Skip to main content

Atila Framework

Project description

Atila

Atila is simple and minimal framework integrated with Skitai App Engine. It is the easiest way to make backend services.

# serve.py
from atila import Atila

app = Atila (__name__)

@app.route ("/")
def index (was):
    return "Hello, World"

if __mame__ == "__main__":
    import skitai
    skitai.mount ("/", app)
    skitai.run (port = 5000)

And run,

python3 serve.py

Important Notice

CAUTION: Atila is base on WSGI but can be run only with Skitai App Engine.

This means if you make your Atila app, you have no choice but Skitai as WSGI app server. And Atila's unique and unconventional style may become very hard work to port to other framework.

Unconventional?

Atila is based on WSGI specification but take some advantage of asynchronous features which are provided from Skitai.

Atila is mostly same as other WSGI containers for functional and API aspect.

Otherwise Atila has asynchronous features but it does not use async/await conventions. it sometime use generator base coroutine with yeild.

Atila treats async jobs like these as a Task.

  • HTTP Request
  • Database Query
  • Thread
  • Process
  • Subprocess

Task has 3 major common methods: fetch(), one(), commit(). Multiple tasks are bind into Tasks. Tasks also have the same 3 methods.

Tasks are passed over to main event loop of main thread from current request thread. If all tasks finished, Tasks will be returned to request thread. Also by yielding, Tasks can be completely transitted into non blocking manner.

I'm not telling Atila is more better design. I'm just telling Atila is obviously NOT main-stream and I enjoy this project.

Installation

Requirements

Python 3.6+

Installation

Atila and other core base dependent libraries is developing on single milestone, install/upgrade all at once. Otherwise it is highly possible to meet some errors.

With pip

pip3 install -U atila skitai rs4 aquests

With git

pip3 install -U skitai rs4 aquests sqlphile
git clone https://gitlab.com/hansroh/atila.git
cd atila
pip3 install -e .

Optional required as you need,

pip3 install redis
pip3 install pymongo
pip3 install psycopg2-binary
pip3 install protobuf # for GRPC
pip3 install jsonrpclib-pelix
pip3 install jinja2

First App

Project Structure

The very formal but minimal project structure for myapp.

myapp/
  static/
  templates/
  __init__.py
tests
serve.py
README.md

Launch Script

# serve.py
import skitai
import myapp

if __name__ == '__main__':
    with skitai.preference () as pref:
        skitai.mount ('/', myapp, pref)

pref is runtime app preference. It will be overwrite its attributes to app on mounted. And have some methods like set_static().

Create App

# myapp/__init__.py
import skitai
from atila import Atila

def __config__ (pref):
    # mount / to myapp/static and app.STATIC_URL will be set to '/'
    # skitai.abspath () will join path with location of serve.py
    # DO NOT use os.path.abspath (__file__), it is not reliable
    pref.set_static ('/', skitai.abspath ('myapp/static'))

def __app__ ():
    return Atila (__name__)

def __mount__ (app, mntopt):
    @app.route ('/')
    def index (was):
        return 'Index Page'

    @app.route ('/sub')
    def sub (was):
        return 'Sub Page'

Make sure that myapp/static/index.html doses not exists. If so static file has more high priority.

Then run.

python3 serve.py --devel # --devel swiach mean debug mdde and auto reloading

Please see Skitai App Engine to more detail usage.

Templates

pip3 install jinja2
<!-- myapp/templates/index.j2 -->
<html>
    <head><title>My App</title></head>
    <body>
        <h1>Index Page</h1>
    </body>
</html>
def __mount__ (app, mntopt):
    @app.route ('/')
    def index (was):
        return was.render ('index.j2')

Cascaded Service Mount

For larger app, make myapp/services directory.

myapp/
  static/
  templates/
  services/
  __init__.py
# myapp/__init__.py
# remove __mount__ ()
import skitai
from atila import Atila

def __config (pref):
    pref.set_static ('/', skitai.abspath ('myapp/static'))

def __app__ ():
    return Atila (__name__)

def __setup__ (app, mntopt):
    from . import services
    app.mount ('/', services)
# NEW myapp/services/__init__.py
def __mount__ (app, mntopt):
    @app.route ('/')
    def index (was):
        return 'Index Page'

    @app.route ('/sub')
    def sub (was):
        return 'Sub Page'

Seperate '/sub' as sub module.

# modify myapp/services/__init__.py
# add __setup__ hook
def __setup__ (app, mntopt):
    from . import sub
    app.mount ('/sub', sub)

def __mount__ (app, mntopt):
    @app.route ('/')
    def index (was):
        return 'Index Page'
# NEW myapp/services/sub.py
def __mount__ (app, mntopt):
    # make sure blank path for accessing '/sub'
    # if '/', accessable URL become '/sub/'
    @app.route ('')
    def sub (was):
        return 'Sub Page'

Add new '/sub2' as sub package

Assume /sub2 is more large and complex, and you want to make as sub package.

# modify __setup__ of myapp/services/__init__.py
def __setup__ (app, mntopt):
    from . import sub
    from . import sub2

    app.mount ('/sub', sub)
    app.mount ('/sub2', sub2)
# NEW myapp/services/sub2/__init__.py
def __setup__ (app, mntopt):
    from . import pages
    app.mount ("/sub2/pages", pages)
# NEW myapp/services/sub2/pages.py
def __mount__ (app, mntopt):
    @app.route ('') # /sub2/pages
    def pages (was):
        return 'Pages'

    @app.route ('/1') # /sub2/pages/1
    def page1 (was):
        return 'Page1'

finally, mayapp structure is:

myapp/
  static/
  templates/
  services/
    sub.py
    sub2/
      __init__.py
      pages.py
  __init__.py

Other Hooks

Currently only one other hook.

def __umount__ (app): # NO mntopt argument
    print ('bye...')

More About Preference

