Mysql toolkit
Project description
CommonDAO
A powerful, type-safe, and Pydantic-integrated async MySQL toolkit for Python.
CommonDAO is a lightweight, type-safe async MySQL toolkit designed to simplify database operations with a clean, intuitive API. It integrates seamlessly with Pydantic for robust data validation while providing a comprehensive set of tools for common database tasks.
✨ Features
- Async/Await Support: Built on aiomysql for non-blocking database operations
- Type Safety: Strong typing with Python's type hints and runtime type checking
- Pydantic Integration: Seamless validation and transformation of database records to Pydantic models
- SQL Injection Protection: Parameterized queries for secure database access
- Comprehensive CRUD Operations: Simple methods for common database tasks
- Raw SQL Support: Full control when you need it with parameterized raw SQL
- Connection Pooling: Efficient database connection management
- Minimal Boilerplate: Write less code while maintaining readability and control
🚀 Installation
pip install commondao
🔍 Quick Start
import asyncio
from commondao import connect
from commondao.annotation import TableId
from pydantic import BaseModel
from typing import Annotated
# Define your Pydantic models with TableId annotation
class User(BaseModel):
id: Annotated[int, TableId('users')] # First field with TableId is the primary key
name: str
email: str
class UserInsert(BaseModel):
id: Annotated[Optional[int], TableId('users')] = None # Optional for auto-increment
name: str
email: str
async def main():
# Connect to database
config = {
'host': 'localhost',
'port': 3306,
'user': 'root',
'password': 'password',
'db': 'testdb',
'autocommit': True,
}
async with connect(**config) as db:
# Insert a new user using Pydantic model
user = UserInsert(name='John Doe', email='john@example.com')
await db.insert(user)
# Query the user by key with Pydantic model validation
result = await db.get_by_key(User, key={'email': 'john@example.com'})
if result:
print(f"User: {result.name} ({result.email})") # Output => User: John Doe (john@example.com)
if __name__ == "__main__":
asyncio.run(main())
📊 Core Operations
Connection
from commondao import connect
async with connect(
host='localhost',
port=3306,
user='root',
password='password',
db='testdb'
) as db:
# Your database operations here
pass
Insert Data (with Pydantic Models)
from pydantic import BaseModel
from commondao.annotation import TableId
from typing import Annotated, Optional
class UserInsert(BaseModel):
id: Annotated[Optional[int], TableId('users')] = None # Auto-increment primary key
name: str
email: str
# Insert using Pydantic model (id will be auto-generated)
user = UserInsert(name='John', email='john@example.com')
await db.insert(user)
print(f"New user id: {db.lastrowid()}") # Get the auto-generated id
# Insert with ignore option (skips duplicate key errors)
user2 = UserInsert(name='Jane', email='jane@example.com')
await db.insert(user2, ignore=True)
# Insert with custom field handling
# exclude_unset=False: includes all fields, even those not explicitly set
# exclude_none=True: excludes fields with None values
user3 = UserInsert(name='Bob', email='bob@example.com')
await db.insert(user3, exclude_unset=False, exclude_none=True)
Update Data (with Pydantic Models)
class UserUpdate(BaseModel):
id: Optional[int] = None
name: Optional[str] = None
email: Optional[str] = None
# Update by primary key (id must be provided)
user = UserUpdate(id=1, name='John Smith', email='john.smith@example.com')
await db.update_by_id(user)
# Update by custom key (partial update - only specified fields)
user_update = UserUpdate(name='Jane Doe', email='jane.doe@example.com')
await db.update_by_key(user_update, key={'email': 'john.smith@example.com'})
# Update with field handling options
# exclude_unset=True (default): only update explicitly set fields
# exclude_none=False (default): include None values (set column to NULL)
user = UserUpdate(id=2, name='Alice', email=None)
await db.update_by_id(user, exclude_unset=True, exclude_none=False) # Sets email to NULL
# Update excluding None values
user = UserUpdate(id=3, name='Bob', email=None)
await db.update_by_id(user, exclude_unset=True, exclude_none=True) # Won't update email column
Delete Data
# Delete by primary key
await db.delete_by_id(User, 1)
# Delete by custom key
await db.delete_by_key(User, key={'email': 'john@example.com'})
Query Data
# Get a single row by primary key
user = await db.get_by_id(User, 1)
# Get a row by primary key or raise NotFoundError if not found
user = await db.get_by_id_or_fail(User, 1)
# Get by custom key
user = await db.get_by_key(User, key={'email': 'john@example.com'})
# Get by key or raise NotFoundError if not found
user = await db.get_by_key_or_fail(User, key={'email': 'john@example.com'})
# Use with Pydantic models
from pydantic import BaseModel
from commondao.annotation import RawSql
from typing import Annotated
class UserModel(BaseModel):
id: int
name: str
email: str
full_name: Annotated[str, RawSql("CONCAT(first_name, ' ', last_name)")]
# Query with model validation
user = await db.select_one(
"from users where id = :id",
UserModel,
{"id": 1}
)
# Query multiple rows
users = await db.select_all(
"from users where status = :status",
UserModel,
{"status": "active"}
)
# Paginated queries
from commondao import Paged
result: Paged[UserModel] = await db.select_paged(
"from users where status = :status",
UserModel,
{"status": "active"},
size=10,
offset=0
)
print(f"Total users: {result.total}")
print(f"Current page: {len(result.items)} users")
Raw SQL Execution
CommonDAO supports parameterized SQL queries using named parameters with the :parameter_name format for secure and readable queries.
execute_query - For SELECT operations
# Simple query without parameters
rows = await db.execute_query("SELECT * FROM users")
# Query with single parameter
user_rows = await db.execute_query(
"SELECT * FROM users WHERE id = :user_id",
{"user_id": 123}
)
# Query with multiple parameters
filtered_rows = await db.execute_query(
"SELECT * FROM users WHERE name = :name AND age > :min_age",
{"name": "John", "min_age": 18}
)
# Query with IN clause (using list parameter)
users_in_group = await db.execute_query(
"SELECT * FROM users WHERE id IN :user_ids",
{"user_ids": [1, 2, 3, 4]}
)
# Complex query with date filtering
recent_users = await db.execute_query(
"SELECT * FROM users WHERE created_at > :date AND status = :status",
{"date": "2023-01-01", "status": "active"}
)
execute_mutation - For INSERT, UPDATE, DELETE operations
# INSERT statement
affected = await db.execute_mutation(
"INSERT INTO users (name, email, age) VALUES (:name, :email, :age)",
{"name": "John", "email": "john@example.com", "age": 25}
)
print(f"Inserted {affected} rows")
# UPDATE statement
affected = await db.execute_mutation(
"UPDATE users SET email = :new_email WHERE id = :user_id",
{"new_email": "newemail@example.com", "user_id": 123}
)
print(f"Updated {affected} rows")
# DELETE statement
affected = await db.execute_mutation(
"DELETE FROM users WHERE age < :min_age",
{"min_age": 18}
)
print(f"Deleted {affected} rows")
# Multiple parameter UPDATE
affected = await db.execute_mutation(
"UPDATE users SET name = :name, age = :age WHERE id = :id",
{"name": "Jane", "age": 30, "id": 456}
)
# Bulk operations with loop
user_list = [
{"name": "Alice", "email": "alice@example.com"},
{"name": "Bob", "email": "bob@example.com"},
]
for user_data in user_list:
affected = await db.execute_mutation(
"INSERT INTO users (name, email) VALUES (:name, :email)",
user_data
)
Parameter Format Rules
- Named Parameters: Use
:parameter_nameformat in SQL - Dictionary Keys: Match parameter names without the colon prefix
- Supported Types: str, int, float, bytes, datetime, date, time, timedelta, Decimal
- Lists/Tuples: Supported for IN clauses in queries
- None Values: Properly handled as SQL NULL
# Example with various data types
from datetime import datetime, date
from decimal import Decimal
result = await db.execute_query(
"""
SELECT * FROM orders
WHERE customer_id = :customer_id
AND total >= :min_total
AND created_date = :order_date
AND status IN :valid_statuses
""",
{
"customer_id": 123,
"min_total": Decimal("99.99"),
"order_date": date(2023, 12, 25),
"valid_statuses": ["pending", "confirmed", "shipped"]
}
)
Transactions
from commondao.annotation import TableId
from typing import Annotated, Optional
class OrderInsert(BaseModel):
id: Annotated[Optional[int], TableId('orders')] = None
customer_id: int
total: float
class OrderItemInsert(BaseModel):
id: Annotated[Optional[int], TableId('order_items')] = None
order_id: int
product_id: int
async with connect(host='localhost', user='root', db='testdb') as db:
# Start transaction (autocommit=False by default)
order = OrderInsert(customer_id=1, total=99.99)
await db.insert(order)
order_id = db.lastrowid() # Get the auto-generated order id
item = OrderItemInsert(order_id=order_id, product_id=42)
await db.insert(item)
# Commit the transaction
await db.commit()
🔐 Type Safety
CommonDAO provides robust type checking to help prevent errors:
from commondao import is_row_dict, is_query_dict
from typing import Dict, Any
from datetime import datetime
# Valid row dict (for updates/inserts)
valid_data: Dict[str, Any] = {
"id": 1,
"name": "John",
"created_at": datetime.now(),
}
# Check type safety
assert is_row_dict(valid_data) # Type check passes
# Valid query dict (can contain lists/tuples for IN clauses)
valid_query: Dict[str, Any] = {
"id": 1,
"status": "active",
"tags": ["admin", "user"], # Lists are valid for query dicts
"codes": (200, 201) # Tuples are also valid
}
assert is_query_dict(valid_query) # Type check passes
# Invalid row dict (contains a list)
invalid_data: Dict[str, Any] = {
"id": 1,
"tags": ["admin", "user"] # Lists are not valid row values
}
assert not is_row_dict(invalid_data) # Type check fails
📖 API Documentation
For complete API documentation, please see the docstrings in the code or visit our documentation website.
🧪 Testing
CommonDAO comes with comprehensive tests to ensure reliability:
# Install test dependencies
pip install -e ".[test]"
# Run tests
pytest tests
🤝 Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
📄 License
This project is licensed under the Apache License 2.0.
CommonDAO API Reference
This document provides comprehensive API reference for CommonDAO, a powerful async MySQL toolkit with Pydantic integration.
Table of Contents
- Connection Management
- Core CRUD Operations
- Query Methods
- Raw SQL Execution
- Transaction Management
- Type Safety Utilities
- Annotations
- Error Classes
- Data Types
Connection Management
connect(**config) -> AsyncContextManager[Commondao]
Creates an async database connection context manager.
Parameters:
host(str): Database hostport(int): Database port (default: 3306)user(str): Database usernamepassword(str): Database passworddb(str): Database nameautocommit(bool): Enable autocommit mode (default: True)- Additional aiomysql connection parameters
Returns: AsyncContextManager yielding a Commondao instance
Example:
async with connect(host='localhost', user='root', password='pwd', db='testdb') as db:
# Database operations here
pass
Core CRUD Operations
insert(entity: BaseModel, *, ignore: bool = False, exclude_unset: bool = True, exclude_none: bool = False) -> int
Insert a Pydantic model instance into the database.
Parameters:
entity(BaseModel): Pydantic model instance to insertignore(bool): Use INSERT IGNORE to skip duplicate key errorsexclude_unset(bool): If True (default), excludes fields not explicitly set. Allows database defaults for unset fieldsexclude_none(bool): If False (default), includes None values. If True, excludes None values from INSERT
Returns: int - Number of affected rows (1 on success, 0 if ignored)
Example:
user = UserInsert(name='John', email='john@example.com')
affected_rows = await db.insert(user)
update_by_id(entity: BaseModel, *, exclude_unset: bool = True, exclude_none: bool = False) -> int
Update a record by its primary key using a Pydantic model.
Parameters:
entity(BaseModel): Pydantic model with primary key value setexclude_unset(bool): If True (default), only updates explicitly set fields. Enables partial updatesexclude_none(bool): If False (default), includes None values (sets to NULL). If True, excludes None values
Returns: int - Number of affected rows
Raises:
EmptyPrimaryKeyError: If primary key is None or empty
Example:
user = UserUpdate(id=1, name='John Updated', email='john.new@example.com')
affected_rows = await db.update_by_id(user)
update_by_key(entity: BaseModel, *, key: QueryDict, exclude_unset: bool = True, exclude_none: bool = False) -> int
Update records matching the specified key conditions.
Parameters:
entity(BaseModel): Pydantic model with update valueskey(QueryDict): Dictionary of key-value pairs for WHERE conditionsexclude_unset(bool): If True (default), only updates explicitly set fields. Enables partial updatesexclude_none(bool): If False (default), includes None values (sets to NULL). If True, excludes None values
Returns: int - Number of affected rows
Example:
user_update = UserUpdate(name='Jane', email='jane@example.com')
affected_rows = await db.update_by_key(user_update, key={'id': 1})
delete_by_id(entity_class: Type[BaseModel], entity_id: Union[int, str]) -> int
Delete a record by its primary key value.
Parameters:
entity_class(Type[BaseModel]): Pydantic model classentity_id(Union[int, str]): The primary key value
Returns: int - Number of affected rows
Raises:
AssertionError: If entity_id is None
Example:
affected_rows = await db.delete_by_id(User, 1)
delete_by_key(entity_class: Type[BaseModel], *, key: QueryDict) -> int
Delete records matching the specified key conditions.
Parameters:
entity_class(Type[BaseModel]): Pydantic model classkey(QueryDict): Dictionary of key-value pairs for WHERE conditions
Returns: int - Number of affected rows
Example:
affected_rows = await db.delete_by_key(User, key={'id': 1})
Query Methods
get_by_id(entity_class: Type[M], entity_id: Union[int, str]) -> Optional[M]
Get a single record by its primary key value.
Parameters:
entity_class(Type[M]): Pydantic model classentity_id(Union[int, str]): The primary key value
Returns: Optional[M] - Model instance or None if not found
Raises:
AssertionError: If entity_id is None
Example:
user = await db.get_by_id(User, 1)
if user:
print(f"Found user: {user.name}")
get_by_id_or_fail(entity_class: Type[M], entity_id: Union[int, str]) -> M
Get a single record by its primary key value or raise an error if not found.
Parameters:
entity_class(Type[M]): Pydantic model classentity_id(Union[int, str]): The primary key value
Returns: M - Model instance
Raises:
NotFoundError: If no record matches the primary keyAssertionError: If entity_id is None
Example:
try:
user = await db.get_by_id_or_fail(User, 1)
print(f"User: {user.name}")
except NotFoundError:
print("User not found")
get_by_key(entity_class: Type[M], *, key: QueryDict) -> Optional[M]
Get a single record matching the specified key conditions.
Parameters:
entity_class(Type[M]): Pydantic model classkey(QueryDict): Dictionary of key-value pairs for WHERE conditions
Returns: Optional[M] - Model instance or None if not found
Example:
user = await db.get_by_key(User, key={'email': 'john@example.com'})
get_by_key_or_fail(entity_class: Type[M], *, key: QueryDict) -> M
Get a single record matching the key conditions or raise an error if not found.
Parameters:
entity_class(Type[M]): Pydantic model classkey(QueryDict): Dictionary of key-value pairs for WHERE conditions
Returns: M - Model instance
Raises:
NotFoundError: If no record matches the conditions
Example:
user = await db.get_by_key_or_fail(User, key={'email': 'john@example.com'})
select_one(headless_sql: str, select: Type[M], data: QueryDict = {}) -> Optional[M]
Execute a SELECT query and return the first row as a validated model instance.
Parameters:
headless_sql(str): SQL query starting with 'from' (without SELECT clause). Examples:"from users where age > :min_age","from \users`","from (subquery) as t"`select(Type[M]): Pydantic model class for result validationdata(QueryDict): Parameters for the query
Returns: Optional[M] - Model instance or None if no results
Example:
user = await db.select_one(
"from users where age > :min_age",
User,
{"min_age": 18}
)
select_one_or_fail(headless_sql: str, select: Type[M], data: QueryDict = {}) -> M
Execute a SELECT query and return the first row or raise an error if not found.
Parameters:
headless_sql(str): SQL query starting with 'from' (without SELECT clause). Examples:"from users where email = :email","from \users`","from (subquery) as t"`select(Type[M]): Pydantic model class for result validationdata(QueryDict): Parameters for the query
Returns: M - Model instance
Raises:
NotFoundError: If no results found
Example:
user = await db.select_one_or_fail(
"from users where email = :email",
User,
{"email": "john@example.com"}
)
select_all(headless_sql: str, select: Type[M], data: QueryDict = {}) -> list[M]
Execute a SELECT query and return all matching rows as validated model instances.
Parameters:
headless_sql(str): SQL query starting with 'from' (without SELECT clause). Examples:"from users where status = :status","from \users`","from (subquery) as t"`select(Type[M]): Pydantic model class for result validationdata(QueryDict): Parameters for the query
Returns: list[M] - List of model instances
Example:
active_users = await db.select_all(
"from users where status = :status",
User,
{"status": "active"}
)
select_paged(headless_sql: str, select: Type[M], data: QueryDict = {}, *, size: int, offset: int = 0) -> Paged[M]
Execute a paginated SELECT query with total count.
Parameters:
headless_sql(str): SQL query starting with 'from' (without SELECT clause). Examples:"from users where status = :status","from \users`","from (subquery) as t"`select(Type[M]): Pydantic model class for result validationdata(QueryDict): Parameters for the querysize(int): Number of items per pageoffset(int): Number of items to skip
Returns: Paged[M] - Paginated result with items and total count
Example:
result = await db.select_paged(
"from users where status = :status",
User,
{"status": "active"},
size=10,
offset=20
)
print(f"Total: {result.total}, Page items: {len(result.items)}")
Raw SQL Execution
execute_query(sql: str, data: Mapping[str, Any] = {}) -> list
Execute a parameterized SQL query and return all results.
Parameters:
sql(str): SQL query with named parameter placeholders (:param_name)data(Mapping[str, Any]): Dictionary mapping parameter names to values
Returns: list - List of result rows (dictionaries)
Supported Parameter Types:
- str, int, float, bytes, datetime, date, time, timedelta, Decimal
- Lists/tuples (for IN clauses)
- None (converted to SQL NULL)
Examples:
# Simple query
rows = await db.execute_query("SELECT * FROM users")
# Parameterized query
rows = await db.execute_query(
"SELECT * FROM users WHERE age > :min_age AND status = :status",
{"min_age": 18, "status": "active"}
)
# IN clause with list
rows = await db.execute_query(
"SELECT * FROM users WHERE id IN :user_ids",
{"user_ids": [1, 2, 3, 4]}
)
execute_mutation(sql: str, data: Mapping[str, Any] = {}) -> int
Execute a parameterized SQL mutation (INSERT, UPDATE, DELETE) and return affected row count.
Parameters:
sql(str): SQL mutation statement with named parameter placeholders (:param_name)data(Mapping[str, Any]): Dictionary mapping parameter names to values
Returns: int - Number of affected rows
Examples:
# INSERT
affected = await db.execute_mutation(
"INSERT INTO users (name, email, age) VALUES (:name, :email, :age)",
{"name": "John", "email": "john@example.com", "age": 25}
)
# UPDATE
affected = await db.execute_mutation(
"UPDATE users SET email = :email WHERE id = :id",
{"email": "newemail@example.com", "id": 123}
)
# DELETE
affected = await db.execute_mutation(
"DELETE FROM users WHERE age < :min_age",
{"min_age": 18}
)
Transaction Management
commit() -> None
Commit the current transaction.
Example:
async with connect(host='localhost', autocommit=False) as db:
await db.insert(user)
await db.commit() # Explicitly commit
rollback() -> None
Rollback the current transaction.
Example:
try:
await db.insert(user)
await db.insert(order)
await db.commit()
except Exception:
await db.rollback()
lastrowid() -> int
Get the auto-generated ID of the last inserted row.
Returns: int - The last inserted row ID
Example:
await db.insert(user)
user_id = db.lastrowid()
print(f"New user ID: {user_id}")
Type Safety Utilities
is_row_dict(data: Mapping) -> TypeGuard[RowDict]
Check if a mapping is valid for database row operations (INSERT/UPDATE).
Parameters:
data(Mapping): The mapping to check
Returns: bool - True if valid for row operations
Valid Types: str, int, float, bytes, datetime, date, time, timedelta, Decimal, None
Example:
from commondao import is_row_dict
data = {"id": 1, "name": "John", "age": 25}
assert is_row_dict(data)
# Now TypeScript knows data is a valid RowDict
is_query_dict(data: Mapping) -> TypeGuard[QueryDict]
Check if a mapping is valid for query operations (WHERE clauses).
Parameters:
data(Mapping): The mapping to check
Returns: bool - True if valid for query operations
Valid Types: All RowDict types plus lists and tuples (for IN clauses)
Example:
from commondao import is_query_dict
query_data = {"id": 1, "status": "active", "tags": ["admin", "user"]}
assert is_query_dict(query_data)
# Now TypeScript knows query_data is a valid QueryDict
Annotations
TableId(table_name: str)
Annotation to mark a field as the primary key and specify the table name.
Parameters:
table_name(str): Name of the database table
Usage:
from commondao.annotation import TableId
from typing import Annotated, Optional
from pydantic import BaseModel
class User(BaseModel):
id: Annotated[Optional[int], TableId('users')] = None
name: str
email: str
RawSql(expression: str)
Annotation to include raw SQL expressions in SELECT queries.
Parameters:
expression(str): SQL expression to include
Usage:
from commondao.annotation import RawSql
from typing import Annotated
class UserWithFullName(BaseModel):
id: int
first_name: str
last_name: str
full_name: Annotated[str, RawSql("CONCAT(first_name, ' ', last_name)")]
Error Classes
NotFoundError(ValueError)
Raised when a record is not found in operations that expect a result.
Example:
try:
user = await db.get_by_id_or_fail(User, 999)
except NotFoundError as e:
print(f"User not found: {e}")
EmptyPrimaryKeyError(ValueError)
Raised when attempting operations with empty or None primary key values.
Example:
try:
user = UserUpdate(id=None, name="John")
await db.update_by_id(user)
except EmptyPrimaryKeyError as e:
print(f"Primary key error: {e}")
NotTableError(ValueError)
Raised when a Pydantic model doesn't have proper table annotation.
MissingParamError(ValueError)
Raised when required parameters are missing from SQL queries.
TooManyResultError(ValueError)
Raised when operations expecting single results return multiple rows.
Data Types
RowDict
Type alias for mappings valid in database row operations.
RowDict = Mapping[str, Union[str, int, float, bytes, datetime, date, time, timedelta, Decimal, None]]
QueryDict
Type alias for mappings valid in query operations (extends RowDict with list/tuple support).
QueryDict = Mapping[str, Union[RowValueType, list, tuple]]
Paged[T]
Generic container for paginated query results.
Attributes:
items(list[T]): List of result itemstotal(int): Total count of all matching records
Example:
result: Paged[User] = await db.select_paged(
"select * from users",
User,
size=10
)
print(f"Page has {len(result.items)} items out of {result.total} total")
Type Safety with TypeGuard
CommonDAO provides TypeGuard functions is_row_dict() and is_query_dict() for runtime type checking. Important: These functions should only be used with assert statements for proper type narrowing.
Correct Usage Pattern
from commondao import is_row_dict, is_query_dict
# ✅ Correct: Use with assert
def process_database_row(data: dict):
assert is_row_dict(data)
# Type checker now knows data is RowDict
# Safe to use for database operations
user = UserInsert(**data)
await db.insert(user)
def process_query_parameters(params: dict):
assert is_query_dict(params)
# Type checker now knows params is QueryDict
# Safe to use in queries
result = await db.execute_query(
"SELECT * FROM users WHERE status = :status",
params
)
# ✅ Correct: In test validation
async def test_query_result():
rows = await db.execute_query("SELECT * FROM users")
assert len(rows) > 0
row = rows[0]
assert is_row_dict(row) # Validates row format
# Now safely access row data
user_id = row['id']
What NOT to do
# ❌ Wrong: Don't use in if statements
if is_row_dict(data):
# Type narrowing won't work properly
pass
# ❌ Wrong: Don't use in boolean expressions
valid = is_row_dict(data) and data['id'] > 0
# ❌ Wrong: Don't use with or/and logic
assert is_row_dict(data) or True # Defeats the purpose
Why Use Assert
TypeGuard functions with assert provide both runtime validation and compile-time type narrowing:
def example_function(unknown_data: dict):
# Before assert: unknown_data is just dict
assert is_row_dict(unknown_data)
# After assert: type checker knows unknown_data is RowDict
# This will have proper type hints and validation
await db.execute_mutation(
"INSERT INTO users (name, email) VALUES (:name, :email)",
unknown_data # Type checker knows this is safe
)
Best Practices
Entity Class Naming Conventions
When designing Pydantic models for database operations, follow these naming conventions to improve code clarity and maintainability:
-
Naming Convention: Name entity classes used for
dao.insert()with anInsertsuffix; name entity classes used fordao.update()with anUpdatesuffix; keep query entity class names unchanged. -
Field Optionality Rules:
- For query entity classes: If a field's DDL is
NOT NULL, the field should not be optional - For insert entity classes: If a field's DDL is
nullableor has a default value, the field can be optional - For update entity classes: All fields should be optional
- For query entity classes: If a field's DDL is
-
Field Inclusion:
- Insert entity classes should only include fields that may need to be inserted
- Update entity classes should only include fields that may need to be modified
-
TableId Annotation: Entity classes used for insert/update operations must always include the
TableIdannotation, even if the primary key field is optional
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file commondao-1.4.0.tar.gz.
File metadata
- Download URL: commondao-1.4.0.tar.gz
- Upload date:
- Size: 43.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.11.8
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
800e11fec51600ac226b16207edcc9bfedf30f880aed1c0cadc41745db4def1b
|
|
| MD5 |
ecaf505aef8a60a52d2d596d61314202
|
|
| BLAKE2b-256 |
e73ab47fb9b2e6986c2ad6af9ff005fe23c7567c3269a00569d223fd785d414e
|
File details
Details for the file commondao-1.4.0-py3-none-any.whl.
File metadata
- Download URL: commondao-1.4.0-py3-none-any.whl
- Upload date:
- Size: 22.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.11.8
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
6feafc86252214830aae3bf16bc67d47ec59255b587f65e6da765f2391922787
|
|
| MD5 |
1cb11b472ce793114c8b0b42f383aeff
|
|
| BLAKE2b-256 |
7fd58b31ed5a8144d848df1d8f70927a0e64713a27320c1e0b67e96bfb0b913d
|