Skip to main content

Functions for running many subprocesses in parallel

Project description

Latest version released on PyPi Build status BSD 3-Clause

Utility classes and functions for running subprocess commands in parallel:

>>> from random import randrange
>>> from commandpool import run

>>> commands = ['sleep %s' % randrange(5) for _ in range(100)]

>>> for proc, cmd in run(commands):
...    print(proc.returncode, proc, cmd, sep=', ')
0, <subprocess.Popen object at 0x7fa470b5e278>, sleep 1
0, <subprocess.Popen object at 0x7fa470b449b0>, sleep 2
0, <subprocess.Popen object at 0x7fa470b53d30>, sleep 2
0, <subprocess.Popen object at 0x7fa470b44b70>, sleep 3
0, <subprocess.Popen object at 0x7fa470b53cf8>, sleep 3
0, <subprocess.Popen object at 0x7fa470b53d68>, sleep 4

One way to look at the functionality provided by this library is like a subprocess equivalent of:

echo $commands | xargs -P $concurrency sh -c

This library works by periodically checking if started processes have finished and then starting new ones in their place.

Installation

The latest stable version of commandpool can be installed from pypi:

$ pip install commandpool

Usage

Functional

from commandpool import run

# Run at most 5 commands at a time.
run(commands, concurrency=5)

# Start all commands at the same time (this is the default).
run(commands, concurrency=None)

# The duration between 'ticks' is configurable.
run(commands, sleep_seconds=0.1)

# Processing commands as they are finished.
for proc, cmd in run(commands):
    assert isinstance(proc, subprocess.Popen)

# The way commands are started is configurable through `start_command`.
from subprocess import Popen, PIPE

commands = {i: ('echo', i*i) for i in range(5, 10)}
start_command = lambda num: Popen(commands[num], stdout=PIPE)

for proc, cmd in run(commands, start_command=start_command):
    print(proc.stdout, cmd, commands[cmd])
# b'25', 5, ('echo', 25)
# b'36', 6, ('echo', 36)
# ...

Subclassing

from commandpool import ConcurrentCommandRunner

class MyCommandRunner(ConcurrentCommandRunner):
   def start_command(self, cmd):
       ...

   def command_finished(self, proc, cmd):
       ...

runner = MyCommandRunner(commands, sleep_interval=1.0)
runner.run()

Todo

  • Add tests.

  • Complete documentation.

Alternatives

ConcurrentCommandRunner can be implemented in a few lines with the help of concurrent.futures, assuming that spawning a thread per command is acceptable. This also has the added benefit of yielding as soon as a command (wrapped in a future) is complete, instead of at sleep_seconds intervals, as is the case with ConcurrentCommandRunner.

from concurrent.futures import ThreadPoolExecutor, as_completed
from subprocess import run

with ThreadPoolExecutor(max_workers=10) as pool:
   futures = {pool.submit(run, cmd): cmd for cmd in commands}
   for res in as_completed(futures):
       print(futures[res], res.returncode)

License

Released under the terms of the Revised BSD License.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

commandpool-0.0.0.tar.gz (4.8 kB view hashes)

Uploaded Source

Built Distribution

commandpool-0.0.0-py2.py3-none-any.whl (7.2 kB view hashes)

Uploaded Python 2 Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page