Skip to main content

Python wrapper for macOS Grand Central Dispatch (GCD)

Project description

cygcd

Python wrapper for Apple's Grand Central Dispatch (GCD) framework on macOS.

GCD provides a powerful API for concurrent programming, allowing you to execute tasks asynchronously on system-managed thread pools. This wrapper exposes the core GCD primitives to Python with proper GIL handling for true parallelism.

Installation

# Clone and install
git clone <repository>
cd cygcd
make

Requires macOS and Python 3.9+.

Quick Start

import cygcd

# Create a serial queue
q = cygcd.Queue("com.example.myqueue")

# Execute tasks asynchronously
q.run_async(lambda: print("Hello from GCD!"))

# Execute synchronously (blocks until complete)
q.run_sync(lambda: print("This runs and waits"))

Core Concepts

Queues

Queues manage task execution. Serial queues execute one task at a time in FIFO order. Concurrent queues can execute multiple tasks simultaneously.

import cygcd

# Serial queue (default) - tasks run one at a time
serial = cygcd.Queue("com.example.serial")

# Concurrent queue - tasks can run in parallel
concurrent = cygcd.Queue("com.example.concurrent", concurrent=True)

# Global queues (system-managed, concurrent)
high_priority = cygcd.Queue.global_queue(cygcd.QUEUE_PRIORITY_HIGH)
default = cygcd.Queue.global_queue(cygcd.QOS_CLASS_DEFAULT)
background = cygcd.Queue.global_queue(cygcd.QUEUE_PRIORITY_BACKGROUND)

Async and Sync Execution

q = cygcd.Queue("example")
results = []

# Async - returns immediately, task runs later
q.run_async(lambda: results.append("async"))

# Sync - blocks until task completes
q.run_sync(lambda: results.append("sync"))

print(results)  # ['async', 'sync']

Barriers (Reader-Writer Pattern)

Barriers on concurrent queues wait for all previous tasks to complete, execute exclusively, then allow subsequent tasks.

q = cygcd.Queue("rw", concurrent=True)
data = {"value": 0}

def read():
    print(f"Read: {data['value']}")

def write():
    data["value"] += 1
    print(f"Write: {data['value']}")

# Readers can run concurrently
q.run_async(read)
q.run_async(read)

# Writer runs exclusively
q.barrier_async(write)

# More readers after write completes
q.run_async(read)

q.barrier_sync(lambda: None)  # Wait for all

Groups

Groups track completion of multiple tasks across queues.

import cygcd

g = cygcd.Group()
q = cygcd.Queue.global_queue()
results = []

# Submit tasks to the group
for i in range(5):
    g.run_async(q, lambda i=i: results.append(i))

# Wait for all tasks to complete
g.wait()
print(f"Completed: {sorted(results)}")

# Or get notified when done
g.notify(q, lambda: print("All done!"))

Semaphores

Semaphores limit concurrent access to resources.

import time
import cygcd

# Allow 2 concurrent operations
sem = cygcd.Semaphore(2)
q = cygcd.Queue.global_queue()

def limited_task(task_id):
    sem.wait()  # Acquire (blocks if at limit)
    print(f"Task {task_id} running")
    time.sleep(1)
    sem.signal()  # Release

# Only 2 of these will run at a time
for i in range(5):
    q.run_async(lambda i=i: limited_task(i))

Parallel Apply

Execute a function N times in parallel (like a parallel for loop).

import threading
import cygcd

results = []
lock = threading.Lock()

def process(index):
    result = index * index
    with lock:
        results.append(result)

# Execute 10 times in parallel, blocks until complete
cygcd.apply(10, process)

print(sorted(results))  # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

Delayed Execution

Schedule tasks to run after a delay.

import time
import cygcd

q = cygcd.Queue("timer")
start = time.time()

q.after(0.5, lambda: print(f"Executed after {time.time() - start:.2f}s"))
q.after(1.0, lambda: print(f"Executed after {time.time() - start:.2f}s"))

time.sleep(1.5)

Once (Thread-Safe Initialization)

Execute a callable exactly once, even from multiple threads.

import cygcd

once = cygcd.Once()
initialized = False

