Pure Python MySQL ORM
Project description
PyMyORM
Table of Contents
This package contains a pure-Python MySQL Object Relational Mapping
client library.
Requirements
- Python:
- MySQL server -- one of the following:
Installation
Package is uploaded on PyPI.
You can install it with pip:
$python3 pip install PyMyORM
Documentation
Documentation is coming soon.
Example
The following examples make use of a simple table
create table `t_user` (
`id` int unsigned not null auto_increment,
`name` varchar(16) not null default '',
`phone` varchar(16) not null default '',
`money` decimal(10,2) not null default 0,
`gender` tinyint unsigned not null default 0,
`status` tinyint unsigned not null default 0,
`time` timestamp not null default current_timestamp,
primary key(`id`),
unique key `idx_name` (`name`),
key `idx_phone` (`phone`),
key `idx_status` (`status`),
key `idx_time` (`time`)
) engine=InnoDB auto_increment=1 default charset=utf8mb4;
Model definition models/user.py
from pymyorm.model import Model
class User(Model):
tablename = 't_user'
Connect to database
from pymyorm.database import Database
Database.connect(host='127.0.0.1',
port=3306,
user='root',
password='password',
database='test',
charset='utf8')
raw sql
sometimes we need to run raw sql, like creating tables, we can call database's execute method
from pymyorm.database import Database
fp = open('sql/t_user.sql', 'r', encoding='utf-8')
sql = fp.read()
fp.close()
Database.connect(**db)
Database.execute(sql)
select
# case 1
from models.user import User
one = User.find().where(name='ping').one()
print(one.id, one.name)
# case 2
from models.user import User
one = User.find().select('name').where(name='ping').where(phone='18976641111').one()
print(one)
# case 3
from models.user import User
one = User.find().where(name='ping', phone='18976641111').one()
print(one)
# case 4
from models.user import User
one = User.find().where('money', '!=', 200).order('id desc').one()
print(one)
# case 6
from models.user import User
all = User.find().order('id desc').offset(0).limit(5).all()
for one in all:
print(one)
where
we may filter data set by pass the where condition like below
from models.user import User
model = User.find()
if id:
model.where(id=id)
if name:
model.where(name=name)
if phone:
model.where(phone=phone)
if status:
model.where(status=status)
as you see, there are more if
clause as conditions, it looks ugly.
to enhance the code more readable, we can rewrite the code like this:
from models.user import User
model = User.find().where(id=id, name=name, phone=phone, status=status)
where
function will auto ignore the empty value or None value.
update
# case 1
from models.user import User
one = User.find().where(name='lily').one()
one.money = 500
one.phone = '18976642222'
one.save()
# case 2
from models.user import User
User.find().where(name='lily').update(money=500, phone='18976642222')
insert
# case 1
from models.user import User
user = User(name='rose', phone='18976643333', money=100)
user.save()
# case 2
from models.user import User
user = User()
user.name = 'vera'
user.phone = '18976644444'
user.money = 100
user.save()
delete
# case 1
from models.user import User
one = User.find().where(name='lily').one()
one.delete()
# case 2
from models.user import User
User.find().where(money=100).delete()
# case 3
from models.user import User
all = User.find().select('name', 'phone').where('money', '>', 100).all()
for one in all:
one.delete()
# case 4
from models.user import User
User.find().delete() # delete all users
batch insert
from models.user import User
fields = ('name', 'phone', 'money')
values = [
('jack', '18976643333', 120),
('sean', '18976654444', 160),
('vera', '18976645555', 180),
]
User.insert(fields, values)
exists
from models.user import User
exists = User.find().where(name='ping').exists()
count
from models.user import User
count = User.find().where(status='0').count()
min
from models.user import User
money = User.find().where(status=0).min('money')
max
from models.user import User
money = User.find().where(status=0).max('money')
print(money)
average
from models.user import User
money = User.find().where(status=0).average('money')
scalar
from models.user import User
money = User.find().where(id=1).scalar('money')
column
from models.user import User
names = User.find().column('name')
group by
from models.user import User
all = User.find() \
.select('gender', 'count(*) as count', 'avg(money) as avg', 'sum(money) as total') \
.group('gender') \
.having('avg', '>', 220) \
.all()
for one in all:
print(one)
truncate
from models.user import User
User.truncate()
join
# case 1: inner join
from models.admin import Admin
from models.admin_role import AdminRole
all = Admin.find().select('a.*').alias('a') \
.join(table=AdminRole.tablename, alias='r', on='a.role = r.id') \
.where('r.name', '=', 'role1') \
.where('a.lock', '=', 0) \
.all()
for one in all:
print(one)
# case 2: left join
from models.admin import Admin
from models.admin_role import AdminRole
all = Admin.find().alias('a') \
.join(table=AdminRole.tablename, alias='r', on='a.role=r.id', type='left') \
.where('a.lock', '=', 0) \
.all()
for one in all:
print(one)
# case 3
from models.admin import Admin
from models.admin_role import AdminRole
all = Admin.find().select('a.*').alias('a') \
.join(table=AdminRole.tablename, alias='r', on='a.role=r.id') \
.where('a.lock', '=', 0) \
.all()
for one in all:
print(one)
# case 4: join more than one table
from models.admin import Admin
from models.admin_role import AdminRole
from models.admin_auth import AdminAuth
all = Admin.find().select('username', 'a.role').alias('a') \
.join(table=AdminRole.tablename, alias='r', on='a.role=r.id') \
.join(table=AdminAuth.tablename, alias='t', on='t.role=r.id') \
.where('t.action', '=', 300) \
.all()
for one in all:
print(one)
transaction
# case 1
from pymyorm.transaction import Transaction as t
from models.user import User
try:
t.begin()
model = User(name='ping', phone='18976641111', money=100)
model.save()
t.commit()
except Exception as e:
t.rollback()
raise e
# case 2 : nested transaction
from pymyorm.transaction import Transaction as t
try:
t.begin()
# ... your code
try:
t.begin()
# ... your code
t.commit()
except Exception as e:
t.rollback()
raise e
t.commit()
except Exception as e:
t.rollback()
raise e
connection pool
By default PyMyORM works at single thread, however when we develop a web application based on flask, we would like to make PyMyORM support multi-threading.
So PyMyORM provide a connection pool component, and it's threadsafety.
In this kind of scenario, we should use ConnectionPool to replace Database to init mysql connection.
import functools
from flask import Flask
from pymyorm.local import local
from pymyorm.connection_pool import ConnectionPool
from models.user import User
app = Flask(__name__)
config = dict(user=user, port=port, user=user, password=password, database=database)
pool = ConnectionPool()
pool.size(size=10)
pool.debug(debug=True)
pool.create(**config)
# assign one connection to the request
def assign_connection(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
pool = ConnectionPool()
local.conn = pool.get()
resp = func(*args, **kwargs)
# don't forget to put connection into pool
pool.put(local.conn)
return resp
return wrapper
@app.route('/')
@assign_connection
def index():
one = User.find().where(name='ping').one()
print(one)
return 'index'
@app.route('/hello')
@assign_connection
def hello():
one = User.find().where(name='ping').one()
print(one)
return 'hello'
As the code slice mentioned above, PyMyORM assign one mysql connection for each http request, so the mysql transaction will work properly.
Resource
- MySQL Reference Manuals: https://dev.mysql.com/doc/
- PyMySQL: https://pymysql.readthedocs.io/
License
PyMyORM is released under the MIT License. See LICENSE for more information.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.