Skip to main content

A Python wrapper around multiprocessing for easy cross-machine computation

Project description

distributed-worker

A Python 3.7 wrapper around multiprocessing for easy cross-machine computation

What is this?

Distributed Worker is a wrapper around the multiprocessing package allowing you to focus on the distributed computations instead of managing nodes/communication. You can use regular python primitives to communicate just as in multiprocessing, but you can also use remote machines as workers.

How does it work?

You create a DistributedManager which creates a central server for your workers to connect to. You can create some logic around it to manage tasks/messages.

Then you implement one or multiple DistributedWorkers to handle those tasks.

And now you can run your distributed computations.

Example Setup

Check /example.py for a fully functional version

Let’s say we want a distributed prime checker, here is a very naive implementation: py def is_prime(x): for num in range(2, x): if (x % num) == 0: return False return True

If we want to check a large amount of primes that would take ages, so we want to distribute the task.

To be clear this task is likely overkill for a distributed setup, but it’s a good example

First we will create a task server:

```py from distributed_worker import DistributedManager class PrimeManager(DistributedManager): def init(self):

super().__init__()
self.pending = list(range(150000))
self.results = {}
self.tasked = set()
self.chunks = 1000 # Task size

def loop(self):

# No tasks pending, wait on exit
if len(self.pending) < 1:
    return

# Assign tasks to workers
active = set(self.get_active_workers())
available_workers = active - self.tasked
for x in available_workers:
  chunk = min(len(self.pending), self.chunks)
  # Send chunks (1000) of numbers for processing
  task = self.pending[:chunk]
  self.pending = self.pending[chunk:]
  print('Send task %d-%d to worker %d' % (task[0], task[-1], x))
  self.send(x, task)
  self.tasked.add(x)

def on_new_worker(self, worker: int):

print('New worker added %d' % worker)

def on_worker_disconnect(self, worker: int):

print('Worker disconnected %d' % worker)

def handle_msg(self, worker: int, msg: Any):

# Worker finished it's task
for num in msg:
  self.results[num] = msg[num]
self.tasked.remove(worker)

if name == “main“:

manager = PrimeManager()

# For adding remote workers
print('client creds:', manager.get_client_args())

while manager.tasked or manager.pending:
  manager.run_once()
  # Can do other tasks here as well

# Print the results
print(manager.results)

# Stop all workers
manager.stop()

```

Then we’ll create a worker:

```py from distributed_worker import DistributedWorker class PrimeWorker(DistributedWorker): def init(self, pipe, *args, **kwargs):

super().__init__(pipe)
self.task = []
self.results = {}

def loop(self):

# Ideally keep the executing here within 1 hour or adjust TTL on the server
if len(self.task):
  # Tasks available
  ctask = self.task.pop()
  self.results[ctask] = is_prime(ctask)
else:
  # Finished tasks
  if len(self.results):
    self.send(self.results)
    # Clear results so we don't resend
    self.results = {}

def handle_msg(self, msg):

if type(msg) == list:
  self.task = msg

```

Now we can create distributed workers gallore: ```py from distributed_worker import create_remote_worker

import PrimeWorker as well

Example, check console of server

client_args = ((‘localhost’, 6000), ‘AF_INET’, b’secret password’)

Create 10 workers (10 threads)

for x in range(10): # Add PrimeWorker constructor args if needed create_remote_worker(PrimeWorker, client_args) ```

If we just want to utilize our local CPU cores (and not deal with the authentication) we can create workers through the manager: py manager = DistributedManager() # Again, add PrimeWorker constructor args if needed manager.create_local_worker(PrimeWorker)

The total execution time will generally improve as tasks need more time to complete

Running example.py vs a single-threaded version: ``` $ time example.py … real 0m11.631s user 0m57.648s sys 0m0.395s

$ time single.py real 0m43.581s user 0m43.550s sys 0m0.018s ```

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

distributed-worker-1.3.7.tar.gz (6.0 kB view details)

Uploaded Source

File details

Details for the file distributed-worker-1.3.7.tar.gz.

File metadata

  • Download URL: distributed-worker-1.3.7.tar.gz
  • Upload date:
  • Size: 6.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.4.1 importlib_metadata/4.0.1 pkginfo/1.7.0 requests/2.22.0 requests-toolbelt/0.9.1 tqdm/4.61.0 CPython/3.7.0

File hashes

Hashes for distributed-worker-1.3.7.tar.gz
Algorithm Hash digest
SHA256 20e804e48522bdd9ac37e44f31475531fd0821b5f27cf837c3f42f430a7db1aa
MD5 5fca4920557f98aa97f850fb84b9a025
BLAKE2b-256 edcaab6dda91f51e51d6930e8e937c6f83ad6eb47e508ec72a358d2f881d532c

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page