def init():
    global initialized
    initialized = True
    print("Initialized!")

# Only prints once, even if called multiple times
once(init)
once(init)
once(init)

Timers

Repeating or one-shot timers using dispatch sources.

import cygcd

count = 0

def tick():
    global count
    count += 1
    print(f"Tick {count}")

# Create a repeating timer (fires every 0.5 seconds)
timer = cygcd.Timer(0.5, tick)
timer.start()

# ... later
timer.cancel()

# One-shot timer (fires once after delay)
one_shot = cygcd.Timer(1.0, lambda: print("Done!"), repeating=False)
one_shot.start()

Signal Handling

Handle Unix signals asynchronously on a dispatch queue.

import os
import signal
import cygcd

def on_signal():
    print("Received SIGUSR1!")

# Disable default handling
signal.signal(signal.SIGUSR1, signal.SIG_IGN)

# Create signal source
source = cygcd.SignalSource(signal.SIGUSR1, on_signal)
source.start()

# Send signal to self
os.kill(os.getpid(), signal.SIGUSR1)

# ... later
source.cancel()

File Descriptor Monitoring

Monitor file descriptors for read/write availability.

import os
import cygcd

r, w = os.pipe()

def on_readable():
    data = os.read(r, 1024)
    print(f"Received: {data}")

# Monitor for readable data
reader = cygcd.ReadSource(r, on_readable)
reader.start()

# Write triggers the handler
os.write(w, b"Hello!")

# ... later
reader.cancel()

Process Monitoring

Monitor processes for lifecycle events.

import subprocess
import cygcd

def on_exit():
    print("Child process exited!")

# Launch a subprocess
proc = subprocess.Popen(["sleep", "1"])

# Monitor for exit
source = cygcd.ProcessSource(proc.pid, on_exit, events=cygcd.PROC_EXIT)
source.start()

# Wait for exit notification
proc.wait()

source.cancel()

Suspend and Resume

Control queue execution.

q = cygcd.Queue("work")

q.suspend()
# Queue accepts tasks but doesn't execute them
q.run_async(lambda: print("Waiting..."))

q.resume()
# Now tasks execute

Inactive Queues

Create queues that don't execute until activated. Useful for batch configuration.

import cygcd

# Create an inactive queue
q = cygcd.Queue("lazy", inactive=True)
print(q.is_inactive)  # True

# Submit work (queued but not executed)
q.run_async(lambda: print("Waiting to run..."))

# Configure the queue while inactive
q.set_target_queue(cygcd.Queue.global_queue())

# Activate to begin execution
q.activate()
print(q.is_inactive)  # False

Main Queue

Access the main thread's queue (useful for GUI apps).

main = cygcd.Queue.main_queue()
main.run_async(update_ui)

Wall Clock Time

Schedule based on wall clock (adjusts for system time changes).

import time
import cygcd

# Schedule 60 seconds from now using wall clock
t = cygcd.walltime(delta_seconds=60)

# Schedule at a specific Unix timestamp
future = time.time() + 3600  # 1 hour from now
t = cygcd.walltime(timestamp=future)

Dispatch Data

Efficient immutable buffers for I/O operations.

import cygcd

# Create from bytes
data = cygcd.Data(b"Hello, World!")
print(len(data))  # 13
print(bytes(data))  # b'Hello, World!'

# Concatenate (returns new Data, originals unchanged)
part1 = cygcd.Data(b"Hello, ")
part2 = cygcd.Data(b"World!")
combined = part1.concat(part2)

# Extract subrange
message = cygcd.Data(b"The quick brown fox")
word = message.subrange(4, 5)  # b"quick"

Async I/O

Non-blocking file read and write operations.

import os
import cygcd

# Async read
fd = os.open("file.txt", os.O_RDONLY)
sem = cygcd.Semaphore(0)

def on_read(data, error):
    if error == 0:
        print(f"Read {len(data)} bytes: {data}")
    sem.signal()

cygcd.read_async(fd, 1024, on_read)
sem.wait()
os.close(fd)

# Async write
fd = os.open("output.txt", os.O_WRONLY | os.O_CREAT, 0o644)

