permchain
Project description
permchain
Get started
pip install permchain
Usage
from permchain import InMemoryPubSubConnection, PubSub, Topic
topic_one = Topic("one")
chain_one = Topic.IN.subscribe() | (lambda x: x + 'b') | topic_one.publish()
chain_two = topic_one.subscribe() | (lambda x: x + 'c') | Topic.OUT.publish()
conn = InMemoryPubSubConnection()
pubsub = PubSub(processes=(chain_one, chain_two), connection=conn)
assert pubsub.invoke('a') == ['abc']
Check tests
and examples
for more examples.
Roadmap
- Add initial retry support (pending changes in
langchain
) - Detect cycles (aka. infinite loops) and throw an error
- Allow user to catch that error (by subcribing to an error topic?)
- Replace Queue data structure with a Log data structure (this will enable checking the status of the readers, etc.)
- eg. https://anyio.readthedocs.io/en/3.x/streams.html
- Implement IN and OUT topics as regular topics
- Enable resuming PubSub from the "middle" of the computation
- Add Redis-backed Connection implementation
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
permchain-0.0.2.tar.gz
(6.2 kB
view hashes)
Built Distribution
Close
Hashes for permchain-0.0.2-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | f91c039769b7beecff1f1b4ed5b639087e92c24b29fd59d8340f1d8ae102aac6 |
|
MD5 | 44131d1d7edcb4a3f612b1c1430b9f53 |
|
BLAKE2b-256 | 60948511471f59629eb7862e877dd0013706b9f612333a95a47503d455deecb1 |