Atila Framework
Project description
Atila
Atila is simple and minimal framework integrated with Skitai App Engine. It is the easiest way to make backend services.
# serve.py
from atila import Atila
app = Atila (__name__)
@app.route ("/")
def index (was):
return "Hello, World"
if __mame__ == "__main__":
import skitai
skitai.mount ("/", app)
skitai.run (port = 5000)
And run,
python3 serve.py
Important Notice
CAUTION: Atila is base on WSGI but can be run only with Skitai App Engine.
This means if you make your Atila app, you have no choice but Skitai as WSGI app server. And Atila's unique and unconventional style may become very hard work to port to other framework.
Table of Content
- Atila
- Important Notice
- Unconventional?
- Installation
- First App
- Project Structure
- Launch Script
- Create App
- Templates
- Cascaded Service Mount
- Seperate '/sub' as sub module.
- Add new '/sub2' as sub package
- Other Hooks
- More About Preference
- Conclusion
- App Overview
- Event Bus
- App Life Cycle Events
- Extending and Overriding
- Extending
- Overriding
- Conclusion
- Interval Base App Maintenancing
- Working With Multiple Apps
- Event Subscription
- Data Exchanging
- Accesing Other App Directly
- Very First Argument 'was'
- Request
- Responses
- Coroutines
- Event Bus
- Access Control and Authentication
- CORS (Cross Origin Resource Sharing) and Preflight
- Custom Authentication
- WWW-Authentication
- Authentication On Specific Methods
- Password Provider
- Authentication On Entire App
- Bearer Authentication
- Processing Request
- Request Life Cycle Hooks
- Request Life Cycle Events
- App & Request Gloabal
- Request Object
- Basic Members
- Basic Methods
- Route Options
- Environment Variables
- Routing
- Request Parameters
- In The Template Engine
- Parameter and Validation
- File Upload
- Streaming Request Data
- Cookie
- Session
- Namespaced Session
- Message Box
- Piping
- Making URL
- Helpers
- Testing
- Conditional Prework
- Run Pre/Postworks Chain
- Using Task
- Resource Aliasing
- Task
- HTTP Based Protocols
- PostgreSQL / SQLite3
- MongDB
- Redis
- Thread / Process / Subrocess
- Mask
- Tasks
- Generator Based Coroutine
- Map / Load-Balancing
- Cache Control
- Adding Custom Database Interface For Atila Coroutine
- Making Response
- Cache Control
- HTTP Error
- Primitive
- String
- API Response
- Rendered Template
- render_or_API
- File
- Static
- Generator
- Redirecting
- RPC Response
- ProxyPass
- ThreadPass
- Threaded Data Streaming
- Coroutine
- Task / Tasks
- Map
- Mapped
- Couroutine Streaming
- Bidirectional Streaming
- Template Engine
- Customizing Jinja2
- Using Task In Template
- Custom Error Templates
- Helpers of 'was'
- Logging and Traceback
- Cross Site Request Forgery Token (CSRF Token)
- JWT Token
- One-Time Password
- One-Time Token
- Websocket
- WS_COROUTINE
- WS_CHANNEL
- WS_THREAD
- WS_THREADSAFE
- WS_NOTHREAD
- Test Client
- Your Life Is So Precious
- Deployment
- Integrating Django Models and Administrative Views
- Integrating pytest and API Documentation
- Change Log
Unconventional?
Atila is based on WSGI specification but take some advantage of asynchronous features which are provided from Skitai.
Atila is mostly same as other WSGI containers for functional and API aspect.
Otherwise Atila has asynchronous features but it does not use
async/await
conventions. it sometime use generator base coroutine
with yeild
.
Atila treats async jobs like these as a Task
.
- HTTP Request
- Database Query
- Thread
- Process
- Subprocess
Task has 3 major common methods: fetch(), one(), commit(). Multiple tasks are
bind into Tasks
. Tasks also have the same 3 methods.
Tasks are passed over to main event loop of main thread from current request thread. If all tasks finished, Tasks will be returned to request thread. Also by yielding, Tasks can be completely transitted into non blocking manner.
I'm not telling Atila is more better design. I'm just telling Atila is obviously NOT main-stream and I enjoy this project.
Installation
Requirements
Python 3.6+
Installation
Atila and other core base dependent libraries is developing on single milestone, install/upgrade all at once. Otherwise it is highly possible to meet some errors.
With pip
pip3 install -U atila skitai rs4 aquests
With git
pip3 install -U skitai rs4 aquests sqlphile
git clone https://gitlab.com/hansroh/atila.git
cd atila
pip3 install -e .
Optional required as you need,
pip3 install redis
pip3 install pymongo
pip3 install psycopg2-binary
pip3 install protobuf # for GRPC
pip3 install jsonrpclib-pelix
pip3 install jinja2
First App
Project Structure
The very formal but minimal project structure for myapp
.
myapp/
static/
templates/
__init__.py
tests
serve.py
README.md
Launch Script
# serve.py
import skitai
import myapp
if __name__ == '__main__':
with skitai.preference () as pref:
skitai.mount ('/', myapp, pref)
skitai.run (port = 5000, name = 'mtapp')
pref is runtime app preference. It will be overwrite its attributes to app on mounted. And have some methods like set_static().
Create App
# myapp/__init__.py
import skitai
from atila import Atila
def __config__ (pref):
# mount / to myapp/static and app.STATIC_URL will be set to '/'
# skitai.abspath () will join path with location of serve.py
# DO NOT use os.path.abspath (__file__), it is not reliable
pref.set_static ('/', skitai.abspath ('myapp/static'))
def __app__ ():
return Atila (__name__)
def __mount__ (app, mntopt):
@app.route ('/')
def index (was):
return 'Index Page'
@app.route ('/sub')
def sub (was):
return 'Sub Page'
Make sure that myapp/static/index.html
doses not exists. If so static file has more high priority.
Then run.
python3 serve.py --devel # --devel swiach mean debug mdde and auto reloading
Please see Skitai App Engine to more detail usage.
Templates
pip3 install jinja2
<!-- myapp/templates/index.j2 -->
<html>
<head><title>My App</title></head>
<body>
<h1>Index Page</h1>
</body>
</html>
def __mount__ (app, mntopt):
@app.route ('/')
def index (was):
return was.render ('index.j2')
Cascaded Service Mount
For larger app, make myapp/services
directory.
myapp/
static/
templates/
services/
__init__.py
# myapp/__init__.py
# remove __mount__ ()
import skitai
from atila import Atila
def __config (pref):
pref.set_static ('/', skitai.abspath ('myapp/static'))
def __app__ ():
return Atila (__name__)
def __setup__ (app, mntopt):
from . import services
app.mount ('/', services)
# NEW myapp/services/__init__.py
def __mount__ (app, mntopt):
@app.route ('/')
def index (was):
return 'Index Page'
@app.route ('/sub')
def sub (was):
return 'Sub Page'
Seperate '/sub' as sub module.
# modify myapp/services/__init__.py
# add __setup__ hook
def __setup__ (app, mntopt):
from . import sub
app.mount ('/sub', sub)
def __mount__ (app, mntopt):
@app.route ('/')
def index (was):
return 'Index Page'
# NEW myapp/services/sub.py
def __mount__ (app, mntopt):
# make sure blank path for accessing '/sub'
# if '/', accessable URL become '/sub/'
@app.route ('')
def sub (was):
return 'Sub Page'
Add new '/sub2' as sub package
Assume /sub2
is more large and complex, and you want to make as sub package.
# modify __setup__ of myapp/services/__init__.py
def __setup__ (app, mntopt):
from . import sub
from . import sub2
app.mount ('/sub', sub)
app.mount ('/sub2', sub2)
# NEW myapp/services/sub2/__init__.py
def __setup__ (app, mntopt):
from . import pages
app.mount ("/sub2/pages", pages)
# NEW myapp/services/sub2/pages.py
def __mount__ (app, mntopt):
@app.route ('') # /sub2/pages
def pages (was):
return 'Pages'
@app.route ('/1') # /sub2/pages/1
def page1 (was):
return 'Page1'
finally, mayapp
structure is:
myapp/
static/
templates/
services/
sub.py
sub2/
__init__.py
pages.py
__init__.py
Other Hooks
Currently only one other hook.
def __umount__ (app): # NO mntopt argument
print ('bye...')
More About Preference
with skitai.preference () as pref:
pref.set_static ('/static', 'myapp/static')
pref.set_media ('/media', '/mnt/data/media')
pref.config.MAX_UPLOAD_SIZE = 256 * 1024 * 1024
pref.config.MAINTAIN_INTERVAL = 60
You can add your options to pref.config
and attibute to pref
and you can sure accessing from app.config
and app
attribute later.
Conclusion
- Mount sub package in __setup__ hook and make services at __mount__.
- DO NOT mess with sub package in __mount__.
- __config__ and __app__ can be in app initializing script ONLY.
App Overview
Event Bus
Atila's event bus is based on event-bus.
# wherever in your app package,
def __setup__ (app, mntopt):
@app.on ('custom-event')
def on_cusom_event (was, detail):
print ('I got custom-event with detail: {}'.format (detail))
# wherever in your app package,
def __mount__ (app, mntopt):
@app.route ('/make_event')
def make_event (was):
app.emit ('custom-event')
return "OK"
App Life Cycle Events
There're serveral pre-defined events related app life cycle.
- before_mount
- mounted
- before_reload
- reloaded
- mounted_or_reloaded
- before_umount
- mounted
# wherever in your app package,
def load_my_models ():
...
def __setup__ (app, mntopt):
@app.on ('before_mount')
def on_before_mount (was):
load_my_models ()
Extending and Overriding
Big difference is which has app
and drive services.
Extending
pip3 install atila_vue
atila_vue is base project templates and no useful services. It must be implemented by your app.
import skitai
import atila_vue
import myapp
if __name__ == '__main__':
with sktai.oreference () as pref:
pref.extends (atila_vue)
skitai.mount ('/', myapp, pref)
myapp
extends atila_vue
and can use all atila_vue
's services, static files and templates whuch myapp
hasn't.
Overriding
pip3 install tfserver
tfserver has own useful services and you can add your custom services.
import skitai
import tfserver
import myapp
if __name__ == '__main__':
with sktai.oreference () as pref:
pref.overrides (myapp)
skitai.mount ('/', tfserver, pref)
myapp
overrides tfserver
's services, static files and templates if same resource paths.
Conclusion
myapp
, tfserver
and atila_vue
are very same project structure as I wrote above. If your app is useful to the others, please publish to PyPI.
Interval Base App Maintenancing
app.config.MAINTAIN_INTERVAL = 60 # seconds, default is 60
app.store.set ("num-nodes", 0) # thread safe store
@app.maintain (10, threading = False) # execute every 10 maintaining (500 sec.)
def maintain_num_nodes (was, now, count):
...
num_nodes = was.getlu ("cluster.num-nodes")
if app.g ["num-nodes"] != num_nodes:
app.g ["num-nodes"] = num_nodes
app.emit ("cluster:num_nodes", num_nodes)
You can add multiple maintain jobs but maintain function names is SHOULD be unique.
Working With Multiple Apps
Skitai can mount multiple apps including Atila apps. Atila apps sometimes need cumminicate each others.
Event Subscription
# serve.py
import skitai
import myapp
import tfserver
if __name__ == '__main__':
with sktai.oreference () as pref:
skitai.mount ('/tf', tfserver, pref, name = 'tfserver')
with sktai.oreference () as pref:
skitai.mount ('/', myapp, pref, subscribe = ['tfserver'])
myapp
can receive all event from tfserver
.
For example, tfserver
emit tfserver:model-reloaded
.
# wherever in myapp,
def __setup__ (app, mntopt):
@app.on ('tfserver:model-reloaded')
def on_tfserver_model_reloaded (was, model_name):
jobs_as_you_need ()
Data Exchanging
was.g
is multiple apps shared dictionary like object.
# wherever in tfserver app
@route ('/')
def index (was):
was.g ['LOADED_MODELS'] = ['a', 'b']
# wherever in myapp
@route ('/')
def index (was):
return '|'.join (was.g ['LOADED_MODELS'])
Accesing Other App Directly
@route ('/')
def index (was):
other_app = was.apps ['tfserver']
Very First Argument 'was'
What is was
as first argument of mostly hooks, event_handlers and services?
was
contains request
and response
related objects and services.
It is created per worker threads and never destoyed in normal sutuation. Sometimes clones will be created, cached and destroyed by need-to base.
Actually, it is just bunch of short-cuts to use request processing resources.
In this chapter, we just need to know briefly and partially.
Request
- was.request: current request object
- was.app: current app which process request
- was.g: multiple apps shared dictionary like object
Responses
- was.render ()
- was.API ()
- was.Error ()
- and lost of responses
Coroutines
- was.Task
- was.Tasks ()
And several helper methods.
Access Control and Authentication
CORS (Cross Origin Resource Sharing) and Preflight
with skitai.preference () as pref:
pref.access_control_allow_origin = ["*"]
# OR
pref.access_control_allow_origin = ["http://www.skitai.com:5001"]
pref.access_control_max_age = 3600
If you want function specific CORS,
@app.route (
"/post", methods = ["POST", "OPTIONS"],
access_control_allow_origin = ["*"], access_control_max_age = 3600
)
def post (was):
return 'hello'
Custom Authentication
Let's assume you manage permission by user levels: admin, staff, owner and user.
class User:
def __init__ (self, uid, lev, nick_name):
self.uid = uid
self.lev = lev
self.nick_name = nick_name
self.tuid = None
def __str__ (self):
return self.uid
@app.permission_handler
def permission_handler (was, perms):
if was.request.get_header ('authorization'):
was.request.user = None
claims = was.dejwt ()
if "err" in claims:
raise was.Error ("401 Unauthorized", claims ["err"])
was.request.user = User (claims ['uid'], claims ['lev'], claims ['nick_name'])
if 'uid' in was.request.PARAMS:
tuid = was.request.PARAMS ['uid']
if 'owner' in perms and tuid != 'me':
raise was.Error ("403 Permission Denied", "owners only operation")
was.request.user.tuid = (tuid == 'me' and was.request.user.uid or (tuid != 'notme' and tuid or None))
if not perms:
return
if was.request.user.lev == "staff":
return # always vaild
if "staff" in perms:
raise was.Error ("403 Permission Denied")
@app.route ("/animals/<id>")
@app.identification_required
def animals (was, id = None):
if request.user is None:
...
else:
...
@app.identification_required
just process permission_handler but all error will be ignored.
@app.route ("/animals/<id>")
@app.login_required
def animals (was, id = None):
id = id or was.request.user...
@app.login_required
is same as @app.permission_required ([])
.
@app.route ("/animals/<id>")
@app.permission_required (["staff"])
def animals (was, id = None):
id = id or was.request.user...
staff
permission is required.
@app.route ("/animals/<id>")
@app.permission_required (id = ["staff"])
def animals (was, id = None):
id = id or was.request.user...
This resources required any permission for /animals/
or /animals/me
. But /animals/100
is required staff
permission.
Also you can specify premissions per request methods.
@app.route ("/animals/<id>", methods = ["POST", "DELETE"])
@app.permission_required (['user'], id = ["staff"], DELETE = ["admin"])
def animals (was, id = None):
id = id or was.request.user...
WWW-Authentication
Authentication On Specific Methods
@app.route ("/hello/<name>", authenticate = "digest")
def hello (was, name = "Hans Roh"):
return "Hello, %s" % name
@app.route ("/hello/<name>")
@app.authorization_required ("digest")
def hello (was, name = "Hans Roh"):
return "Hello, %s" % name
Available authorization methods are basic
, digest
and bearer
.
Password Provider
You can provide password and user information getter by 2 ways. First, users object
# users object shoukd have get(username) method
app.users = {"hansroh": ("1234", False)}
Second, use decorator
@app.authorization_handler
def auth_handler (was, username):
...
return ("1234", False)
The return object can be:
- (str password, boolean encrypted, obj userinfo)
- (str password, boolean encrypted)
- str password
- None if authorization failed
If you use encrypted password, you should use digest authorization and password should encrypt by this way:
from hashlib import md5
encrypted_password = md5 (
("%s:%s:%s" % (username, realm, password)).encode ("utf8")
).hexdigest ()
If authorization is successful, app can access username and userinfo from was.request.user.
- was.request.user.name
- was.request.user.realm
- was.request.user.info
If your server run with SSL, you can use app.authorization = "basic", otherwise recommend using "digest" for your password safety.
Authentication On Entire App
app = Atila (__name__)
app.authenticate = "digest"
app.realm = "Partner App Area of mysite.com"
app.users = {"app": ("iamyourpartnerapp", 0, {'role': 'root'})}
@app.route ("/hello/<name>")
def hello (was, name = "Hans Roh"):
return "Hello, %s" % name
Bearer Authentication
Currently, JWT token claims set to was.request.user
.
@app.route ("/api/v1/predict")
@app.authorization_required ("bearer")
def predict (was):
was.request.user...
But your token is not JWT, you should override.
@app.bearer_handler
def bearer_handler (was, token):
claims = handle_token_yourself (token)
if 'err' in claims:
raise was.Error ("401 Unauthorized", claims ["err"])
was.request.user = ...
Processing Request
Request Life Cycle Hooks
- before_request
- finish_request
- failed_request
- teardown_request
def __setup__ (app, mntopt):
@app.on ('before_mount')
def on_before_mount (was):
app.g ['TOTAL_REQUEST'] = 0
app.g ['FAILED_REQUEST'] = 0
app.g ['SUCCES_REQUEST'] = 0
@app.before_request
def before_request (was):
app.g ['TOTAL_REQUEST'] += 1
request.g ['RESOURCE'] = resource.open ()
@app.finish_request
def finish_request (was, response): # response
app.g ['SUCCES_REQUEST'] += 1
# return new response or None
@app.failed_request
def failed_request (was, expt): # expt is sys.exc_info ()
app.g ['FAILED_REQUEST'] += 1
# called wahterver success or fail
# clear resources
@app.teardown_request
def teardown_request (was): # no args
request.g ['RESOURCE'] = resource.close ()
Composit your own request/response processors.
class JWTTokenChecker:
def __call__ (self, was):
if something_is_wrong ():
raise was.Error ("400 Bad Request")
def __setup__ (app, mntopt):
PREPROCESSORS = [
JWTTokenChecker (),
PermissionHandler ()
]
@app.before_request
def before_request (was):
for p in PREPROCESSORS:
response = p (was)
if response:
return response
@app.finish_request
def finish_request (was, response):
was.response.add_header ('X-Custom', 'some-value')
return response + "--"
Request Life Cycle Events
- before_request
- finish_request
- failed_request
- teardown_request
def __setup__ (app, mntopt):
@app.on ('before_request')
def on_before_request (was):
...
@app.on ('finish_request')
def on_finish_request (was, response):
...
@app.on ('failed_request')
def on_failed_request (was, expt):
...
@app.on ('teardown_request')
def on_teardown_request (was):
...
Hooks can return content or error, but events cannot because they are async. If event handling is failed, just logged traceback information.
App & Request Gloabal
- app.g
- was.request.g
@app.before_request
def before_request (was):
app.g ['TOTAL_REQUEST'] = 0
was.request.g ['START'] = time.time ()
@app.teardown_request
def teardown_request (was):
app.g ['TOTAL_REQUEST'] += 1
was.log ('processing time: {:.5f} seconds.'.format (
time.time () - was.request.g ['START']
))
Request Object
was.request
Basic Members
- was.request.method # upper case GET, POST, ...
- was.request.uri
- was.request.version # HTTP Version, 1.0, 1.1, 2.0, 3.0
- was.request.scheme # http or https
- was.request.args: all parameters including URL, Query String, form data even JSON Data
- was.request.env
- was.request.g
- was.request.cookie
- was.request.session
- was.request.mbox
Basic Methods
- was.request.split_uri () # (script, param, querystring, fragment)
- was.request.get_header ("content-type") # case insensitive
- was.request.get_headers () # retrun header all list
- was.request.get_body ()
- was.request.get_remote_addr ()
- was.request.get_user_agent ()
- was.request.get_content_type ()
- was.request.get_main_type ()
- was.request.get_sub_type ()
Route Options
- was.request.routed # routed function
- was.request.routable # {'methods': ["POST", "OPTIONS"], 'content_types': ["text/xml"], 'options': {...}, 'mntopt': {...}}
Environment Variables
was.request.env (alias: was.env)
was.env.get ('HTTP_USER_AGENT', 'Unknown Agent')
Routing
@app.route ("/hello")
def hello_world (was):
return was.render ("hello.j2")
Give sime restirction.
@app.route ("/hello", methods = ["GET"]) #if not match response 405
@app.route ("/hello", content_types = ["application/json"]) #if not match response 406
Request Parameters
@app.route ("/hello", methods = ["POST", "OPTIONS"])
def hello_world (was, name, birth_year, gender = None):
return was.render ("hello.j2", name = name, birth_year = birth_year, gender = gender)
@app.route ("/hello", methods = ["POST", "OPTIONS"])
def hello_world (was, name, **DATA):
return was.render ("hello.j2", name = name, DATA = DATA)
@app.route ("/profiles/<int:id>", methods = ["GET"])
def hello_world (was, id, detail = 'yes'):
return was.render ("profile.j2", id = id, detail = detail)
It is valid:
- /profiles/123?detail=no
- /profiles/me?detail=no
- /profiles/notme?detail=no
- /profiles/new?detail=no
@app.route ("/profiles/<int:id>", methods = ["GET"])
def hello_world (was, id = None, detail = 'yes'):
return was.render ("profile.j2", id = id, detail = detail)
It is valid additionaly:
- /profiles?detail=no
To access all parameters as dictionary, use was.request.args
. It will be useful to use in cycle hooks or templates.
@app.route ("/profiles/<int:id>", methods = ["GET"])
def hello_world (was, id = None, detail = 'yes'):
assert was.request.args ['id'] == id
assert was.request.args ['detail'] == detail
If there are URL and Form/JSON parameters conflicts, the latter has high priority.
In The Template Engine
pip3 install jinja2
return was.render ("profile.j2", id = id, detail = detail)
<li>{{ context.id }} is {{ request.args.id }}</li>
About template engine, I will explain detail later.
Parameter and Validation
@app.route ("/profiles/<int:id>", methods = ["GET"])
@app_inspect (ints = ['id'], booleans = ['detail'])
def hello_world (was, id = None, detail = False):
return was.render ("profile.j2", id = id, detail = detail)
# MUST use with keyword argument
@app.inspect (
ints = None, floats = None,
strings = None, booleans = None,
emails = None, uuids = None, nones = None, lists = None,
dicts = None, oneof = None, manyof = None,
notags = None, safes = None,
**kargs
)
Examples:
- strings = ['name', 'gender']
- notags = ['resume'] # not allow html tags
- safes = ['resume'] # not allow <script> or javascript things
- nones = ['gender', 'birth_year'] if value is blank or zero like, make to None
- booleans = ['detail'] # valid if True, False, true, false, yes, no, y, n, true, false, t and f
- oneof = ['email', 'mobile']
Keyword Argument Examples:
- birth_year = int # type
- mobile = re.compile ('[0-9]{3}-[0-9]{3}-[0-9]{4}') # an object has .search () method
- birth_year__between = (1900, dt.now ().year)
- gender__in = ['male', 'female', 'not-determine']
- name__len__lte = 64
- name__len__gt = 3
- name__len__between = (3, 64)
Avaliable double low-dash operator:
- len__
- between
- in
- notin
- eq, exact
- neq
- lte
- lt
- gte
- gt
- contains
- notcontain
- startswith
- notstartwith
- endswith
- notendwith
- regex
File Upload
@app.route ("/upload", methods = ['POST'])
def upload (was, file):
file.save ("/var/tmp/upload", dup = "o")
- file.path: temporary saved file full path
- file.name: original file name posted
- file.size
- file.mimetype
- file.ext: file extension
- file.read ()
- file.move (to)
- file.as_flashfile ()
- file.save (into, name = None, mkdir = False, dup = "u")
- file.remove ()
Streaming Request Data
@app.route ("/bistreaming", methods = ['POST'], input_stream = True)
def streaming (was):
buf = []
while 1:
data = yield was.Input (4096)
if not data:
break
buf.append (data)
return b''.join (buf)
Caution: Be careful to use request streaming. Request streaming need only a few specific conditions.
- Small chunked request data which is intermittent and need long terms connection like receiving GPS coordinate data from client device
- Bidirectional streaming like detectecting silence for 10~30ms segments
of audio data stream. See next
Bidirectional Streaming
topic.
If you just want upload data, just use regular POST upload method. DO NOT use request streaming which may cause event loop blocking and also is very inefficient.
Cookie
was.request.cookie (alias: was.cookie): dicionary like object
if "user_id" not in was.cookie:
was.request.cookie.set ("user_id", "hansroh")
# or
was.request.cookie ["user_id"] = "hansroh"
was.cookie.set (
key, val,
expires = None,
path = None, domain = None,
secure = False, http_only = False
)
'expires' args is seconds to expire.
- if None, this cookie valid until browser closed
- if 0 or 'now', expired immediately
- if 'never', expire date will be set to a hundred years from now
If 'secure' and 'http_only' options are set to True, 'Secure' and 'HttpOnly' parameters will be added to Set-Cookie header.
If 'path' is None, every app's cookie path will be automatically set to their mount point.
Session
was.request.session (alias: was.session): dicionary like object
To enable session for app, random string formatted securekey should be set for encrypt/decrypt session values.
with skitai.preference () as pref:
pref.securekey = "ds8fdsflksdjf9879dsf;?<>Asda"
pref.session_timeout = 1200 # sec
@app.route ("/session")
def hello_world (was, **form):
if "login" not in was.session:
was.session.set ("user_id", form.get ("hansroh"))
# or
was.session ["user_id"] = form.get ("hansroh")
If you set, alter or remove session value, session expiry is automatically extended by app.session_timeout. But just getting value will not be extended. If you extend explicit without altering value, you can use touch() or set_expiry(). session.touch() will extend by app.session_timeout. session.set_expiry (timeout) will extend by timeout value.
Once you set expiry, session auto extenstion will be disabled until expiry time become shoter than new expiry time is calculated by app.session_timeout.
Namespaced Session
@app.route("/")
def index (was):
was.session.mount ("ADMIN", path = '/admin')
was.session.set ("login", True)
was.session.mount () # resore to default session
Additional methods:
- was.session.set_expiry (timeout)
- was.session.touch ()
- was.session.expire ()
- was.session.use_time ()
- was.session.impending (): if session timeout remains 20%
- was.session.mount ( name = None, session_timeout = None, securekey = None, path = None, domain = None, secure = False, http_only = False, extend = True )
Message Box
was.request.mbox (alias: was.mbox)
@app.route ("/msg")
def msg (was):
was.mbox.send ("This is Flash Message", "flash")
was.mbox.send ("This is Alert Message Kept by 60 seconds on every request", "alram", valid = 60)
return was.redirect (was.urlfor (showmsg, "Hans Roh"), status = "302 Object Moved")
@app.route ("/showmsg")
def showmsg (was, name):
return was.render ("msg.htm", name=name)
<ul>
{% for mid, category, created, valid, msg, extra in was.mbox.get ("notice", "news") %}
<li class="{{category}}"> {{ msg }}</li>
{% endfor %}
</ul>
- was.mbox.send (msg, category, valid_seconds, **extra_dict)
- was.mbox.get () return [(message_id, category, created_time, valid_seconds, msg, extra_dict)]
- was.mbox.get (category) filtered by category
- was.mbox.get (key, val) filtered by extra_dict key and value pair
- was.mbox.search (key, val): find in extra_dict. if val is not given or given None, compare with category name. return [message_id, ...]
- was.mbox.remove (message_id)
Piping
was.pipe () can call function by resource names. This make call nested function within __mount__ (app) in another module.
# services/__init__.py
def __mount__ (app, mntopt):
@app.route ("/1")
@app.inspect (offset = int)
def index (was, offset = 1):
return was.API (result = offset)
@app.route ("/2")
def index2 (was):
return was.pipe (index)
@app.route ("/3")
def index3 (was):
return was.pipe (index, offset = 4)
@app.route ("/4")
def index4 (was):
return was.pipe ('index', offset = 't')
@app.route ("/5")
def index4 (was):
return was.pipe ('sub.pages.toc', chapter = 5)
# services/sub/pages.py
def __mount__ (app, mntopt):
@app.route ("/pages/toc/<int:chapter>")
def toc (was, chapter):
return "Page tOC Chapter {}".format (chapter)
Making URL
was.static ('img/logo.png')
was.urlfor ("index")
was.urlfor ("sub.pages.toc", 1)
was.urlfor ("sub.pages.toc", chapter = 1)
was.urlfor ("/index.htm") # hard coded absolute URL, MUST start with /
Helpers
Testing
def is_superuser (was):
if was.user.name not in ('admin', 'root'):
reutrn was.response ("403 Permission Denied")
@app.testpass_required (is_superuser)
def modify_profile (was):
...
Conditional Prework
def reload_config (was, path):
...
@app.if_file_modified ('/opt/myapp/config', reload_config, interval = 1)
def index (was):
return was.render ('index.html')
Run Pre/Postworks Chain
You can make automation for preworks and postworks.
def pre1 (was):
...
def pre2 (was):
...
def post1 (was):
...
@app.route ('/')
@app.run_before (pre1, pre2)
@app.run_after (post1)
def index (was):
return was.render ('index.html')
- @app.run_before can return None or responsable contents for aborting all next run_before and main request.
- @app.run_after return will be ignored
Using Task
Task is a part of request processing and also related with generating response. So you should look into this first.
pip3 install psycopg2-binary
pip3 install pymongo
pip3 install redis
Resource Aliasing
with skitai.preference () as pref:
skitai.alias ("@mypg", skitai.DB_PGSQL, ["user:pass@localhost/mydb"])
skitai.alias ("@mylite", skitai.DB_SQLITE3, ["./sqlite3.db"])
skitai.alias ("@mymongo", skitai.DB_MONGODB, ["localhost/mycollection"])
skitai.alias ("@myredis", skitai.DB_REDIS, ["localhost/0"])
skitai.alias ('@myapi', skitai.PROTO_HTTPS, ["s1.myserver.com"])
Task
Task is sort of coroutine. Task has methods:
- fetch (): return iterable
- one (): should be single element
- commit (): return None, just wait to complete
HTTP Based Protocols
with was.stub ('@myapi/api') as stub:
task1 = stub.get ('/profiles/{}?limit={}', 100, 10)
payload = dict (name = 'Hans Roh')
task2 = stub.put ('/profiles/{}', 100, payload)
with was.xmlrpc ('@myapi/rpc2') as stub:
task3 = stub.add (156, 952)
task1.fetch ()
task2.fetch ()
task3.fetch ()
pip3 install tfserver
from tfserver import cli
with was.grpc ("@myapi/tensorflow.serving.PredictionService") as stub:
x = get_numpy_data ()
request = cli.build_request ('mymodel', 'predict', x = x)
task = stub.Predict (request, 10.0)
PostgreSQL / SQLite3
# PostgreSQL and SQLite3
with was.db ('@mypg') as db:
task = (db.select ('my_table').
.filter (name = 'Hans')
.with_ ('a', db.insert ('my_table').set (name = 'Hans'))
).execute ()
MongDB
with was.db('@mymongo') as db:
task = db.find ({'city': 'New York'})
Redis
with was.db ("@myredis") as db:
task1 = db.set('foo', 'bar')
task2 = db.get('foo')
Thread / Process / Subrocess
task = was.Thread (my_func, arg1, arg2)
task = was.Process (my_func, arg1, arg2)
task = was.Subprocess ('top -n1', timeout = 3)
Mask
It is fake of Task(s).
You can make it by wrapping was.Mask (data) if you want to use consistant methods as Task.
task = was.Mask (1)
result = task.fetch () # 1
tasks = was.Mask ([1, 2])
result1, result2 = tasks.fetch () # 1, 2
Tasks
with was.db('@mymongo') as db:
task1 = db.find ({'city': 'New York'})
with was.stub ('@myapi/api') as stub:
task2 = stub.get ('/profiles/{}?limit={}', 100, 10)
task3 = was.Process (my_func, arg1, arg2)
tasks = was.Tasks ([task1, task2, task3])
a, b, c = tasks.fetch () # got 3 results
tasks = was.Tasks (
db.insert ('city').set (name = 'Hans').execute (),
files = was.Subprocess ('ls /var/log', filter = lammda x: x.replace (' ', '_')),
result = was.db ('@mydb', filter = hide_password).execute ('select * from user')
)
results = tasks.fetch ()
results ['files']
results ['result']
Generator Based Coroutine
New in version 0.8.6
If you create single task and call fetch () at just next line, It is alomost same as synchronous blocking job.
But by yielding Task
, it becomes fully async task.
Add coroutine = True
parameter to @app.route
to distinguish native generator.
@app.route ("/coroutine/2", coroutine = True)
def coroutine (was):
with was.stub ("http://example.com") as stub:
task = yield stub.get ("/")
return task.fetch ()
Note: DO NOT make blocking jobs after first yield. If MUST do, yield was.Thread() or was.Process().
@app.route ("/coroutine", coroutine = True)
def coroutine (was):
def wait_hello (timeout = 1.0):
time.sleep (timeout)
return 'mask'
# do sync tasks in thread
some_sync_task ()
# swith to async event loop in main thread
tasks = yield was.Tasks (
a = was.Mask ("Example Domain"),
b = was.Thread (wait_hello, args = (1.0,))
)
task3 = yield was.Thread (wait_hello, args = (1.0,))
task4 = yield was.Subprocess ('ls')
# finally
return was.API (d = task4.fetch (), c = task3.fetch (), **tasks.fetch ())
Why does not use await? To use await
also use async def
. But Atila doesn't want to disturb legacy sync tasks. So every requests will be passed over to thread pool initially and you can decide to keep sync manner or switch to async.
Map / Load-Balancing
Set multiple members with weight.
with skitai.preference () as pref:
members = ["user:pass@localhost/mydb 20", "user:pass@remote.com/mydb 10"]
skitai.alias ("@mypg", skitai.DB_PGSQL, members)
There're 2 more methods for request status check.
- dispatch ()
- wait ()
# map-merging
with was.stub.map ('@myapi') as stub:
task = stub.get ('@myapi/profiles/{}?limit={}', 100, 10)
all_results = []
for resp in task.dispatch ():
if resp.status != skitai.STA_NORMAL:
continue
all_results.extend (resp.data)
response.status
:
- STA_UNSENT
- STA_REQFAIL
- STA_TIMEOUT
- STA_NETERR
- STA_NORMAL
# load-balancing is the very same as single call
with was.stub.lb ('@myapi') as stub:
task = stub.get ('@myapi/profiles/{}?limit={}', 100, 10)
task.fetch ()
Cache Control
# myapp/__init__.py
import skitai
def __config__ (pref):
skitai.register_g ('tables.users', 'table.photos')
@app.route ("/update")
def update (was):
# update tables.users modification time
was.db ('@mydb', rm_cache = 'tables.users').execute (...)
@app.route ("/query")
def query (was):
# use cache if tables.users modification time is older
was.db ('@mydb', use_cache = 'tables.users').execute (...)
@app.route ("/query2")
def query2 (was):
# check all times are older
was.db ('@mydb', use_cache = ['tables.users', 'table.photos']).execute (...)
This uses skitai.g object which is shared by all Skitai multiprocessing workers. So a worker update table, also all others can know that.
Also if rm_cache is used, it will emit events. You can handle event if you need. But this event system. But please note that this event notified within its worker process.
@app.route ("/update")
def update (was):
was.db ('@mydb').execute (...)
was.setlu ('tables.users', something...)
This makes event.
@app.on ("setlu:tables.users")
def table_updated (was, *args, **kargs):
# your code
But this event notification will be limited with current worker process. If every workers need to know that,
def clear_memory (was, name):
...
@app.route ('/query')
@app.if_cache_updated (['tables.users', 'table.photos'], clear_memory, interval = 1)
def query (was, q):
return was.render ('index.html')
Every workers will check cache updating when called query
.
Adding Custom Database Interface For Atila Coroutine
You can override existing classes - RDBMS, NoSQL (Redis, MongoDB) styles.
from aquests.dbi import asynredis
class MyDBI (asynredis.AsynConnect):
def __init__ (self):
...
DB_MYDBI = '*mydbi'
skitai.add_database_interface (DB_MYDBI, MyDBI)
skitai.alias ('@mydbi', DB_MYDBI, 'localhost:9000')
Making Response
was.response
was.response.set_status ("201 Created")
# add header even name is duplicated
was.response.add_header ('X-Server', "skitai")
# if exists old, remote it and update new
was.response.set_header ('X-Server', "skitai")
was.response.get_header ('Accept', '')
was.response.del_header ('Accept')
# current response code and msg
was.response.status_code
was.response.reason
Cache Control
# etag based cache control
# make sure CACHE_KEY is unique identifier for this resource
was.response.set_etag ('CACHE_KEY', max_age = 3600)
# content modification time based cache control
was.response.set_mtime (time.time (), max_age = 3600)
# set cache header as HTTP versions
was.response.set_max_age (3600)
HTTP Error
raise was.Error ("400 Bad Request")
raise was.Error ("400 Bad Request", "missing user ID")
raise was.Error ("400 Bad Request", "missing user ID", 40011)
Primitive
String
return "hello"
was.response.set_status ('200 OK')
was.response.set_header ('X-Server', 'Skitai')
return "hello"
API Response
JSON or XML response as request's Accept
header.
return was.API ({'name': Hans Roh})
return was.API (name = Hans Roh)
Rendered Template
return was.render ("hello.j2", name = 'Hans Roh')
<!-- hello.j2 -->
<h1>Hello {{ context.name }}</h1>
render_or_API
Response by Accept
header,
return was.render_or_API ("index.html", result = result)
File
return was.File ("/path/to/data.xlsx")
return was.File ("/path/to/data.xlsx", 'application/octet-stream', 'data.xlsx')
Static
# by static file URL
# resolved into was.static (""img/logo.png")
return was.Static ("img/logo.png")
@app.route ("/robots.txt")
def robots (was):
if app.debug:
was.response ['Content-Type'] = 'text/plain'
return "User-Agent: *\nDisallow: /\n"
return was.Static ('/robots.real.txt')
Generator
def build_csv (was):
def generate():
for row in iter_all_rows():
yield ','.join(row) + '\n'
return was.response ("200 OK", generate (), headers = [("Content-Type", "text/csv")])
Redirecting
return was.redirect (
was.urlfor ("showmsg", "Hans Roh"),
status = "302 Object Moved"
)
return was.redirect (
was.static ("index.html"),
status = "302 Object Moved"
)
RPC Response
By request's content type, Atila suppert XMLRPC, JSONRPC, GRPC.
# for jsonrpclib
pip3 install jsonrpclib-pelix
# XML/JSONRPC
@app.route ('/add')
def add (was, a, b):
return a + b # int is serailizable
# GRPC
@app.route ("/GetFeature")
def GetFeature (was, point):
feature = get_feature(db, point)
if feature is None:
return route_guide_pb2.Feature(name="", location=point)
else:
return feature
You nothing to do for this, just return type is serailizable by each RPC protocol.
ProxyPass
@app.route ("/<path:path>")
def proxy (was, path = None):
return was.ProxyPass ("@myupstream", path)
ThreadPass
@app.route ("/thread_future", methods = ['GET'])
def thread_future_respond (was):
def respond (was, a):
a = some_synchronous_task ()
# you can make corequest
was.db ('@mydb').execute (...).commit ()
return was.API (
result = a
)
return was.ThreadPass (respond, args = (4.0,))
Threaded Data Streaming
class Producer:
def get (self, n):
return [random.randrange (1000) for _ in range (n)]
def producing (producer):
def produce (queue):
while 1:
items = producer.get (100)
if not items:
queue.put (None) # end of stream
break
queue.put (str (items))
return produce
@app.route ("/threaproducer")
def threaproducer (was):
return was.Queue (producing (Producer ()))
Coroutine
Task / Tasks
If you need post processing data,
def repond (was, task):
return was.API (result = task.fetch (), a = task.meta.get ('a'))
@app.route ('...')
def foo ():
task = was.db ("@sqlite3").execute ("select * from test")
return task.then (respond)
@app.route ('...')
def foo ():
return was.Process (math.sqrt, args = (4.0,), meta = {'a': 1}).then (respond)
@app.route ('...')
def foo ():
def sqrt (a):
return math.sqrt (a)
return was.Thread (sqrt, args = (4.0,)).then (respond)
Map
No need post processing,
@app.route ('/datalist')
def datalist (was):
task = was.db ("@sqlite3").execute ("select * from test")
return was.API (result = task.fetch ())
# same result but transit to async
# return thread to pool early
@app.route ('/datalist')
def datalist (was):
task = was.db ("@sqlite3").execute ("select * from test")
return was.Map (result = task)
@app.route ("/bench/sp", methods = ['GET'])
def bench_sp (was):
with was.db ('@mydb') as db:
root = (db.select ("foo")
.order_by ("-created_at")
.limit (10)
.filter (Q (from_wallet_id = 8) | Q (detail = 'ReturnTx')))
return was.Map (
was.Thread (time.sleep, args = (0.3,)), # no need map
files = was.Subprocess ('ls /var/log'),
result = root.clone ().execute (),
record_count__one = root.clone ().aggregate ('count (id) as cnt').execute ()
)
# JSON response, 1st args had been executed but ignored in results because no map name
# >> { result: [...], record_count: {cnt: 123}, ls: 'syslog ...' }
But simple processing is possible with filter.
def hide_password (rows):
for row in rows:
row.password = '****'
return rows
return was.Map (
files = was.Subprocess ('ls /var/log', filter = lammda x: x.replace (' ', '_')),
result = was.db ('@mydb', filter = hide_password).execute ('select * from user')
)
By Accept
header,
@app.route ('/')
def index (was, error):
return was.render_or_Map ("index.j2", result = db.execute ('...'))
Mapped
tasks = was.Tasks (
files = was.Subprocess ('ls /var/log', filter = lammda x: x.replace (' ', '_')),
result = was.db ('@mydb', filter = hide_password).execute ('select * from user')
)
return was.Mapped (tasks)
By Accept
header,
@app.route ('/')
def index (was, error):
tasks = was.Taks (result = db.execute ('...'))
return was.render_or_Mapped ("index.j2", tasks)
Couroutine Streaming
@app.route ("/download_csv", coroutine = True)
def download_csv (was):
yield "ID, NAME\n"
current_id = 0
while 1:
task = yield (was.db ('@mydb').select ('tble')
.get ('id, name')
.filter (id__gt = current_id).
limit (fetch_count)).execute ()
rows = task.fetch ()
if not rows:
break
current_id += fetch_count
yield '\n'.join (['{}, "{}"'.format (row.id, row.name) for row in rows])
Bidirectional Streaming
@app.route ("/bistreaming", methods = ['POST'], coroutine = True, input_stream = True)
def coroutine_streaming (was):
while 1:
data = yield was.Input (16184)
if not data:
break
yield b':' + data
Template Engine
pip3 install jinja2
return was.render ("index.html", choice = 2, product = "Apples")
- myapp/__init__.py
- myapp/templates/index.html
Within template, you can access was
and aliases for your convinient.
- context: namespace for given keyword arguments (or dictionary keys)
- response: alias for
was.response
- request: alias for
was.request
- app
- was
Note that these names cannot ne used as context variable name.
{{ request.cookie.username }} choices item {{ request.args.get ("choice", "N/A") }}.
<a href="{{ was.urlfor ('checkout', context.choice) }}">Proceed</a>
Customizing Jinja2
If you want modify Jinja2 envrionment, can through app.jinja_env
object.
To register global template function,
@app.template_global ('form_token')
def generate_form_token ():
...
{{ test_global () }}
To register custom filter,
@app.template_filter ("reverse")
def reverse_filter (s):
return s [::-1]
{{ "Hello" | reverse }}
To add Jinja2 extensions,
app.add_jinja_ext ('jinja2.ext.i18n')
Note that Atila registers jinja2.ext.do
and jinja2.ext.loopcontrols
automatically.
Using Task In Template
@app.route ("/")
def intro (was):
return was.render (
'template.html',
task1 = was.get ("https://example.com/blur/blar"),
task2 = was.get ("https://example.com/blur/blar/foo"),
)
{% set rows = context.task1.fetch () %}
{% for row in rows %}
...
{% endfor %}
{% set profile = context.task2.one () %}
Custom Error Templates
This works only if client's Accept
header contains text/html
.
<!-- errors/default.j2 -->
<h1>{{ error.code }} {{ error.message }}</h1>
<p>{{ error.detail }}</p>
<hr>
<div>URL: {{ error.url }}</div>
<div>Time: {{ error.time }}</div>
@app.default_error_handler
def default_error_handler (was, error):
return was.render ("errors/default.j2", error = error)
@app.error_handler (404)
def not_found (was, error):
return was.render ('404.j2', error = error)
Helpers of 'was'
Logging and Traceback
@app.route ("/")
def sum ():
was.log ("called index", "info")
try:
...
except:
was.log ("exception occured", "error")
was.traceback ()
was.log ("done index", "info")
- was.log (msg, category = "info")
- was.traceback (id = "") # id is used as fast searching log line for debug, if not given, id will be Global transaction ID/Local transaction ID
Cross Site Request Forgery Token (CSRF Token)
New in skitai version 0.26.16
At template, insert CSRF Token,
<form>
{{ was.csrf_token_input }}
...
</form>
then verify token like this,
@app.before_request
def before_request (was):
if was.request.args.get ("username"):
if not was.verify_csrf ():
return was.response ("400 Bad Request")
Or use decorator,
@app.csrf_verification_required
def before_request (was):
...
JWT Token
@app.route ('/make_jwt')
def make_jwt (was)
t = was.encode_jwt ({'iss': 'example.com', 'exp': time.time () + 3600})
@app.route ('/verify_token')
def verify_jwt (was)
calims = was.request.JWT
if "err" in claims:
return claims ["err"]
One-Time Password
New in skitai version 0.35.0
def check_otp (was):
if not was.verify_otp (was.request.get_header ('x-otp')):
raise was.Error ('403 Unauthorized')
@app.route ('/admin-task')
@app.testpass_required (check_otp)
def task (was)
...
At your client,
from atila.was import generate_otp
generate_otp (secret_key)
One-Time Token
New in skitai version 0.26.17
For creatiing onetime link url, you can convert your data to signatured token string.
Note: Like JWT token, this token contains data and decode easily, then you should not contain important information like password or PIN. This token just make sure contained data is not altered by comparing signature which is generated with your app scret key.
@app.route ('/password-reset')
def password_reset (was)
if was.request.args ('username'):
username = "hans"
token = was.encode_ott (username, 3600, "pwrset") # valid within 1 hour
pw_reset_url = was.urlfor ('reset_password', token)
# send email
return was.render ('done.html')
if was.request.args ('token'):
username = was.decode_ott (was.request.args ['token'])
if not username:
return was.response ('400 Bad Request')
# processing password reset
...
If you want to expire token explicit, add session token key
# valid within 1 hour and create session token named '_reset_token'
token = was.encode_ott ("hans", 3600, 'rset')
>> kO6EYlNE2QLNnospJ+jjOMJjzbw?fXEAKFgGAAAAb2JqZWN0...
username = was.decode_ott (token)
>> "hans"
# if processing is done and for revoke token,
was.revoke_ott (token)
Websocket
@app.websocket (spec, timeout = 60, onopen = None, onclose = None)
WS_COROUTINE
New in version 0.8.8
@app.route ("/echo_coroutine")
@app.websocket (skitai.WS_COROUTINE, 60)
def echo_coroutine (was):
while 1:
msg = yield was.Input ()
if not msg:
break
with was.stub ('http://example.com') as stub:
task = yield stub.get ("/")
yield task.fetch ()
WS_CHANNEL
- simple request and response way like AJAX
- with WS_THREAD, WS_SESSION, WS_NOTHREAD, WS_NOTHREAD options
WS_THREAD
- default, function base websocket message handling
- it treats every single websocket message as single request to resources like url requests.
- on receiving message from client, it will call function for handling with queue and thread pool, it is basically same as request resource
WS_THREADSAFE
New in version Skitai 0.26
- Mostly same as WS_THREAD
- Message sending is thread safe
- Most case you needn't this option, but you create yourself one or more threads using websocket.send () method you need this for your convinience
def onopen (was):
was.request.session.set ("WS_ID", was.websocket.client_id)
print ('websocket opened with', was.request.ARGS ["options"])
def onclose (was):
was.request.session.remove ("WS_ID")
@app.route ("/websocket")
@app.websocket (skitai.WS_CHANNEL | skitai.WS_THREAD, 1200, onopen, onclose)
def websocket (was, message, **options):
app.websocket_send (
was.request.session.get ("WS_ID"),
"Item In Stock!"
)
return 'you said: ' + message + str (options)
To push message to specific client.
@app.route ("/item_in_stock")
def item_in_stock (was):
app.websocket_send (
was.request.session.get ("WS_ID"),
"Item In Stock!"
)
Note:: I'm not sure it is works in all web browser.
WS_NOTHREAD
- non-threaded function call base websocket message handling
- it is faster than WS_THREAD
WS_NOTHREAD does not use queue or thread pool. In this case, response is more faster but if response includes IO blocking operation, entire Skitai event loop will be blocked.
@app.route ("/websocket")
@app.websocket (skitai.WS_CHANNEL | skitai.WS_NOTHREAD, 60, onopen)
def websocket (was, message):
return 'you said: ' + message
Test Client
Excellent tests can only make good app. Skitai provide test_client which can test integrating and E2E tests.
See Test Client
chapter in Skitai App Engine.
Your Life Is So Precious
Deployment
Give a name to yoyr service.
skitai.run (port = '5000', name = "myapp")
python3 serve.py install
sudo systemctl myapp start
Additionally, Nginx is good idea for more efficient service.
# collect static files and generate base Nginx config files
python3 serve.py --nginx-conf=myapp.com
python3 serve.py --disable-static update
sudo systemctl myapp restart
See Skitai App Engine.
Integrating Django Models and Administrative Views
# atila_vue/entities/firebase_vue/models.py
from django.db import models
from django.contrib.postgres.fields import JSONField
from atila.patches.djangopatch import Model
class User (Model):
uid = models.CharField (max_length = 32)
email = models.EmailField (max_length = 64, null = True, blank = True)
nick_name = models.CharField (max_length = 16, null = True, blank = True)
lev = models.CharField (max_length = 16, default = 'user', choices = [('guest', 'guest'), ('user', 'user'), ('staff', 'staff')])
status = models.CharField (max_length = 16, null = True, blank = True)
created = models.DateTimeField (auto_now_add = True)
last_updated = models.DateTimeField (auto_now = True)
class Meta:
proxy = False
managed = True
db_table = 'firebase_user'
verbose_name = 'User'
verbose_name_plural = 'Users'
def __str__ (self):
return '{}'.format (self.nick_name)
# atila_vue/services/firebase/services.py
from firebase_vue.models import User, UserLog
class UserService:
@classmethod
def _get_id (cls, uid):
return User.get (uid = uid).get ('id')
# basic ops ------------------------------
@classmethod
def get (cls, uid = None, nick_name = None):
assert uid or nick_name, 'uid or nick_name required'
return User.get (uid = uid, nick_name = nick_name).execute ()
@classmethod
def add (cls, uid, payload):
payload ['uid'] = uid
return User.add (payload).returning ("*").execute ()
@classmethod
def set (cls, uid, payload):
return User.set (payload, uid = uid).returning ("*").execute ()
@classmethod
def delete (cls, uid):
return (User.remove (uid = uid)
.with_ (UserLog.remove (user_id = cls._get_id (uid)))
).execute ()
All queries works as Atila native coroutine. More importantly, you can use Django migration, field validation and even Django adminsrative views by mounting Django app into '/admin'.
This is just core code lines. For more information, see my atila-vue.
Integrating pytest and API Documentation
python3 serve.py --devel
# tests/conftest.py
from atila.pytest_hooks import * # HERE
import skitai
@pytest.fixture
def launch ():
return partial (skitai.test_client, port = 30371, silent = False)
@pytest.fixture (scope = "module")
def client ():
_client = skitai.test_client (port = 5000)
yield _client
_client.stop ()
# tests/test_app.py
def test_app (client):
resp = client.get ('/apis/tickers')
assert resp.status_code == 200
def test_app (launch):
# launch test server on 30371 with --devel option
with launch ('../serve.py', devel = True) as client:
resp = client.get ('/apis/tickers')
assert resp.status_code == 200
cd tests && pytest --docs # HERE
As a result, your test with parameters, data and errors will be logged and saved.
If your test finished successfully, docs/api/index.md
will be created.
## Table of Content
1. [/apis/tickers/<int:prodict_code>](#tproduct_tickers) [**`GET`**](#get-apistickers)
## product.tickers
**URL Parameters**: `product_code`
**Authorization Required**: NO
#### `GET` `/apis/tickers`
**Success Response Example**
> **_URL_**: `/apis/tickers/123`
> **_Response_**: `200 OK`
> **_Response Data_** (application/json)
'''json
{
"instock": 2
}
'''
If you seperate your tests sub directories like 'account', 'products',...
pytest account --docs
docs/api/account.md
will be created.
I am happy, frontenders happy, everybody happy.
Note: It will works only if pytest is run on same machine - request IP must be 127.0.0.1.
Change Log
-
0.11 (May, 2021)
- deprecating unused or duplicated decorators
- update README
- event system refactoring
- @app.login_required and @app.permission_required are integrated
-
0.9 (Apr, 2021)
- deprecate app.test_client (), use skitai.test_client ()
- add app.run ()
- change app initializing hook name from __setup__ () to __config__ ()
- add app.unroute ()
- add app.extends ()
- add app.overrides ()
- multiple life-cycle hooks
-
0.8 (Feb, 2020)
- add request stream feature, @app.route (..., input_stream = True)
- add was.Queue
- add atila.service
- fix was.pipe ()
- add generator based coroutine and @app.coroutine
- was.g and app.r has benn deprecated, use app.g and app.r
- add app.r for current request context, was.g is changed to global app context with thread-safe
- add was.pipe ()
- @app.inspect can inspect JSON data
- @app.maintain can have threading option
- remove 400 Not My Fault with assert
- rename from @app.require to @app.inspect but still valid
- @app.maintain can have interval parameter
- add was.Media () and was.Mounted ()
- add was.render_or_Map ()
- add was.Map () and was.ThreadPass ()
- add was.static and was.media
- both __setup__ and __mount__ can have mntopt argument optionally
- add was.Static ()
- add 400 Not My Fault with assert
- add notags and safes arguments to @app.require
- now, csrf token uses cookie not session and kept with browser
- add remove_csrf ()
- fix corequest cache sync
- update, config.MINIFY_HTML = None (default) | 'strip' | 'minify'
- add @app.csrf_verification_required
- add '@app.clarify_permission' and '@app.clarify_login' decorators
- add __setup__ hook for service packages.__init__.py
-
0.7 (Dec, 2019)
- fix type routing
- change URL build alias from was.urlspec ()
- change URL build alias from was.ab () to was.urlfor ()
- add alias was.urlpatch () for was.partial () for clarity
- add session.impending () and session.use_time ()
- change default options for Jinja2
- change session key name
- fix session expireation
- add extend param to session.mount ()
- add was.render_or_API ()
- add was.request.acceptables and was.request.acceptable (media)
- fix @app.fix testpass_required when reloading
- change session.mount spec
- fix multiple mount bug related
enable_namespace
- fix websocket bug related
enable_namespace
app.auto_mount
was deprecated- default value of
app.enable_namespace
has been from False to True. ACTION REQUIRED, lower version incompatible
-
0.6 (Oct, 2019)
- fix query string exception handling
- readd Chameleon template engine chapter to README
- test on PyPy
-
0.5 (Sep, 2019)
- add app example
- update requirements
-
0.4 (Aug, 2019)
- now, modules within __mount__ are reloadable
- deprecated @app.test_params, use @app.require
- deprecated was.Future and was.Futures, it doesn't need.
-
0.3 (Mar 13, 2019)
- remove proxing django route
- remove login service with django
- remove django model signal redirecting
- add @app.require
- change mount handler: def mount (app) => def mount (app) but lower version compatible
- make available @app.route ("")
- add was.ProxyPass (alias, path, timeout = 3)
- add special pre-defined URL parameter value: me, notme, new
- add parameter validation, now response code 400, if validatiion if failed
- fix implicit routing
- add conditional permission control
-
0.2 (Feb 18, 2019)
- fix implicit routing for root
- remove jinja2 from requirements
- add app.websocket_send ()
- fix Futures respinse bugs
- add was.API (), was.Fault (), was.File and was.Futures ()
-
0.1 (Jan 17, 2019)
- was.promise () has been deprecated, use was.Futures ()
- add interval based maintain jobs executor
- change name from app.storage to app.store
- add default_bearer_handler
- fix routing bugs related fancy URL
- add was.request.URL, DEFAULT, FORM (former was.request.form ()), JSON (former was.request.json ()), DATA (former was.request.data), ARGS (former was.request.args)
- add @app.test_param (required = None, ints = None, floats = None)
- project has been seperated from skitai and rename from saddle to atila, because saddle project is already exist on PYPI
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.