lesscode_database是数据库连接工具包
Project description
lesscode_database
数据库连接工具包
示例代码
import asyncio
from sqlalchemy import MetaData, Table, Column, VARCHAR, INTEGER, select
from sqlalchemy.ext.asyncio import async_sessionmaker
from sqlalchemy.orm import sessionmaker
from lesscode_database.connection_info import ConnectionInfo
from lesscode_database.options import options
options.conn_list = [
ConnectionInfo(dialect="mysql", name="mysql", host="127.0.0.1", port=3306, user="root",
password="root", db_name="test", enable=True, params={"pool_recycle": 3600},
async_enable=True),
ConnectionInfo(dialect="elasticsearch", name="es", host="127.0.0.1", port=9210, user="root",
password="root", enable=True, async_enable=True),
ConnectionInfo(dialect="mongo", name="mongo", host="127.0.0.1", port=27027, user="root",
password="root", enable=True, async_enable=True),
ConnectionInfo(dialect="nebula", name="nebula", host="127.0.0.1", port=9669, user="root",
password="nebula", db_name="nebula", enable=True),
ConnectionInfo(dialect="postgresql", host="127.0.0.1", port=5454, user="root", password="root",
db_name="root", enable=True, async_enable=True),
ConnectionInfo(dialect="redis", name="redis", host="127.0.0.1", port=6379, user=None,
password=None, db_name=1, enable=True, async_enable=True),
ConnectionInfo(dialect="sqlalchemy", name="sqlalchemy", host="127.0.0.1", port=3306, user="root",
password="root", db_name="test", enable=True, async_enable=True),
ConnectionInfo(dialect="neo4j", name="neo4j", host="127.0.0.1", port=7687, user="neo4j",
password="neo4j", db_name=None, enable=True, async_enable=True),
ConnectionInfo(dialect="clickhouse", name="clickhouse", dsn="clickhouse://localhost", host="127.0.0.1", port=9000,
user="default", password="", db_name='', enable=True, async_enable=False),
]
# 获取数据库连接池的方式
## options.name name是ConnectionInfo的name,options.conn_list里所有的name不可重复
# mysql 同步测试,async_enable=False
with options.mysql.dedicated_connection() as conn:
conn.ping(reconnect=True)
with conn.cursor() as cursor:
cursor.execute("select 1")
description = cursor.description
rs = cursor.fetchone()
print(rs)
# mysql 异步测试,async_enable=True
async def async_mysql_test():
async with options.mysql.acquire() as conn:
async with conn.cursor() as cursor:
await cursor.execute("select 1")
rs = await cursor.fetchone()
print(rs)
loop = asyncio.get_event_loop()
loop.run_until_complete(async_mysql_test())
# es同步测试,async_enable=False
body = {
"query": {
"bool": {
"must": []
}
},
"size": 1
}
resp = options.es.search(
index="test",
body=body
)
print(resp)
# es异步测试,async_enable=True
async def async_es_test():
resp = await options.es.search(
index="test",
body={"query": {"match_all": {}}},
size=1,
)
print(resp)
loop = asyncio.get_event_loop()
loop.run_until_complete(async_es_test())
# mongo同步测试,async_enable=False
print(options.mongo.test.test.find_one())
# mongo异步测试,async_enable=True
async def async_mongo_test():
resp = await options.mongo.test.test.find_one()
print(resp)
loop = asyncio.get_event_loop()
loop.run_until_complete(async_mongo_test())
# nebula同步测试,async_enable=False
with options.nebula.session_context("root", "nebula") as session:
session.execute(f'USE test')
result = session.execute("match (t:Test) return t limit 1")
print(result)
# nebula暂不支持异步
# postgresql同步测试,async_enable=False
with options.postgresql.connection() as conn:
with conn.cursor() as cursor:
cursor.execute("select 1")
rs = cursor.fetchone()
print(rs)
# postgresql异步测试,async_enable=True
async def async_pg_test():
async with options.postgresql.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute("select 1")
rs = await cur.fetchone()
print(rs)
loop = asyncio.get_event_loop()
loop.run_until_complete(async_pg_test())
# redis 同步测试,async_enable=False
print(options.redis.exists("test"))
# redis 异步测试,async_enable=True
async def async_redis_test():
rs = await options.redis.exists("test")
print(rs)
loop = asyncio.get_event_loop()
loop.run_until_complete(async_redis_test())
# sqlalchemy table
meta = MetaData()
t1 = Table("test", meta, Column("name", VARCHAR(collation='utf8mb3_bin', length=255)),
Column(name='id', comment=None, nullable=False, autoincrement=False, primary_key=True, type_=INTEGER(),
server_default=None))
# sqlalchemy同步测试,async_enable=False
with sessionmaker(bind=options.sqlalchemy)() as session:
cur = session.execute(select(t1))
print(cur.fetchall())
# sqlalchemy异步测试,async_enable=True
async def async_sqlalchemy_test():
async with async_sessionmaker(options.sqlalchemy)() as session:
cur = await session.execute(select(t1))
print(cur.fetchall())
loop = asyncio.get_event_loop()
loop.run_until_complete(async_sqlalchemy_test())
# neo4j同步测试
def query(tx):
nql = "MATCH (t:Test) return t limit 1"
for record in tx.run(nql):
print(record)
with options.neo4j.session(database="test") as session:
session.execute_read(query)
# neo4j异步测试
async def query(tx):
nql = "MATCH (t:Test) return t limit 1"
res = await tx.run(nql)
async for record in res:
print(record)
async def async_neo4j_test():
async with options.neo4j.session(database="test") as session:
await session.execute_read(query)
loop = asyncio.get_event_loop()
loop.run_until_complete(async_neo4j_test())
# clickhouse 同步测试
with options.clickhouse as conn:
with conn.cursor() as cursor:
cursor.execute('SELECT 1')
print(cursor.fetchall())
# clickhouse 异步测试测试
async def async_clickhouse_test():
async with options.clickhouse.acquire() as conn:
async with conn.cursor() as cursor:
await cursor.execute("SELECT 1")
ret = await cursor.fetchone()
print(ret)
loop = asyncio.get_event_loop()
loop.run_until_complete(async_clickhouse_test())
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
lesscode_database-0.0.3.tar.gz
(12.7 kB
view hashes)
Built Distribution
Close
Hashes for lesscode_database-0.0.3-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | e180404fa628345db197fdc91e16de8c122113b6eee9898be608029f395db5e5 |
|
MD5 | 29eadba14ac5a3fba4d4afe3014e9e40 |
|
BLAKE2b-256 | 42e390a17cecea05b1a1320caaf50715fdbc29b7cb095d1f65cf7639aa1bf2df |