with skitai.preference () as pref:
    pref.set_static ('/static', 'myapp/static')
    pref.set_media ('/media', '/mnt/data/media')
    pref.config.MAX_UPLOAD_SIZE = 256 * 1024 * 1024
    pref.config.MAINTAIN_INTERVAL = 60

You can add your options to pref.config and attibute to pref and you can sure accessing from app.config and app attribute later.

Conclusion

  1. Mount sub package in __setup__ hook and make services at __mount__.
  2. DO NOT mess with sub package in __mount__.
  3. __config__ and __app__ can be in app initializing script ONLY.

Event Bus

Atila's event bus is based on event-bus.

# wherever in your app package,
def __setup__ (app, mntopt):
    @app.on ('custom-event')
    def on_cusom_event (was, detail):
        print ('I got custom-event with detail: {}'.format (detail))
# wherever in your app package,
def __mount__ (app, mntopt):
    @app.route ('/make_event')
    def make_event (was):
        app.emit ('custom-event')
        return "OK"

App Life Cycle Events

There're serveral pre-defined events related app life cycle.

  • before_mount
  • mounted
  • before_reload
  • reloaded
  • mounted_or_reloaded
  • before_umount
  • mounted
# wherever in your app package,

def load_my_models ():
    ...

def __setup__ (app, mntopt):
    @app.on ('before_mount')
    def on_before_mount (was):
        load_my_models ()

Extending and Overriding

Big difference is which has app and drive services.

Extending

pip3 install atila_vue

atila_vue is base project templates and no useful services. It must be implemented by your app.

import skitai
import atila_vue
import myapp

if __name__ == '__main__':
    with sktai.oreference () as pref:
        pref.extends (atila_vue)
        skitai.mount ('/', myapp, pref)

myapp extends atila_vue and can use all atila_vue's services, static files and templates whuch myapp hasn't.

Overriding

pip3 install tfserver

tfserver has own useful services and you can add your custom services.

import skitai
import tfserver
import myapp

if __name__ == '__main__':
    with sktai.oreference () as pref:
        pref.overrides (myapp)
        skitai.mount ('/', tfserver, pref)

myapp overrides tfserver's services, static files and templates if same resource paths.

Conclusion

myapp, tfserver and atila_vue are very same project structure as I wrote above. If your app is useful to the others, please publish to PyPI.

Working With Multiple Apps

Skitai can mount multiple apps including Atila apps. Atila apps sometimes need cumminicate each others.

Event Subscription

# serve.py
import skitai
import myapp
import tfserver

if __name__ == '__main__':
    with sktai.oreference () as pref:
        skitai.mount ('/tf', tfserver, pref, name = 'tfserver')
    with sktai.oreference () as pref:
        skitai.mount ('/', myapp, pref, subscribe = ['tfserver'])

myapp can receive all event from tfserver. For example, tfserver emit tfserver:model-reloaded.

# wherever in myapp,
def __setup__ (app, mntopt):
    @app.on ('tfserver:model-reloaded')
    def on_tfserver_model_reloaded (was, model_name):
        jobs_as_you_need ()

Data Exchanging

was.g is multiple apps shared dictionary like object.

# wherever in tfserver app
@route ('/')
def index (was):
    was.g ['LOADED_MODELS'] = ['a', 'b']
# wherever in myapp
@route ('/')
def index (was):
    return '|'.join (was.g ['LOADED_MODELS'])

Accesing Other App Directly

@route ('/')
def index (was):
    other_app = was.apps ['tfserver']

Very First Argument 'was'

What is was as first argument of mostly hooks, event_handlers and services?

was contains request and response related objects and services.

It is created per worker threads and never destoyed in normal sutuation. Sometimes clones will be created, cached and destroyed by need-to base.

Actually, it is just bunch of short-cuts to use request processing resources.

In this chapter, we just need to know briefly and partially.

Request

  • was.request: current request object
  • was.app: current app which process request
  • was.g: multiple apps shared dictionary like object

Responses

  • was.render ()
  • was.API ()
  • was.Error ()
  • and lost of responses

Coroutines

  • was.Task
  • was.Tasks ()

And several helper methods.

Processing Request

Request Life Cycle Hooks

  • before_request
  • finish_request
  • failed_request
  • teardown_request
def __setup__ (app, mntopt):
    @app.on ('before_mount')
    def on_before_mount (was):
        app.g ['TOTAL_REQUEST'] = 0
        app.g ['FAILED_REQUEST'] = 0

    @app.before_request
    def before_request (was):
        app.g ['TOTAL_REQUEST'] += 1

    @app.failed_request
    def failed_request (was, expt): # expt is sys.exc_info ()
        app.g ['FAILED_REQUEST'] += 1

Composit your own middlewares.

class JWTTokenChecker:
    def __call__ (self, was):
        if something_is_wrong ():
            raise was.Error ("400 Bad Request")

def __setup__ (app, mntopt):
    MIDDLEWARES = [
        JWTTokenChecker (),
        PermissionHandler ()
    ]
    @app.before_request
    def before_request (was):
        for middleware in MIDDLEWARES:
            middleware (was)

Request Life Cycle Events

  • before_request
  • finish_request
  • failed_request
  • teardown_request
def __setup__ (app, mntopt):
    @app.on ('before_request')
    def on_before_request (was):
        app.g ['TOTAL_REQUEST'] += 1

Hooks can return content or error, but events cannot because they are async. If event handling is failed, just logged traceback information.

App & Request Gloabal

  • app.g
  • was.request.g
@app.before_request
def before_request (was):
    app.g ['TOTAL_REQUEST'] = 0
    was.request.g ['START'] = time.time ()

@app.teardown_request
def teardown_request (was):
    app.g ['TOTAL_REQUEST'] += 1
    was.log ('processing time: {:.5f} seconds.'.format (
        time.time () - was.request.g ['START']
    ))

Request Object

was.request

Basic Members

  • was.request.method # upper case GET, POST, ...
  • was.request.uri
  • was.request.version # HTTP Version, 1.0, 1.1, 2.0, 3.0
  • was.request.scheme # http or https
  • was.request.ARGS
  • was.request.JWT
  • was.request.env
  • was.request.g
  • was.request.cookie
  • was.request.session
  • was.request.mbox