def on_write(remaining, error):
    if error == 0:
        print("Write complete")
    sem.signal()

cygcd.write_async(fd, b"Hello!", on_write)
sem.wait()
os.close(fd)

Workloops

Priority-inversion-avoiding queues for priority-sensitive workloads.

import cygcd

# Create a workloop
wl = cygcd.Workloop("com.example.workloop")

# Submit work (same API as queues)
wl.run_async(lambda: print("Async task"))
wl.run_sync(lambda: print("Sync task"))

# Inactive workloops (cannot submit work until activated)
inactive_wl = cygcd.Workloop("lazy", inactive=True)
inactive_wl.activate()  # Now ready for work
inactive_wl.run_sync(lambda: print("Running!"))

API Reference

Queue

Method Description
Queue(label=None, concurrent=False, ...) Create a queue
Queue.global_queue(priority=0) Get a global queue
Queue.main_queue() Get the main queue
run_async(func) Submit for async execution
run_sync(func) Submit and wait for completion
barrier_async(func) Async barrier (concurrent queues)
barrier_sync(func) Sync barrier
after(delay_seconds, func) Delayed execution
suspend() Suspend queue execution
resume() Resume queue execution
activate() Activate an inactive queue
set_target_queue(queue) Set target queue for hierarchy
label Queue's label (property)
is_inactive Check if queue is inactive (property)

Queue constructor parameters:

  • label: Optional string label for debugging
  • concurrent: If True, create a concurrent queue
  • qos: Quality of Service class (QOS_CLASS_* constants)
  • relative_priority: Priority offset within QOS class (-15 to 0)
  • target: Target queue for queue hierarchy
  • inactive: If True, create in inactive state (must call activate())

Group

Method Description
Group() Create a group
run_async(queue, func) Submit task to group
wait(timeout=-1) Wait for completion (returns bool)
notify(queue, func) Callback when group completes
enter() Manually increment task count
leave() Manually decrement task count

Semaphore

Method Description
Semaphore(value) Create with initial value
wait(timeout=-1) Decrement, block if zero (returns bool)
signal() Increment, wake waiters (returns bool)

Once

Method Description
Once() Create a once token
__call__(func) Execute func exactly once

Timer

Method Description
Timer(interval, handler, queue=None, ...) Create a timer
start() Start the timer
cancel() Cancel the timer
set_timer(interval, start_delay, ...) Reconfigure the timer
is_cancelled Check if cancelled (property)

Timer constructor parameters:

  • interval: Seconds between firings
  • handler: Callable to invoke
  • queue: Target queue (default: global)
  • start_delay: Initial delay (default: 0)
  • leeway: Power optimization leeway (default: 0)
  • repeating: True for repeating, False for one-shot

SignalSource

Method Description
SignalSource(signum, handler, queue=None) Create a signal source
start() Start monitoring
cancel() Stop monitoring
signal Signal number (property)
count Signals received since last handler (property)
is_cancelled Check if cancelled (property)

ReadSource

Method Description
ReadSource(fd, handler, queue=None) Create a read source
start() Start monitoring
cancel() Stop monitoring
fd File descriptor (property)
bytes_available Estimated bytes available (property)
is_cancelled Check if cancelled (property)

WriteSource

Method Description
WriteSource(fd, handler, queue=None) Create a write source
start() Start monitoring
cancel() Stop monitoring
fd File descriptor (property)
buffer_space Estimated buffer space (property)
is_cancelled Check if cancelled (property)

ProcessSource

Method Description
ProcessSource(pid, handler, events=PROC_EXIT, queue=None) Create a process source
start() Start monitoring
cancel() Stop monitoring
pid Process ID (property)
events_pending Events that occurred (property)
is_cancelled Check if cancelled (property)

Data

Method Description
Data(bytes=None) Create from bytes (or empty)
concat(other) Concatenate with another Data
subrange(offset, length) Extract a subrange
size Size in bytes (property)
__len__() Size in bytes
__bytes__() Convert to bytes

Workloop

Method Description
Workloop(label, inactive=False) Create a workloop
run_async(func) Submit for async execution
run_sync(func) Submit and wait for completion
activate() Activate an inactive workloop
is_inactive Check if inactive (property)

