Skip to main content

A Python wrapper around multiprocessing for easy cross-machine computation

Project description

distributed-worker

A Python 3.7 wrapper around multiprocessing for easy cross-machine computation

What is this?

Distributed Worker is a wrapper around the multiprocessing package allowing you to focus on the distributed computations instead of managing nodes/communication. You can use regular python primitives to communicate just as in multiprocessing, but you can also use remote machines as workers.

How does it work?

You create a DistributedManager which creates a central server for your workers to connect to. You can create some logic around it to manage tasks/messages.

Then you implement one or multiple DistributedWorkers to handle those tasks.

And now you can run your distributed computations.

Example Setup

Check /example.py for a fully functional version

Let’s say we want a distributed prime checker, here is a very naive implementation: py def is_prime(x): for num in range(2, x): if (x % num) == 0: return False return True

If we want to check a large amount of primes that would take ages, so we want to distribute the task.

To be clear this task is likely overkill for a distributed setup, but it’s a good example

First we will create a task server:

```py from distributed_worker import DistributedManager class PrimeManager(DistributedManager): def init(self):

super().__init__()
self.pending = list(range(150000))
self.results = {}
self.tasked = set()
self.chunks = 1000 # Task size

def loop(self):

# No tasks pending, wait on exit
if len(self.pending) < 1:
    return

# Assign tasks to workers
active = set(self.get_active_workers())
available_workers = active - self.tasked
for x in available_workers:
  chunk = min(len(self.pending), self.chunks)
  # Send chunks (1000) of numbers for processing
  task = self.pending[:chunk]
  self.pending = self.pending[chunk:]
  print('Send task %d-%d to worker %d' % (task[0], task[-1], x))
  self.send(x, task)
  self.tasked.add(x)

def on_new_worker(self, worker: int):

print('New worker added %d' % worker)

def on_worker_disconnect(self, worker: int):

print('Worker disconnected %d' % worker)

def handle_msg(self, worker: int, msg: Any):

# Worker finished it's task
for num in msg:
  self.results[num] = msg[num]
self.tasked.remove(worker)

if name == “main“:

manager = PrimeManager()

# For adding remote workers
print('client creds:', manager.get_client_args())

while manager.tasked or manager.pending:
  manager.run_once()
  # Can do other tasks here as well

# Print the results
print(manager.results)

# Stop all workers
manager.stop()

```

Then we’ll create a worker:

```py from distributed_worker import DistributedWorker class PrimeWorker(DistributedWorker): def init(self, pipe, *args, **kwargs):

super().__init__(pipe)
self.task = []
self.results = {}

def loop(self):

# Ideally keep the executing here within 1 hour or adjust TTL on the server
if len(self.task):
  # Tasks available
  ctask = self.task.pop()
  self.results[ctask] = is_prime(ctask)
else:
  # Finished tasks
  if len(self.results):
    self.send(self.results)
    # Clear results so we don't resend
    self.results = {}

def handle_msg(self, msg):

if type(msg) == list:
  self.task = msg

```

Now we can create distributed workers gallore: ```py from distributed_worker import create_remote_worker

import PrimeWorker as well

Example, check console of server

client_args = ((‘localhost’, 6000), ‘AF_INET’, b’secret password’)

Create 10 workers (10 threads)

for x in range(10): # Add PrimeWorker constructor args if needed create_remote_worker(PrimeWorker, client_args) ```

If we just want to utilize our local CPU cores (and not deal with the authentication) we can create workers through the manager: py manager = DistributedManager() # Again, add PrimeWorker constructor args if needed manager.create_local_worker(PrimeWorker)

The total execution time will generally improve as tasks need more time to complete

Running example.py vs a single-threaded version: ``` $ time example.py … real 0m11.631s user 0m57.648s sys 0m0.395s

$ time single.py real 0m43.581s user 0m43.550s sys 0m0.018s ```

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

distributed-worker-1.3.7.tar.gz (6.0 kB view hashes)

Uploaded Source

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page