Basic Methods

  • was.request.split_uri () # (script, param, querystring, fragment)
  • was.request.get_header ("content-type") # case insensitive
  • was.request.get_headers () # retrun header all list
  • was.request.get_body ()
  • was.request.get_remote_addr ()
  • was.request.get_user_agent ()
  • was.request.get_content_type ()
  • was.request.get_main_type ()
  • was.request.get_sub_type ()

Route Options

  • was.request.routed # routed function
  • was.request.routable # {'methods': ["POST", "OPTIONS"], 'content_types': ["text/xml"], 'options': {...}, 'mntopt': {...}}

Environment Variables

was.request.env (alias: was.env)

was.env.get ('HTTP_USER_AGENT', 'Unknown Agent')

Routing

@app.route ("/hello")
def hello_world (was):
    return was.render ("hello.j2")

Give sime restirction.

@app.route ("/hello", methods = ["GET"]) #if not match response 405
@app.route ("/hello", content_types = ["application/json"]) #if not match response 406

Request Parameters

@app.route ("/hello", methods = ["POST", "OPTIONS"])
def hello_world (was, name, birth_year, gender = None):
    return was.render ("hello.j2", name = name, birth_year = birth_year, gender = gender)
@app.route ("/hello", methods = ["POST", "OPTIONS"])
def hello_world (was, name, **DATA):
    return was.render ("hello.j2", name = name, DATA = DATA)
@app.route ("/profiles/<int:id>", methods = ["GET"])
def hello_world (was, id, detail = 'yes'):
    return was.render ("profile.j2", id = id, detail = detail)

It is valid:

  • /profiles/123?detail=no
  • /profiles/me?detail=no
  • /profiles/notme?detail=no
  • /profiles/new?detail=no
@app.route ("/profiles/<int:id>", methods = ["GET"])
def hello_world (was, id = None, detail = 'yes'):
    return was.render ("profile.j2", id = id, detail = detail)

It is valid additionaly:

  • /profiles?detail=no

Parameter and Validation

@app.route ("/profiles/<int:id>", methods = ["GET"])
@app_inspect (ints = ['id'], booleans = ['detail'])
def hello_world (was, id = None, detail = False):
    return was.render ("profile.j2", id = id, detail = detail)
# MUST use with keyword argument
@app.inspect (
    ints = None, floats = None,
    strings = None, booleans = None,
    emails = None, uuids = None, nones = None, lists = None,
    dicts = None, oneof = None, manyof = None,
    notags = None, safes = None,
    **kargs
)

Examples:

- strings = ['name', 'gender']
- notags = ['resume'] # not allow html tags
- safes = ['resume'] # not allow <script> or javascript things
- nones = ['gender', 'birth_year'] if value is blank or zero like, make to None
- booleans = ['detail'] # valid if True, False, true, false, yes, no, y, n, true, false, t and f
- oneof = ['email', 'mobile']

Keyword Argument Examples:

- birth_year = int # type
- mobile = re.compile ('[0-9]{3}-[0-9]{3}-[0-9]{4}') # an object has .search () method
- birth_year__between = (1900, dt.now ().year)
- gender__in = ['male', 'female', 'not-determine']
- name__len__lte = 64
- name__len__gt = 3
- name__len__between = (3, 64)

Avaliable double low-dash operator:

  • len__
  • between
  • in
  • notin
  • eq, exact
  • neq
  • lte
  • lt
  • gte
  • gt
  • contains
  • notcontain
  • startswith
  • notstartwith
  • endswith
  • notendwith
  • regex

File Upload

@app.route ("/upload", methods = ['POST'])
def upload (was, file):
    file.save ("/var/tmp/upload", dup = "o")
  • file.path: temporary saved file full path
  • file.name: original file name posted
  • file.size
  • file.mimetype
  • file.ext: file extension
  • file.read ()
  • file.move (to)
  • file.as_flashfile ()
  • file.save (into, name = None, mkdir = False, dup = "u")
  • file.remove ()

Streaming Request Data

@app.route ("/bistreaming", methods = ['POST'], input_stream = True)
def streaming (was):
    buf = []
    while 1:
        data = yield was.Input (4096)
        if not data:
            break
        buf.append (data)
    return b''.join (buf)

Caution: Be careful to use request streaming. Request streaming need only a few specific conditions.

  1. Small chunked request data which is intermittent and need long terms connection like receiving GPS coordinate data from client device
  2. Bidirectional streaming like detectecting silence for 10~30ms segments of audio data stream. See next Bidirectional Streaming topic.

If you just want upload data, just use regular POST upload method. DO NOT use request streaming which may cause event loop blocking and also is very inefficient.

Cookie

was.request.cookie (alias: was.cookie): dicionary like object

if "user_id" not in was.cookie:
    was.request.cookie.set ("user_id", "hansroh")
    # or
    was.request.cookie ["user_id"] = "hansroh"
was.cookie.set (
    key, val,
    expires = None,
    path = None, domain = None,
    secure = False, http_only = False
  )

'expires' args is seconds to expire.

  • if None, this cookie valid until browser closed
  • if 0 or 'now', expired immediately
  • if 'never', expire date will be set to a hundred years from now

If 'secure' and 'http_only' options are set to True, 'Secure' and 'HttpOnly' parameters will be added to Set-Cookie header.

If 'path' is None, every app's cookie path will be automatically set to their mount point.

Session

was.request.session (alias: was.session): dicionary like object

To enable session for app, random string formatted securekey should be set for encrypt/decrypt session values.

with skitai.preference () as pref:
    pref.securekey = "ds8fdsflksdjf9879dsf;?<>Asda"
    pref.session_timeout = 1200 # sec
@app.route ("/session")
def hello_world (was, **form):
    if "login" not in was.session:
        was.session.set ("user_id", form.get ("hansroh"))
        # or
        was.session ["user_id"] = form.get ("hansroh")

