Skip to main content

the coding style of hook

Project description

lifespan event hook

  • Inspired by the event-hook(callback) in javascript.
  • Zero third-party dependencies, only requires python>=3.10
  • Run lifespan functions, without mental burden of nested async/await
  • Just define event hook functions with async or not, then run flow with await or not
  • Lifespan functions can access the Context variable during the whole lifespan
  • Enjoy typing hint/check support

usage:

1. define async lifespan functions, run flow with await flow.run()Highly recommended):

from hookio import FuncArgs,Context,Flow
import asyncio

async def main_func(x,y, context:Context[dict]):
        context.data['params']={'x':x, 'y':y}
        result=x/y
        print('main get result, but block 2s')
        await asyncio.sleep(2)
        context.data['result']=result
        print('main finished')
        return result
    
async def on_start(msg:int, context:Context[dict]):
    print(f'start: time:{msg}; ')
    await asyncio.sleep(2)
    print(f'start: context:{context} after 2s')

async def on_error(msg:str, context:Context[dict]):
    print(f'error: {context.info["error"]}; msg from user: {msg} ')

async def on_end(context:Context[dict]):
    print(f'end: get result from context: {context.data.get("result", None)}')
    print(f'end: full context: {context}')
    
flow=Flow[dict](
    main_logic = FuncArgs(main_func,(1,2)),
    on_start = FuncArgs(on_start,(0,)),
    on_end = FuncArgs(on_end,()),
    on_error=FuncArgs(on_error,('red',))
)

await flow.run()

# start: time:0; 
# start: context:Context(data={}, info={}) after 2s
# main get result, but block 2s
# main finished
# end: get result from context: 0.5
# end: full context: Context(data={'params': {'x': 1, 'y': 2}, 'result': 0.5}, info={})

2. define normal sync lifespan functions, run flow with flow.safe_run():

from hookio import FuncArgs,Context,Flow
import time

def main_func(x,y, context:Context[dict]):
        context.data['params']={'x':x, 'y':y}
        result=x/y
        print('main get result, but block 2s')
        time.sleep(2)
        context.data['result']=result
        print('main finished')
        return result
    
def on_start(msg:int, context:Context[dict]):
    print(f'start: time:{msg}; ')
    time.sleep(2)
    print(f'start: context:{context} after 2s')

def on_error(msg:str, context:Context[dict]):
    print(f'error: {context.info["error"]}; msg from user: {msg} ')

def on_end(context:Context[dict]):
    print(f'end: get result from context: {context.data.get("result", None)}')
    print(f'end: full context: {context}')
    
flow=Flow[dict](
    main_logic = FuncArgs(main_func,(1,2)),
    on_start = FuncArgs(on_start,(0,)),
    on_end = FuncArgs(on_end,()),
    on_error=FuncArgs(on_error,('red',))
)

await flow.run()

# start: time:0; 
# start: context:Context(data={}, info={}) after 2s
# main get result, but block 2s
# main finished
# end: get result from context: 0.5
# end: full context: Context(data={'params': {'x': 1, 'y': 2}, 'result': 0.5}, info={})

3. Mixing async/sync functions lifespan functions, run flow with flow.safe_run():

from hookio import FuncArgs,Context,Flow
import time
import asyncio

async def main_func(x,y, context:Context[dict]):
        context.data['params']={'x':x, 'y':y}
        result=x/y
        print('main get result, but block 2s')
        await asyncio.sleep(2)
        context.data['result']=result
        print('main finished')
        return result
    
def on_start(msg:int, context:Context[dict]):
    print(f'start: time:{msg}; ')
    time.sleep(2)
    print(f'start: context:{context} after 2s')

def on_error(msg:str, context:Context[dict]):
    print(f'error: {context.info["error"]}; msg from user: {msg} ')

def on_end(context:Context[dict]):
    print(f'end: get result from context: {context.data.get("result", None)}')
    print(f'end: full context: {context}')
    
flow=Flow[dict](
    main_logic = FuncArgs(main_func,(1,2)),
    on_start = FuncArgs(on_start,(0,)),
    on_end = FuncArgs(on_end,()),
    on_error=FuncArgs(on_error,('red',))
)

await flow.run()

# start: time:0; 
# start: context:Context(data={}, info={}) after 2s
# main get result, but block 2s
# main finished
# end: get result from context: 0.5
# end: full context: Context(data={'params': {'x': 1, 'y': 2}, 'result': 0.5}, info={})

4. let's trigger the function of on_error():

from hookio import FuncArgs,Context,Flow
import asyncio

async def main_func(x,y, context:Context[dict]):
        context.data['params']={'x':x, 'y':y}
        result=x/y
        print('main get result, but block 2s')
        await asyncio.sleep(2)
        context.data['result']=result
        print('main finished')
        return result
    
async def on_start(msg:int, context:Context[dict]):
    print(f'start: time:{msg}; ')
    await asyncio.sleep(2)
    print(f'start: context:{context} after 2s')

async def on_error(msg:str, context:Context[dict]):
    print(f'error: {context.info["error"]}; msg from user: {msg} ')

async def on_end(context:Context[dict]):
    print(f'end: get result from context: {context.data.get("result", None)}')
    print(f'end: full context: {context}')
    
flow=Flow[dict](
    main_logic = FuncArgs(main_func,(1,0)), ## this will raise error
    on_start = FuncArgs(on_start,(0,)),
    on_end = FuncArgs(on_end,()),
    on_error=FuncArgs(on_error,('red',))
)

await flow.run()

# start: time:0; 
# start: context:Context(data={}, info={}) after 2s
# error: division by zero; msg from user: red 
# end: get result from context: None
# end: full context: Context(data={'params': {'x': 1, 'y': 0}}, info={'error': ZeroDivisionError('division by zero')})

typing support

  • Contextsupport Generic('T'), and shared by all event-hook functions during while lifespan:
    • Context have two properties:
      • data
        • for user to read/write, (eg: record state, share msg)
        • can specify context.data's type by Context[int] or Context[dict[str,Any]]
      • info
        • this is an internally reserved field attribute,
        • initial value is a empty dict {}
        • waring: just read but not change it
from hookio import Context

async def on_end(context:Context[dict]):
    print(f'end: get result from context: {context.data.get("result", None)}')
    print(f'end: full context: {context}')

## `Flow` can also specify Generic
flow = Flow[dict](
    ...
    on_error=FuncArgs(on_error,('find_error',))
)

lifespan function definition

  • FuncArgs() accept two params
    • func: function name
    • args: tuple without context. if func only accept context, should passing it with a empty tuple ()
from hookio import FuncArgs

async def on_start(msg:int, context:Context[dict]):
    print(f'start: time:{msg}; ')
    await asyncio.sleep(2)
    print(f'start: context:{context} after 2s')

on_start = FuncArgs(on_start, (0,))

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

hookio-0.0.5-py3-none-any.whl (4.2 kB view details)

Uploaded Python 3

File details

Details for the file hookio-0.0.5-py3-none-any.whl.

File metadata

  • Download URL: hookio-0.0.5-py3-none-any.whl
  • Upload date:
  • Size: 4.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.10.14

File hashes

Hashes for hookio-0.0.5-py3-none-any.whl
Algorithm Hash digest
SHA256 ffe4a18de789fb114e26aa22d808124dde7a7e542bc9e4b92d613271d5c2738c
MD5 fa86bf102d83e2cffb37ee6340ca8603
BLAKE2b-256 472127e10b0cc642e8d91a3d94e1184b8bb74d659a0876c6628e3102ab5ddf12

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page