Skip to main content

the coding style of hook

Project description

lifespan event hook

  • Inspired by the event-hook(callback) in javascript.
  • Zero third-party dependencies, only requires python>=3.10
  • Run lifespan functions, without mental burden of nested async/await
  • Just define event hook functions with async or not, then run flow with await or not
  • Lifespan functions can access the Context variable during the whole lifespan
  • Enjoy typing hint/check support

usage:

1. (Highly recommended)define async lifespan functions, run flow with await flow.run():

from hookio import FuncArgs,Context,Flow
import asyncio

async def main_func(x,y, context:Context[dict]):
        context.data['params']={'x':x, 'y':y}
        result=x/y
        print('main get result, but block 2s')
        await asyncio.sleep(2)
        context.data['result']=result
        print('main finished')
        return result
    
async def on_start(msg:int, context:Context[dict]):
    print(f'start: time:{msg}; ')
    await asyncio.sleep(2)
    print(f'start: context:{context} after 2s')

async def on_error(msg:str, context:Context[dict]):
    print(f'error: {context.info["error"]}; msg from user: {msg} ')

async def on_end(context:Context[dict]):
    print(f'end: get result from context: {context.data.get("result", None)}')
    print(f'end: full context: {context}')
    
flow=Flow[dict](
    main_logic = FuncArgs(main_func,(1,2)),
    on_start = FuncArgs(on_start,(0,)),
    on_end = FuncArgs(on_end,()),
    on_error=FuncArgs(on_error,('red',))
)

await flow.run()

# start: time:0; 
# start: context:Context(data={}, info={}) after 2s
# main get result, but block 2s
# main finished
# end: get result from context: 0.5
# end: full context: Context(data={'params': {'x': 1, 'y': 2}, 'result': 0.5}, info={})

2. define normal sync lifespan functions, run flow with flow.safe_run():

from hookio import FuncArgs,Context,Flow
import time

def main_func(x,y, context:Context[dict]):
        context.data['params']={'x':x, 'y':y}
        result=x/y
        print('main get result, but block 2s')
        time.sleep(2)
        context.data['result']=result
        print('main finished')
        return result
    
def on_start(msg:int, context:Context[dict]):
    print(f'start: time:{msg}; ')
    time.sleep(2)
    print(f'start: context:{context} after 2s')

def on_error(msg:str, context:Context[dict]):
    print(f'error: {context.info["error"]}; msg from user: {msg} ')

def on_end(context:Context[dict]):
    print(f'end: get result from context: {context.data.get("result", None)}')
    print(f'end: full context: {context}')
    
flow=Flow[dict](
    main_logic = FuncArgs(main_func,(1,2)),
    on_start = FuncArgs(on_start,(0,)),
    on_end = FuncArgs(on_end,()),
    on_error=FuncArgs(on_error,('red',))
)

flow.safe_run()

# start: time:0; 
# start: context:Context(data={}, info={}) after 2s
# main get result, but block 2s
# main finished
# end: get result from context: 0.5
# end: full context: Context(data={'params': {'x': 1, 'y': 2}, 'result': 0.5}, info={})

3. Mixing async/sync functions lifespan functions, run flow with flow.safe_run():

from hookio import FuncArgs,Context,Flow
import time
import asyncio

async def main_func(x,y, context:Context[dict]):
        context.data['params']={'x':x, 'y':y}
        result=x/y
        print('main get result, but block 2s')
        await asyncio.sleep(2)
        context.data['result']=result
        print('main finished')
        return result
    
def on_start(msg:int, context:Context[dict]):
    print(f'start: time:{msg}; ')
    time.sleep(2)
    print(f'start: context:{context} after 2s')

def on_error(msg:str, context:Context[dict]):
    print(f'error: {context.info["error"]}; msg from user: {msg} ')

def on_end(context:Context[dict]):
    print(f'end: get result from context: {context.data.get("result", None)}')
    print(f'end: full context: {context}')
    
flow=Flow[dict](
    main_logic = FuncArgs(main_func,(1,2)),
    on_start = FuncArgs(on_start,(0,)),
    on_end = FuncArgs(on_end,()),
    on_error=FuncArgs(on_error,('red',))
)

flow.safe_run()

# start: time:0; 
# start: context:Context(data={}, info={}) after 2s
# main get result, but block 2s
# main finished
# end: get result from context: 0.5
# end: full context: Context(data={'params': {'x': 1, 'y': 2}, 'result': 0.5}, info={})

4. let's trigger the function of on_error():

from hookio import FuncArgs,Context,Flow
import asyncio

async def main_func(x,y, context:Context[dict]):
        context.data['params']={'x':x, 'y':y}
        result=x/y
        print('main get result, but block 2s')
        await asyncio.sleep(2)
        context.data['result']=result
        print('main finished')
        return result
    
async def on_start(msg:int, context:Context[dict]):
    print(f'start: time:{msg}; ')
    await asyncio.sleep(2)
    print(f'start: context:{context} after 2s')

async def on_error(msg:str, context:Context[dict]):
    print(f'error: {context.info["error"]}; msg from user: {msg} ')

async def on_end(context:Context[dict]):
    print(f'end: get result from context: {context.data.get("result", None)}')
    print(f'end: full context: {context}')
    
flow=Flow[dict](
    main_logic = FuncArgs(main_func,(1,0)), ## this will raise error
    on_start = FuncArgs(on_start,(0,)),
    on_end = FuncArgs(on_end,()),
    on_error=FuncArgs(on_error,('red',))
)

await flow.run()

# start: time:0; 
# start: context:Context(data={}, info={}) after 2s
# error: division by zero; msg from user: red 
# end: get result from context: None
# end: full context: Context(data={'params': {'x': 1, 'y': 0}}, info={'error': ZeroDivisionError('division by zero')})

typing support

  • Contextsupport Generic('T'), and shared by all event-hook functions during while lifespan:
    • Context have two properties:
      • data
        • for user to read/write, (eg: record state, share msg)
        • can specify context.data's type by Context[int] or Context[dict[str,Any]]
      • info
        • this is an internally reserved field attribute,
        • initial value is a empty dict {}
        • waring: just read but not change it
from hookio import Context

async def on_end(context:Context[dict]):
    print(f'end: get result from context: {context.data.get("result", None)}')
    print(f'end: full context: {context}')

## `Flow` can also specify Generic
flow = Flow[dict](
    ...
    on_error=FuncArgs(on_error,('find_error',))
)

lifespan function definition

  • FuncArgs() accept two params
    • func: function name
    • args: tuple without context. if func only accept context, should passing it with a empty tuple ()
from hookio import FuncArgs

async def on_start(msg:int, context:Context[dict]):
    print(f'start: time:{msg}; ')
    await asyncio.sleep(2)
    print(f'start: context:{context} after 2s')

on_start = FuncArgs(on_start, (0,))

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

hookio-0.0.6-py3-none-any.whl (4.2 kB view details)

Uploaded Python 3

File details

Details for the file hookio-0.0.6-py3-none-any.whl.

File metadata

  • Download URL: hookio-0.0.6-py3-none-any.whl
  • Upload date:
  • Size: 4.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.10.14

File hashes

Hashes for hookio-0.0.6-py3-none-any.whl
Algorithm Hash digest
SHA256 aafd83285502e4805ac8ff6ebe302f8f827cde31428374a4c98e16c1f424fd35
MD5 ae1b25e75225ecfa45a001ab4c567cfc
BLAKE2b-256 bd8e94eada183ed341742d1fb63949dbe6229e92ae04238bcbe70dfc6a684953

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page