If you set, alter or remove session value, session expiry is automatically extended by app.session_timeout. But just getting value will not be extended. If you extend explicit without altering value, you can use touch() or set_expiry(). session.touch() will extend by app.session_timeout. session.set_expiry (timeout) will extend by timeout value.

Once you set expiry, session auto extenstion will be disabled until expiry time become shoter than new expiry time is calculated by app.session_timeout.

Namespaced Session

@app.route("/")
def index (was):
    was.session.mount ("ADMIN", path = '/admin')
    was.session.set ("login", True)
    was.session.mount () # resore to default session

Additional methods:

  • was.session.set_expiry (timeout)
  • was.session.touch ()
  • was.session.expire ()
  • was.session.use_time ()
  • was.session.impending (): if session timeout remains 20%
  • was.session.mount ( name = None, session_timeout = None, securekey = None, path = None, domain = None, secure = False, http_only = False, extend = True )

Message Box

was.request.mbox (alias: was.mbox)

@app.route ("/msg")
def msg (was):
    was.mbox.send ("This is Flash Message", "flash")
    was.mbox.send ("This is Alert Message Kept by 60 seconds on every request", "alram", valid = 60)
    return was.redirect (was.urlfor (showmsg, "Hans Roh"), status = "302 Object Moved")

@app.route ("/showmsg")
def showmsg (was, name):
    return was.render ("msg.htm", name=name)
<ul>
    {% for mid, category, created, valid, msg, extra in was.mbox.get ("notice", "news") %}
      <li class="{{category}}"> {{ msg }}</li>
    {% endfor %}
</ul>
  • was.mbox.send (msg, category, valid_seconds, **extra_dict)
  • was.mbox.get () return [(message_id, category, created_time, valid_seconds, msg, extra_dict)]
  • was.mbox.get (category) filtered by category
  • was.mbox.get (key, val) filtered by extra_dict key and value pair
  • was.mbox.search (key, val): find in extra_dict. if val is not given or given None, compare with category name. return [message_id, ...]
  • was.mbox.remove (message_id)

Piping

was.pipe () can call function by resource names. This make call nested function within __mount__ (app) in another module.

# services/__init__.py
def __mount__ (app, mntopt):
    @app.route ("/1")
    @app.inspect (offset = int)
    def index (was, offset = 1):
        return was.API (result = offset)

    @app.route ("/2")
    def index2 (was):
        return was.pipe (index)

    @app.route ("/3")
    def index3 (was):
        return was.pipe (index, offset = 4)

    @app.route ("/4")
    def index4 (was):
        return was.pipe ('index', offset = 't')

    @app.route ("/5")
    def index4 (was):
        return was.pipe ('sub.pages.toc', chapter = 5)
# services/sub/pages.py
def __mount__ (app, mntopt):
    @app.route ("/pages/toc/<int:chapter>")
    def toc (was, chapter):
        return "Page tOC Chapter {}".format (chapter)

Making URL

was.static ('img/logo.png')
was.urlfor ("index")
was.urlfor ("sub.pages.toc", 1)
was.urlfor ("sub.pages.toc", chapter = 1)
was.urlfor ("/index.htm") # hard coded absolute URL, MUST start with /

Helpers

Testing

def is_superuser (was):
    if was.user.name not in ('admin', 'root'):
        reutrn was.response ("403 Permission Denied")

@app.testpass_required (is_superuser)
def modify_profile (was):
    ...

Authorization & Authentification

Let's assume you manage permission by user levels: admin, staff, owner and user.

class User:
    def __init__ (self, uid, lev, nick_name):
        self.uid = uid
        self.lev = lev
        self.nick_name = nick_name
        self.tuid = None

    def __str__ (self):
        return self.uid

@app.permission_handler
def permission_handler (was, perms):
    if was.request.get_header ('authorization'):
        claims = was.dejwt ()
        if "err" in claims:
            raise was.Error ("401 Unauthorized", claims ["err"])
        was.request.user = User (claims ['uid'], claims ['lev'], claims ['nick_name'])

    if 'uid' in was.request.PARAMS:
        tuid = was.request.PARAMS ['uid']
        if 'owner' in perms and tuid != 'me':
            raise was.Error ("403 Permission Denied", "owners only operation")
        was.request.user.tuid = (tuid == 'me' and was.request.user.uid or (tuid != 'notme' and tuid or None))

    if not perms:
        return

    if was.request.user.lev == "staff":
        return # always vaild
    if "staff" in perms:
        raise was.Error ("403 Permission Denied")
@app.route ("/animals/<id>")
@app.permission_required ([], id = ["staff"])
def animals (was, id = None):
    id = id or was.request.JWT ["userid"]

This resources required any permission for "/animals/" or "/animals/me". But '/animals/100' is required 'staff' permission. It may make permission control more simpler.

Also you can specify premissions per request methods.

@app.route ("/animals/<id>", methods = ["POST", "DELETE"])
@app.permission_required (['user'], id = ["staff"], DELETE = ["admin"])
def animals (was, id = None):
    id = id or was.request.JWT ["userid"]

This resources required 'user' permission for "/animals/" or "/animals/me". '/animals/100' is required 'staff' permission. It may make permission control more simpler.

Conditional Prework

def reload_config (was, path):
    ...

@app.if_file_modified ('/opt/myapp/config', reload_config, interval = 1)
def index (was):
    return was.render ('index.html')
def broadcast (was, name):
    ...

@app.if_updated ('STATE', broadcast, interval = 1)
def index (was):
    return was.render ('index.html')

Run Pre/Postworks Chain

You can make automation for preworks and postworks.

def pre1 (was):
    ...

def pre2 (was):
    ...

def post1 (was):
    ...

@app.route ('/')
@app.run_before (pre1, pre2)
@app.run_after (post1)
def index (was):
    return was.render ('index.html')
  • @app.run_before can return None or responsable contents for aborting all next run_before and main request.
  • @app.run_after return will be ignored

