Skip to main content

A rapid application development library for interfacing with data storage

Project description

Velocity

A rapid application development library for Python that eliminates boilerplate across the entire backend stack. Velocity provides a Pythonic database ORM where rows behave like dictionaries, a serverless handler framework for AWS Lambda, processor-agnostic payment integration, and a collection of everyday utilities — all designed so you write business logic instead of infrastructure code.

Table of Contents


Why Velocity?

Most Python backend projects cobble together an ORM, a web/serverless framework, a payment library, spreadsheet tooling, and assorted glue code. Velocity replaces all of that with one cohesive library built around two principles:

  1. Convention over configuration — sensible defaults everywhere. Tables auto-create columns, transactions auto-commit, Lambda handlers auto-route actions.
  2. Python-native interfaces — database rows are real MutableMapping objects you can dict(), **unpack, iterate, and compare. No new query language to learn.

What you get:

Layer What it does Without Velocity
velocity.db Multi-database ORM with dict-like rows, auto-schema, query builder SQLAlchemy + Alembic + custom code
velocity.aws Lambda handler framework with auth, routing, SQS Custom event parsing per function
velocity.payment Processor-agnostic payments (Stripe, Braintree) Per-processor integration code
velocity.misc Excel export, email parsing, data conversion, formatting openpyxl + email libs + helpers
velocity.app Order / invoice / payment domain objects Custom per-project

Installation

pip install velocity-python

With database-specific drivers:

pip install velocity-python[postgres]    # PostgreSQL (psycopg2)
pip install velocity-python[mysql]       # MySQL (mysql-connector-python)
pip install velocity-python[sqlserver]   # SQL Server (python-tds)
pip install velocity-python[payment]     # Stripe + Braintree
pip install velocity-python[all]         # Everything

