A rapid application development library for interfacing with data storage
Project description
Velocity
A rapid application development library for Python that eliminates boilerplate across the entire backend stack. Velocity provides a Pythonic database ORM where rows behave like dictionaries, a serverless handler framework for AWS Lambda, processor-agnostic payment integration, and a collection of everyday utilities — all designed so you write business logic instead of infrastructure code.
Table of Contents
- Why Velocity?
- Installation
- Quick Start
- Velocity.DB — Database ORM
- Velocity.AWS — Serverless Framework
- Velocity.Payment — Payment Processing
- Velocity.App — Business Domain Objects
- Velocity.Misc — Utilities
- Project Structure
- Development
- License
Why Velocity?
Most Python backend projects cobble together an ORM, a web/serverless framework, a payment library, spreadsheet tooling, and assorted glue code. Velocity replaces all of that with one cohesive library built around two principles:
- Convention over configuration — sensible defaults everywhere. Tables auto-create columns, transactions auto-commit, Lambda handlers auto-route actions.
- Python-native interfaces — database rows are real
MutableMappingobjects you candict(),**unpack, iterate, and compare. No new query language to learn.
What you get:
| Layer | What it does | Without Velocity |
|---|---|---|
velocity.db |
Multi-database ORM with dict-like rows, auto-schema, query builder | SQLAlchemy + Alembic + custom code |
velocity.aws |
Lambda handler framework with auth, routing, SQS | Custom event parsing per function |
velocity.payment |
Processor-agnostic payments (Stripe, Braintree) | Per-processor integration code |
velocity.misc |
Excel export, email parsing, data conversion, formatting | openpyxl + email libs + helpers |
velocity.app |
Order / invoice / payment domain objects | Custom per-project |
Installation
pip install velocity-python
With database-specific drivers:
pip install velocity-python[postgres] # PostgreSQL (psycopg2)
pip install velocity-python[mysql] # MySQL (mysql-connector-python)
pip install velocity-python[sqlserver] # SQL Server (python-tds)
pip install velocity-python[payment] # Stripe + Braintree
pip install velocity-python[all] # Everything
SQLite support is built-in (uses Python's sqlite3 module).
Requirements: Python 3.7+
Quick Start
import velocity.db
# 1. Connect
engine = velocity.db.postgres.initialize() # reads DB* env vars
# 2. Write business logic — the decorator handles transactions
@engine.transaction
def onboard_customer(tx, name, email):
customer = tx.table('customers').new() # table auto-created if missing
customer['name'] = name # column auto-created if missing
customer['email'] = email
customer['status'] = 'active'
return customer['sys_id']
# 3. Call it — tx is injected automatically
customer_id = onboard_customer('Acme Corp', 'hello@acme.com')
That's it. No model definitions, no migration files, no manual commits.
Velocity.DB — Database ORM
Connecting to a Database
Each database engine has an initialize() function that returns a configured Engine object:
import velocity.db
# PostgreSQL — reads from environment variables by default:
# DBHost, DBPort, DBDatabase, DBUser, DBPassword
engine = velocity.db.postgres.initialize()
# Or pass config explicitly:
engine = velocity.db.postgres.initialize(config={
'host': 'localhost',
'port': 5432,
'database': 'mydb',
'user': 'myuser',
'password': 'secret',
})
# MySQL
engine = velocity.db.mysql.initialize(config={...})
# SQLite
engine = velocity.db.sqlite.initialize(config={'database': '/path/to/db.sqlite'})
# SQL Server
engine = velocity.db.sqlserver.initialize(config={...})
The Engine object exposes metadata about the server:
engine.version # Server version string
engine.current_database # Active database name
engine.current_schema # Active schema
engine.tables # List of table names
engine.views # List of view names
engine.databases # List of all databases
Transactions
Every database operation runs inside a transaction. The @engine.transaction decorator manages the full lifecycle — connect, begin, commit on success, rollback on exception, close.
Basic Usage
Declare a tx parameter and the engine provides it automatically:
@engine.transaction
def create_user(tx, name, email):
user = tx.table('users').new()
user['name'] = name
user['email'] = email
return user['sys_id']
# Call without passing tx — the decorator injects it:
user_id = create_user('Alice', 'alice@example.com')
Transaction Reuse
Pass tx explicitly to share a single transaction across multiple decorated functions:
@engine.transaction
def create_user(tx, name, email):
user = tx.table('users').new()
user['name'] = name
user['email'] = email
return user['sys_id']
@engine.transaction
def create_profile(tx, user_id, bio):
profile = tx.table('profiles').new()
profile['user_id'] = user_id
profile['bio'] = bio
@engine.transaction
def onboard(tx, name, email, bio):
# Both calls share the same transaction — atomic
uid = create_user(tx, name, email)
create_profile(tx, uid, bio)
return uid
onboard('Alice', 'alice@example.com', 'Engineer')
If you omit tx when calling a decorated function, it gets its own independent transaction.
Class-Level Decoration
Apply @engine.transaction to an entire class to auto-wrap every method that has a tx parameter:
@engine.transaction
class UserService:
def create(self, tx, name, email):
user = tx.table('users').new()
user['name'] = name
user['email'] = email
return user['sys_id']
def deactivate(self, tx, user_id):
user = tx.table('users').find(user_id)
user['status'] = 'inactive'
def validate_email(self, email):
# No tx parameter — not wrapped
return '@' in email
service = UserService()
uid = service.create('Alice', 'alice@example.com')
service.deactivate(uid)
Savepoints
For partial rollback within a transaction:
@engine.transaction
def risky_operation(tx):
sp = tx.create_savepoint()
try:
tx.table('ledger').insert({'amount': 100})
except Exception:
tx.rollback_savepoint(sp) # undo just this part
else:
tx.release_savepoint(sp)
Automatic Retries
The engine automatically retries on transient errors:
DbRetryTransaction— signals explicit retryDbLockTimeoutError— lock contentionDbConnectionError— transient connection drops
Up to 100 retries with backoff.
Key Rules
| Rule | Detail |
|---|---|
Declare tx |
Your function signature must include a tx parameter |
Don't pass tx from outside |
Let the decorator create it for new transactions |
Pass tx to share |
Explicitly pass tx to keep calls in the same transaction |
_tx is reserved |
Do not use _tx as a parameter name |
Rows as Dictionaries
Row implements collections.abc.MutableMapping. A row retrieved from the database behaves exactly like a Python dictionary — because it is one.
@engine.transaction
def demonstrate_row(tx):
user = tx.table('users').find(42)
# Dictionary access
name = user['name']
user['status'] = 'active' # writes through to DB immediately
# Attribute access
email = user.email
user.phone = '+1-555-1234'
# Standard dict operations
data = dict(user) # full shallow copy
merged = {**user, 'extra': True} # unpack into new dict
keys = list(user) # column names
length = len(user) # number of columns
has_phone = 'phone' in user # membership test (case-insensitive)
# .get() with default
bio = user.get('bio', 'No bio')
# Iteration
for key in user:
print(key, user[key])
# Equality and hashing (by table name + primary key)
other = tx.table('users').find(42)
assert user == other
assert hash(user) == hash(other)
s = {user, other} # set with one element
Caching Behaviour
Row data is lazy-loaded on first access and cached locally. Subsequent reads hit the cache — not the database.
user['name'] # fetches all columns from DB, caches result
user['email'] # served from cache — no DB call
user.invalidate() # clear cache; next access re-fetches
user.refresh() # re-fetch immediately
Writes are write-through: user['name'] = 'New' issues an UPDATE and refreshes the cache.
Row API Reference
| Method / Property | Description |
|---|---|
user['col'] |
Get column value (lazy-loads from DB on first access) |
user['col'] = val |
Set column value (writes through to DB + updates cache) |
user.col / user.col = val |
Attribute-style access (same as bracket notation) |
dict(user) / user.to_dict() |
Full row as a plain dict |
user.extract('a', 'b') |
Subset of columns as a dict |
user.update({'a': 1, 'b': 2}) |
Bulk update multiple columns |
user.get(key, default) |
Safe get with default |
'col' in user |
Case-insensitive membership test |
len(user) |
Number of columns |
list(user) |
Column names |
user.refresh() |
Re-fetch all data from DB |
user.invalidate() |
Clear cache (next access re-fetches) |
user.copy() |
Clone row with a new sys_id (strips system columns) |
user.delete() / user.clear() |
Delete the row from the database |
user.lock() |
SELECT ... FOR UPDATE |
user.touch() |
Update sys_modified timestamp |
user.split() |
Returns (data_without_sys_columns, pk) |
user.match(other_dict) |
True if other dict's keys all match this row's values |
user.row('fk_column') |
Follow a foreign key to get the referenced Row |
user.sys_id |
Primary key value |
Tables — CRUD Operations
tx.table('name') returns a Table object with a full CRUD and DDL interface.
Insert
@engine.transaction
def insert_examples(tx):
users = tx.table('users')
# Dict-style: create + populate a Row
user = users.new()
user['name'] = 'Alice'
user['email'] = 'alice@example.com'
# Row is inserted when created; updates write through
# Direct insert from a dict
users.insert({'name': 'Bob', 'email': 'bob@example.com'})
# Insert-if-not-exists
users.insert_if_not_exists(
{'name': 'Carol', 'email': 'carol@example.com'},
where={'email': 'carol@example.com'}
)
Select
@engine.transaction
def select_examples(tx):
users = tx.table('users')
# All rows (returns list of dicts via Result.all())
all_users = users.select().all()
# Filtered + ordered + limited
recent = users.select(
columns=['name', 'email'],
where={'status': 'active'},
orderby='sys_created DESC',
qty=10
).all()
# Shorthand — .list() is select().all()
active = users.list(where={'status': 'active'})
# Single row lookups return Row objects
user = users.find(42) # by primary key
user = users.find({'email': 'a@b.c'}) # by condition
user = users.one({'email': 'a@b.c'}) # alias for find()
user = users.first(where={'status': 'active'}, orderby='name')
# get() — find-or-create
user = users.get({'email': 'a@b.c'}) # creates row if not found
# Iterate Row objects
for row in users.rows(where={'status': 'active'}):
print(row['name'])
# Iterate primary keys only
for sys_id in users.ids(where={'status': 'active'}):
print(sys_id)
# Batched iteration
for batch in users.batch(size=100, where={'status': 'active'}):
process(batch)
# Aggregates
total = users.count()
total_active = users.count(where={'status': 'active'})
revenue = tx.table('orders').sum('amount', where={'status': 'paid'})
Update
@engine.transaction
def update_examples(tx):
users = tx.table('users')
# Row-level (write-through)
user = users.find(42)
user['name'] = 'New Name'
# Bulk update
users.update(
{'status': 'inactive'},
where={'<last_login': '2024-01-01'}
)
# Upsert (INSERT ON CONFLICT UPDATE)
users.upsert({'name': 'Alice', 'status': 'active'}, pk='email')
# or equivalently:
users.merge({'name': 'Alice', 'status': 'active'}, pk='email')
# Update-or-insert (try UPDATE first, fallback to INSERT IF NOT EXISTS)
users.update_or_insert(
{'status': 'active'},
insert_data={'name': 'Alice', 'email': 'a@b.c', 'status': 'active'},
where={'email': 'a@b.c'}
)
Delete
@engine.transaction
def delete_examples(tx):
users = tx.table('users')
# Single row
user = users.find(42)
user.delete()
# Bulk delete (where is required — prevents accidental full deletes)
users.delete(where={'status': 'inactive'})
# Truncate (remove all rows)
users.truncate()
# Drop table entirely
users.drop()
DDL (Schema Management)
@engine.transaction
def schema_examples(tx):
users = tx.table('users')
# Create table (with velocity system columns auto-added)
users.create({'name': str, 'email': str, 'age': int})
# Check existence
if users.exists():
print('Table exists')
# Add columns (smart mode: skips existing columns)
users.alter({'phone': str, 'bio': str})
# Change column type
users.alter_type('age', 'BIGINT')
# Indexes
users.create_index(['email'], unique=True)
users.create_index(['name', 'status'])
# Foreign keys
users.create_foreign_key(['department_id'], 'departments')
# Rename
users.rename('app_users')
# Column inspection
cols = users.columns() # non-system columns
all_cols = users.sys_columns() # all columns including sys_*
pks = users.primary_keys()
fks = users.foreign_keys()
# Column object
col = users.column('email')
col.py_type # Python type
col.sql_type # SQL type
col.is_nullable # nullable?
col.distinct() # distinct values
col.max() # max value
col.rename('contact_email')
System Columns
Every table managed by Velocity automatically includes:
| Column | Type | Description |
|---|---|---|
sys_id |
BIGINT |
Auto-incrementing primary key |
sys_created |
TIMESTAMP |
Row creation time |
sys_modified |
TIMESTAMP |
Last modification time (trigger-maintained) |
sys_modified_by |
TEXT |
Last modifier identifier |
sys_modified_row |
TEXT |
Session tracking |
sys_modified_count |
BIGINT |
Modification counter (trigger-maintained) |
sys_dirty |
BOOLEAN |
Dirty flag for sync workflows |
sys_table |
TEXT |
Table name (self-referential) |
sys_keywords |
TEXT |
Full-text search keywords |
Advanced Queries
Where Clause Formats
Velocity supports three where clause formats:
1. Dictionary with operator prefixes (most common):
users.select(where={
'status': 'active', # = (default)
'>age': 18, # >
'<=score': 100, # <=
'>=created_at': '2024-01-01', # >=
'!status': 'deleted', # <> (not equal)
'%email': '@company.com', # LIKE
'!%name': 'test%', # NOT LIKE
'><age': [18, 65], # BETWEEN
'!><score': [0, 50], # NOT BETWEEN
}).all()
| Prefix | SQL | Description |
|---|---|---|
| (none) | = |
Equals (default) |
> |
> |
Greater than |
< |
< |
Less than |
>= |
>= |
Greater than or equal |
<= |
<= |
Less than or equal |
! or != or <> |
<> |
Not equal |
% |
LIKE |
Pattern match |
!% |
NOT LIKE |
Negated pattern match |
>< |
BETWEEN |
Inclusive range (value must be [low, high]) |
!>< |
NOT BETWEEN |
Negated range |
2. List of tuples (for complex predicates):
users.select(where=[
('status = %s', 'active'),
('priority = %s OR urgency = %s', ('high', 'critical'))
]).all()
3. Raw string (use with care):
users.select(where="status = 'active' AND age >= 18").all()
Foreign Key Pointer Syntax
The > pointer syntax automatically resolves foreign keys into JOINs:
# local_column>foreign_column auto-joins through the FK relationship
orders = tx.table('orders')
result = orders.select(
columns=['order_number', 'customer_id>name', 'customer_id>email'],
where={'status': 'pending'}
).all()
# Generates: SELECT A.order_number, B.name, B.email
# FROM orders A JOIN customers B ON A.customer_id = B.sys_id
# WHERE A.status = 'pending'
Multi-level pointers chain through multiple tables:
# order → customer → region
orders.select(columns=[
'order_number',
'customer_id>name',
'customer_id>region_id>region_name'
]).all()
Aggregations
stats = tx.table('orders').select(
columns=['customer_id', 'COUNT(*) as total', 'SUM(amount) as revenue'],
where={'status': 'completed'},
groupby='customer_id',
having='COUNT(*) > 5'
).all()
Server-Side Cursors
For very large result sets, use server-side cursors to avoid loading everything into memory:
result = users.server_select(where={'status': 'active'})
for row in result:
process(row)
result.close()
Raw SQL
@engine.transaction
def raw_query(tx):
result = tx.execute("""
SELECT u.name, COUNT(o.sys_id) as order_count
FROM users u
LEFT JOIN orders o ON u.sys_id = o.user_id
WHERE u.status = %s
GROUP BY u.name
HAVING COUNT(o.sys_id) > %s
""", ['active', 5])
for row in result:
print(row['name'], row['order_count'])
# Single value
total = tx.execute("SELECT COUNT(*) FROM users").scalar()
Result Objects
Every select(), execute(), or query method returns a Result object with chainable data transformations.
Output Formats
result = users.select(columns=['name', 'email'])
result.as_dict().all() # [{'name': 'Alice', 'email': 'a@b'}, ...] (default)
result.as_tuple().all() # [('Alice', 'a@b'), ...]
result.as_list().all() # [['Alice', 'a@b'], ...]
result.as_json().all() # ['{"name":"Alice","email":"a@b"}', ...]
result.as_pairs().all() # [[('name','Alice'),('email','a@b')], ...]
result.as_simple_list().all() # ['Alice', ...] (first column only)
result.as_simple_list(1).all() # ['a@b', ...] (column at position 1)
result.strings().all() # all values coerced to strings
Retrieval Methods
| Method | Returns | Use case |
|---|---|---|
.all() |
list |
Get all rows at once |
.one(default=None) |
single row or default | Expect exactly one result |
.scalar(default=None) |
single value or default | SELECT COUNT(*), SELECT MAX(...) |
.batch(qty=100) |
generator of lists | Process in chunks |
.enum() |
(index, row) tuples |
Numbered iteration |
Inspection
result.headers # ['name', 'email'] — column names
result.columns # detailed column metadata from cursor.description
bool(result) # True if there are rows (pre-fetches first row)
result.has_results() # same as bool()
result.is_empty() # opposite of has_results()
Table Data Export
# 2D list with header row — useful for spreadsheet export
table_data = result.get_table_data(headers=True)
# [['name', 'email'], ['Alice', 'a@b'], ['Bob', 'c@d']]
Automatic Schema Evolution
Velocity automatically creates missing tables and columns when you write data. This is powered by the @create_missing decorator on CRUD methods.
@engine.transaction
def evolve_schema(tx):
users = tx.table('users') # table may or may not exist
# First run: creates table with name, email columns + system columns
users.insert({'name': 'Alice', 'email': 'alice@a.com'})
# Later: phone column doesn't exist yet — auto-added
users.insert({'name': 'Bob', 'email': 'bob@b.com', 'phone': '555-1234'})
How It Works
- Operation attempted (e.g.,
INSERT) - If
DbTableMissingError→ create the table from the data's keys/types, retry - If
DbColumnMissingError→ALTER TABLE ADD COLUMNfor missing columns, retry
Type Inference
| Python Type | PostgreSQL | MySQL | SQLite |
|---|---|---|---|
str |
TEXT |
TEXT |
TEXT |
int |
BIGINT |
BIGINT |
INTEGER |
float |
NUMERIC(19,6) |
DECIMAL(19,6) |
REAL |
bool |
BOOLEAN |
BOOLEAN |
INTEGER |
datetime |
TIMESTAMP |
DATETIME |
TEXT |
date |
DATE |
DATE |
TEXT |
Protected Operations
Auto-creation applies to: insert(), update(), merge() / upsert(), alter(), alter_type().
Preview Without Executing
sql, vals = users.insert({'name': 'Test', 'new_col': 'val'}, sql_only=True)
print(sql) # Shows the ALTER + INSERT that would run
Views, Sequences, and Databases
Views
@engine.transaction
def view_examples(tx):
v = tx.view('active_users')
v.create_or_replace("SELECT * FROM users WHERE status = 'active'")
v.grant('SELECT', 'readonly_role')
# Idempotent ensure — creates if missing, applies grants
v.ensure(
"SELECT * FROM users WHERE status = 'active'",
grants=[('SELECT', 'readonly_role')],
grant_public_select=True
)
if v.exists():
print('View exists')
Sequences (PostgreSQL)
@engine.transaction
def sequence_examples(tx):
seq = tx.sequence('invoice_number')
seq.create(start=1000)
next_val = seq.next() # nextval()
curr_val = seq.current() # currval()
safe_val = seq.safe_current() # returns None if uninitialized (no exception)
seq.set_value(5000) # ALTER SEQUENCE RESTART WITH 5000
seq.configure(increment=10, minvalue=1, maxvalue=999999, cycle=True)
info = seq.info() # metadata from pg_sequences
seq.rename('order_number')
seq.drop()
Database Management
@engine.transaction
def database_examples(tx):
db = tx.database('analytics')
if not db.exists():
db.create()
db.tables # list of schema.table strings
db.vacuum(analyze=True)
db.reindex()
db.switch() # switch active database
Schema Locking
In production, disable auto-creation to prevent accidental schema changes:
engine = velocity.db.postgres.initialize(schema_locked=True)
# Or toggle at runtime:
engine.lock_schema()
engine.unlock_schema()
# Temporary unlock for migrations:
with engine.unlocked_schema():
migrate(engine)
When schema is locked, @create_missing raises DbSchemaLockedError instead of auto-creating.
Error Handling
Transactions automatically roll back on any exception:
@engine.transaction
def safe_transfer(tx, from_id, to_id, amount):
from_acct = tx.table('accounts').find(from_id)
to_acct = tx.table('accounts').find(to_id)
if from_acct['balance'] < amount:
raise ValueError("Insufficient funds")
from_acct['balance'] -= amount # both changes are atomic
to_acct['balance'] += amount
# auto-committed on success; auto-rolled-back on exception
The @return_default decorator on internal methods swallows specified exceptions and logs them:
# Example: table.count() returns 0 if the table doesn't exist
# because count() is decorated with @return_default(0, (DbTableMissingError,))
user_count = users.count() # returns 0 for missing table, not an exception
The @reset_id_on_dup_key decorator handles primary key collisions by bumping the sequence and retrying (up to 3 times).
Exception Hierarchy
All database exceptions inherit from DbException:
| Exception | When |
|---|---|
DbConnectionError |
Connection failure (transient — auto-retried) |
DbDatabaseMissingError |
Database doesn't exist |
DbTableMissingError |
Table doesn't exist (triggers auto-create) |
DbColumnMissingError |
Column doesn't exist (triggers auto-add) |
DbDuplicateKeyError |
Unique constraint violation |
DbForeignKeyMissingError |
FK constraint violation |
DbDataIntegrityError |
General integrity violation |
DbTruncationError |
Data too long for column |
DbLockTimeoutError |
Lock wait timeout (auto-retried) |
DbSchemaLockedError |
Schema modification blocked |
DbObjectExistsError |
Object already exists |
DbQueryError |
SQL syntax or execution error |
DbTransactionError |
Transaction state error |
DbRetryTransaction |
Signals transaction retry |
DuplicateRowsFoundError |
Multiple rows found when single expected |
Utility Functions
velocity.db.utils provides helpers for working with row data:
from velocity.db.utils import safe_sort_rows, group_by_fields
# Sort rows with None-safe handling
sorted_rows = safe_sort_rows(rows, 'last_name', none_handling='last')
# Group rows by field values
groups = group_by_fields(rows, 'department', 'status')
# {('Engineering', 'active'): [...], ('Marketing', 'active'): [...]}
| Function | Description |
|---|---|
safe_sort_rows(rows, field, none_handling='last') |
Sort dicts by field, placing None values first or last |
group_by_fields(rows, *fields) |
Group rows into {tuple: [rows]} by field values |
mask_config_for_display(config) |
Redact passwords/secrets in config dicts for logging |
Velocity.AWS — Serverless Framework
A complete framework for building AWS Lambda functions with action-based routing, Cognito authentication, and SQS message processing.
Lambda Handlers
Subclass LambdaHandler to create API Gateway-backed Lambda functions:
from velocity.aws.handlers import LambdaHandler
class MyHandler(LambdaHandler):
auth_mode = 'required' # 'required' | 'optional' | 'none'
user_table = 'app_users' # table for DB user lookup
public_actions = ['get-status'] # actions that skip auth
def OnActionGetUsers(self, tx, context):
users = tx.table('users').list(where={'status': 'active'})
context.response().load_object({'users': users})
def OnActionCreateUser(self, tx, context):
data = context.payload()
user = tx.table('users').new()
user['name'] = data['name']
user['email'] = data['email']
context.response().toast('User created', 'success')
def OnActionGetStatus(self, tx, context):
context.response().load_object({'status': 'ok'})
handler = MyHandler()
def lambda_handler(event, context):
return handler.serve(event, context)
Action routing: A POST with {"action": "get-users"} calls OnActionGetUsers. The action name is converted to PascalCase with OnAction prefix.
Authentication flow (beforeAction):
- Extract Cognito user from API Gateway event
- Look up user in
user_tableby Cognito subject - Attach
current_userto context - Log activity to
aws_api_activitytable
Error handling: Exceptions in handlers are caught by onError, logged, and returned as structured error responses. AlertError surfaces user-facing messages.
SQS Handlers
For Lambda functions triggered by SQS queues:
from velocity.aws.handlers import SqsHandler
class MyQueueHandler(SqsHandler):
def OnActionProcessOrder(self, tx, context):
data = context.payload()
order = tx.table('orders').find(data['order_id'])
order['status'] = 'processed'
def OnActionSendNotification(self, tx, context):
data = context.payload()
# send notification logic...
handler = MyQueueHandler()
def lambda_handler(event, context):
return handler.serve(event, context)
Each SQS record's message body is parsed and routed by its action field.
Context and Response Objects
Context — request data accessor:
def OnActionExample(self, tx, context):
action = context.action() # 'example'
body = context.postdata() # full parsed request body
payload = context.payload() # body['payload'] shortcut
name = context.payload('name') # body['payload']['name']
nested = context.payload('a', 'b') # body['payload']['a']['b']
Response — action-based response builder:
def OnActionExample(self, tx, context):
resp = context.response()
resp.load_object({'users': [...]}) # send data to frontend store
resp.update_store({'count': 42}) # partial store update
resp.toast('Saved!', 'success') # toast notification
resp.alert('Are you sure?', 'Confirm') # alert dialog
resp.file_download({'url': '...', 'filename': 'report.xlsx'})
resp.set_status(201)
resp.set_headers({'X-Custom': 'value'})
AlertError — surface errors as UI notifications:
from velocity.aws.handlers import AlertError
raise AlertError('Email already exists', title='Duplicate', toast=True, variant='warning')
PerfTimer — optional request timing:
def OnActionHeavyOperation(self, tx, context):
context.configure_perf(context.postdata()) # enables if perf=True in request
context.perf.start('fetch')
data = tx.table('big_table').list()
context.perf.time('fetch')
context.perf.start('process')
result = process(data)
context.perf.time('process')
# Timing logged automatically
Amplify Integration
AmplifyProject wraps AWS Amplify, Lambda, and SQS APIs:
from velocity.aws.amplify import AmplifyProject
app = AmplifyProject('d1234abcde')
app.get_app_name() # Amplify app name
app.list_backend_branches() # ['main', 'staging']
app.get_merged_env_vars('main') # env vars (global + branch)
app.set_environment_variable('KEY', 'val') # set env var
# Update Lambda functions
for fn in app.list_lambda_functions_filtered('main'):
app.update_lambda_function(fn, env_vars={'NEW_KEY': 'val'})
amplify_build provides full deployment automation:
from velocity.aws.amplify_build import run_backend_deployment, BackendDeploymentConfig
config = BackendDeploymentConfig(queue_names=['notifications'])
run_backend_deployment(app, 'main', config)
Velocity.Payment — Payment Processing
Processor-agnostic payment system using the Adapter pattern. Currently supports Stripe (Connect with Express accounts) and Braintree (sub-merchant accounts).
Adapter Pattern
All adapters implement PaymentProcessorAdapter:
from velocity.payment.base_adapter import PaymentProcessorAdapter
# Adapters provide:
adapter.create_account(tx, client_data) # create merchant account
adapter.get_account_status(tx, account_id) # account status
adapter.create_onboarding_link(tx, id, return_url, refresh_url)
adapter.authorize_payment(tx, payment_data) # pre-authorize (hold)
adapter.capture_payment(tx, transaction_id, amount) # capture authorized payment
adapter.cancel_payment(tx, transaction_id, reason) # void authorization
adapter.charge_stored_payment_method(tx, payment_data) # charge vaulted method
Payment Routing
The router selects the correct adapter using a 3-level hierarchy:
- Form-level override (specific donation form)
- Client-level setting (organization preference)
- Platform default (system-wide feature flag)
from velocity.payment.router import get_processor_adapter, charge_stored_payment_method
# Get the right adapter for a client
adapter = get_processor_adapter(tx, client_id=42, form_sys_id=100)
# Or use the convenience function
charge_stored_payment_method(
tx,
client_id=42,
payment_profile=profile_data,
amount=50.00,
description='Monthly donation'
)
Revenue split percentages follow a 4-level hierarchy:
from velocity.payment.router import get_revenue_split_percentage
split = get_revenue_split_percentage(tx, client_id=42) # e.g., 15.00
Stripe and Braintree
Stripe — Express Connected Accounts with destination charges:
- Hosted onboarding via AccountLinks
- Destination charges with
application_fee_amount - Two-phase auth (manual capture)
- Full customer profile management (create, attach/detach payment methods, delete)
Braintree — Sub-merchant accounts:
- Platform-centric model (all payments to master account)
- Manual settlement to clients
- Sub-merchant account management
Customer Profiles
from velocity.payment.router import (
get_or_create_customer_profile,
attach_payment_method,
detach_payment_method,
)
profile = get_or_create_customer_profile(tx, client_id=42, customer_data={...})
attach_payment_method(tx, client_id=42, customer_id=cid, payment_method_id=pm_id)
detach_payment_method(tx, client_id=42, payment_method_id=pm_id)
from velocity.payment.profiles import money_to_cents, build_stripe_payment_profile
cents = money_to_cents(49.99) # 4999
profile = build_stripe_payment_profile(customer_id, payment_method, email)
Velocity.App — Business Domain Objects
Pre-built, transaction-aware domain objects for common business operations.
Orders
from velocity.app.orders import Order
@engine.transaction
class OrderService:
def create_order(self, tx, customer_id, items):
order = Order()
order.update_header('customer_id', customer_id)
order.update_header('status', 'pending')
for item in items:
order.add_lineitem(
{'product_id': item['id'], 'qty': item['qty'], 'price': item['price']},
supp_data={'notes': item.get('notes')}
)
order.persist(tx)
return order.to_dict()
Order manages headers, line items, and supplemental data with schema validation (SCHEMA), defaults (DEFAULTS), and full CRUD (load, persist, add_lineitem, update_lineitem, delete_lineitem).
Invoices, Payments, and Purchase Orders modules are available as extension points.
Velocity.Misc — Utilities
Data Conversion (iconv / oconv)
Two-way data conversion for transforming between frontend/API formats and database storage:
iconv — Input Conversion (frontend → database):
from velocity.misc.conv import iconv
iconv.phone('5551234567') # '5551234567' (normalized to 10 digits)
iconv.email('John@Example.COM') # 'john@example.com'
iconv.date_conv('01/15/2024') # datetime.date(2024, 1, 15)
iconv.boolean('yes') # True
iconv.integer('42') # 42
iconv.none('null') # None
iconv.pointer('123') # 123
oconv — Output Conversion (database → frontend):
from velocity.misc.conv import oconv
oconv.phone('5551234567') # '(555) 123-4567'
oconv.money(1234.5) # '$1,234.50'
oconv.date_conv(date.today()) # '01/15/2024'
oconv.day_of_week(1) # 'Monday'
oconv.ein('123456789') # '12-3456789'
oconv.boolean(1) # True
oconv.title('john doe') # 'John Doe'
oconv.padding(10, '0') # returns a right-pad function
oconv.pprint('{"a":1}') # formatted JSON string
Spreadsheet Export
Generate Excel files with headers, styles, merged cells, and auto-sized columns:
from velocity.misc.export import create_spreadsheet
headers = ['Name', 'Email', 'Amount']
rows = [
['Alice', 'alice@a.com', 150.00],
['Bob', 'bob@b.com', 275.50],
]
create_spreadsheet(
headers=headers,
rows=rows,
fileorbuffer='report.xlsx',
styles={
'col_header': True, # bold header row
'align_right': [2], # right-align Amount column
},
freeze_panes='A2', # freeze header row
auto_size=True # auto-fit column widths
)
Available styles: col_header, bold, sum_total, sub_total, align_right. Supports named styles, merge cells, number formats, and dimension overrides.
Email Parsing
Parse MIME email content into structured data:
from velocity.misc.mail import parse
result = parse(raw_email_content)
result['body'] # plain text body
result['html'] # HTML body
result['attachments'] # [Attachment(name, data, ctype, size, hash), ...]
Formatting and Serialization
from velocity.misc.format import currency, human_delta, to_json
from datetime import timedelta
from decimal import Decimal
currency(1234.5) # '1,234.50'
human_delta(timedelta(hours=3, minutes=2)) # '3 hr(s) 2 min'
# JSON serializer that handles Decimal, datetime, date, time, timedelta
to_json({'amount': Decimal('19.99'), 'created': datetime.now()})
Deep Merge
Recursively merge dictionaries:
from velocity.misc.merge import deep_merge
base = {'a': 1, 'nested': {'x': 10, 'y': 20}}
override = {'b': 2, 'nested': {'y': 99, 'z': 30}}
result = deep_merge(base, override)
# {'a': 1, 'b': 2, 'nested': {'x': 10, 'y': 99, 'z': 30}}
# Mutate first dict in-place:
deep_merge(base, override, update=True)
Timer
Simple stopwatch for profiling:
from velocity.misc.timer import Timer
t = Timer('DB query')
# ... do work ...
elapsed = t.stop()
print(t) # "DB query: 0.1234 s"
Project Structure
velocity-python/
├── src/velocity/
│ ├── db/ # Database ORM
│ │ ├── core/
│ │ │ ├── engine.py # Engine — connection + transaction decorator
│ │ │ ├── transaction.py # Transaction — connection lifecycle
│ │ │ ├── table.py # Table — full CRUD + DDL
│ │ │ ├── row.py # Row — MutableMapping dict-like row
│ │ │ ├── result.py # Result — cursor wrapper + transforms
│ │ │ ├── view.py # View — DB view management
│ │ │ ├── column.py # Column — column metadata + operations
│ │ │ ├── database.py # Database — DB-level operations
│ │ │ ├── sequence.py # Sequence — PostgreSQL sequences
│ │ │ └── decorators.py # @create_missing, @return_default, etc.
│ │ ├── servers/
│ │ │ ├── postgres/ # PostgreSQL dialect
│ │ │ ├── mysql/ # MySQL dialect
│ │ │ ├── sqlite/ # SQLite dialect
│ │ │ ├── sqlserver/ # SQL Server dialect
│ │ │ ├── base/ # Base dialect (abstract)
│ │ │ └── tablehelper.py # Query builder + FK pointer resolution
│ │ ├── exceptions.py # Exception hierarchy
│ │ └── utils.py # Sorting, grouping, masking utilities
│ ├── aws/ # AWS integrations
│ │ ├── handlers/ # Lambda handler framework
│ │ │ ├── base_handler.py # BaseHandler — action routing
│ │ │ ├── lambda_handler.py # LambdaHandler — API Gateway
│ │ │ ├── sqs_handler.py # SqsHandler — SQS events
│ │ │ ├── context.py # Context — request accessor
│ │ │ ├── response.py # Response — action-based response builder
│ │ │ ├── data_service.py # DataServiceMixin — generic CRUD + export
│ │ │ ├── web_handler.py # WebHandler — activity tracking mixin
│ │ │ └── perf.py # PerfTimer — request timing
│ │ ├── amplify.py # AmplifyProject — Amplify/Lambda/SQS API
│ │ └── amplify_build.py # Deployment automation
│ ├── payment/ # Payment processing
│ │ ├── base_adapter.py # PaymentProcessorAdapter (ABC)
│ │ ├── stripe_adapter.py # Stripe Connect adapter
│ │ ├── braintree_adapter.py # Braintree adapter
│ │ ├── router.py # Processor routing + convenience functions
│ │ └── profiles.py # Payment profile utilities
│ ├── app/ # Business domain objects
│ │ ├── orders.py # Order management
│ │ ├── invoices.py # Invoice management
│ │ ├── payments.py # Payment records
│ │ └── purchase_orders.py # Purchase order management
│ ├── misc/ # Utilities
│ │ ├── conv/
│ │ │ ├── iconv.py # Input conversion (frontend → DB)
│ │ │ └── oconv.py # Output conversion (DB → frontend)
│ │ ├── export.py # Excel spreadsheet generation
│ │ ├── mail.py # Email MIME parsing
│ │ ├── format.py # Currency, JSON, timedelta formatting
│ │ ├── merge.py # Deep dict merge
│ │ ├── timer.py # Stopwatch utility
│ │ └── tools.py # run_once and misc helpers
│ └── logging.py # Structured logging (CloudWatch JSON)
├── tests/ # Test suite
├── scripts/ # Utility and demo scripts
├── docs/ # Additional documentation
├── pyproject.toml # Package configuration
├── Makefile # Development commands
└── README.md # This file
Development
Setup
git clone <repository-url>
cd velocity-python
pip install -e ".[dev]"
Running Tests
# All tests
pytest
# With coverage
pytest --cov=velocity
# Unit tests only (no database required)
make test-unit
# Integration tests (requires database)
make test-integration
# Clean caches
make clean
Code Quality
black src/ # format
mypy src/ # type check
flake8 src/ # lint
Version Management
python scripts/bump.py
License
MIT License — see LICENSE for details.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file velocity_python-0.0.239.tar.gz.
File metadata
- Download URL: velocity_python-0.0.239.tar.gz
- Upload date:
- Size: 278.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.15
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ad49154955299f6929374b0de21b0aa23bace531694783947a751d78b1408d81
|
|
| MD5 |
1f45a4a1339a900946541a01db67efc7
|
|
| BLAKE2b-256 |
86e7cae134db5e77afdcbc2945ca87eba3aaf2e6564a173c95e5244f587d7441
|
File details
Details for the file velocity_python-0.0.239-py3-none-any.whl.
File metadata
- Download URL: velocity_python-0.0.239-py3-none-any.whl
- Upload date:
- Size: 283.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.15
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
31f2989faa56ffe11116ca938b6a8416ab1bba408407f697c4fb138c8f280128
|
|
| MD5 |
f4982a154ce6efbecc296f49f1a474f7
|
|
| BLAKE2b-256 |
9696d14757448c4f4fbd1ff752fbac6ddcfec2aad25ab130c0e1eafef6889830
|