CORS (Cross Origin Resource Sharing) and Preflight

with skitai.preference () as pref:
    pref.access_control_allow_origin = ["*"]
    # OR
    pref.access_control_allow_origin = ["http://www.skitai.com:5001"]
    pref.access_control_max_age = 3600

If you want function specific CORS,

@app.route (
    "/post", methods = ["POST", "OPTIONS"],
    access_control_allow_origin = ["*"], access_control_max_age = 3600
)
def post (was):
    return 'hello'

Using Task

Task is a part of request processing and also related with generating response. So you should look into this first.

Resource Aliasing

with skitai.preference () as pref:
    skitai.alias ("@mypg", skitai.DB_PGSQL, ["user:pass@localhost/mydb"])
    skitai.alias ("@mylite", skitai.DB_SQLITE3, ["./sqlite3.db"])
    skitai.alias ("@mymongo", skitai.DB_MONGODB, ["localhost/mycollection"])
    skitai.alias ("@myredis", skitai.DB_REDIS, ["localhost/0"])
    skitai.alias ('@myapi', skitai.PROTO_HTTPS, ["s1.myserver.com"])

Task

Task is sort of coroutine. Task has methods:

  • fetch (): return iterable
  • one (): should be single element
  • commit (): return None, just wait to complete

HTTP Based Protocols

with was.stub ('@myapi/api') as stub:
    task1 = stub.get ('/profiles/{}?limit={}', 100, 10)
    payload = dict (name = 'Hans Roh')
    task2 = stub.put ('/profiles/{}', 100, payload)

with was.xmlrpc ('@myapi/rpc2') as stub:
    task3 = stub.add (156, 952)
task1.fetch ()
task2.fetch ()
task3.fetch ()
pip3 install tfserver
from tfserver import cli
with was.grpc ("@myapi/tensorflow.serving.PredictionService") as stub:
    x = get_numpy_data ()
    request = cli.build_request ('mymodel', 'predict', x = x)
    task = stub.Predict (request, 10.0)

PostgreSQL / SQLite3

# PostgreSQL and SQLite3
with was.db ('@mypg') as db:
    task = (db.select ('my_table').
                .filter (name = 'Hans')
                .with_ ('a', db.insert ('my_table').set (name = 'Hans'))
            ).execute ()

MongDB

with was.db('@mymongo') as db:
    task = db.find ({'city': 'New York'})

Redis

with was.db ("@myredis") as db:
    task1 = db.set('foo', 'bar')
    task2 = db.get('foo')

Thread / Process / Subrocess

task = was.Thread (my_func, arg1, arg2)
task = was.Process (my_func, arg1, arg2)
task = was.Subprocess ('top -n1', timeout = 3)

Mask

It is fake of Task(s).

You can make it by wrapping was.Mask (data) if you want to use consistant methods as Task.

task = was.Mask (1)
result = task.fetch () # 1
tasks = was.Mask ([1, 2])
result1, result2 = tasks.fetch () # 1, 2

Tasks

with was.db('@mymongo') as db:
    task1 = db.find ({'city': 'New York'})
with was.stub ('@myapi/api') as stub:
    task2 = stub.get ('/profiles/{}?limit={}', 100, 10)
task3 = was.Process (my_func, arg1, arg2)

tasks = was.Tasks ([task1, task2, task3])
a, b, c = tasks.fetch () # got 3 results
tasks = was.Tasks (
    db.insert ('city').set (name = 'Hans').execute (),
    files = was.Subprocess ('ls /var/log', filter = lammda x: x.replace (' ', '_')),
    result = was.db ('@mydb', filter = hide_password).execute ('select * from user')
)
results = tasks.fetch ()
results ['files']
results ['result']

Generator Based Coroutine

New in version 0.8.6

If you create single task and call fetch () at just next line, It is alomost same as synchronous blocking job. But by yielding Task, it becomes fully async task.

Add coroutine = True parameter to @app.route to distinguish native generator.

@app.route ("/coroutine/2", coroutine = True)
def coroutine (was):
    with was.stub ("http://example.com") as stub:
        task = yield stub.get ("/")
    return task.fetch ()

Note: DO NOT make blocking jobs after first yield. If MUST do, yield was.Thread() or was.Process().

@app.route ("/coroutine", coroutine = True)
def coroutine (was):
    def wait_hello (timeout = 1.0):
        time.sleep (timeout)
        return 'mask'

    # do sync tasks in thread
    some_sync_task ()

    # swith to async event loop in main thread
    tasks = yield was.Tasks (
        a = was.Mask ("Example Domain"),
        b = was.Thread (wait_hello, args = (1.0,))
    )
    task3 = yield was.Thread (wait_hello, args = (1.0,))
    task4 = yield was.Subprocess ('ls')

    # finally
    return was.API (d = task4.fetch (), c = task3.fetch (), **tasks.fetch ())

Why does not use await? To use await also use async def. But Atila doesn't want to disturb legacy sync tasks. So every requests will be passed over to thread pool initially and you can decide to keep sync manner or switch to async.

Map / Load-Balancing

Set multiple members with weight.

with skitai.preference () as pref:
    members = ["user:pass@localhost/mydb 20", "user:pass@remote.com/mydb 10"]
    skitai.alias ("@mypg", skitai.DB_PGSQL, members)

There're 2 more methods for request status check.

  • dispatch ()
  • wait ()
# map-merging
with was.stub.map ('@myapi') as stub:
    task = stub.get ('@myapi/profiles/{}?limit={}', 100, 10)
    all_results = []
    for resp in task.dispatch ():
        if resp.status != skitai.STA_NORMAL:
            continue
        all_results.extend (resp.data)

response.status:

  • STA_UNSENT
  • STA_REQFAIL
  • STA_TIMEOUT
  • STA_NETERR
  • STA_NORMAL
# load-balancing is the very same as single call
with was.stub.lb ('@myapi') as stub:
    task = stub.get ('@myapi/profiles/{}?limit={}', 100, 10)
    task.fetch ()

