Atila Framework
Project description
Atila
Atila is simple and minimal framework integrated with Skitai App Engine. It is the easiest way to make backend services.
# serve.py
from atila import Atila
app = Atila (__name__)
@app.route ("/")
def index (was):
return "Hello, World"
if __mame__ == "__main__":
import skitai
skitai.mount ("/", app)
skitai.run (port = 5000)
And run,
python3 serve.py
Important Notice
CAUTION: Atila is base on WSGI but can be run only with Skitai App Engine.
This means if you make your Atila app, you have no choice but Skitai as WSGI app server. And Atila's unique and unconventional style may become very hard work to port to other framework.
Unconventional?
Atila is based on WSGI specification but take some advantage of asynchronous features which are provided from Skitai.
Atila is mostly same as other WSGI containers for functional and API aspect.
Otherwise Atila has asynchronous features but it does not use
async/await
conventions. it sometime use generator base coroutine
with yeild
.
Atila treats async jobs like these as a Task
.
- HTTP Request
- Database Query
- Thread
- Process
- Subprocess
Task has 3 major common methods: fetch(), one(), commit(). Multiple tasks are
bind into Tasks
. Tasks also have the same 3 methods.
Tasks are passed over to main event loop of main thread from current request thread. If all tasks finished, Tasks will be returned to request thread. Also by yielding, Tasks can be completely transitted into non blocking manner.
I'm not telling Atila is more better design. I'm just telling Atila is obviously NOT main-stream and I enjoy this project.
Installation
Requirements
Python 3.6+
Installation
Atila and other core base dependent libraries is developing on single milestone, install/upgrade all at once. Otherwise it is highly possible to meet some errors.
With pip
pip3 install -U atila skitai rs4 aquests
With git
pip3 install -U skitai rs4 aquests sqlphile
git clone https://gitlab.com/hansroh/atila.git
cd atila
pip3 install -e .
Optional required as you need,
pip3 install redis
pip3 install pymongo
pip3 install psycopg2-binary
pip3 install protobuf # for GRPC
pip3 install jsonrpclib-pelix
pip3 install jinja2
First App
Project Structure
The very formal but minimal project structure for myapp
.
myapp/
static/
templates/
__init__.py
serve.py
README.md
Launch Script
# serve.py
import skitai
import myapp
if __name__ == '__main__':
with skitai.preference () as pref:
skitai.mount ('/', myapp, pref)
pref is runtime app preference. It will be overwrite its attributes to app on mounted. And have some methods like set_static().
Create App
# myapp/__init__.py
import skitai
from atila import Atila
def __config__ (pref):
# mount / to myapp/static and app.STATIC_URL will be set to '/'
# skitai.abspath () will join path with location of serve.py
# DO NOT use os.path.abspath (__file__), it is not reliable
pref.set_static ('/', skitai.abspath ('myapp/static'))
def __app__ ():
return Atila (__name__)
def __mount__ (app, mntopt):
@app.route ('/')
def index (was):
return 'Index Page'
@app.route ('/sub')
def sub (was):
return 'Sub Page'
Make sure that myapp/static/index.html
doses not exists. If so static file has more high priority.
Then run.
python3 serve.py --devel # --devel swiach mean debug mdde and auto reloading
Please see Skitai App Engine to more detail usage.
Templates
pip3 install jinja2
<!-- myapp/templates/index.j2 -->
<html>
<head><title>My App</title></head>
<body>
<h1>Index Page</h1>
</body>
</html>
def __mount__ (app, mntopt):
@app.route ('/')
def index (was):
return was.render ('index.j2')
Cascaded Service Mount
For larger app, make myapp/services
directory.
myapp/
static/
templates/
services/
__init__.py
# myapp/__init__.py
# remove __mount__ ()
import skitai
from atila import Atila
def __config (pref):
pref.set_static ('/', skitai.abspath ('myapp/static'))
def __app__ ():
return Atila (__name__)
def __setup__ (app, mntopt):
from . import services
app.mount ('/', services)
# NEW myapp/services/__init__.py
def __mount__ (app, mntopt):
@app.route ('/')
def index (was):
return 'Index Page'
@app.route ('/sub')
def sub (was):
return 'Sub Page'
Seperate '/sub' as sub module.
# modify myapp/services/__init__.py
# add __setup__ hook
def __setup__ (app, mntopt):
from . import sub
app.mount ('/sub', sub)
def __mount__ (app, mntopt):
@app.route ('/')
def index (was):
return 'Index Page'
# NEW myapp/services/sub.py
def __mount__ (app, mntopt):
# make sure blank path for accessing '/sub'
# if '/', accessable URL become '/sub/'
@app.route ('')
def sub (was):
return 'Sub Page'
Add new '/sub2' as sub package
Assume /sub2
is more large and complex, and you want to make as sub package.
# modify __setup__ of myapp/services/__init__.py
def __setup__ (app, mntopt):
from . import sub
from . import sub2
app.mount ('/sub', sub)
app.mount ('/sub2', sub2)
# NEW myapp/services/sub2/__init__.py
def __setup__ (app, mntopt):
from . import pages
app.mount ("/sub2/pages", pages)
# NEW myapp/services/sub2/pages.py
def __mount__ (app, mntopt):
@app.route ('') # /sub2/pages
def pages (was):
return 'Pages'
@app.route ('/1') # /sub2/pages/1
def page1 (was):
return 'Page1'
finally,
myapp/
static/
templates/
services/
sub.py
sub2/
__init__.py
pages.py
__init__.py
Other Hooks
Currently only one other hook.
def __umount__ (app): # NO mntopt argument
print ('bye...')
More About Preference
with skitai.preference () as pref:
pref.set_static ('/static', 'myapp/static')
pref.set_media ('/media', '/mnt/data/media')
pref.config.MAX_UPLOAD_SIZE = 256 * 1024 * 1024
pref.config.MAINTAIN_INTERVAL = 60
You can add your options to pref.config
and attibute to pref
and you can sure accessing from app.config
and app
attribute later.
Conclusion
- Mount sub package in __setup__ hook and make services at __mount__.
- DO NOT mess with sub package in __mount__.
- __config__ and __app__ can be in app initializing script ONLY.
Event Bus
Atila's event bus is based on event-bus.
# wherever in your app package,
def __setup__ (app, mntopt):
@app.on ('custom-event')
def on_cusom_event (was, detail):
print ('I got custom-event with detail: {}'.format (detail))
# wherever in your app package,
def __mount__ (app, mntopt):
@app.route ('/make_event')
def make_event (was):
app.emit ('custom-event')
return "OK"
App Life Cycle Events
There're serveral pre-defined events related app life cycle.
- before_mount
- mounted
- before_reload
- reloaded
- mounted_or_reloaded
- before_umount
- mounted
# wherever in your app package,
def load_my_models ():
...
def __setup__ (app, mntopt):
@app.on ('before_mount')
def on_before_mount (was):
load_my_models ()
Extending and Overriding
Big difference is which has app
and drive services.
Extending
pip3 install atila_vue
atila_vue is base project templates and no useful services. It must be implemented by your app.
import skitai
import atila_vue
import myapp
if __name__ == '__main__':
with sktai.oreference () as pref:
pref.extends (atila_vue)
skitai.mount ('/', myapp, pref)
myapp
extends atila_vue
and can use all atila_vue
's services, static files and templates whuch myapp
hasn't.
Overriding
pip3 install tfserver
tfserver has own useful services and you can add your custom services.
import skitai
import tfserver
import myapp
if __name__ == '__main__':
with sktai.oreference () as pref:
pref.overrides (myapp)
skitai.mount ('/', tfserver, pref)
myapp
overrides tfserver
's services, static files and templates if same resource paths.
Conclusion
myapp
, tfserver
and atila_vue
are very same project structure as I wrote above. If your app is useful to the others, please publish to PyPI.
Working With Multiple Apps
Skitai can mount multiple apps including Atila apps. Atila apps sometimes need cumminicate each others.
Event Subscription
# serve.py
import skitai
import myapp
import tfserver
if __name__ == '__main__':
with sktai.oreference () as pref:
skitai.mount ('/tf', tfserver, pref, name = 'tfserver')
with sktai.oreference () as pref:
skitai.mount ('/', myapp, pref, subscribe = ['tfserver'])
myapp
can receive all event from tfserver
.
For example, tfserver
emit tfserver:model-reloaded
.
# wherever in myapp,
def __setup__ (app, mntopt):
@app.on ('tfserver:model-reloaded')
def on_tfserver_model_reloaded (was, model_name):
jobs_as_you_need ()
Data Exchanging
was.g
is multiple apps shared dictionary like object.
# wherever in tfserver app
@route ('/')
def index (was):
was.g ['LOADED_MODELS'] = ['a', 'b']
# wherever in myapp
@route ('/')
def index (was):
return '|'.join (was.g ['LOADED_MODELS'])
Accesing Other App Directly
@route ('/')
def index (was):
other_app = was.apps ['tfserver']
Very First Argument 'was'
What is was
as first argument of mostly hooks, event_handlers and services?
was
contains request
and response
related objects and services.
It is created per worker threads and never destoyed in normal sutuation. Sometimes clones will be created, cached and destroyed by need-to base.
Actually, it is just bunch of short-cuts to use request processing resources.
In this chapter, we just need to know briefly and partially.
Request
- was.request: current request object
- was.app: current app which process request
- was.g: multiple apps shared dictionary like object
Responses
- was.render ()
- was.API ()
- was.Error ()
- and lost of responses
Coroutines
- was.Task
- was.Tasks ()
And several helper methods.
Processing Request
Request Life Cycle Hooks
- before_request
- finish_request
- failed_request
- teardown_request
def __setup__ (app, mntopt):
@app.on ('before_mount')
def on_before_mount (was):
app.g ['TOTAL_REQUEST'] = 0
app.g ['FAILED_REQUEST'] = 0
@app.before_request
def before_request (was):
app.g ['TOTAL_REQUEST'] += 1
@app.failed_request
def failed_request (was, expt): # expt is sys.exc_info ()
app.g ['FAILED_REQUEST'] += 1
Composit your own middlewares.
class JWTTokenChecker:
def __call__ (self, was):
if something_is_wrong ():
raise was.Error ("400 Bad Request")
def __setup__ (app, mntopt):
MIDDLEWARES = [
JWTTokenChecker (),
PermissionHandler ()
]
@app.before_request
def before_request (was):
for middleware in MIDDLEWARES:
middleware (was)
Request Life Cycle Events
- before_request
- finish_request
- failed_request
- teardown_request
def __setup__ (app, mntopt):
@app.on ('before_request')
def on_before_request (was):
app.g ['TOTAL_REQUEST'] += 1
Hooks can return content or error, but events cannot because they are async. If event handling is failed, just logged traceback information.
CORS (Cross Origin Resource Sharing) and Preflight
For allowing CORS, you should do 2 things:
with skitai.preference () as pref:
pref.access_control_allow_origin = ["*"]
# OR
pref.access_control_allow_origin = ["http://www.skitai.com:5001"]
pref.access_control_max_age = 3600
If you want function specific CORS,
@app.route ("/post", methods = ["POST", "OPTIONS"], access_control_allow_origin = ["*"], access_control_max_age = 3600)
def post (was):
args = was.request.json ()
return was.jstream ({...})
Environment Variables
was.env.get ('HTTP_USER_AGENT', 'Unknown Agent')
Routing
@app.route ("/hello")
def hello_world (was):
return was.render ("hello.j2")
Give sime restirction.
@app.route ("/hello", methods = ["GET"]) #if not match response 405
@app.route ("/hello", content_types = ["application/json"]) #if not match response 406
Request Parameters
@app.route ("/hello", methods = ["POST", "OPTIONS"])
def hello_world (was, name, birth_year, gender = None):
return was.render ("hello.j2", name = name, birth_year = birth_year, gender = gender)
@app.route ("/hello", methods = ["POST", "OPTIONS"])
def hello_world (was, name, **DATA):
return was.render ("hello.j2", name = name, DATA = DATA)
@app.route ("/profiles/<int:id>", methods = ["GET"])
def hello_world (was, id, detail = 'yes'):
return was.render ("profile.j2", id = id, detail = detail)
It is valid:
- /profiles/123?detail=no
- /profiles/me?detail=no
- /profiles/notme?detail=no
- /profiles/new?detail=no
@app.route ("/profiles/<int:id>", methods = ["GET"])
def hello_world (was, id = None, detail = 'yes'):
return was.render ("profile.j2", id = id, detail = detail)
It is valid additionaly:
- /profiles?detail=no
Parameter and Validation
@app.route ("/profiles/<int:id>", methods = ["GET"])
@app_inspect (ints = ['id'], booleans = ['detail'])
def hello_world (was, id = None, detail = False):
return was.render ("profile.j2", id = id, detail = detail)
# MUST use with keyword argument
@app.inspect (
ints = None, floats = None,
strings = None, booleans = None,
emails = None, uuids = None, nones = None, lists = None,
dicts = None, oneof = None, manyof = None,
notags = None, safes = None,
**kargs
)
Examples:
- strings = ['name', 'gender']
- notags = ['resume'] # not allow html tags
- safes = ['resume'] # not allow <script> or javascript things
- nones = ['gender', 'birth_year'] if value is blank or zero like, make to None
- booleans = ['detail'] # valid if True, False, true, false, yes, no, y, n, true, false, t and f
- oneof = ['email', 'mobile']
Keyword Argument Examples:
- birth_year = int # type
- mobile = re.compile ('[0-9]{3}-[0-9]{3}-[0-9]{4}')
- birth_year__between = (1900, dt.now ().year)
- gender__in = ['male', 'female', 'not-determine']
- name__len__lte = 64
- name__len__gt = 3
- name__len__between = (3, 64)
Avaliable double low-dash operator:
- len__
- between
- in
- notin
- eq, exact
- neq
- lte
- lt
- gte
- gt
- contains
- notcontain
- startswith
- notstartwith
- endswith
- notendwith
- regex
File Upload
@app.route ("/upload", methods = ['POST'])
def upload (was, file):
file.save ("/var/tmp/upload", dup = "o")
- file.path: temporary saved file full path
- file.name: original file name posted
- file.size
- file.mimetype
- file.ext: file extension
- file.read ()
- file.move (to)
- file.as_flashfile ()
- file.save (into, name = None, mkdir = False, dup = "u")
- file.remove ()
Streaming Request Data
@app.route ("/bistreaming", methods = ['POST'], input_stream = True)
def streaming (was):
buf = []
while 1:
data = yield was.Input (4096)
if not data:
break
buf.append (data)
return b''.join (buf)
Caution: Be careful to use request streaming. Request streaming need only a few specific conditions.
- Small chunked request data which is intermittent and need long terms connection like receiving GPS coordinate data from client device
- Bidirectional streaming like detectecting silence for 10~30ms segments
of audio data stream. See next
Bidirectional Streaming
topic.
If you just want upload data, just use regular POST upload method. DO NOT use request streaming which may cause event loop blocking and also is very inefficient.
Cookie
was.request.cookie (alias: was.cookie): dicionary like object
if "user_id" not in was.cookie:
was.request.cookie.set ("user_id", "hansroh")
# or
was.request.cookie ["user_id"] = "hansroh"
was.cookie.set (
key, val,
expires = None,
path = None, domain = None,
secure = False, http_only = False
)
'expires' args is seconds to expire.
- if None, this cookie valid until browser closed
- if 0 or 'now', expired immediately
- if 'never', expire date will be set to a hundred years from now
If 'secure' and 'http_only' options are set to True, 'Secure' and 'HttpOnly' parameters will be added to Set-Cookie header.
If 'path' is None, every app's cookie path will be automatically set to their mount point.
Session
was.request.session (alias: was.session): dicionary like object
To enable session for app, random string formatted securekey should be set for encrypt/decrypt session values.
with skitai.preference () as pref:
pref.securekey = "ds8fdsflksdjf9879dsf;?<>Asda"
pref.session_timeout = 1200 # sec
@app.route ("/session")
def hello_world (was, **form):
if "login" not in was.session:
was.session.set ("user_id", form.get ("hansroh"))
# or
was.session ["user_id"] = form.get ("hansroh")
If you set, alter or remove session value, session expiry is automatically extended by app.session_timeout. But just getting value will not be extended. If you extend explicit without altering value, you can use touch() or set_expiry(). session.touch() will extend by app.session_timeout. session.set_expiry (timeout) will extend by timeout value.
Once you set expiry, session auto extenstion will be disabled until expiry time become shoter than new expiry time is calculated by app.session_timeout.
Namespaced Session
@app.route("/")
def index (was):
was.session.mount ("ADMIN", path = '/admin')
was.session.set ("login", True)
was.session.mount () # resore to default session
Additional methods:
- was.session.set_expiry (timeout)
- was.session.touch ()
- was.session.expire ()
- was.session.use_time ()
- was.session.impending (): if session timeout remains 20%
- was.session.mount ( name = None, session_timeout = None, securekey = None, path = None, domain = None, secure = False, http_only = False, extend = True )
Message Box
was.request.mbox (alias: was.mbox)
@app.route ("/msg")
def msg (was):
was.mbox.send ("This is Flash Message", "flash")
was.mbox.send ("This is Alert Message Kept by 60 seconds on every request", "alram", valid = 60)
return was.redirect (was.urlfor (showmsg, "Hans Roh"), status = "302 Object Moved")
@app.route ("/showmsg")
def showmsg (was, name):
return was.render ("msg.htm", name=name)
<ul>
{% for mid, category, created, valid, msg, extra in was.mbox.get ("notice", "news") %}
<li class="{{category}}"> {{ msg }}</li>
{% endfor %}
</ul>
- was.mbox.send (msg, category, valid_seconds, **extra_dict)
- was.mbox.get () return [(message_id, category, created_time, valid_seconds, msg, extra_dict)]
- was.mbox.get (category) filtered by category
- was.mbox.get (key, val) filtered by extra_dict key and value pair
- was.mbox.search (key, val): find in extra_dict. if val is not given or given None, compare with category name. return [message_id, ...]
- was.mbox.remove (message_id)
Piping
was.pipe () can call function by resource names. This make call nested function within __mount__ (app) in another module.
# services/__init__.py
def __mount__ (app, mntopt):
@app.route ("/1")
@app.inspect (offset = int)
def index (was, offset = 1):
return was.API (result = offset)
@app.route ("/2")
def index2 (was):
return was.pipe (index)
@app.route ("/3")
def index3 (was):
return was.pipe (index, offset = 4)
@app.route ("/4")
def index4 (was):
return was.pipe ('index', offset = 't')
@app.route ("/5")
def index4 (was):
return was.pipe ('sub.pages.toc', chapter = 5)
# services/sub/pages.py
def __mount__ (app, mntopt):
@app.route ("/pages/toc/<int:chapter>")
def toc (was, chapter):
return "Page tOC Chapter {}".format (chapter)
Making URL
was.static ('img/logo.png')
was.urlfor ("index")
was.urlfor ("sub.pages.toc", 1)
was.urlfor ("sub.pages.toc", chapter = 1)
was.urlfor ("/index.htm") # hard coded absolute URL, MUST start with /
Helpers
Testpassing
def is_superuser (was):
if was.user.username not in ('admin', 'root'):
reutrn was.response ("403 Permission Denied")
@app.testpass_required (is_superuser)
def modify_profile (was):
...
Authorization & Authentification
Let's assume you manage permission by user levels: admin, staff, owner and user.
@app.permission_check_handler
def permission_check_handler (was, perms):
if was.request.get_header ('authorization'):
output = check_jwt (was)
else:
output = check_session (was)
if output:
return output
if 'uid' in was.request.PARAMS:
tuid = was.request.PARAMS ['uid']
if 'owner' in perms and tuid != 'me':
raise was.Error ("403 Permission Denied", "owners only operation")
was.request.user.tuid = (tuid == 'me' and was.request.user.uid or (tuid != 'notme' and tuid or None))
if not perms:
return
if was.request.user.lev == "staff":
return # always vaild
if "staff" in perms:
raise was.Error ("403 Permission Denied")
@app.route ("/animals/<id>")
@app.permission_required ([], id = ["staff"])
def animals (was, id = None):
id = id or was.request.JWT ["userid"]
This resources required any permission for "/animals/" or "/animals/me". But '/animals/100' is required 'staff' permission. It may make permission control more simpler.
Also you can specify premissions per request methods.
@app.route ("/animals/<id>", methods = ["POST", "DELETE"])
@app.permission_required (['user'], id = ["staff"], DELETE = ["admin"])
def animals (was, id = None):
id = id or was.request.JWT ["userid"]
This resources required 'user' permission for "/animals/" or "/animals/me". '/animals/100' is required 'staff' permission. It may make permission control more simpler.
Condition Based Preworks
def reload_config (was, path):
...
@app.if_file_modified ('/opt/myapp/config', reload_config, interval = 1)
def index (was):
return was.render ('index.html')
def broadcast (was, name):
...
@app.if_updated ('STATE', broadcast, interval = 1)
def index (was):
return was.render ('index.html')
Run Pre/Postworks Chain
You can make automation for preworks and postworks.
def pre1 (was):
...
def pre2 (was):
...
def post1 (was):
...
@app.route ('/')
@app.run_before (pre1, pre2)
@app.run_after (post1)
def index (was):
return was.render ('index.html')
@app.run_before can return None or responsable contents for aborting all next run_before and main request.
@app.run_after return will be ignored
Using Task
Resource Aliasing
Task
PostgreSQL
SQLite3
MongDB
Redis
Thread
Process
Subrocess
Tasks
Making Response
Primitive
str
render
render_or_Map
File
Static
Producer Objects
Generator
Content Negotiation
JSON API
JSONRPC
XMLRPC
GRPC
render_or_API
Coroutine
Map
Mapped
Content Negotiatiable Coroutine
render_or_Map
render_or_Mapped
Streaming Service
Websocket
Logging and Traceback
Interval Based App Mainternancing
Test Client
Make Your Life Easy
Integrating Django Models and Administrative View
Integrating pytest and API document generation
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.