SQLite support is built-in (uses Python's sqlite3 module).

Requirements: Python 3.7+


Quick Start

import velocity.db

# 1. Connect
engine = velocity.db.postgres.initialize()  # reads DB* env vars

# 2. Write business logic — the decorator handles transactions
@engine.transaction
def onboard_customer(tx, name, email):
    customer = tx.table('customers').new()  # table auto-created if missing
    customer['name'] = name                 # column auto-created if missing
    customer['email'] = email
    customer['status'] = 'active'
    return customer['sys_id']

# 3. Call it — tx is injected automatically
customer_id = onboard_customer('Acme Corp', 'hello@acme.com')

That's it. No model definitions, no migration files, no manual commits.


Velocity.DB — Database ORM

Connecting to a Database

Each database engine has an initialize() function that returns a configured Engine object:

import velocity.db

# PostgreSQL — reads from environment variables by default:
#   DBHost, DBPort, DBDatabase, DBUser, DBPassword
engine = velocity.db.postgres.initialize()

# Or pass config explicitly:
engine = velocity.db.postgres.initialize(config={
    'host': 'localhost',
    'port': 5432,
    'database': 'mydb',
    'user': 'myuser',
    'password': 'secret',
})

# MySQL
engine = velocity.db.mysql.initialize(config={...})

# SQLite
engine = velocity.db.sqlite.initialize(config={'database': '/path/to/db.sqlite'})

# SQL Server
engine = velocity.db.sqlserver.initialize(config={...})

The Engine object exposes metadata about the server:

engine.version          # Server version string
engine.current_database # Active database name
engine.current_schema   # Active schema
engine.tables           # List of table names
engine.views            # List of view names
engine.databases        # List of all databases

Transactions

Every database operation runs inside a transaction. The @engine.transaction decorator manages the full lifecycle — connect, begin, commit on success, rollback on exception, close.

Basic Usage

Declare a tx parameter and the engine provides it automatically:

@engine.transaction
def create_user(tx, name, email):
    user = tx.table('users').new()
    user['name'] = name
    user['email'] = email
    return user['sys_id']

# Call without passing tx — the decorator injects it:
user_id = create_user('Alice', 'alice@example.com')

Transaction Reuse

Pass tx explicitly to share a single transaction across multiple decorated functions:

@engine.transaction
def create_user(tx, name, email):
    user = tx.table('users').new()
    user['name'] = name
    user['email'] = email
    return user['sys_id']

@engine.transaction
def create_profile(tx, user_id, bio):
    profile = tx.table('profiles').new()
    profile['user_id'] = user_id
    profile['bio'] = bio

@engine.transaction
def onboard(tx, name, email, bio):
    # Both calls share the same transaction — atomic
    uid = create_user(tx, name, email)
    create_profile(tx, uid, bio)
    return uid

onboard('Alice', 'alice@example.com', 'Engineer')

If you omit tx when calling a decorated function, it gets its own independent transaction.

Class-Level Decoration

Apply @engine.transaction to an entire class to auto-wrap every method that has a tx parameter:

@engine.transaction
class UserService:
    def create(self, tx, name, email):
        user = tx.table('users').new()
        user['name'] = name
        user['email'] = email
        return user['sys_id']

    def deactivate(self, tx, user_id):
        user = tx.table('users').find(user_id)
        user['status'] = 'inactive'

    def validate_email(self, email):
        # No tx parameter — not wrapped
        return '@' in email

service = UserService()
uid = service.create('Alice', 'alice@example.com')
service.deactivate(uid)

Savepoints

For partial rollback within a transaction:

@engine.transaction
def risky_operation(tx):
    sp = tx.create_savepoint()
    try:
        tx.table('ledger').insert({'amount': 100})
    except Exception:
        tx.rollback_savepoint(sp)   # undo just this part
    else:
        tx.release_savepoint(sp)

Automatic Retries

The engine automatically retries on transient errors:

  • DbRetryTransaction — signals explicit retry
  • DbLockTimeoutError — lock contention
  • DbConnectionError — transient connection drops

Up to 100 retries with backoff.

Key Rules

Rule Detail
Declare tx Your function signature must include a tx parameter
Don't pass tx from outside Let the decorator create it for new transactions
Pass tx to share Explicitly pass tx to keep calls in the same transaction
_tx is reserved Do not use _tx as a parameter name

Rows as Dictionaries

Row implements collections.abc.MutableMapping. A row retrieved from the database behaves exactly like a Python dictionary — because it is one.

@engine.transaction
def demonstrate_row(tx):
    user = tx.table('users').find(42)

    # Dictionary access
    name = user['name']
    user['status'] = 'active'          # writes through to DB immediately

    # Attribute access
    email = user.email
    user.phone = '+1-555-1234'

    # Standard dict operations
    data = dict(user)                  # full shallow copy
    merged = {**user, 'extra': True}   # unpack into new dict
    keys = list(user)                  # column names
    length = len(user)                 # number of columns
    has_phone = 'phone' in user        # membership test (case-insensitive)

    # .get() with default
    bio = user.get('bio', 'No bio')

    # Iteration
    for key in user:
        print(key, user[key])

    # Equality and hashing (by table name + primary key)
    other = tx.table('users').find(42)
    assert user == other
    assert hash(user) == hash(other)
    s = {user, other}                  # set with one element

Caching Behaviour

Row data is lazy-loaded on first access and cached locally. Subsequent reads hit the cache — not the database.

user['name']          # fetches all columns from DB, caches result
user['email']         # served from cache — no DB call
user.invalidate()     # clear cache; next access re-fetches
user.refresh()        # re-fetch immediately

Writes are write-through: user['name'] = 'New' issues an UPDATE and refreshes the cache.

Row API Reference

Method / Property Description
user['col'] Get column value (lazy-loads from DB on first access)
user['col'] = val Set column value (writes through to DB + updates cache)
user.col / user.col = val Attribute-style access (same as bracket notation)
dict(user) / user.to_dict() Full row as a plain dict
user.extract('a', 'b') Subset of columns as a dict
user.update({'a': 1, 'b': 2}) Bulk update multiple columns
user.get(key, default) Safe get with default
'col' in user Case-insensitive membership test
len(user) Number of columns
list(user) Column names
user.refresh() Re-fetch all data from DB
user.invalidate() Clear cache (next access re-fetches)
user.copy() Clone row with a new sys_id (strips system columns)
user.delete() / user.clear() Delete the row from the database
user.lock() SELECT ... FOR UPDATE
user.touch() Update sys_modified timestamp
user.split() Returns (data_without_sys_columns, pk)
user.match(other_dict) True if other dict's keys all match this row's values
user.row('fk_column') Follow a foreign key to get the referenced Row
user.sys_id Primary key value

Tables — CRUD Operations

tx.table('name') returns a Table object with a full CRUD and DDL interface.

Insert

@engine.transaction
def insert_examples(tx):
    users = tx.table('users')

    # Dict-style: create + populate a Row
    user = users.new()
    user['name'] = 'Alice'
    user['email'] = 'alice@example.com'
    # Row is inserted when created; updates write through

    # Direct insert from a dict
    users.insert({'name': 'Bob', 'email': 'bob@example.com'})

    # Insert-if-not-exists
    users.insert_if_not_exists(
        {'name': 'Carol', 'email': 'carol@example.com'},
        where={'email': 'carol@example.com'}
    )

Select

@engine.transaction
def select_examples(tx):
    users = tx.table('users')

    # All rows (returns list of dicts via Result.all())
    all_users = users.select().all()

    # Filtered + ordered + limited
    recent = users.select(
        columns=['name', 'email'],
        where={'status': 'active'},
        orderby='sys_created DESC',
        qty=10
    ).all()

    # Shorthand — .list() is select().all()
    active = users.list(where={'status': 'active'})

    # Single row lookups return Row objects
    user = users.find(42)                  # by primary key
    user = users.find({'email': 'a@b.c'}) # by condition
    user = users.one({'email': 'a@b.c'})  # alias for find()
    user = users.first(where={'status': 'active'}, orderby='name')

    # get() — find-or-create
    user = users.get({'email': 'a@b.c'})  # creates row if not found

    # Iterate Row objects
    for row in users.rows(where={'status': 'active'}):
        print(row['name'])

    # Iterate primary keys only
    for sys_id in users.ids(where={'status': 'active'}):
        print(sys_id)

    # Batched iteration
    for batch in users.batch(size=100, where={'status': 'active'}):
        process(batch)

    # Aggregates
    total = users.count()
    total_active = users.count(where={'status': 'active'})
    revenue = tx.table('orders').sum('amount', where={'status': 'paid'})

Update

@engine.transaction
def update_examples(tx):
    users = tx.table('users')

    # Row-level (write-through)
    user = users.find(42)
    user['name'] = 'New Name'

    # Bulk update
    users.update(
        {'status': 'inactive'},
        where={'<last_login': '2024-01-01'}
    )

    # Upsert (INSERT ON CONFLICT UPDATE)
    users.upsert({'name': 'Alice', 'status': 'active'}, pk='email')
    # or equivalently:
    users.merge({'name': 'Alice', 'status': 'active'}, pk='email')

    # Update-or-insert (try UPDATE first, fallback to INSERT IF NOT EXISTS)
    users.update_or_insert(
        {'status': 'active'},
        insert_data={'name': 'Alice', 'email': 'a@b.c', 'status': 'active'},
        where={'email': 'a@b.c'}
    )

Delete

@engine.transaction
def delete_examples(tx):
    users = tx.table('users')

    # Single row
    user = users.find(42)
    user.delete()

    # Bulk delete (where is required — prevents accidental full deletes)
    users.delete(where={'status': 'inactive'})

    # Truncate (remove all rows)
    users.truncate()

    # Drop table entirely
    users.drop()

DDL (Schema Management)

@engine.transaction
def schema_examples(tx):
    users = tx.table('users')

    # Create table (with velocity system columns auto-added)
    users.create({'name': str, 'email': str, 'age': int})

    # Check existence
    if users.exists():
        print('Table exists')

    # Add columns (smart mode: skips existing columns)
    users.alter({'phone': str, 'bio': str})

    # Change column type
    users.alter_type('age', 'BIGINT')

    # Indexes
    users.create_index(['email'], unique=True)
    users.create_index(['name', 'status'])

    # Foreign keys
    users.create_foreign_key(['department_id'], 'departments')

    # Rename
    users.rename('app_users')

    # Column inspection
    cols = users.columns()           # non-system columns
    all_cols = users.sys_columns()   # all columns including sys_*
    pks = users.primary_keys()
    fks = users.foreign_keys()

    # Column object
    col = users.column('email')
    col.py_type       # Python type
    col.sql_type      # SQL type
    col.is_nullable   # nullable?
    col.distinct()    # distinct values
    col.max()         # max value
    col.rename('contact_email')

System Columns

Every table managed by Velocity automatically includes:

Column Type Description
sys_id BIGINT Auto-incrementing primary key
sys_created TIMESTAMP Row creation time
sys_modified TIMESTAMP Last modification time (trigger-maintained)
sys_modified_by TEXT Last modifier identifier
sys_modified_row TEXT Session tracking
sys_modified_count BIGINT Modification counter (trigger-maintained)
sys_dirty BOOLEAN Dirty flag for sync workflows
sys_table TEXT Table name (self-referential)
sys_keywords TEXT Full-text search keywords

Advanced Queries

Where Clause Formats

Velocity supports three where clause formats:

1. Dictionary with operator prefixes (most common):

users.select(where={
    'status': 'active',             # = (default)
    '>age': 18,                     # >
    '<=score': 100,                 # <=
    '>=created_at': '2024-01-01',   # >=
    '!status': 'deleted',           # <> (not equal)
    '%email': '@company.com',       # LIKE
    '!%name': 'test%',             # NOT LIKE
    '><age': [18, 65],             # BETWEEN
    '!><score': [0, 50],           # NOT BETWEEN
}).all()
Prefix SQL Description
(none) = Equals (default)
> > Greater than
< < Less than
>= >= Greater than or equal
<= <= Less than or equal
! or != or <> <> Not equal
% LIKE Pattern match
!% NOT LIKE Negated pattern match
>< BETWEEN Inclusive range (value must be [low, high])
!>< NOT BETWEEN Negated range

2. List of tuples (for complex predicates):

users.select(where=[
    ('status = %s', 'active'),
    ('priority = %s OR urgency = %s', ('high', 'critical'))
]).all()

3. Raw string (use with care):

users.select(where="status = 'active' AND age >= 18").all()

Foreign Key Pointer Syntax

The > pointer syntax automatically resolves foreign keys into JOINs:

# local_column>foreign_column auto-joins through the FK relationship
orders = tx.table('orders')
result = orders.select(
    columns=['order_number', 'customer_id>name', 'customer_id>email'],
    where={'status': 'pending'}
).all()
# Generates: SELECT A.order_number, B.name, B.email
#            FROM orders A JOIN customers B ON A.customer_id = B.sys_id
#            WHERE A.status = 'pending'

Multi-level pointers chain through multiple tables:

# order → customer → region
orders.select(columns=[
    'order_number',
    'customer_id>name',
    'customer_id>region_id>region_name'
]).all()

Aggregations

stats = tx.table('orders').select(
    columns=['customer_id', 'COUNT(*) as total', 'SUM(amount) as revenue'],
    where={'status': 'completed'},
    groupby='customer_id',
    having='COUNT(*) > 5'
).all()

Server-Side Cursors

For very large result sets, use server-side cursors to avoid loading everything into memory:

result = users.server_select(where={'status': 'active'})
for row in result:
    process(row)
result.close()

Raw SQL

@engine.transaction
def raw_query(tx):
    result = tx.execute("""
        SELECT u.name, COUNT(o.sys_id) as order_count
        FROM users u
        LEFT JOIN orders o ON u.sys_id = o.user_id
        WHERE u.status = %s
        GROUP BY u.name
        HAVING COUNT(o.sys_id) > %s
    """, ['active', 5])

    for row in result:
        print(row['name'], row['order_count'])

    # Single value
    total = tx.execute("SELECT COUNT(*) FROM users").scalar()

Result Objects

Every select(), execute(), or query method returns a Result object with chainable data transformations.

Output Formats

result = users.select(columns=['name', 'email'])

result.as_dict().all()         # [{'name': 'Alice', 'email': 'a@b'}, ...]  (default)
result.as_tuple().all()        # [('Alice', 'a@b'), ...]
result.as_list().all()         # [['Alice', 'a@b'], ...]
result.as_json().all()         # ['{"name":"Alice","email":"a@b"}', ...]
result.as_pairs().all()        # [[('name','Alice'),('email','a@b')], ...]
result.as_simple_list().all()  # ['Alice', ...]  (first column only)
result.as_simple_list(1).all() # ['a@b', ...]    (column at position 1)
result.strings().all()         # all values coerced to strings

Retrieval Methods

Method Returns Use case
.all() list Get all rows at once
.one(default=None) single row or default Expect exactly one result
.scalar(default=None) single value or default SELECT COUNT(*), SELECT MAX(...)
.batch(qty=100) generator of lists Process in chunks
.enum() (index, row) tuples Numbered iteration

Inspection

result.headers       # ['name', 'email'] — column names
result.columns       # detailed column metadata from cursor.description
bool(result)         # True if there are rows (pre-fetches first row)
result.has_results() # same as bool()
result.is_empty()    # opposite of has_results()

Table Data Export

# 2D list with header row — useful for spreadsheet export
table_data = result.get_table_data(headers=True)
# [['name', 'email'], ['Alice', 'a@b'], ['Bob', 'c@d']]

Automatic Schema Evolution

Velocity automatically creates missing tables and columns when you write data. This is powered by the @create_missing decorator on CRUD methods.

@engine.transaction
def evolve_schema(tx):
    users = tx.table('users')  # table may or may not exist

    # First run: creates table with name, email columns + system columns
    users.insert({'name': 'Alice', 'email': 'alice@a.com'})

    # Later: phone column doesn't exist yet — auto-added
    users.insert({'name': 'Bob', 'email': 'bob@b.com', 'phone': '555-1234'})

How It Works

  1. Operation attempted (e.g., INSERT)
  2. If DbTableMissingError → create the table from the data's keys/types, retry
  3. If DbColumnMissingErrorALTER TABLE ADD COLUMN for missing columns, retry

Type Inference

Python Type PostgreSQL MySQL SQLite
str TEXT TEXT TEXT
int BIGINT BIGINT INTEGER
float NUMERIC(19,6) DECIMAL(19,6) REAL
bool BOOLEAN BOOLEAN INTEGER
datetime TIMESTAMP DATETIME TEXT
date DATE DATE TEXT

Protected Operations

Auto-creation applies to: insert(), update(), merge() / upsert(), alter(), alter_type().

Preview Without Executing

sql, vals = users.insert({'name': 'Test', 'new_col': 'val'}, sql_only=True)
print(sql)  # Shows the ALTER + INSERT that would run

Views, Sequences, and Databases

Views

@engine.transaction
def view_examples(tx):
    v = tx.view('active_users')

    v.create_or_replace("SELECT * FROM users WHERE status = 'active'")
    v.grant('SELECT', 'readonly_role')

    # Idempotent ensure — creates if missing, applies grants
    v.ensure(
        "SELECT * FROM users WHERE status = 'active'",
        grants=[('SELECT', 'readonly_role')],
        grant_public_select=True
    )

    if v.exists():
        print('View exists')

Sequences (PostgreSQL)

@engine.transaction
def sequence_examples(tx):
    seq = tx.sequence('invoice_number')

    seq.create(start=1000)
    next_val = seq.next()          # nextval()
    curr_val = seq.current()       # currval()
    safe_val = seq.safe_current()  # returns None if uninitialized (no exception)

    seq.set_value(5000)            # ALTER SEQUENCE RESTART WITH 5000
    seq.configure(increment=10, minvalue=1, maxvalue=999999, cycle=True)

    info = seq.info()              # metadata from pg_sequences
    seq.rename('order_number')
    seq.drop()

Database Management

@engine.transaction
def database_examples(tx):
    db = tx.database('analytics')

    if not db.exists():
        db.create()

    db.tables        # list of schema.table strings
    db.vacuum(analyze=True)
    db.reindex()
    db.switch()      # switch active database

Schema Locking

In production, disable auto-creation to prevent accidental schema changes:

engine = velocity.db.postgres.initialize(schema_locked=True)

# Or toggle at runtime:
engine.lock_schema()
engine.unlock_schema()

# Temporary unlock for migrations:
with engine.unlocked_schema():
    migrate(engine)

When schema is locked, @create_missing raises DbSchemaLockedError instead of auto-creating.


Error Handling

Transactions automatically roll back on any exception:

@engine.transaction
def safe_transfer(tx, from_id, to_id, amount):
    from_acct = tx.table('accounts').find(from_id)
    to_acct = tx.table('accounts').find(to_id)

    if from_acct['balance'] < amount:
        raise ValueError("Insufficient funds")

    from_acct['balance'] -= amount  # both changes are atomic
    to_acct['balance'] += amount
    # auto-committed on success; auto-rolled-back on exception

The @return_default decorator on internal methods swallows specified exceptions and logs them:

# Example: table.count() returns 0 if the table doesn't exist
# because count() is decorated with @return_default(0, (DbTableMissingError,))
user_count = users.count()  # returns 0 for missing table, not an exception

The @reset_id_on_dup_key decorator handles primary key collisions by bumping the sequence and retrying (up to 3 times).


Exception Hierarchy

All database exceptions inherit from DbException:

Exception When
DbConnectionError Connection failure (transient — auto-retried)
DbDatabaseMissingError Database doesn't exist
DbTableMissingError Table doesn't exist (triggers auto-create)
DbColumnMissingError Column doesn't exist (triggers auto-add)
DbDuplicateKeyError Unique constraint violation
DbForeignKeyMissingError FK constraint violation
DbDataIntegrityError General integrity violation
DbTruncationError Data too long for column
DbLockTimeoutError Lock wait timeout (auto-retried)
DbSchemaLockedError Schema modification blocked
DbObjectExistsError Object already exists
DbQueryError SQL syntax or execution error
DbTransactionError Transaction state error
DbRetryTransaction Signals transaction retry
DuplicateRowsFoundError Multiple rows found when single expected

Utility Functions

velocity.db.utils provides helpers for working with row data:

from velocity.db.utils import safe_sort_rows, group_by_fields

# Sort rows with None-safe handling
sorted_rows = safe_sort_rows(rows, 'last_name', none_handling='last')

# Group rows by field values
groups = group_by_fields(rows, 'department', 'status')
# {('Engineering', 'active'): [...], ('Marketing', 'active'): [...]}
Function Description
safe_sort_rows(rows, field, none_handling='last') Sort dicts by field, placing None values first or last
group_by_fields(rows, *fields) Group rows into {tuple: [rows]} by field values
mask_config_for_display(config) Redact passwords/secrets in config dicts for logging

Velocity.AWS — Serverless Framework

A complete framework for building AWS Lambda functions with action-based routing, Cognito authentication, and SQS message processing.

Lambda Handlers

Subclass LambdaHandler to create API Gateway-backed Lambda functions:

from velocity.aws.handlers import LambdaHandler

class MyHandler(LambdaHandler):
    auth_mode = 'required'           # 'required' | 'optional' | 'none'
    user_table = 'app_users'         # table for DB user lookup
    public_actions = ['get-status']  # actions that skip auth

    def OnActionGetUsers(self, tx, context):
        users = tx.table('users').list(where={'status': 'active'})
        context.response().load_object({'users': users})

    def OnActionCreateUser(self, tx, context):
        data = context.payload()
        user = tx.table('users').new()
        user['name'] = data['name']
        user['email'] = data['email']
        context.response().toast('User created', 'success')

    def OnActionGetStatus(self, tx, context):
        context.response().load_object({'status': 'ok'})

handler = MyHandler()

def lambda_handler(event, context):
    return handler.serve(event, context)

Action routing: A POST with {"action": "get-users"} calls OnActionGetUsers. The action name is converted to PascalCase with OnAction prefix.

Authentication flow (beforeAction):

  1. Extract Cognito user from API Gateway event
  2. Look up user in user_table by Cognito subject
  3. Attach current_user to context
  4. Log activity to aws_api_activity table

Error handling: Exceptions in handlers are caught by onError, logged, and returned as structured error responses. AlertError surfaces user-facing messages.

SQS Handlers

For Lambda functions triggered by SQS queues:

from velocity.aws.handlers import SqsHandler

class MyQueueHandler(SqsHandler):
    def OnActionProcessOrder(self, tx, context):
        data = context.payload()
        order = tx.table('orders').find(data['order_id'])
        order['status'] = 'processed'

    def OnActionSendNotification(self, tx, context):
        data = context.payload()
        # send notification logic...

handler = MyQueueHandler()

def lambda_handler(event, context):
    return handler.serve(event, context)

Each SQS record's message body is parsed and routed by its action field.

Context and Response Objects

Context — request data accessor:

def OnActionExample(self, tx, context):
    action = context.action()           # 'example'
    body = context.postdata()           # full parsed request body
    payload = context.payload()         # body['payload'] shortcut
    name = context.payload('name')      # body['payload']['name']
    nested = context.payload('a', 'b')  # body['payload']['a']['b']

Response — action-based response builder:

def OnActionExample(self, tx, context):
    resp = context.response()

    resp.load_object({'users': [...]})        # send data to frontend store
    resp.update_store({'count': 42})          # partial store update
    resp.toast('Saved!', 'success')           # toast notification
    resp.alert('Are you sure?', 'Confirm')    # alert dialog
    resp.file_download({'url': '...', 'filename': 'report.xlsx'})

    resp.set_status(201)
    resp.set_headers({'X-Custom': 'value'})

AlertError — surface errors as UI notifications:

from velocity.aws.handlers import AlertError

raise AlertError('Email already exists', title='Duplicate', toast=True, variant='warning')

PerfTimer — optional request timing:

def OnActionHeavyOperation(self, tx, context):
    context.configure_perf(context.postdata())  # enables if perf=True in request

    context.perf.start('fetch')
    data = tx.table('big_table').list()
    context.perf.time('fetch')

    context.perf.start('process')
    result = process(data)
    context.perf.time('process')
    # Timing logged automatically

Amplify Integration

AmplifyProject wraps AWS Amplify, Lambda, and SQS APIs:

from velocity.aws.amplify import AmplifyProject

app = AmplifyProject('d1234abcde')

app.get_app_name()                              # Amplify app name
app.list_backend_branches()                     # ['main', 'staging']
app.get_merged_env_vars('main')                 # env vars (global + branch)
app.set_environment_variable('KEY', 'val')      # set env var

# Update Lambda functions
for fn in app.list_lambda_functions_filtered('main'):
    app.update_lambda_function(fn, env_vars={'NEW_KEY': 'val'})

amplify_build provides full deployment automation:

from velocity.aws.amplify_build import run_backend_deployment, BackendDeploymentConfig

config = BackendDeploymentConfig(queue_names=['notifications'])
run_backend_deployment(app, 'main', config)

Velocity.Payment — Payment Processing

Processor-agnostic payment system using the Adapter pattern. Currently supports Stripe (Connect with Express accounts) and Braintree (sub-merchant accounts).

Adapter Pattern

All adapters implement PaymentProcessorAdapter:

from velocity.payment.base_adapter import PaymentProcessorAdapter

# Adapters provide:
adapter.create_account(tx, client_data)              # create merchant account
adapter.get_account_status(tx, account_id)            # account status
adapter.create_onboarding_link(tx, id, return_url, refresh_url)
adapter.authorize_payment(tx, payment_data)           # pre-authorize (hold)
adapter.capture_payment(tx, transaction_id, amount)   # capture authorized payment
adapter.cancel_payment(tx, transaction_id, reason)    # void authorization
adapter.charge_stored_payment_method(tx, payment_data) # charge vaulted method

Payment Routing

The router selects the correct adapter using a 3-level hierarchy:

  1. Form-level override (specific donation form)
  2. Client-level setting (organization preference)
  3. Platform default (system-wide feature flag)
from velocity.payment.router import get_processor_adapter, charge_stored_payment_method

# Get the right adapter for a client
adapter = get_processor_adapter(tx, client_id=42, form_sys_id=100)

# Or use the convenience function
charge_stored_payment_method(
    tx,
    client_id=42,
    payment_profile=profile_data,
    amount=50.00,
    description='Monthly donation'
)

Revenue split percentages follow a 4-level hierarchy:

from velocity.payment.router import get_revenue_split_percentage

split = get_revenue_split_percentage(tx, client_id=42)  # e.g., 15.00

Stripe and Braintree

Stripe — Express Connected Accounts with destination charges:

  • Hosted onboarding via AccountLinks
  • Destination charges with application_fee_amount
  • Two-phase auth (manual capture)
  • Full customer profile management (create, attach/detach payment methods, delete)

Braintree — Sub-merchant accounts:

  • Platform-centric model (all payments to master account)
  • Manual settlement to clients
  • Sub-merchant account management

Customer Profiles

from velocity.payment.router import (
    get_or_create_customer_profile,
    attach_payment_method,
    detach_payment_method,
)

profile = get_or_create_customer_profile(tx, client_id=42, customer_data={...})
attach_payment_method(tx, client_id=42, customer_id=cid, payment_method_id=pm_id)
detach_payment_method(tx, client_id=42, payment_method_id=pm_id)
from velocity.payment.profiles import money_to_cents, build_stripe_payment_profile

cents = money_to_cents(49.99)  # 4999
profile = build_stripe_payment_profile(customer_id, payment_method, email)

Velocity.App — Business Domain Objects

Pre-built, transaction-aware domain objects for common business operations.

Orders

from velocity.app.orders import Order

@engine.transaction
class OrderService:
    def create_order(self, tx, customer_id, items):
        order = Order()
        order.update_header('customer_id', customer_id)
        order.update_header('status', 'pending')

        for item in items:
            order.add_lineitem(
                {'product_id': item['id'], 'qty': item['qty'], 'price': item['price']},
                supp_data={'notes': item.get('notes')}
            )

        order.persist(tx)
        return order.to_dict()

Order manages headers, line items, and supplemental data with schema validation (SCHEMA), defaults (DEFAULTS), and full CRUD (load, persist, add_lineitem, update_lineitem, delete_lineitem).

Invoices, Payments, and Purchase Orders modules are available as extension points.


Velocity.Misc — Utilities

Data Conversion (iconv / oconv)

Two-way data conversion for transforming between frontend/API formats and database storage:

iconv — Input Conversion (frontend → database):

from velocity.misc.conv import iconv

iconv.phone('5551234567')      # '5551234567' (normalized to 10 digits)
iconv.email('John@Example.COM') # 'john@example.com'
iconv.date_conv('01/15/2024')  # datetime.date(2024, 1, 15)
iconv.boolean('yes')           # True
iconv.integer('42')            # 42
iconv.none('null')             # None
iconv.pointer('123')           # 123

oconv — Output Conversion (database → frontend):

from velocity.misc.conv import oconv

oconv.phone('5551234567')       # '(555) 123-4567'
oconv.money(1234.5)             # '$1,234.50'
oconv.date_conv(date.today())   # '01/15/2024'
oconv.day_of_week(1)            # 'Monday'
oconv.ein('123456789')          # '12-3456789'
oconv.boolean(1)                # True
oconv.title('john doe')         # 'John Doe'
oconv.padding(10, '0')          # returns a right-pad function
oconv.pprint('{"a":1}')        # formatted JSON string

Spreadsheet Export

Generate Excel files with headers, styles, merged cells, and auto-sized columns:

from velocity.misc.export import create_spreadsheet

headers = ['Name', 'Email', 'Amount']
rows = [
    ['Alice', 'alice@a.com', 150.00],
    ['Bob', 'bob@b.com', 275.50],
]

create_spreadsheet(
    headers=headers,
    rows=rows,
    fileorbuffer='report.xlsx',
    styles={
        'col_header': True,     # bold header row
        'align_right': [2],     # right-align Amount column
    },
    freeze_panes='A2',          # freeze header row
    auto_size=True              # auto-fit column widths
)

Available styles: col_header, bold, sum_total, sub_total, align_right. Supports named styles, merge cells, number formats, and dimension overrides.

Email Parsing

Parse MIME email content into structured data:

from velocity.misc.mail import parse

result = parse(raw_email_content)
result['body']         # plain text body
result['html']         # HTML body
result['attachments']  # [Attachment(name, data, ctype, size, hash), ...]

Formatting and Serialization

from velocity.misc.format import currency, human_delta, to_json
from datetime import timedelta
from decimal import Decimal

currency(1234.5)                    # '1,234.50'
human_delta(timedelta(hours=3, minutes=2))  # '3 hr(s) 2 min'

# JSON serializer that handles Decimal, datetime, date, time, timedelta
to_json({'amount': Decimal('19.99'), 'created': datetime.now()})

Deep Merge

Recursively merge dictionaries:

from velocity.misc.merge import deep_merge

base = {'a': 1, 'nested': {'x': 10, 'y': 20}}
override = {'b': 2, 'nested': {'y': 99, 'z': 30}}

result = deep_merge(base, override)
# {'a': 1, 'b': 2, 'nested': {'x': 10, 'y': 99, 'z': 30}}

# Mutate first dict in-place:
deep_merge(base, override, update=True)

Timer

Simple stopwatch for profiling:

from velocity.misc.timer import Timer

t = Timer('DB query')
# ... do work ...
elapsed = t.stop()
print(t)  # "DB query: 0.1234 s"

Project Structure

velocity-python/
├── src/velocity/
│   ├── db/                          # Database ORM
│   │   ├── core/
│   │   │   ├── engine.py            # Engine — connection + transaction decorator
│   │   │   ├── transaction.py       # Transaction — connection lifecycle
│   │   │   ├── table.py             # Table — full CRUD + DDL
│   │   │   ├── row.py               # Row — MutableMapping dict-like row
│   │   │   ├── result.py            # Result — cursor wrapper + transforms
│   │   │   ├── view.py              # View — DB view management
│   │   │   ├── column.py            # Column — column metadata + operations
│   │   │   ├── database.py          # Database — DB-level operations
│   │   │   ├── sequence.py          # Sequence — PostgreSQL sequences
│   │   │   └── decorators.py        # @create_missing, @return_default, etc.
│   │   ├── servers/
│   │   │   ├── postgres/            # PostgreSQL dialect
│   │   │   ├── mysql/               # MySQL dialect
│   │   │   ├── sqlite/              # SQLite dialect
│   │   │   ├── sqlserver/           # SQL Server dialect
│   │   │   ├── base/                # Base dialect (abstract)
│   │   │   └── tablehelper.py       # Query builder + FK pointer resolution
│   │   ├── exceptions.py            # Exception hierarchy
│   │   └── utils.py                 # Sorting, grouping, masking utilities
│   ├── aws/                         # AWS integrations
│   │   ├── handlers/                # Lambda handler framework
│   │   │   ├── base_handler.py      # BaseHandler — action routing
│   │   │   ├── lambda_handler.py    # LambdaHandler — API Gateway
│   │   │   ├── sqs_handler.py       # SqsHandler — SQS events
│   │   │   ├── context.py           # Context — request accessor
│   │   │   ├── response.py          # Response — action-based response builder
│   │   │   ├── data_service.py      # DataServiceMixin — generic CRUD + export
│   │   │   ├── web_handler.py       # WebHandler — activity tracking mixin
│   │   │   └── perf.py              # PerfTimer — request timing
│   │   ├── amplify.py               # AmplifyProject — Amplify/Lambda/SQS API
│   │   └── amplify_build.py         # Deployment automation
│   ├── payment/                     # Payment processing
│   │   ├── base_adapter.py          # PaymentProcessorAdapter (ABC)
│   │   ├── stripe_adapter.py        # Stripe Connect adapter
│   │   ├── braintree_adapter.py     # Braintree adapter
│   │   ├── router.py                # Processor routing + convenience functions
│   │   └── profiles.py              # Payment profile utilities
│   ├── app/                         # Business domain objects
│   │   ├── orders.py                # Order management
│   │   ├── invoices.py              # Invoice management
│   │   ├── payments.py              # Payment records
│   │   └── purchase_orders.py       # Purchase order management
│   ├── misc/                        # Utilities
│   │   ├── conv/
│   │   │   ├── iconv.py             # Input conversion (frontend → DB)
│   │   │   └── oconv.py             # Output conversion (DB → frontend)
│   │   ├── export.py                # Excel spreadsheet generation
│   │   ├── mail.py                  # Email MIME parsing
│   │   ├── format.py                # Currency, JSON, timedelta formatting
│   │   ├── merge.py                 # Deep dict merge
│   │   ├── timer.py                 # Stopwatch utility
│   │   └── tools.py                 # run_once and misc helpers
│   └── logging.py                   # Structured logging (CloudWatch JSON)
├── tests/                           # Test suite
├── scripts/                         # Utility and demo scripts
├── docs/                            # Additional documentation
├── pyproject.toml                   # Package configuration
├── Makefile                         # Development commands
└── README.md                        # This file

Development

Setup

git clone <repository-url>
cd velocity-python
pip install -e ".[dev]"

Running Tests

# All tests
pytest

# With coverage
pytest --cov=velocity

# Unit tests only (no database required)
make test-unit

# Integration tests (requires database)
make test-integration

# Clean caches
make clean

Code Quality

black src/          # format
mypy src/           # type check
flake8 src/         # lint

Version Management

python scripts/bump.py

License

MIT License — see LICENSE for details.

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

velocity_python-0.0.234.tar.gz (270.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

velocity_python-0.0.234-py3-none-any.whl (278.8 kB view details)

Uploaded Python 3

File details

Details for the file velocity_python-0.0.234.tar.gz.

File metadata

  • Download URL: velocity_python-0.0.234.tar.gz
  • Upload date:
  • Size: 270.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.15

File hashes

Hashes for velocity_python-0.0.234.tar.gz
Algorithm Hash digest
SHA256 71e55f3eeccd43cc60c481142bada6dc0c4c84c1d6ec45ebaa867213d379b6be
MD5 54de154e3de9bd7f86b231e29202ef4a
BLAKE2b-256 df03510325e144045da6fae2cecf41aa77181c036951f34861ad1faf5e1c8fdc

See more details on using hashes here.

File details

Details for the file velocity_python-0.0.234-py3-none-any.whl.

File metadata

File hashes

Hashes for velocity_python-0.0.234-py3-none-any.whl
Algorithm Hash digest
SHA256 1859065318a4cc43c8b8a1a8cbf28c52ad4adb745553d5ac0587183758ef3dd1
MD5 ea36327ae108ab0d9f65cbafb561f5bc
BLAKE2b-256 cce802dd6591731766dd6dc07f46a0608be69ac0485e0c0f1649c220b4b04d9a

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page