Cache Control

# myapp/__init__.py
import skitai
def __config__ (pref):
    skitai.register_g ('tables.users', 'table.photos')
@app.route ("/update")
def update (was):
    # update tables.users modification time
    was.db ('@mydb', rm_cache = 'tables.users').execute (...)

@app.route ("/query")
def query (was):
    # use cache if tables.users modification time is older
    was.db ('@mydb', use_cache = 'tables.users').execute (...)

@app.route ("/query2")
def query2 (was):
    # check all times are older
    was.db ('@mydb', use_cache = ['tables.users', 'table.photos']).execute (...)

This uses skitai.g object which is shared by all Skitai multiprocessing workers. So a worker update table, also all others can know that.

Also if rm_cache is used, it will emit events. You can handle event if you need. But this event system. But please note that this event notified within its worker process.

@app.route ("/update")
def update (was):
    was.db ('@mydb').execute (...)
    was.setlu ('tables.users', something...)

@app.on ("setlu:tables.users")
def table_updated (was, *args, **kargs):
    # your code

Adding Custom Database Interface For Atila Coroutine

You can override existing classes - RDBMS, NoSQL (Redis, MongoDB) styles.

from aquests.dbi import asynredis

class MyDBI (asynredis.AsynConnect):
    def __init__ (self):
      ...

DB_MYDBI = '*mydbi'
skitai.add_database_interface (DB_MYDBI, MyDBI)
skitai.alias ('@mydbi', DB_MYDBI, 'localhost:9000')

Making Response

HTTP Error

raise was.Error ("400 Bad Request")
raise was.Error ("400 Bad Request", 40011)
raise was.Error ("400 Bad Request", 40011, "missing user ID")

Custom Error Handlers

Atila response as Accept header, this is an example for HTML services.

<h1>{{ error.code }} {{ error.message }}</h1>
<p>{{ error.detail }}</p>
<hr>
<div>URL: {{ error.url }}</div>
<div>Time: {{ error.time }}</div>
@app.default_error_handler
def default_error_handler (was, error):
    return was.render ("errors/default.j2", error = error)

@app.error_handler (404)
def not_found (was, error):
    return was.render ('404.j2', error = error)

Redirecting

return was.redirect (
    was.urlfor ("showmsg", "Hans Roh"),
    status = "302 Object Moved"
)

return was.redirect (
    was.static ("index.html"),
    status = "302 Object Moved"
)

Primitive

String

return "hello"
return was.response (status = 200 OK, body = 'Hello', headers = [('X-Server', 'Skitai')])
was.response.set_status ('200 OK')
was.response.set_header ('X-Server', 'Skitai')
return "hello"

API Response

JSON or XML response as request's Accept header.

return was.API ({'name': Hans Roh})
return was.API (name = Hans Roh)

Rendered Template

return was.render ("hello.j2", name = 'Hans Roh')
<!-- hello.j2 -->
<h1>Hello {{ context.name }}</h1>

render_or_API

Response by Accept header,

return was.render_or_API ("index.html", result = result)

File

return was.File ("/path/to/data.xlsx")
return was.File ("/path/to/data.xlsx", 'application/octet-stream', 'data.xlsx')

Static

# by static file URL
# resolved into was.static (""img/logo.png")
return was.Static ("img/logo.png")
@app.route ("/robots.txt")
def robots (was):
    if app.debug:
        was.response ['Content-Type'] = 'text/plain'
        return "User-Agent: *\nDisallow: /\n"
    return was.Static ('/robots.real.txt')

Generator

def build_csv (was):
  def generate():
    for row in iter_all_rows():
      yield ','.join(row) + '\n'
  return was.response ("200 OK", generate (), headers = [("Content-Type", "text/csv")])

RPC Response

By request's content type, Atila suppert XMLRPC, JSONRPC, GRPC.

# for jsonrpclib
pip3 install jsonrpclib-pelix
# XML/JSONRPC
@app.route ('/add')
def add (was, a, b):
    return a + b # int is serailizable

# GRPC
@app.route ("/GetFeature")
def GetFeature (was, point):
	feature = get_feature(db, point)
	if feature is None:
		return route_guide_pb2.Feature(name="", location=point)
	else:
		return feature

You nothing to do for this, just return type is serailizable by each RPC protocol.

Coroutine

Task / Tasks

If you need post processing data,

def repond (was, task):
    return was.API (result = task.fetch (), a = task.meta.get ('a'))

@app.route ('...')
def foo ():
    task = was.db ("@sqlite3").execute ("select * from test")
    return task.then (respond)

@app.route ('...')
def foo ():
    return was.Process (math.sqrt, args = (4.0,), meta = {'a': 1}).then (respond)

@app.route ('...')
def foo ():
  def sqrt (a):
    return math.sqrt (a)
  return was.Thread (sqrt, args = (4.0,)).then (respond)

Map

No need post processing,

@app.route ('/datalist')
def datalist (was):
    task = was.db ("@sqlite3").execute ("select * from test")
    return was.API (result = task.fetch ())

# same result but transit to async
# return thread to pool early
@app.route ('/datalist')
def datalist (was):
    task = was.db ("@sqlite3").execute ("select * from test")
    return was.Map (result = task)
@app.route ("/bench/sp", methods = ['GET'])
def bench_sp (was):
    with was.db ('@mydb') as db:
        root = (db.select ("foo")
                    .order_by ("-created_at")
                    .limit (10)
                    .filter (Q (from_wallet_id = 8) | Q (detail = 'ReturnTx')))

        return was.Map (
            was.Thread (time.sleep, args = (0.3,)), # no need map
            files = was.Subprocess ('ls /var/log'),
            result = root.clone ().execute (),
            record_count__one = root.clone ().aggregate ('count (id) as cnt').execute ()
        )
        # JSON response, 1st args had been executed but ignored in results because no map name
        # >> { result: [...],  record_count: {cnt: 123}, ls: 'syslog ...' }