Note: Work cannot be submitted to inactive workloops (raises RuntimeError).

Functions

Function Description
apply(iterations, func, queue=None) Parallel for loop
time_from_now(seconds) Create dispatch time (monotonic)
walltime(timestamp=0, delta_seconds=0) Create dispatch time (wall clock)
read_async(fd, length, callback, queue=None) Async file read
write_async(fd, data, callback, queue=None) Async file write

Async I/O callbacks receive (data, error) where error is 0 on success.

Constants

Time:

  • DISPATCH_TIME_NOW, DISPATCH_TIME_FOREVER
  • NSEC_PER_SEC, NSEC_PER_MSEC, NSEC_PER_USEC
  • USEC_PER_SEC, MSEC_PER_SEC

Queue Priority:

  • QUEUE_PRIORITY_HIGH, QUEUE_PRIORITY_DEFAULT
  • QUEUE_PRIORITY_LOW, QUEUE_PRIORITY_BACKGROUND

QOS Classes:

  • QOS_CLASS_USER_INTERACTIVE, QOS_CLASS_USER_INITIATED
  • QOS_CLASS_DEFAULT, QOS_CLASS_UTILITY
  • QOS_CLASS_BACKGROUND, QOS_CLASS_UNSPECIFIED

Process Events:

  • PROC_EXIT - Process exited
  • PROC_FORK - Process forked
  • PROC_EXEC - Process executed exec()
  • PROC_SIGNAL - Process received a signal

I/O Types:

  • IO_STREAM - Stream-based I/O
  • IO_RANDOM - Random access I/O

Examples

See the examples/ directory for complete examples:

  • serial_queue.py - Serial FIFO execution
  • concurrent_queue.py - Concurrent queue with barriers
  • semaphore.py - Resource limiting
  • dispatch_once.py - One-time initialization
  • gcd_groups.py - Task groups and notifications
  • parallel_apply.py - Parallel loop execution
  • producer_consumer.py - Semaphore coordination
  • delayed_execution.py - Scheduled tasks
  • timer.py - Repeating and one-shot timers
  • signal_source.py - Unix signal handling
  • fd_source.py - File descriptor I/O monitoring
  • process_source.py - Process lifecycle monitoring
  • inactive_queue.py - Creating and activating inactive queues
  • dispatch_data.py - Buffer management with Data objects
  • async_io.py - Asynchronous file read/write
  • workloop.py - Priority-inversion-avoiding workloops

Notes

  • All GCD operations release the Python GIL, enabling true parallelism
  • Callbacks are executed with the GIL held (Python code is thread-safe)
  • Global queues are system-managed and should not be released
  • Avoid run_sync() on a serial queue from within that queue (deadlock)

License

GCD is licensed under the Apache 2.0 license

This library is licensed under the MIT license

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cygcd-0.1.1.tar.gz (95.4 kB view details)

Uploaded Source

Built Distributions

If you're not sure about the file name format, learn more about wheel file names.

cygcd-0.1.1-cp314-cp314-macosx_15_0_arm64.whl (77.2 kB view details)

Uploaded CPython 3.14macOS 15.0+ ARM64

cygcd-0.1.1-cp313-cp313-macosx_15_0_arm64.whl (76.0 kB view details)

Uploaded CPython 3.13macOS 15.0+ ARM64

cygcd-0.1.1-cp312-cp312-macosx_15_0_arm64.whl (76.3 kB view details)

Uploaded CPython 3.12macOS 15.0+ ARM64

cygcd-0.1.1-cp311-cp311-macosx_15_0_arm64.whl (75.9 kB view details)

Uploaded CPython 3.11macOS 15.0+ ARM64

cygcd-0.1.1-cp310-cp310-macosx_15_0_arm64.whl (75.9 kB view details)

Uploaded CPython 3.10macOS 15.0+ ARM64

cygcd-0.1.1-cp39-cp39-macosx_15_0_arm64.whl (76.3 kB view details)

Uploaded CPython 3.9macOS 15.0+ ARM64

File details

Details for the file cygcd-0.1.1.tar.gz.

File metadata

  • Download URL: cygcd-0.1.1.tar.gz
  • Upload date:
  • Size: 95.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.2

