lesscode_database是数据库连接工具包
Project description
lesscode_database
数据库连接工具包
示例代码
import asyncio
from sqlalchemy import MetaData, Table, Column, VARCHAR, INTEGER, select
from sqlalchemy.orm import scoped_session, sessionmaker
from lesscode_database.connection_info import ConnectionInfo
from lesscode_database.options import options
options.conn_list = [
ConnectionInfo(dialect="mysql", name="mysql", host="127.0.0.1", port=3306, user="root",
password="root", db_name="test", enable=True, params={"pool_recycle": 3600},
async_enable=True),
ConnectionInfo(dialect="elasticsearch", name="es", host="127.0.0.1", port=9210, user="root",
password="root", enable=True, async_enable=True),
ConnectionInfo(dialect="mongo", name="mongo", host="127.0.0.1", port=27027, user="root",
password="root", enable=True, async_enable=True),
ConnectionInfo(dialect="nebula3", name="nebula", host="127.0.0.1", port=9669, user="root",
password="nebula", db_name="nebula", enable=True),
ConnectionInfo(dialect="postgresql", host="127.0.0.1", port=5454, user="root", password="root",
db_name="root", enable=True, async_enable=True),
ConnectionInfo(dialect="redis", name="redis", host="127.0.0.1", port=6379, user=None,
password=None, db_name=1, enable=True, async_enable=True),
ConnectionInfo(dialect="sqlalchemy", name="sqlalchemy", host="127.0.0.1", port=3306, user="root",
password="root", db_name="test", enable=True, async_enable=True),
ConnectionInfo(dialect="neo4j", name="neo4j", host="127.0.0.1", port=7687, user="neo4j",
password="neo4j", db_name=None, enable=True, async_enable=True),
]
# mysql 同步测试,async_enable=False
with options.mysql.dedicated_connection() as conn:
conn.ping(reconnect=True)
with conn.cursor() as cursor:
cursor.execute("select 1")
description = cursor.description
rs = cursor.fetchone()
print(rs)
# mysql 异步测试,async_enable=True
async def async_mysql_test():
async with options.mysql.acquire() as conn:
async with conn.cursor() as cursor:
await cursor.execute("select 1")
rs = await cursor.fetchone()
print(rs)
loop = asyncio.get_event_loop()
loop.run_until_complete(async_mysql_test())
# es同步测试,async_enable=False
body = {
"query": {
"bool": {
"must": []
}
},
"size": 1
}
resp = options.es.search(
index="test",
body=body
)
print(resp)
# es异步测试,async_enable=True
async def async_es_test():
resp = await options.es.search(
index="test",
body={"query": {"match_all": {}}},
size=1,
)
print(resp)
loop = asyncio.get_event_loop()
loop.run_until_complete(async_es_test())
# mongo同步测试,async_enable=False
print(options.mongo.test.test.find_one())
# mongo异步测试,async_enable=True
async def async_mongo_test():
resp = await options.mongo.test.test.find_one()
print(resp)
loop = asyncio.get_event_loop()
loop.run_until_complete(async_mongo_test())
# nebula3同步测试,async_enable=False
with options.nebula.session_context("root", "nebula") as session:
session.execute(f'USE test')
result = session.execute("match (t:Test) return t limit 1")
print(result)
# nebula3暂不支持异步
# postgresql同步测试,async_enable=False
with options.postgresql.connection() as conn:
with conn.cursor() as cursor:
cursor.execute("select 1")
rs = cursor.fetchone()
print(rs)
# postgresql异步测试,async_enable=True
async def async_pg_test():
async with options.postgresql.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute("select 1")
rs = await cur.fetchone()
print(rs)
loop = asyncio.get_event_loop()
loop.run_until_complete(async_pg_test())
# redis 同步测试,async_enable=False
print(options.redis.exists("test"))
# redis 异步测试,async_enable=True
async def async_redis_test():
rs = await options.redis.exists("test")
print(rs)
loop = asyncio.get_event_loop()
loop.run_until_complete(async_redis_test())
# sqlalchemy table
meta = MetaData()
t1 = Table("test", meta, Column("name", VARCHAR(collation='utf8mb3_bin', length=255)),
Column(name='id', comment=None, nullable=False, autoincrement=False, primary_key=True, type_=INTEGER(),
server_default=None))
# sqlalchemy同步测试,async_enable=False
db_session = scoped_session(sessionmaker(bind=options.internal_capability))
session = db_session()
cur = session.execute(select(t1))
print(cur.fetchall())
# sqlalchemy异步测试,async_enable=True
async def async_sqlalchemy_test():
async with options.internal_capability.connect() as conn:
result = await conn.execute(select(t1))
print(result.fetchall())
loop = asyncio.get_event_loop()
loop.run_until_complete(async_sqlalchemy_test())
# neo4j同步测试
def query(tx):
nql = "MATCH (t:Test) return t limit 1"
for record in tx.run(nql):
print(record)
with options.neo4j.session(database="test") as session:
session.execute_read(query)
# neo4j异步测试
async def query(tx):
nql = "MATCH (t:Test) return t limit 1"
res = await tx.run(nql)
async for record in res:
print(record)
async def async_neo4j_test():
async with options.neo4j.session(database="test") as session:
await session.execute_read(query)
loop = asyncio.get_event_loop()
loop.run_until_complete(async_neo4j_test())
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
lesscode_database-0.0.2.tar.gz
(12.1 kB
view hashes)
Built Distribution
Close
Hashes for lesscode_database-0.0.2-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | a3cb4853fa3ece88491baad7a9811d428bc5a8fd9ceee19caf4076ac839beecb |
|
MD5 | 6d0ebf6c877e3d1b6ec17c56e8256890 |
|
BLAKE2b-256 | 2377e814f361a2d5b3e97f4f410b90c588a6dcdf8b33e4aac6364e824f4e043e |