A “write-once, run‐anywhere” sync/async bridge that’s thread-safe, decorator-driven, and plays nicely in FastAPI (or other frameworks) & with DB connections.
Project description
synchronaut Overview
synchronaut is a tiny bridge to write your business logic once and run it in both sync and async contexts—thread-safe, decorator-driven, and DB-friendly. It provides:
- A single
call_anyentrypoint for all sync↔️async combinations, where you can optionally passexecutor= - 🆕 A decorator
@synchronaut(...)with.sync/.async_bypass methods - Batch helper
parallel_map(aliased asparallel_map), with per-call timeouts and exception capture - Context-var propagation across threads
- Customizable timeouts with
CallAnyTimeout
Quickstart
Install:
# “standard” install (no uvloop):
pip install synchronaut
# “fast” (with uvloop) for maximum asyncio performance:
pip install synchronaut[fast]
Create quickstart.py:
import time
import asyncio
from synchronaut import synchronaut, call_any, call_map, CallAnyTimeout
# ——— plain functions ———
def sync_add(a, b):
return a + b
async def async_add(a, b):
return a + b
# ——— decorated versions ———
@synchronaut()
def dec_sync_add(a, b):
return a + b
@synchronaut(timeout=1.0)
async def dec_async_add(a, b):
return a + b
async def main():
# sync → sync
print('sync_add:', sync_add(1, 2))
print('call_any(sync_add):', await call_any(sync_add, 3, 4))
# sync → async (in async context, sync funcs auto-offload)
print('offloaded sync_add:', await call_any(sync_add, 5, 6))
# async → async
print('async_add:', await async_add(7, 8))
print('call_any(async_add):', await call_any(async_add, 7, 8))
# batch helper in async
print('call_map:', await call_map([sync_add, async_add], 4, 5))
# decorator shortcuts in async
print('await dec_sync_add.async_:', await dec_sync_add.async_(6, 7))
print('await dec_async_add:', await dec_async_add(8, 9))
# timeout demo (pure-sync offload)
try:
await call_any(lambda: time.sleep(2), timeout=0.5)
except CallAnyTimeout as e:
print('Timeout caught:', e)
if __name__ == '__main__':
# sync-land examples
print('dec_sync_add(2,3):', dec_sync_add(2, 3))
print('call_any(async_add) in sync:', call_any(async_add, 9, 10))
# then run the async demonstrations
asyncio.run(main())
Run it:
python quickstart.py
Expected output:
dec_sync_add(2,3): 5
sync_add: 3
call_any(sync_add): 7
offloaded sync_add: 11
async_add: 15
call_any(async_add): 15
call_map: [9, 9]
await dec_sync_add.async_: 13
await dec_async_add: 17
Timeout caught: Function <lambda> timed out after 0.5s
FastAPI Integration
Copy this into app.py—it’ll just work once you pip install synchronaut:
from typing import AsyncGenerator
from fastapi import FastAPI, Depends, HTTPException
from pydantic import BaseModel
from synchronaut import synchronaut
# ——— Dummy DB & models ———
class User(BaseModel):
id: int
name: str
class DummyDB:
def __init__(self):
self._data = {
1: {'id': 1, 'name': 'Alice'},
2: {'id': 2, 'name': 'Bob'},
}
def query(self, user_id: int):
return self._data.get(user_id)
async def get_db_async() -> AsyncGenerator[DummyDB, None]:
db = DummyDB()
try:
yield db
finally:
...
# ——— App & routes ———
app = FastAPI()
@synchronaut()
def get_user(user_id: int, db: DummyDB = Depends(get_db_async)) -> User:
data = db.query(user_id)
if not data:
raise HTTPException(status_code=404, detail='User not found')
return User(**data)
@app.get('/')
async def hello():
return {"Hello, @syncronauts!"}
@app.get('/users/{user_id}', response_model=User)
async def read_user(user: User = Depends(get_user)):
return user
Run:
uvicorn app:app --reload
This will produce:
When you go to http://127.0.0.1:8000/ -> {'Hello, @syncronauts!'}
When you go to http://127.0.0.1:8000/users/1 -> {'id': 1, 'name': 'Alice'}
When you go to http://127.0.0.1:8000/users/2 -> {'id': 2, 'name': 'Bob'}
When you go to http://127.0.0.1:8000/users/3 -> {"detail":"User not found"}
Note: if you ever need to offload into your own thread‐pool, you can write
call_any(some_sync_fn, arg1, arg2, executor=my_custom_executor)rather than relying on the built-in default.
Context Propagation
Put this in ctx_prop.py:
from synchronaut.utils import (
request_context,
spawn_thread_with_ctx,
set_request_ctx,
get_request_ctx,
)
# set a global context
set_request_ctx({'user_id': 42})
print('Global, user_id:', get_request_ctx()['user_id']) # 42
# override in a block
with request_context({'user_id': 99}):
print('Inside block, user_id:', get_request_ctx()['user_id']) # 99
# back to global
print('Global again, user_id:', get_request_ctx()['user_id']) # 42
# worker in a thread sees the global context
def work():
print('Inside thread, user_id:', get_request_ctx()['user_id']) # 42
thread = spawn_thread_with_ctx(work)
thread.join()
Run:
python ctx_prop.py
Expected:
Global, user_id: 42
Inside block, user_id: 99
Global again, user_id: 42
Inside thread, user_id: 42
Batch Helper: parallel_map
A key feature in synchronaut is the ability to run multiple calls in parallel (in both sync and async contexts) with individual timeouts. This helper is exposed as parallel_map (aliased to parallel_map for backwards compatibility).
- In sync-land, all calls are submitted to a thread pool at once and run truly in parallel (up to
max_workers). - In
asyncio-land, each call is wrapped in anasyncio.create_task(...)and then awaited with a singleasyncio.gather(...). - In Trio-land, calls are run sequentially under Trio’s task runner—but any sync call still offloads to threads if needed.
Signature
def parallel_map(
calls: list[tuple[Callable, tuple, dict, float|None]],
*,
executor: ThreadPoolExecutor | None = None,
return_exceptions: bool = False,
) -> Any
-
callsis a list of 4-tuples:(func, args_tuple, kwargs_dict, per_call_timeout)
where
per_call_timeoutis afloat(seconds) orNone(no timeout). -
executor(optional) lets you supply your ownThreadPoolExecutorfor offloading in sync-land or asyncio-land; if omitted, the built-in_SHARED_EXECUTORis used. -
return_exceptions(bool) controls whether exceptions get returned in the results list (instead of immediately propagating).
Per-Function Timeouts
Each call’s 4th element is either:
- A
float(e.g.0.2), causingcall_any(..., timeout=0.2, …)to be used, so that if the function runs longer, aCallAnyTimeoutis returned or raised. None, meaning no timeout is applied on that call.
Exception Capture
- If
return_exceptions=False(the default), the first exception (or timeout) anywhere will immediately bubble up. - If
return_exceptions=True, each call is wrapped in atry/exceptthat returns the exception object in that position instead of raising.
Examples
1. Synchronous (plain‐old def) usage
def sync_greet(name):
return f"Hello, {name}!"
def sync_add(a, b):
return a + b
def sync_sleep_and_return(x):
import time; time.sleep(x)
return f"Slept {x}s"
# We want:
# - sync_greet("Alice") with no timeout
# - sync_add(2, 3) with a 1.0s timeout
# - sync_sleep_and_return(2) with a 1.0s timeout (this one should “timeout”)
calls = [
(sync_greet, ("Alice",), {}, None),
(sync_add, (2, 3), {}, 1.0),
(sync_sleep_and_return, (2,), {}, 1.0),
]
# Collect exceptions instead of letting the sleep call raise
results = parallel_map(calls, return_exceptions=True)
# `results` is a list, in the same order:
# [
# "Hello, Alice!", # from sync_greet
# 5, # from sync_add
# CallAnyTimeout(...) # from sync_sleep_and_return because it slept 2s > 1.0s timeout
# ]
print(results)
Possible Output:
['Hello, Alice!', 5, CallAnyTimeout('Function sync_sleep_and_return timed out after 1.0s')]
2. “Mixed” Sync + Async in an asyncio coroutine
import asyncio
async def async_hello(name):
await asyncio.sleep(0.1)
return f"Hi, {name}!"
def sync_double(x):
return x * 2
async def main():
calls = [
# run async_hello("Charlie") with a 0.5s timeout
(async_hello, ("Charlie",), {}, 0.5),
# run sync_double(21) with no timeout
(sync_double, (21,), {}, None),
# run async_hello but force it to “time out” in 0.01s
(async_hello, ("TooSlow",), {}, 0.01),
]
# We want exceptions captured so we can see who timed out
results = await parallel_map(calls, return_exceptions=True)
# results[0] → "Hi, Charlie!"
# results[1] → 42
# results[2] → CallAnyTimeout(...) because async_hello slept 0.1s > 0.01s
print(results)
asyncio.run(main())
Possible Output:
['Hi, Charlie!', 42, CallAnyTimeout('Function async_hello timed out after 0.01s')]
3. Inside a Trio‐based function
import trio
async def async_square(n):
await trio.sleep(0.05)
return n * n
def sync_subtract(a, b):
return a - b
async def trio_main():
calls = [
(async_square, (5,), {}, 0.1), # finishes in 0.05s < 0.1s
(sync_subtract, (10,3),{}, None),
(async_square, (10,), {}, 0.01), # will timeout
]
results = await parallel_map(calls, return_exceptions=True)
# → [25, 7, CallAnyTimeout(...)]
print(results)
trio.run(trio_main)
Possible Output:
[25, 7, CallAnyTimeout('Function async_square timed out after 0.01s')]
⚙️ How It Works
-
Signature
def parallel_map( calls: list[tuple[Callable, tuple, dict, float|None]], *, executor: ThreadPoolExecutor | None = None, return_exceptions: bool = False, ) -> Any:
-
callsis a list of 4‐tuples:(func, args_tuple, kwargs_dict, per_call_timeout)where
per_call_timeoutis afloatorNone. -
executor(optional) lets you supply your ownThreadPoolExecutorfor offloading in sync-land or asyncio-land. -
return_exceptionscontrols whether exceptions get returned in the result list (instead of immediately propagating).
-
-
Sync branch (plain “no async loop”)
futures = [ executor.submit( partial( call_any, fn, *args, timeout=per_timeout, executor=executor, **kwargs ) ) for (fn, args, kwargs, per_timeout) in calls ] for fut in futures: try: results.append(fut.result()) except Exception as exc: if return_exceptions: results.append(exc) else: raise
- Each
call_any(...)may, in turn, spin up anasyncio.run_coroutine_threadsafe(...)(for async fns) or run your sync fn directly in that thread. - Because all calls are submitted at once, they run in parallel up to
max_workersthreads in your pool.
- Each
-
Asyncio branch (
mode == 'asyncio')async def _run_all_asyncio(): tasks = [] for (fn, args, kwargs, per_timeout) in calls: async def _run_one(fn=fn, args=args, kwargs=kwargs, per_timeout=per_timeout): try: return await call_any(fn, *args, timeout=per_timeout, executor=_exec, **kwargs) except Exception as exc: if return_exceptions: return exc raise tasks.append(asyncio.create_task(_run_one())) return await asyncio.gather(*tasks, return_exceptions=return_exceptions)
- Each call is scheduled immediately with
asyncio.create_task(...). - Then a single
await asyncio.gather(...)waits for all to complete or timeout/raise.
- Each call is scheduled immediately with
-
Trio branch (
mode == 'trio')async def _run_all_trio(): results = [] for (fn, args, kwargs, per_timeout) in calls: try: val = await call_any(fn, *args, timeout=per_timeout, **kwargs) results.append(val) except Exception as exc: if return_exceptions: results.append(exc) else: raise return results
- Calls are run sequentially under Trio’s task runner.
- If any individual call raises, it’s either captured (if
return_exceptions=True) or re‐raised.
Advanced
All these options are callable via call_any(...) or the @synchronaut(...) decorator:
-
timeout=: raisesCallAnyTimeoutif the call exceeds N seconds -
force_offload=True: always run sync funcs in the background loop (enables timely cancellation) -
executor=: send offloaded sync work into a caller-providedThreadPoolExecutor(instead of the default) -
call_map([...], *args): runs in parallel in async context, sequentially in sync context -
Context propagation:
set_request_ctx()/get_request_ctx()to set and read a globalContextVarrequest_context({...})context-manager to temporarily overridespawn_thread_with_ctx(fn, *args)to ensureContextVarstate flows into threads
⚠️ Gotchas
- Decorator overhead: each call does an inspect/async-check (nanoseconds–µs). In ultra-hot loops, consider a bypass.
- Timeouts on sync code: pure-sync calls only respect
timeoutif offloaded—otherwise they block until completion. - Background loop lifecycle: offloads and
.syncbypass use our single background loop; it lives until process exit. - Custom executor: if you pass
executor=my_executor, that executor will actually be used for offloading. If you forget, all work goes into the built-in_SHARED_EXECUTOR. - ContextVar propagation: manual threads must use our
spawn_thread_with_ctx. - Non-asyncio stacks:
_in_async_contextrecognizes only asyncio and Trio. Other event loops may mis‐route. - Tracebacks: decorators + offloads can obscure original frames. Use logging or
inspect.trace()for debugging.
✅ When to use synchronaut
- I/O-bound web services (DB calls, HTTP, file I/O)
- Mixed sync/async code-bases (one API, two contexts)
- FastAPI / DI: sync ORMs auto-offload under the hood
- Context-scoped resources: single “request context” across threads & coros
🚫 When not to use synchronaut
- CPU-bound tight loops where microseconds matter
- Pure-sync or pure-async projects (no context switching)
- Non-asyncio async frameworks (e.g. Curio)
- Strict loop-lifecycle environments that forbid background loops
By tuning
timeout,force_offload, or using the.sync/.async_bypasses, you get seamless sync↔️async interoperability without rewriting your core logic.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file synchronaut-0.3.2.tar.gz.
File metadata
- Download URL: synchronaut-0.3.2.tar.gz
- Upload date:
- Size: 38.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.6.17
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d2fc25a1295fae4232bbbd4d45a98f10f19010add44101d2ecdd417b919a5a40
|
|
| MD5 |
5fbd778bebc265954108a2de12b518b0
|
|
| BLAKE2b-256 |
37a9dbce9c90f8056724683f2c18067cb989fcb24152756c6254807b8bbb91ed
|
File details
Details for the file synchronaut-0.3.2-py3-none-any.whl.
File metadata
- Download URL: synchronaut-0.3.2-py3-none-any.whl
- Upload date:
- Size: 13.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.6.17
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8898ddce9fb434825f011f0d035fb3c74cb0c05dc09fa6fced4aaaee0dd8a7ba
|
|
| MD5 |
d6609fbb6c263c34beae512f0c873369
|
|
| BLAKE2b-256 |
fa19ab44bfe515843a3b13358d6eb563a803c2ac5d5ae2eace650a4cee980ea4
|