File hashes

Hashes for cygcd-0.1.1.tar.gz
Algorithm Hash digest
SHA256 e046590fd669826d25638b0c170655d9c47e69857739baa5fe7b3b63df3bf048
MD5 3ab38015f7cf7268bb5b600966c79a9c
BLAKE2b-256 e6f88b2cd0005d2d596b04b0d65ab5d8b87c6eb0fcaa4d8ad6437ef8fa9b6b41

See more details on using hashes here.

File details

Details for the file cygcd-0.1.1-cp314-cp314-macosx_15_0_arm64.whl.

File metadata

File hashes

Hashes for cygcd-0.1.1-cp314-cp314-macosx_15_0_arm64.whl
Algorithm Hash digest
SHA256 909b3a3059ab9b6672b7b86aac8d0c4cd86f97fa638bfada8da1ef1a4c94c32c
MD5 6cd1082de77fffc69ec096a051a68e51
BLAKE2b-256 774e42a0ce8ff0f045304fa7dc1013c259e69c4e69bcda894accaed533b60782

See more details on using hashes here.

File details

Details for the file cygcd-0.1.1-cp313-cp313-macosx_15_0_arm64.whl.

File metadata

File hashes

Hashes for cygcd-0.1.1-cp313-cp313-macosx_15_0_arm64.whl
Algorithm Hash digest
SHA256 48a648c673b073dc72efa7cbf06f0a020f7ca48efd320702e5f85337b76768b4
MD5 ce3d8e676716f16746f65efda7dd3f8e
BLAKE2b-256 e9c418d4258bbcb23213cddc273d8ac32307215df3df1f9da7b68bcb5139ff3b

See more details on using hashes here.

File details

Details for the file cygcd-0.1.1-cp312-cp312-macosx_15_0_arm64.whl.

File metadata

File hashes

Hashes for cygcd-0.1.1-cp312-cp312-macosx_15_0_arm64.whl
Algorithm Hash digest
SHA256 ed31e3ea0a5d8534f4cc288cbe8c579d42f36f94eea554770e8ca849267e2785
MD5 8068f24f7bb02731f6859a984d33a18c
BLAKE2b-256 733613b2cad007b93d5d667d6b397b5a55c28eeb784fb9f24a28b4adf85e8357

See more details on using hashes here.

File details

Details for the file cygcd-0.1.1-cp311-cp311-macosx_15_0_arm64.whl.

File metadata

File hashes

Hashes for cygcd-0.1.1-cp311-cp311-macosx_15_0_arm64.whl
Algorithm Hash digest
SHA256 4547f6a1b88279c7bd0ef72c616034a04b2889e9ffeb7555615b5345e0cad206
MD5 c30e1cdfa875bfaf468fd842ae2bae75
BLAKE2b-256 85cd71b08e72f4994d3402af8ee93097d8340aab5c62cc5faf3b2a6ac174c9a0

See more details on using hashes here.

File details

Details for the file cygcd-0.1.1-cp310-cp310-macosx_15_0_arm64.whl.

File metadata

File hashes

Hashes for cygcd-0.1.1-cp310-cp310-macosx_15_0_arm64.whl
Algorithm Hash digest
SHA256 b3f20552c5d3f8269e75da747967f7c3b80548903357ddfda4e7782faf05d1e5
MD5 7df5d4e2be832e7a7593bd7995b4dab3
BLAKE2b-256 816b73dd295f88da7554fcc0b54c39cb403219b40d9ec00dc570d828b1842ced

See more details on using hashes here.

File details

Details for the file cygcd-0.1.1-cp39-cp39-macosx_15_0_arm64.whl.

File metadata

File hashes

Hashes for cygcd-0.1.1-cp39-cp39-macosx_15_0_arm64.whl
Algorithm Hash digest
SHA256 e1148698cb19e4db2b9f35cae77f105fc9ec1304abc8fcd55da7d0c25b915f5b
MD5 8485b60816243734a6f10450ae6f97e7
BLAKE2b-256 d72474e3fd748904333ae2fbbd55e6b45bf4f6d188fd163b43f38e8a68422505

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page