But simple processing is possible with filter.

def hide_password (rows):
  for row in rows:
    row.password = '****'
  return rows

return was.Map (
  files = was.Subprocess ('ls /var/log', filter = lammda x: x.replace (' ', '_')),
  result = was.db ('@mydb', filter = hide_password).execute ('select * from user')
)

By Accept header,

@app.route ('/')
def index (was, error):
  return was.render_or_Map ("index.j2", result = db.execute ('...'))

Mapped

tasks = was.Tasks (
    files = was.Subprocess ('ls /var/log', filter = lammda x: x.replace (' ', '_')),
    result = was.db ('@mydb', filter = hide_password).execute ('select * from user')
)
return was.Mapped (tasks)

By Accept header,

@app.route ('/')
def index (was, error):
    tasks = was.Taks (result = db.execute ('...'))
    return was.render_or_Mapped ("index.j2", tasks)

ThreadPass

@app.route ("/thread_future", methods = ['GET'])
def thread_future_respond (was):
    def respond (was, a):
        a = some_synchronous_task ()
        # you can make corequest
        was.db ('@mydb').execute (...).commit ()
        return was.API (
          result = a
        )
    return was.ThreadPass (respond, args = (4.0,))

Treaded Data Streaming

class Producer:
  def get (self, n):
    return [random.randrange (1000) for _ in range (n)]

def producing (producer):
    def produce (queue):
        while 1:
            items = producer.get (100)
            if not items:
                queue.put (None) # end of stream
                break
            queue.put (str (items))
    return produce

@app.route ("/threaproducer")
def threaproducer (was):
    return was.Queue (producing (Producer ()))

ProxyPass

@app.route ("/<path:path>")
def proxy (was, path = None):
    return was.ProxyPass ("@myupstream", path)

Couroutine Streaming

@app.route ("/download_csv", coroutine = True)
def download_csv (was):
    yield "ID, NAME\n"
    current_id = 0
    while 1:
        task = yield (was.db ('@mydb').select ('tble')
                        .get ('id, name')
                        .filter (id__gt = current_id).
                        limit (fetch_count)).execute ()
        rows = task.fetch ()
        if not rows:
            break
        current_id += fetch_count
        yield '\n'.join (['{}, "{}"'.format (row.id, row.name) for row in rows])

Bidirectional Streaming

@app.route ("/bistreaming", methods = ['POST'], coroutine = True, input_stream = True)
def coroutine_streaming (was):
    while 1:
        data = yield was.Input (16184)
        if not data:
            break
        yield b':' + data

Websocket

@app.websocket (spec, timeout = 60, onopen = None, onclose = None)

WS_COROUTINE (New in version 0.8.8)

@app.route ("/echo_coroutine")
@app.websocket (skitai.WS_COROUTINE, 60)
def echo_coroutine (was):
    while 1:
      msg = yield was.Input ()
      if not msg:
        break

      with was.stub ('http://example.com') as stub:
        task = yield stub.get ("/")
        yield task.fetch ()

WS_CHANNEL

  • simple request and response way like AJAX
  • with WS_THREAD, WS_SESSION, WS_NOTHREAD, WS_NOTHREAD options

WS_THREAD

  • default, function base websocket message handling
  • it treats every single websocket message as single request to resources like url requests.
  • on receiving message from client, it will call function for handling with queue and thread pool, it is basically same as request resource

WS_THREADSAFE (New in version Skitai 0.26)

  • Mostly same as WS_THREAD
  • Message sending is thread safe
  • Most case you needn't this option, but you create yourself one or more threads using websocket.send () method you need this for your convinience
def onopen (was):
    was.request.session.set ("WS_ID", was.websocket.client_id)
    print ('websocket opened with', was.request.ARGS ["options"])

def onclose (was):
    was.request.session.remove ("WS_ID")

@app.route ("/websocket")
@app.websocket (skitai.WS_CHANNEL | skitai.WS_THREAD, 1200, onopen, onclose)
def websocket (was, message, **options):
    app.websocket_send (
        was.request.session.get ("WS_ID"),
        "Item In Stock!"
    )
    return 'you said: ' + message + str (options)

To push message to specific client.

@app.route ("/item_in_stock")
def item_in_stock (was):
    app.websocket_send (
        was.request.session.get ("WS_ID"),
        "Item In Stock!"
    )

Note:: I'm not sure it is works in all web browser.

WS_NOTHREAD

  • non-threaded function call base websocket message handling
  • it is faster than WS_THREAD

WS_NOTHREAD does not use queue or thread pool. In this case, response is more faster but if response includes IO blocking operation, entire Skitai event loop will be blocked.

  @app.route ("/websocket")
  @app.websocket (skitai.WS_CHANNEL | skitai.WS_NOTHREAD, 60, onopen)
  def websocket (was, message):
        return 'you said: ' + message

Helpers of 'was'

Logging and Traceback

@app.route ("/")
def sum ():
    was.log ("called index", "info")
    try:
        ...
    except:
        was.log ("exception occured", "error")
        was.traceback ()
    was.log ("done index", "info")
  • was.log (msg, category = "info")
  • was.traceback (id = "") # id is used as fast searching log line for debug, if not given, id will be Global transaction ID/Local transaction ID

Cross Site Request Forgery Token (CSRF Token)

New in skitai version 0.26.16

At template, insert CSRF Token,

  <form>
  {{ was.csrf_token_input }}
  ...
  </form>

then verify token like this,

@app.before_request
def before_request (was):
    if was.request.args.get ("username"):
        if not was.verify_csrf ():
            return was.response ("400 Bad Request")

Or use decorator,

@app.csrf_verification_required
def before_request (was):
    ...

JWT Token

@app.route ('/make_jwt')
def make_jwt (was)
    t = was.encode_jwt ({'iss': 'example.com', 'exp': time.time () + 3600})

@app.route ('/verify_token')
def verify_jwt (was)
    calims = was.request.JWT
    if "err" in claims:
        return claims ["err"]

