Atila Framework
Project description
Atila
Atila is simple and minimal framework integrated with Skitai App Engine. It is the easiest way to make backend services.
# serve.py
from atila import Atila
app = Atila (__name__)
@app.route ("/")
def index (was):
return "Hello, World"
if __mame__ == "__main__":
import skitai
skitai.mount ("/", app)
skitai.run (port = 5000)
And run,
python3 serve.py
Important Notice
CAUTION: Atila is base on WSGI but can be run only with Skitai App Engine.
This means if you make your Atila app, you have no choice but Skitai as WSGI app server. And Atila's unique and unconventional style may become very hard work to port to other framework.
Table of Content
- Atila
- Important Notice
- Unconventional?
- Installation
- First App
- App Overview
- Processing Request
- Request Life Cycle Hooks
- Request Life Cycle Events
- App & Request Gloabal
- Request Object
- Environment Variables
- Routing
- Request Parameters
- Parameter and Validation
- File Upload
- Streaming Request Data
- Cookie
- Session
- Message Box
- Piping
- Making URL
- Helpers
- CORS (Cross Origin Resource Sharing) and Preflight
- Using Task
- Making Response
- Helpers of 'was'
- Websocket
- Test Client
- Your Life Is So Precious
- Change Log
Unconventional?
Atila is based on WSGI specification but take some advantage of asynchronous features which are provided from Skitai.
Atila is mostly same as other WSGI containers for functional and API aspect.
Otherwise Atila has asynchronous features but it does not use
async/await
conventions. it sometime use generator base coroutine
with yeild
.
Atila treats async jobs like these as a Task
.
- HTTP Request
- Database Query
- Thread
- Process
- Subprocess
Task has 3 major common methods: fetch(), one(), commit(). Multiple tasks are
bind into Tasks
. Tasks also have the same 3 methods.
Tasks are passed over to main event loop of main thread from current request thread. If all tasks finished, Tasks will be returned to request thread. Also by yielding, Tasks can be completely transitted into non blocking manner.
I'm not telling Atila is more better design. I'm just telling Atila is obviously NOT main-stream and I enjoy this project.
Installation
Requirements
Python 3.6+
Installation
Atila and other core base dependent libraries is developing on single milestone, install/upgrade all at once. Otherwise it is highly possible to meet some errors.
With pip
pip3 install -U atila skitai rs4 aquests
With git
pip3 install -U skitai rs4 aquests sqlphile
git clone https://gitlab.com/hansroh/atila.git
cd atila
pip3 install -e .
Optional required as you need,
pip3 install redis
pip3 install pymongo
pip3 install psycopg2-binary
pip3 install protobuf # for GRPC
pip3 install jsonrpclib-pelix
pip3 install jinja2
First App
Project Structure
The very formal but minimal project structure for myapp
.
myapp/
static/
templates/
__init__.py
tests
serve.py
README.md
Launch Script
# serve.py
import skitai
import myapp
if __name__ == '__main__':
with skitai.preference () as pref:
skitai.mount ('/', myapp, pref)
pref is runtime app preference. It will be overwrite its attributes to app on mounted. And have some methods like set_static().
Create App
# myapp/__init__.py
import skitai
from atila import Atila
def __config__ (pref):
# mount / to myapp/static and app.STATIC_URL will be set to '/'
# skitai.abspath () will join path with location of serve.py
# DO NOT use os.path.abspath (__file__), it is not reliable
pref.set_static ('/', skitai.abspath ('myapp/static'))
def __app__ ():
return Atila (__name__)
def __mount__ (app, mntopt):
@app.route ('/')
def index (was):
return 'Index Page'
@app.route ('/sub')
def sub (was):
return 'Sub Page'
Make sure that myapp/static/index.html
doses not exists. If so static file has more high priority.
Then run.
python3 serve.py --devel # --devel swiach mean debug mdde and auto reloading
Please see Skitai App Engine to more detail usage.
Templates
pip3 install jinja2
<!-- myapp/templates/index.j2 -->
<html>
<head><title>My App</title></head>
<body>
<h1>Index Page</h1>
</body>
</html>
def __mount__ (app, mntopt):
@app.route ('/')
def index (was):
return was.render ('index.j2')
Cascaded Service Mount
For larger app, make myapp/services
directory.
myapp/
static/
templates/
services/
__init__.py
# myapp/__init__.py
# remove __mount__ ()
import skitai
from atila import Atila
def __config (pref):
pref.set_static ('/', skitai.abspath ('myapp/static'))
def __app__ ():
return Atila (__name__)
def __setup__ (app, mntopt):
from . import services
app.mount ('/', services)
# NEW myapp/services/__init__.py
def __mount__ (app, mntopt):
@app.route ('/')
def index (was):
return 'Index Page'
@app.route ('/sub')
def sub (was):
return 'Sub Page'
Seperate '/sub' as sub module.
# modify myapp/services/__init__.py
# add __setup__ hook
def __setup__ (app, mntopt):
from . import sub
app.mount ('/sub', sub)
def __mount__ (app, mntopt):
@app.route ('/')
def index (was):
return 'Index Page'
# NEW myapp/services/sub.py
def __mount__ (app, mntopt):
# make sure blank path for accessing '/sub'
# if '/', accessable URL become '/sub/'
@app.route ('')
def sub (was):
return 'Sub Page'
Add new '/sub2' as sub package
Assume /sub2
is more large and complex, and you want to make as sub package.
# modify __setup__ of myapp/services/__init__.py
def __setup__ (app, mntopt):
from . import sub
from . import sub2
app.mount ('/sub', sub)
app.mount ('/sub2', sub2)
# NEW myapp/services/sub2/__init__.py
def __setup__ (app, mntopt):
from . import pages
app.mount ("/sub2/pages", pages)
# NEW myapp/services/sub2/pages.py
def __mount__ (app, mntopt):
@app.route ('') # /sub2/pages
def pages (was):
return 'Pages'
@app.route ('/1') # /sub2/pages/1
def page1 (was):
return 'Page1'
finally, mayapp
structure is:
myapp/
static/
templates/
services/
sub.py
sub2/
__init__.py
pages.py
__init__.py
Other Hooks
Currently only one other hook.
def __umount__ (app): # NO mntopt argument
print ('bye...')
More About Preference
with skitai.preference () as pref:
pref.set_static ('/static', 'myapp/static')
pref.set_media ('/media', '/mnt/data/media')
pref.config.MAX_UPLOAD_SIZE = 256 * 1024 * 1024
pref.config.MAINTAIN_INTERVAL = 60
You can add your options to pref.config
and attibute to pref
and you can sure accessing from app.config
and app
attribute later.
Conclusion
- Mount sub package in __setup__ hook and make services at __mount__.
- DO NOT mess with sub package in __mount__.
- __config__ and __app__ can be in app initializing script ONLY.
App Overview
Event Bus
Atila's event bus is based on event-bus.
# wherever in your app package,
def __setup__ (app, mntopt):
@app.on ('custom-event')
def on_cusom_event (was, detail):
print ('I got custom-event with detail: {}'.format (detail))
# wherever in your app package,
def __mount__ (app, mntopt):
@app.route ('/make_event')
def make_event (was):
app.emit ('custom-event')
return "OK"
App Life Cycle Events
There're serveral pre-defined events related app life cycle.
- before_mount
- mounted
- before_reload
- reloaded
- mounted_or_reloaded
- before_umount
- mounted
# wherever in your app package,
def load_my_models ():
...
def __setup__ (app, mntopt):
@app.on ('before_mount')
def on_before_mount (was):
load_my_models ()
Extending and Overriding
Big difference is which has app
and drive services.
Extending
pip3 install atila_vue
atila_vue is base project templates and no useful services. It must be implemented by your app.
import skitai
import atila_vue
import myapp
if __name__ == '__main__':
with sktai.oreference () as pref:
pref.extends (atila_vue)
skitai.mount ('/', myapp, pref)
myapp
extends atila_vue
and can use all atila_vue
's services, static files and templates whuch myapp
hasn't.
Overriding
pip3 install tfserver
tfserver has own useful services and you can add your custom services.
import skitai
import tfserver
import myapp
if __name__ == '__main__':
with sktai.oreference () as pref:
pref.overrides (myapp)
skitai.mount ('/', tfserver, pref)
myapp
overrides tfserver
's services, static files and templates if same resource paths.
Conclusion
myapp
, tfserver
and atila_vue
are very same project structure as I wrote above. If your app is useful to the others, please publish to PyPI.
Interval Base App Maintenancing
app.config.MAINTAIN_INTERVAL = 60 # seconds, default is 60
app.store.set ("num-nodes", 0) # thread safe store
@app.maintain (10, threading = False) # execute every 10 maintaining (500 sec.)
def maintain_num_nodes (was, now, count):
...
num_nodes = was.getlu ("cluster.num-nodes")
if app.g ["num-nodes"] != num_nodes:
app.g ["num-nodes"] = num_nodes
app.emit ("cluster:num_nodes", num_nodes)
You can add multiple maintain jobs but maintain function names is SHOULD be unique.
Working With Multiple Apps
Skitai can mount multiple apps including Atila apps. Atila apps sometimes need cumminicate each others.
Event Subscription
# serve.py
import skitai
import myapp
import tfserver
if __name__ == '__main__':
with sktai.oreference () as pref:
skitai.mount ('/tf', tfserver, pref, name = 'tfserver')
with sktai.oreference () as pref:
skitai.mount ('/', myapp, pref, subscribe = ['tfserver'])
myapp
can receive all event from tfserver
.
For example, tfserver
emit tfserver:model-reloaded
.
# wherever in myapp,
def __setup__ (app, mntopt):
@app.on ('tfserver:model-reloaded')
def on_tfserver_model_reloaded (was, model_name):
jobs_as_you_need ()
Data Exchanging
was.g
is multiple apps shared dictionary like object.
# wherever in tfserver app
@route ('/')
def index (was):
was.g ['LOADED_MODELS'] = ['a', 'b']
# wherever in myapp
@route ('/')
def index (was):
return '|'.join (was.g ['LOADED_MODELS'])
Accesing Other App Directly
@route ('/')
def index (was):
other_app = was.apps ['tfserver']
Very First Argument 'was'
What is was
as first argument of mostly hooks, event_handlers and services?
was
contains request
and response
related objects and services.
It is created per worker threads and never destoyed in normal sutuation. Sometimes clones will be created, cached and destroyed by need-to base.
Actually, it is just bunch of short-cuts to use request processing resources.
In this chapter, we just need to know briefly and partially.
Request
- was.request: current request object
- was.app: current app which process request
- was.g: multiple apps shared dictionary like object
Responses
- was.render ()
- was.API ()
- was.Error ()
- and lost of responses
Coroutines
- was.Task
- was.Tasks ()
And several helper methods.
Processing Request
Request Life Cycle Hooks
- before_request
- finish_request
- failed_request
- teardown_request
def __setup__ (app, mntopt):
@app.on ('before_mount')
def on_before_mount (was):
app.g ['TOTAL_REQUEST'] = 0
app.g ['FAILED_REQUEST'] = 0
app.g ['SUCCES_REQUEST'] = 0
@app.before_request
def before_request (was):
app.g ['TOTAL_REQUEST'] += 1
request.g ['RESOURCE'] = resource.open ()
@app.finish_request
def finish_request (was, response): # response
app.g ['SUCCES_REQUEST'] += 1
# return new response or None
@app.failed_request
def failed_request (was, expt): # expt is sys.exc_info ()
app.g ['FAILED_REQUEST'] += 1
# called wahterver success or fail
# clear resources
@app.teardown_request
def teardown_request (was): # no args
request.g ['RESOURCE'] = resource.close ()
Composit your own request/response processors.
class JWTTokenChecker:
def __call__ (self, was):
if something_is_wrong ():
raise was.Error ("400 Bad Request")
def __setup__ (app, mntopt):
PREPROCESSORS = [
JWTTokenChecker (),
PermissionHandler ()
]
@app.before_request
def before_request (was):
for p in PREPROCESSORS:
response = p (was)
if response:
return response
@app.finish_request
def finish_request (was, response):
was.response.add_header ('X-Custom', 'some-value')
return response + "--"
Request Life Cycle Events
- before_request
- finish_request
- failed_request
- teardown_request
def __setup__ (app, mntopt):
@app.on ('before_request')
def on_before_request (was):
...
@app.on ('finish_request')
def on_finish_request (was, response):
...
@app.on ('failed_request')
def on_failed_request (was, expt):
...
@app.on ('teardown_request')
def on_teardown_request (was):
...
Hooks can return content or error, but events cannot because they are async. If event handling is failed, just logged traceback information.
App & Request Gloabal
- app.g
- was.request.g
@app.before_request
def before_request (was):
app.g ['TOTAL_REQUEST'] = 0
was.request.g ['START'] = time.time ()
@app.teardown_request
def teardown_request (was):
app.g ['TOTAL_REQUEST'] += 1
was.log ('processing time: {:.5f} seconds.'.format (
time.time () - was.request.g ['START']
))
Request Object
was.request
Basic Members
- was.request.method # upper case GET, POST, ...
- was.request.uri
- was.request.version # HTTP Version, 1.0, 1.1, 2.0, 3.0
- was.request.scheme # http or https
- was.request.ARGS
- was.request.JWT
- was.request.env
- was.request.g
- was.request.cookie
- was.request.session
- was.request.mbox
Basic Methods
- was.request.split_uri () # (script, param, querystring, fragment)
- was.request.get_header ("content-type") # case insensitive
- was.request.get_headers () # retrun header all list
- was.request.get_body ()
- was.request.get_remote_addr ()
- was.request.get_user_agent ()
- was.request.get_content_type ()
- was.request.get_main_type ()
- was.request.get_sub_type ()
Route Options
- was.request.routed # routed function
- was.request.routable # {'methods': ["POST", "OPTIONS"], 'content_types': ["text/xml"], 'options': {...}, 'mntopt': {...}}
Environment Variables
was.request.env (alias: was.env)
was.env.get ('HTTP_USER_AGENT', 'Unknown Agent')
Routing
@app.route ("/hello")
def hello_world (was):
return was.render ("hello.j2")
Give sime restirction.
@app.route ("/hello", methods = ["GET"]) #if not match response 405
@app.route ("/hello", content_types = ["application/json"]) #if not match response 406
Request Parameters
@app.route ("/hello", methods = ["POST", "OPTIONS"])
def hello_world (was, name, birth_year, gender = None):
return was.render ("hello.j2", name = name, birth_year = birth_year, gender = gender)
@app.route ("/hello", methods = ["POST", "OPTIONS"])
def hello_world (was, name, **DATA):
return was.render ("hello.j2", name = name, DATA = DATA)
@app.route ("/profiles/<int:id>", methods = ["GET"])
def hello_world (was, id, detail = 'yes'):
return was.render ("profile.j2", id = id, detail = detail)
It is valid:
- /profiles/123?detail=no
- /profiles/me?detail=no
- /profiles/notme?detail=no
- /profiles/new?detail=no
@app.route ("/profiles/<int:id>", methods = ["GET"])
def hello_world (was, id = None, detail = 'yes'):
return was.render ("profile.j2", id = id, detail = detail)
It is valid additionaly:
- /profiles?detail=no
Parameter and Validation
@app.route ("/profiles/<int:id>", methods = ["GET"])
@app_inspect (ints = ['id'], booleans = ['detail'])
def hello_world (was, id = None, detail = False):
return was.render ("profile.j2", id = id, detail = detail)
# MUST use with keyword argument
@app.inspect (
ints = None, floats = None,
strings = None, booleans = None,
emails = None, uuids = None, nones = None, lists = None,
dicts = None, oneof = None, manyof = None,
notags = None, safes = None,
**kargs
)
Examples:
- strings = ['name', 'gender']
- notags = ['resume'] # not allow html tags
- safes = ['resume'] # not allow <script> or javascript things
- nones = ['gender', 'birth_year'] if value is blank or zero like, make to None
- booleans = ['detail'] # valid if True, False, true, false, yes, no, y, n, true, false, t and f
- oneof = ['email', 'mobile']
Keyword Argument Examples:
- birth_year = int # type
- mobile = re.compile ('[0-9]{3}-[0-9]{3}-[0-9]{4}') # an object has .search () method
- birth_year__between = (1900, dt.now ().year)
- gender__in = ['male', 'female', 'not-determine']
- name__len__lte = 64
- name__len__gt = 3
- name__len__between = (3, 64)
Avaliable double low-dash operator:
- len__
- between
- in
- notin
- eq, exact
- neq
- lte
- lt
- gte
- gt
- contains
- notcontain
- startswith
- notstartwith
- endswith
- notendwith
- regex
File Upload
@app.route ("/upload", methods = ['POST'])
def upload (was, file):
file.save ("/var/tmp/upload", dup = "o")
- file.path: temporary saved file full path
- file.name: original file name posted
- file.size
- file.mimetype
- file.ext: file extension
- file.read ()
- file.move (to)
- file.as_flashfile ()
- file.save (into, name = None, mkdir = False, dup = "u")
- file.remove ()
Streaming Request Data
@app.route ("/bistreaming", methods = ['POST'], input_stream = True)
def streaming (was):
buf = []
while 1:
data = yield was.Input (4096)
if not data:
break
buf.append (data)
return b''.join (buf)
Caution: Be careful to use request streaming. Request streaming need only a few specific conditions.
- Small chunked request data which is intermittent and need long terms connection like receiving GPS coordinate data from client device
- Bidirectional streaming like detectecting silence for 10~30ms segments
of audio data stream. See next
Bidirectional Streaming
topic.
If you just want upload data, just use regular POST upload method. DO NOT use request streaming which may cause event loop blocking and also is very inefficient.
Cookie
was.request.cookie (alias: was.cookie): dicionary like object
if "user_id" not in was.cookie:
was.request.cookie.set ("user_id", "hansroh")
# or
was.request.cookie ["user_id"] = "hansroh"
was.cookie.set (
key, val,
expires = None,
path = None, domain = None,
secure = False, http_only = False
)
'expires' args is seconds to expire.
- if None, this cookie valid until browser closed
- if 0 or 'now', expired immediately
- if 'never', expire date will be set to a hundred years from now
If 'secure' and 'http_only' options are set to True, 'Secure' and 'HttpOnly' parameters will be added to Set-Cookie header.
If 'path' is None, every app's cookie path will be automatically set to their mount point.
Session
was.request.session (alias: was.session): dicionary like object
To enable session for app, random string formatted securekey should be set for encrypt/decrypt session values.
with skitai.preference () as pref:
pref.securekey = "ds8fdsflksdjf9879dsf;?<>Asda"
pref.session_timeout = 1200 # sec
@app.route ("/session")
def hello_world (was, **form):
if "login" not in was.session:
was.session.set ("user_id", form.get ("hansroh"))
# or
was.session ["user_id"] = form.get ("hansroh")
If you set, alter or remove session value, session expiry is automatically extended by app.session_timeout. But just getting value will not be extended. If you extend explicit without altering value, you can use touch() or set_expiry(). session.touch() will extend by app.session_timeout. session.set_expiry (timeout) will extend by timeout value.
Once you set expiry, session auto extenstion will be disabled until expiry time become shoter than new expiry time is calculated by app.session_timeout.
Namespaced Session
@app.route("/")
def index (was):
was.session.mount ("ADMIN", path = '/admin')
was.session.set ("login", True)
was.session.mount () # resore to default session
Additional methods:
- was.session.set_expiry (timeout)
- was.session.touch ()
- was.session.expire ()
- was.session.use_time ()
- was.session.impending (): if session timeout remains 20%
- was.session.mount ( name = None, session_timeout = None, securekey = None, path = None, domain = None, secure = False, http_only = False, extend = True )
Message Box
was.request.mbox (alias: was.mbox)
@app.route ("/msg")
def msg (was):
was.mbox.send ("This is Flash Message", "flash")
was.mbox.send ("This is Alert Message Kept by 60 seconds on every request", "alram", valid = 60)
return was.redirect (was.urlfor (showmsg, "Hans Roh"), status = "302 Object Moved")
@app.route ("/showmsg")
def showmsg (was, name):
return was.render ("msg.htm", name=name)
<ul>
{% for mid, category, created, valid, msg, extra in was.mbox.get ("notice", "news") %}
<li class="{{category}}"> {{ msg }}</li>
{% endfor %}
</ul>
- was.mbox.send (msg, category, valid_seconds, **extra_dict)
- was.mbox.get () return [(message_id, category, created_time, valid_seconds, msg, extra_dict)]
- was.mbox.get (category) filtered by category
- was.mbox.get (key, val) filtered by extra_dict key and value pair
- was.mbox.search (key, val): find in extra_dict. if val is not given or given None, compare with category name. return [message_id, ...]
- was.mbox.remove (message_id)
Piping
was.pipe () can call function by resource names. This make call nested function within __mount__ (app) in another module.
# services/__init__.py
def __mount__ (app, mntopt):
@app.route ("/1")
@app.inspect (offset = int)
def index (was, offset = 1):
return was.API (result = offset)
@app.route ("/2")
def index2 (was):
return was.pipe (index)
@app.route ("/3")
def index3 (was):
return was.pipe (index, offset = 4)
@app.route ("/4")
def index4 (was):
return was.pipe ('index', offset = 't')
@app.route ("/5")
def index4 (was):
return was.pipe ('sub.pages.toc', chapter = 5)
# services/sub/pages.py
def __mount__ (app, mntopt):
@app.route ("/pages/toc/<int:chapter>")
def toc (was, chapter):
return "Page tOC Chapter {}".format (chapter)
Making URL
was.static ('img/logo.png')
was.urlfor ("index")
was.urlfor ("sub.pages.toc", 1)
was.urlfor ("sub.pages.toc", chapter = 1)
was.urlfor ("/index.htm") # hard coded absolute URL, MUST start with /
Helpers
Testing
def is_superuser (was):
if was.user.name not in ('admin', 'root'):
reutrn was.response ("403 Permission Denied")
@app.testpass_required (is_superuser)
def modify_profile (was):
...
Authorization & Authentification
Let's assume you manage permission by user levels: admin, staff, owner and user.
class User:
def __init__ (self, uid, lev, nick_name):
self.uid = uid
self.lev = lev
self.nick_name = nick_name
self.tuid = None
def __str__ (self):
return self.uid
@app.permission_handler
def permission_handler (was, perms):
if was.request.get_header ('authorization'):
claims = was.dejwt ()
if "err" in claims:
raise was.Error ("401 Unauthorized", claims ["err"])
was.request.user = User (claims ['uid'], claims ['lev'], claims ['nick_name'])
if 'uid' in was.request.PARAMS:
tuid = was.request.PARAMS ['uid']
if 'owner' in perms and tuid != 'me':
raise was.Error ("403 Permission Denied", "owners only operation")
was.request.user.tuid = (tuid == 'me' and was.request.user.uid or (tuid != 'notme' and tuid or None))
if not perms:
return
if was.request.user.lev == "staff":
return # always vaild
if "staff" in perms:
raise was.Error ("403 Permission Denied")
@app.route ("/animals/<id>")
@app.permission_required ([], id = ["staff"])
def animals (was, id = None):
id = id or was.request.JWT ["userid"]
This resources required any permission for "/animals/" or "/animals/me". But '/animals/100' is required 'staff' permission. It may make permission control more simpler.
Also you can specify premissions per request methods.
@app.route ("/animals/<id>", methods = ["POST", "DELETE"])
@app.permission_required (['user'], id = ["staff"], DELETE = ["admin"])
def animals (was, id = None):
id = id or was.request.JWT ["userid"]
This resources required 'user' permission for "/animals/" or "/animals/me". '/animals/100' is required 'staff' permission. It may make permission control more simpler.
Conditional Prework
def reload_config (was, path):
...
@app.if_file_modified ('/opt/myapp/config', reload_config, interval = 1)
def index (was):
return was.render ('index.html')
def broadcast (was, name):
...
@app.if_updated ('STATE', broadcast, interval = 1)
def index (was):
return was.render ('index.html')
Run Pre/Postworks Chain
You can make automation for preworks and postworks.
def pre1 (was):
...
def pre2 (was):
...
def post1 (was):
...
@app.route ('/')
@app.run_before (pre1, pre2)
@app.run_after (post1)
def index (was):
return was.render ('index.html')
- @app.run_before can return None or responsable contents for aborting all next run_before and main request.
- @app.run_after return will be ignored
CORS (Cross Origin Resource Sharing) and Preflight
with skitai.preference () as pref:
pref.access_control_allow_origin = ["*"]
# OR
pref.access_control_allow_origin = ["http://www.skitai.com:5001"]
pref.access_control_max_age = 3600
If you want function specific CORS,
@app.route (
"/post", methods = ["POST", "OPTIONS"],
access_control_allow_origin = ["*"], access_control_max_age = 3600
)
def post (was):
return 'hello'
Using Task
Task is a part of request processing and also related with generating response. So you should look into this first.
Resource Aliasing
with skitai.preference () as pref:
skitai.alias ("@mypg", skitai.DB_PGSQL, ["user:pass@localhost/mydb"])
skitai.alias ("@mylite", skitai.DB_SQLITE3, ["./sqlite3.db"])
skitai.alias ("@mymongo", skitai.DB_MONGODB, ["localhost/mycollection"])
skitai.alias ("@myredis", skitai.DB_REDIS, ["localhost/0"])
skitai.alias ('@myapi', skitai.PROTO_HTTPS, ["s1.myserver.com"])
Task
Task is sort of coroutine. Task has methods:
- fetch (): return iterable
- one (): should be single element
- commit (): return None, just wait to complete
HTTP Based Protocols
with was.stub ('@myapi/api') as stub:
task1 = stub.get ('/profiles/{}?limit={}', 100, 10)
payload = dict (name = 'Hans Roh')
task2 = stub.put ('/profiles/{}', 100, payload)
with was.xmlrpc ('@myapi/rpc2') as stub:
task3 = stub.add (156, 952)
task1.fetch ()
task2.fetch ()
task3.fetch ()
pip3 install tfserver
from tfserver import cli
with was.grpc ("@myapi/tensorflow.serving.PredictionService") as stub:
x = get_numpy_data ()
request = cli.build_request ('mymodel', 'predict', x = x)
task = stub.Predict (request, 10.0)
PostgreSQL / SQLite3
# PostgreSQL and SQLite3
with was.db ('@mypg') as db:
task = (db.select ('my_table').
.filter (name = 'Hans')
.with_ ('a', db.insert ('my_table').set (name = 'Hans'))
).execute ()
MongDB
with was.db('@mymongo') as db:
task = db.find ({'city': 'New York'})
Redis
with was.db ("@myredis") as db:
task1 = db.set('foo', 'bar')
task2 = db.get('foo')
Thread / Process / Subrocess
task = was.Thread (my_func, arg1, arg2)
task = was.Process (my_func, arg1, arg2)
task = was.Subprocess ('top -n1', timeout = 3)
Mask
It is fake of Task(s).
You can make it by wrapping was.Mask (data) if you want to use consistant methods as Task.
task = was.Mask (1)
result = task.fetch () # 1
tasks = was.Mask ([1, 2])
result1, result2 = tasks.fetch () # 1, 2
Tasks
with was.db('@mymongo') as db:
task1 = db.find ({'city': 'New York'})
with was.stub ('@myapi/api') as stub:
task2 = stub.get ('/profiles/{}?limit={}', 100, 10)
task3 = was.Process (my_func, arg1, arg2)
tasks = was.Tasks ([task1, task2, task3])
a, b, c = tasks.fetch () # got 3 results
tasks = was.Tasks (
db.insert ('city').set (name = 'Hans').execute (),
files = was.Subprocess ('ls /var/log', filter = lammda x: x.replace (' ', '_')),
result = was.db ('@mydb', filter = hide_password).execute ('select * from user')
)
results = tasks.fetch ()
results ['files']
results ['result']
Generator Based Coroutine
New in version 0.8.6
If you create single task and call fetch () at just next line, It is alomost same as synchronous blocking job.
But by yielding Task
, it becomes fully async task.
Add coroutine = True
parameter to @app.route
to distinguish native generator.
@app.route ("/coroutine/2", coroutine = True)
def coroutine (was):
with was.stub ("http://example.com") as stub:
task = yield stub.get ("/")
return task.fetch ()
Note: DO NOT make blocking jobs after first yield. If MUST do, yield was.Thread() or was.Process().
@app.route ("/coroutine", coroutine = True)
def coroutine (was):
def wait_hello (timeout = 1.0):
time.sleep (timeout)
return 'mask'
# do sync tasks in thread
some_sync_task ()
# swith to async event loop in main thread
tasks = yield was.Tasks (
a = was.Mask ("Example Domain"),
b = was.Thread (wait_hello, args = (1.0,))
)
task3 = yield was.Thread (wait_hello, args = (1.0,))
task4 = yield was.Subprocess ('ls')
# finally
return was.API (d = task4.fetch (), c = task3.fetch (), **tasks.fetch ())
Why does not use await? To use await
also use async def
. But Atila doesn't want to disturb legacy sync tasks. So every requests will be passed over to thread pool initially and you can decide to keep sync manner or switch to async.
Map / Load-Balancing
Set multiple members with weight.
with skitai.preference () as pref:
members = ["user:pass@localhost/mydb 20", "user:pass@remote.com/mydb 10"]
skitai.alias ("@mypg", skitai.DB_PGSQL, members)
There're 2 more methods for request status check.
- dispatch ()
- wait ()
# map-merging
with was.stub.map ('@myapi') as stub:
task = stub.get ('@myapi/profiles/{}?limit={}', 100, 10)
all_results = []
for resp in task.dispatch ():
if resp.status != skitai.STA_NORMAL:
continue
all_results.extend (resp.data)
response.status
:
- STA_UNSENT
- STA_REQFAIL
- STA_TIMEOUT
- STA_NETERR
- STA_NORMAL
# load-balancing is the very same as single call
with was.stub.lb ('@myapi') as stub:
task = stub.get ('@myapi/profiles/{}?limit={}', 100, 10)
task.fetch ()
Cache Control
# myapp/__init__.py
import skitai
def __config__ (pref):
skitai.register_g ('tables.users', 'table.photos')
@app.route ("/update")
def update (was):
# update tables.users modification time
was.db ('@mydb', rm_cache = 'tables.users').execute (...)
@app.route ("/query")
def query (was):
# use cache if tables.users modification time is older
was.db ('@mydb', use_cache = 'tables.users').execute (...)
@app.route ("/query2")
def query2 (was):
# check all times are older
was.db ('@mydb', use_cache = ['tables.users', 'table.photos']).execute (...)
This uses skitai.g object which is shared by all Skitai multiprocessing workers. So a worker update table, also all others can know that.
Also if rm_cache is used, it will emit events. You can handle event if you need. But this event system. But please note that this event notified within its worker process.
@app.route ("/update")
def update (was):
was.db ('@mydb').execute (...)
was.setlu ('tables.users', something...)
@app.on ("setlu:tables.users")
def table_updated (was, *args, **kargs):
# your code
Adding Custom Database Interface For Atila Coroutine
You can override existing classes - RDBMS, NoSQL (Redis, MongoDB) styles.
from aquests.dbi import asynredis
class MyDBI (asynredis.AsynConnect):
def __init__ (self):
...
DB_MYDBI = '*mydbi'
skitai.add_database_interface (DB_MYDBI, MyDBI)
skitai.alias ('@mydbi', DB_MYDBI, 'localhost:9000')
Making Response
was.response
was.response.set_status ("201 Created")
# add header even name is duplicated
was.response.add_header ('X-Server', "skitai")
# if exists old, remote it and update new
was.response.set_header ('X-Server', "skitai")
was.response.get_header ('Accept', '')
was.response.del_header ('Accept')
# current response code and msg
was.response.status_code
was.response.reason
Cache Control
# etag based cache control
# make sure CACHE_KEY is unique identifier for this resource
was.response.set_etag ('CACHE_KEY', max_age = 3600)
# content modification time based cache control
was.response.set_mtime (time.time (), max_age = 3600)
# set cache header as HTTP versions
was.response.set_max_age (3600)
HTTP Error
raise was.Error ("400 Bad Request")
raise was.Error ("400 Bad Request", "missing user ID")
raise was.Error ("400 Bad Request", "missing user ID", 40011)
Custom Error Handlers
Atila response as Accept
header, this is an example for HTML services.
<h1>{{ error.code }} {{ error.message }}</h1>
<p>{{ error.detail }}</p>
<hr>
<div>URL: {{ error.url }}</div>
<div>Time: {{ error.time }}</div>
@app.default_error_handler
def default_error_handler (was, error):
return was.render ("errors/default.j2", error = error)
@app.error_handler (404)
def not_found (was, error):
return was.render ('404.j2', error = error)
Primitive
String
return "hello"
was.response.set_status ('200 OK')
was.response.set_header ('X-Server', 'Skitai')
return "hello"
API Response
JSON or XML response as request's Accept
header.
return was.API ({'name': Hans Roh})
return was.API (name = Hans Roh)
Rendered Template
return was.render ("hello.j2", name = 'Hans Roh')
<!-- hello.j2 -->
<h1>Hello {{ context.name }}</h1>
render_or_API
Response by Accept
header,
return was.render_or_API ("index.html", result = result)
File
return was.File ("/path/to/data.xlsx")
return was.File ("/path/to/data.xlsx", 'application/octet-stream', 'data.xlsx')
Static
# by static file URL
# resolved into was.static (""img/logo.png")
return was.Static ("img/logo.png")
@app.route ("/robots.txt")
def robots (was):
if app.debug:
was.response ['Content-Type'] = 'text/plain'
return "User-Agent: *\nDisallow: /\n"
return was.Static ('/robots.real.txt')
Generator
def build_csv (was):
def generate():
for row in iter_all_rows():
yield ','.join(row) + '\n'
return was.response ("200 OK", generate (), headers = [("Content-Type", "text/csv")])
Redirecting
return was.redirect (
was.urlfor ("showmsg", "Hans Roh"),
status = "302 Object Moved"
)
return was.redirect (
was.static ("index.html"),
status = "302 Object Moved"
)
RPC Response
By request's content type, Atila suppert XMLRPC, JSONRPC, GRPC.
# for jsonrpclib
pip3 install jsonrpclib-pelix
# XML/JSONRPC
@app.route ('/add')
def add (was, a, b):
return a + b # int is serailizable
# GRPC
@app.route ("/GetFeature")
def GetFeature (was, point):
feature = get_feature(db, point)
if feature is None:
return route_guide_pb2.Feature(name="", location=point)
else:
return feature
You nothing to do for this, just return type is serailizable by each RPC protocol.
ProxyPass
@app.route ("/<path:path>")
def proxy (was, path = None):
return was.ProxyPass ("@myupstream", path)
ThreadPass
@app.route ("/thread_future", methods = ['GET'])
def thread_future_respond (was):
def respond (was, a):
a = some_synchronous_task ()
# you can make corequest
was.db ('@mydb').execute (...).commit ()
return was.API (
result = a
)
return was.ThreadPass (respond, args = (4.0,))
Threaded Data Streaming
class Producer:
def get (self, n):
return [random.randrange (1000) for _ in range (n)]
def producing (producer):
def produce (queue):
while 1:
items = producer.get (100)
if not items:
queue.put (None) # end of stream
break
queue.put (str (items))
return produce
@app.route ("/threaproducer")
def threaproducer (was):
return was.Queue (producing (Producer ()))
Coroutine
Task / Tasks
If you need post processing data,
def repond (was, task):
return was.API (result = task.fetch (), a = task.meta.get ('a'))
@app.route ('...')
def foo ():
task = was.db ("@sqlite3").execute ("select * from test")
return task.then (respond)
@app.route ('...')
def foo ():
return was.Process (math.sqrt, args = (4.0,), meta = {'a': 1}).then (respond)
@app.route ('...')
def foo ():
def sqrt (a):
return math.sqrt (a)
return was.Thread (sqrt, args = (4.0,)).then (respond)
Map
No need post processing,
@app.route ('/datalist')
def datalist (was):
task = was.db ("@sqlite3").execute ("select * from test")
return was.API (result = task.fetch ())
# same result but transit to async
# return thread to pool early
@app.route ('/datalist')
def datalist (was):
task = was.db ("@sqlite3").execute ("select * from test")
return was.Map (result = task)
@app.route ("/bench/sp", methods = ['GET'])
def bench_sp (was):
with was.db ('@mydb') as db:
root = (db.select ("foo")
.order_by ("-created_at")
.limit (10)
.filter (Q (from_wallet_id = 8) | Q (detail = 'ReturnTx')))
return was.Map (
was.Thread (time.sleep, args = (0.3,)), # no need map
files = was.Subprocess ('ls /var/log'),
result = root.clone ().execute (),
record_count__one = root.clone ().aggregate ('count (id) as cnt').execute ()
)
# JSON response, 1st args had been executed but ignored in results because no map name
# >> { result: [...], record_count: {cnt: 123}, ls: 'syslog ...' }
But simple processing is possible with filter.
def hide_password (rows):
for row in rows:
row.password = '****'
return rows
return was.Map (
files = was.Subprocess ('ls /var/log', filter = lammda x: x.replace (' ', '_')),
result = was.db ('@mydb', filter = hide_password).execute ('select * from user')
)
By Accept
header,
@app.route ('/')
def index (was, error):
return was.render_or_Map ("index.j2", result = db.execute ('...'))
Mapped
tasks = was.Tasks (
files = was.Subprocess ('ls /var/log', filter = lammda x: x.replace (' ', '_')),
result = was.db ('@mydb', filter = hide_password).execute ('select * from user')
)
return was.Mapped (tasks)
By Accept
header,
@app.route ('/')
def index (was, error):
tasks = was.Taks (result = db.execute ('...'))
return was.render_or_Mapped ("index.j2", tasks)
Couroutine Streaming
@app.route ("/download_csv", coroutine = True)
def download_csv (was):
yield "ID, NAME\n"
current_id = 0
while 1:
task = yield (was.db ('@mydb').select ('tble')
.get ('id, name')
.filter (id__gt = current_id).
limit (fetch_count)).execute ()
rows = task.fetch ()
if not rows:
break
current_id += fetch_count
yield '\n'.join (['{}, "{}"'.format (row.id, row.name) for row in rows])
Bidirectional Streaming
@app.route ("/bistreaming", methods = ['POST'], coroutine = True, input_stream = True)
def coroutine_streaming (was):
while 1:
data = yield was.Input (16184)
if not data:
break
yield b':' + data
Helpers of 'was'
Logging and Traceback
@app.route ("/")
def sum ():
was.log ("called index", "info")
try:
...
except:
was.log ("exception occured", "error")
was.traceback ()
was.log ("done index", "info")
- was.log (msg, category = "info")
- was.traceback (id = "") # id is used as fast searching log line for debug, if not given, id will be Global transaction ID/Local transaction ID
Cross Site Request Forgery Token (CSRF Token)
New in skitai version 0.26.16
At template, insert CSRF Token,
<form>
{{ was.csrf_token_input }}
...
</form>
then verify token like this,
@app.before_request
def before_request (was):
if was.request.args.get ("username"):
if not was.verify_csrf ():
return was.response ("400 Bad Request")
Or use decorator,
@app.csrf_verification_required
def before_request (was):
...
JWT Token
@app.route ('/make_jwt')
def make_jwt (was)
t = was.encode_jwt ({'iss': 'example.com', 'exp': time.time () + 3600})
@app.route ('/verify_token')
def verify_jwt (was)
calims = was.request.JWT
if "err" in claims:
return claims ["err"]
One-Time Password
New in skitai version 0.35.0
def check_otp (was):
if not was.verify_otp (was.request.get_header ('x-otp')):
raise was.Error ('403 Unauthorized')
@app.route ('/admin-task')
@app.testpass_required (check_otp)
def task (was)
...
At your client,
from atila.was import generate_otp
generate_otp (secret_key)
One-Time Token
New in skitai version 0.26.17
For creatiing onetime link url, you can convert your data to signatured token string.
Note: Like JWT token, this token contains data and decode easily, then you should not contain important information like password or PIN. This token just make sure contained data is not altered by comparing signature which is generated with your app scret key.
@app.route ('/password-reset')
def password_reset (was)
if was.request.args ('username'):
username = "hans"
token = was.encode_ott (username, 3600, "pwrset") # valid within 1 hour
pw_reset_url = was.urlfor ('reset_password', token)
# send email
return was.render ('done.html')
if was.request.args ('token'):
username = was.decode_ott (was.request.args ['token'])
if not username:
return was.response ('400 Bad Request')
# processing password reset
...
If you want to expire token explicit, add session token key
# valid within 1 hour and create session token named '_reset_token'
token = was.encode_ott ("hans", 3600, 'rset')
>> kO6EYlNE2QLNnospJ+jjOMJjzbw?fXEAKFgGAAAAb2JqZWN0...
username = was.decode_ott (token)
>> "hans"
# if processing is done and for revoke token,
was.revoke_ott (token)
Websocket
@app.websocket (spec, timeout = 60, onopen = None, onclose = None)
WS_COROUTINE
New in version 0.8.8
@app.route ("/echo_coroutine")
@app.websocket (skitai.WS_COROUTINE, 60)
def echo_coroutine (was):
while 1:
msg = yield was.Input ()
if not msg:
break
with was.stub ('http://example.com') as stub:
task = yield stub.get ("/")
yield task.fetch ()
WS_CHANNEL
- simple request and response way like AJAX
- with WS_THREAD, WS_SESSION, WS_NOTHREAD, WS_NOTHREAD options
WS_THREAD
- default, function base websocket message handling
- it treats every single websocket message as single request to resources like url requests.
- on receiving message from client, it will call function for handling with queue and thread pool, it is basically same as request resource
WS_THREADSAFE
New in version Skitai 0.26
- Mostly same as WS_THREAD
- Message sending is thread safe
- Most case you needn't this option, but you create yourself one or more threads using websocket.send () method you need this for your convinience
def onopen (was):
was.request.session.set ("WS_ID", was.websocket.client_id)
print ('websocket opened with', was.request.ARGS ["options"])
def onclose (was):
was.request.session.remove ("WS_ID")
@app.route ("/websocket")
@app.websocket (skitai.WS_CHANNEL | skitai.WS_THREAD, 1200, onopen, onclose)
def websocket (was, message, **options):
app.websocket_send (
was.request.session.get ("WS_ID"),
"Item In Stock!"
)
return 'you said: ' + message + str (options)
To push message to specific client.
@app.route ("/item_in_stock")
def item_in_stock (was):
app.websocket_send (
was.request.session.get ("WS_ID"),
"Item In Stock!"
)
Note:: I'm not sure it is works in all web browser.
WS_NOTHREAD
- non-threaded function call base websocket message handling
- it is faster than WS_THREAD
WS_NOTHREAD does not use queue or thread pool. In this case, response is more faster but if response includes IO blocking operation, entire Skitai event loop will be blocked.
@app.route ("/websocket")
@app.websocket (skitai.WS_CHANNEL | skitai.WS_NOTHREAD, 60, onopen)
def websocket (was, message):
return 'you said: ' + message
Test Client
Excellent tests can only make good app. Skitai provide test_client which can test integrating and E2E tests.
See Test Client
chapter in Skitai App Engine.
Your Life Is So Precious
Integrating Django Models and Administrative Views
# atila_vue/entities/firebase_vue/models.py
from django.db import models
from django.contrib.postgres.fields import JSONField
from atila.patches.djangopatch import Model
class User (Model):
uid = models.CharField (max_length = 32)
email = models.EmailField (max_length = 64, null = True, blank = True)
nick_name = models.CharField (max_length = 16, null = True, blank = True)
lev = models.CharField (max_length = 16, default = 'user', choices = [('guest', 'guest'), ('user', 'user'), ('staff', 'staff')])
status = models.CharField (max_length = 16, null = True, blank = True)
created = models.DateTimeField (auto_now_add = True)
last_updated = models.DateTimeField (auto_now = True)
class Meta:
proxy = False
managed = True
db_table = 'firebase_user'
verbose_name = 'User'
verbose_name_plural = 'Users'
def __str__ (self):
return '{}'.format (self.nick_name)
# atila_vue/services/firebase/services.py
from firebase_vue.models import User, UserLog
class UserService:
@classmethod
def _get_id (cls, uid):
return User.get (uid = uid).get ('id')
# basic ops ------------------------------
@classmethod
def get (cls, uid = None, nick_name = None):
assert uid or nick_name, 'uid or nick_name required'
return User.get (uid = uid, nick_name = nick_name).execute ()
@classmethod
def add (cls, uid, payload):
payload ['uid'] = uid
return User.add (payload).returning ("*").execute ()
@classmethod
def set (cls, uid, payload):
return User.set (payload, uid = uid).returning ("*").execute ()
@classmethod
def delete (cls, uid):
return (User.remove (uid = uid)
.with_ (UserLog.remove (user_id = cls._get_id (uid)))
).execute ()
All queries works as Atila native coroutine. More importantly, you can use Django migration, field validation and even Django adminsrative views by mounting Django app into '/admin'.
This is just core code lines. For more information, see my atila-vue.
Integrating pytest and API Documentation
python3 serve.py --devel
# tests/conftest.py
from atila.pytest_hooks import * # HERE
import skitai
@pytest.fixture
def launch ():
return partial (skitai.test_client, port = 30371, silent = False)
@pytest.fixture (scope = "module")
def client ():
_client = skitai.test_client (port = 5000)
yield _client
_client.stop ()
# tests/test_app.py
def test_app (client):
resp = client.get ('/apis/tickers')
assert resp.status_code == 200
def test_app (launch):
# launch test server on 30371 with --devel option
with launch ('../serve.py', devel = True) as client:
resp = client.get ('/apis/tickers')
assert resp.status_code == 200
cd tests && pytest --docs # HERE
As a result, your test with parameters, data and errors will be logged and saved.
If your test finished successfully, docs/api/index.md
will be created.
## Table of Content
1. [/apis/tickers/<int:prodict_code>](#tproduct_tickers) [**`GET`**](#get-apistickers)
## product.tickers
**URL Parameters**: `product_code`
**Authorization Required**: NO
#### `GET` `/apis/tickers`
**Success Response Example**
> **_URL_**: `/apis/tickers/123`
> **_Response_**: `200 OK`
> **_Response Data_** (application/json)
'''json
{
"instock": 2
}
'''
If you seperate your tests sub directories like 'account', 'products',...
pytest account --docs
docs/api/account.md
will be created.
I am happy, frontenders happy, everybody happy.
Note: It will works only if pytest is run on same machine - request IP must be 127.0.0.1.
Change Log
-
0.11 (May, 2021)
- update README
- event system refactoring
-
0.9 (Apr, 2021)
- deprecate app.test_client (), use skitai.test_client ()
- add app.run ()
- change app initializing hook name from __setup__ () to __config__ ()
- add app.unroute ()
- add app.extends ()
- add app.overrides ()
- multiple life-cycle hooks
-
0.8 (Feb, 2020)
- add request stream feature, @app.route (..., input_stream = True)
- add was.Queue
- add atila.service
- fix was.pipe ()
- add generator based coroutine and @app.coroutine
- was.g and app.r has benn deprecated, use app.g and app.r
- add app.r for current request context, was.g is changed to global app context with thread-safe
- add was.pipe ()
- @app.inspect can inspect JSON data
- @app.maintain can have threading option
- remove 400 Not My Fault with assert
- rename from @app.require to @app.inspect but still valid
- @app.maintain can have interval parameter
- add was.Media () and was.Mounted ()
- add was.render_or_Map ()
- add was.Map () and was.ThreadPass ()
- add was.static and was.media
- both __setup__ and __mount__ can have mntopt argument optionally
- add was.Static ()
- add 400 Not My Fault with assert
- add notags and safes arguments to @app.require
- now, csrf token uses cookie not session and kept with browser
- add remove_csrf ()
- fix corequest cache sync
- update, config.MINIFY_HTML = None (default) | 'strip' | 'minify'
- add @app.csrf_verification_required
- add '@app.clarify_permission' and '@app.clarify_login' decorators
- add __setup__ hook for service packages.__init__.py
-
0.7 (Dec, 2019)
- fix type routing
- change URL build alias from was.urlspec ()
- change URL build alias from was.ab () to was.urlfor ()
- add alias was.urlpatch () for was.partial () for clarity
- add session.impending () and session.use_time ()
- change default options for Jinja2
- change session key name
- fix session expireation
- add extend param to session.mount ()
- add was.render_or_API ()
- add was.request.acceptables and was.request.acceptable (media)
- fix @app.fix testpass_required when reloading
- change session.mount spec
- fix multiple mount bug related
enable_namespace
- fix websocket bug related
enable_namespace
app.auto_mount
was deprecated- default value of
app.enable_namespace
has been from False to True. ACTION REQUIRED, lower version incompatible
-
0.6 (Oct, 2019)
- fix query string exception handling
- readd Chameleon template engine chapter to README
- test on PyPy
-
0.5 (Sep, 2019)
- add app example
- update requirements
-
0.4 (Aug, 2019)
- now, modules within __mount__ are reloadable
- deprecated @app.test_params, use @app.require
- deprecated was.Future and was.Futures, it doesn't need.
-
0.3 (Mar 13, 2019)
- remove proxing django route
- remove login service with django
- remove django model signal redirecting
- add @app.require
- change mount handler: def mount (app) => def mount (app) but lower version compatible
- make available @app.route ("")
- add was.ProxyPass (alias, path, timeout = 3)
- add special pre-defined URL parameter value: me, notme, new
- add parameter validation, now response code 400, if validatiion if failed
- fix implicit routing
- add conditional permission control
-
0.2 (Feb 18, 2019)
- fix implicit routing for root
- remove jinja2 from requirements
- add app.websocket_send ()
- fix Futures respinse bugs
- add was.API (), was.Fault (), was.File and was.Futures ()
-
0.1 (Jan 17, 2019)
- was.promise () has been deprecated, use was.Futures ()
- add interval based maintain jobs executor
- change name from app.storage to app.store
- add default_bearer_handler
- fix routing bugs related fancy URL
- add was.request.URL, DEFAULT, FORM (former was.request.form ()), JSON (former was.request.json ()), DATA (former was.request.data), ARGS (former was.request.args)
- add @app.test_param (required = None, ints = None, floats = None)
- project has been seperated from skitai and rename from saddle to atila, because saddle project is already exist on PYPI
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.