One-Time Password

New in skitai version 0.35.0

def check_otp (was):
    if not was.verify_otp (was.request.get_header ('x-otp')):
        raise was.Error ('403 Unauthorized')

@app.route ('/admin-task')
@app.testpass_required (check_otp)
def task (was)
    ...

At your client,

from atila.was import generate_otp
generate_otp (secret_key)

One-Time Token

New in skitai version 0.26.17

For creatiing onetime link url, you can convert your data to signatured token string.

Note: Like JWT token, this token contains data and decode easily, then you should not contain important information like password or PIN. This token just make sure contained data is not altered by comparing signature which is generated with your app scret key.

@app.route ('/password-reset')
def password_reset (was)
    if was.request.args ('username'):
        username = "hans"
        token = was.encode_ott (username, 3600, "pwrset") # valid within 1 hour
        pw_reset_url = was.urlfor ('reset_password', token)
        # send email
        return was.render ('done.html')

    if was.request.args ('token'):
        username = was.decode_ott (was.request.args ['token'])
        if not username:
            return was.response ('400 Bad Request')
        # processing password reset
        ...

If you want to expire token explicit, add session token key

# valid within 1 hour and create session token named '_reset_token'
token = was.encode_ott ("hans", 3600, 'rset')
>> kO6EYlNE2QLNnospJ+jjOMJjzbw?fXEAKFgGAAAAb2JqZWN0...

username = was.decode_ott (token)
>> "hans"

# if processing is done and for revoke token,
was.revoke_ott (token)

Interval Based App Mainternancing

If you need interval base maintaining jobs,

with skitai.preference () as pref:
    pref.config.MAINTAIN_INTERVAL = 60  # seconds, default is 60
@app.maintain (10, threading = False) # execute every 10 maintaining (500 sec.)
def maintain_num_nodes (was, now, count):
    ...
    num_nodes = was.getlu ("cluster.num-nodes")
    if app.store ["num-nodes"] != num_nodes:
      app.store ["num-nodes"] = num_nodes
      app.broadcast ("cluster:num_nodes")

You can add multiple maintain jobs but maintain function names is SHOULD be unique.

Test Client

Excellent tests can only make good app. Skitai provide test_client which can test integrating and E2E tests.

See Test Client chapter in Skitai App Engine.

Your Life Is So Precious

Integrating Django Models and Administrative Views

# atila_vue/entities/firebase_vue/models.py
from django.db import models
from django.contrib.postgres.fields import JSONField
from atila.patches.djangopatch import Model

class User (Model):
    uid = models.CharField (max_length = 32)
    email = models.EmailField (max_length = 64, null = True, blank = True)
    nick_name = models.CharField (max_length = 16, null = True, blank = True)

    lev = models.CharField (max_length = 16, default = 'user', choices = [('guest', 'guest'), ('user', 'user'), ('staff', 'staff')])
    status = models.CharField (max_length = 16, null = True, blank = True)
    created = models.DateTimeField (auto_now_add = True)
    last_updated = models.DateTimeField (auto_now = True)

    class Meta:
        proxy = False
        managed = True
        db_table = 'firebase_user'
        verbose_name = 'User'
        verbose_name_plural = 'Users'

    def __str__ (self):
        return '{}'.format (self.nick_name)
# atila_vue/services/firebase/services.py
from firebase_vue.models import User, UserLog

class UserService:
    @classmethod
    def _get_id (cls, uid):
        return User.get (uid = uid).get ('id')

    # basic ops ------------------------------
    @classmethod
    def get (cls, uid = None, nick_name = None):
        assert uid or nick_name, 'uid or nick_name required'
        return User.get (uid = uid, nick_name = nick_name).execute ()

    @classmethod
    def add (cls, uid, payload):
        payload ['uid'] = uid
        return User.add (payload).returning ("*").execute ()

    @classmethod
    def set (cls, uid, payload):
        return User.set (payload, uid = uid).returning ("*").execute ()

    @classmethod
    def delete (cls, uid):
        return (User.remove (uid = uid)
                    .with_ (UserLog.remove (user_id = cls._get_id (uid)))
               ).execute ()

All queries works as Atila native coroutine. More importantly, you can use Django migration, field validation and even Django adminsrative views by mounting Django app into '/admin'.

This is just core code lines. For more information, see my atila-vue.

Integrating pytest and API Documentation

python3 serve.py --devel
# tests/conftest.py
from atila.pytest_hooks import * # HERE
import skitai

@pytest.fixture
def launch ():
    return partial (skitai.test_client, port = 30371, silent = False)

@pytest.fixture (scope = "module")
def client ():
    _client = skitai.test_client (port = 5000)
    yield _client
    _client.stop ()
# tests/test_app.py
def test_app (client):
    resp = client.get ('/apis/tickers')
    assert resp.status_code == 200

def test_app (launch):
    # launch test server on 30371 with --devel option
    with launch ('../serve.py', devel = True) as client:
        resp = client.get ('/apis/tickers')
        assert resp.status_code == 200
cd tests && pytest --docs # HERE

As a result, your test with parameters, data and errors will be logged and saved. If your test finished successfully, docs/api/index.md will be created.

## Table of Content
1. [/apis/tickers/<int:prodict_code>](#tproduct_tickers) [**`GET`**](#get-apistickers)

## product.tickers
**URL Parameters**: `product_code`
**Authorization Required**: NO
#### `GET` `/apis/tickers`

**Success Response Example**
> **_URL_**: `/apis/tickers/123`
> **_Response_**: `200 OK`
> **_Response Data_** (application/json)

'''json
{
  "instock": 2
}
'''

If you seperate your tests sub directories like 'account', 'products',...

pytest account --docs

docs/api/account.md will be created.

I am happy, frontenders happy, everybody happy.

Note: It will works only if pytest is run on same machine - request IP must be 127.0.0.1.

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

atila-0.11.0b2-py3-none-any.whl (71.4 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page