Skip to main content

lesscode_database是数据库连接工具包

Project description

lesscode_database

数据库连接工具包

示例代码

import asyncio

from sqlalchemy import MetaData, Table, Column, VARCHAR, INTEGER, select
from sqlalchemy.ext.asyncio import async_sessionmaker
from sqlalchemy.orm import sessionmaker

from lesscode_database.connection_info import ConnectionInfo
from lesscode_database.options import options

options.conn_list = [
    ConnectionInfo(dialect="mysql", name="mysql", host="127.0.0.1", port=3306, user="root",
                   password="root", db_name="test", enable=True, params={"pool_recycle": 3600},
                   async_enable=True),
    ConnectionInfo(dialect="elasticsearch", name="es", host="127.0.0.1", port=9210, user="root",
                   password="root", enable=True, async_enable=True),
    ConnectionInfo(dialect="mongo", name="mongo", host="127.0.0.1", port=27027, user="root",
                   password="root", enable=True, async_enable=True),
    ConnectionInfo(dialect="nebula", name="nebula", host="127.0.0.1", port=9669, user="root",
                   password="nebula", db_name="nebula", enable=True),
    ConnectionInfo(dialect="postgresql", host="127.0.0.1", port=5454, user="root", password="root",
                   db_name="root", enable=True, async_enable=True),
    ConnectionInfo(dialect="redis", name="redis", host="127.0.0.1", port=6379, user=None,
                   password=None, db_name=1, enable=True, async_enable=True),
    ConnectionInfo(dialect="sqlalchemy", name="sqlalchemy", host="127.0.0.1", port=3306, user="root",
                   password="root", db_name="test", enable=True, async_enable=True),
    ConnectionInfo(dialect="neo4j", name="neo4j", host="127.0.0.1", port=7687, user="neo4j",
                   password="neo4j", db_name=None, enable=True, async_enable=True),
    ConnectionInfo(dialect="clickhouse", name="clickhouse", dsn="clickhouse://localhost", host="127.0.0.1", port=9000,
                   user="default", password="", db_name='', enable=True, async_enable=False),

]
# 获取数据库连接池的方式
## options.name    name是ConnectionInfo的name,options.conn_list里所有的name不可重复

# mysql 同步测试,async_enable=False
with options.mysql.dedicated_connection() as conn:
    conn.ping(reconnect=True)
    with conn.cursor() as cursor:
        cursor.execute("select 1")
        description = cursor.description
        rs = cursor.fetchone()
        print(rs)


# mysql 异步测试,async_enable=True
async def async_mysql_test():
    async with options.mysql.acquire() as conn:
        async with conn.cursor() as cursor:
            await cursor.execute("select 1")
            rs = await cursor.fetchone()
            print(rs)


loop = asyncio.get_event_loop()
loop.run_until_complete(async_mysql_test())

# es同步测试,async_enable=False
body = {
    "query": {
        "bool": {
            "must": []
        }
    },
    "size": 1
}
resp = options.es.search(
    index="test",
    body=body
)
print(resp)


# es异步测试,async_enable=True
async def async_es_test():
    resp = await options.es.search(
        index="test",
        body={"query": {"match_all": {}}},
        size=1,
    )
    print(resp)


loop = asyncio.get_event_loop()
loop.run_until_complete(async_es_test())

# mongo同步测试,async_enable=False
print(options.mongo.test.test.find_one())


# mongo异步测试,async_enable=True
async def async_mongo_test():
    resp = await options.mongo.test.test.find_one()
    print(resp)


loop = asyncio.get_event_loop()
loop.run_until_complete(async_mongo_test())

# nebula同步测试,async_enable=False
with options.nebula.session_context("root", "nebula") as session:
    session.execute(f'USE test')
    result = session.execute("match (t:Test) return t limit 1")
    print(result)

# nebula暂不支持异步

# postgresql同步测试,async_enable=False
with options.postgresql.connection() as conn:
    with conn.cursor() as cursor:
        cursor.execute("select 1")
        rs = cursor.fetchone()
        print(rs)


# postgresql异步测试,async_enable=True
async def async_pg_test():
    async with options.postgresql.acquire() as conn:
        async with conn.cursor() as cur:
            await cur.execute("select 1")
            rs = await cur.fetchone()
            print(rs)


loop = asyncio.get_event_loop()
loop.run_until_complete(async_pg_test())

# redis 同步测试,async_enable=False
print(options.redis.exists("test"))


# redis 异步测试,async_enable=True
async def async_redis_test():
    rs = await options.redis.exists("test")
    print(rs)


loop = asyncio.get_event_loop()
loop.run_until_complete(async_redis_test())

# sqlalchemy table
meta = MetaData()
t1 = Table("test", meta, Column("name", VARCHAR(collation='utf8mb3_bin', length=255)),
           Column(name='id', comment=None, nullable=False, autoincrement=False, primary_key=True, type_=INTEGER(),
                  server_default=None))

# sqlalchemy同步测试,async_enable=False
with sessionmaker(bind=options.sqlalchemy)() as session:
    cur = session.execute(select(t1))
    print(cur.fetchall())


# sqlalchemy异步测试,async_enable=True
async def async_sqlalchemy_test():
    async with async_sessionmaker(options.sqlalchemy)() as session:
        cur = await session.execute(select(t1))

        print(cur.fetchall())


loop = asyncio.get_event_loop()
loop.run_until_complete(async_sqlalchemy_test())


# neo4j同步测试
def query(tx):
    nql = "MATCH (t:Test) return t limit 1"
    for record in tx.run(nql):
        print(record)


with options.neo4j.session(database="test") as session:
    session.execute_read(query)


# neo4j异步测试
async def query(tx):
    nql = "MATCH (t:Test) return t limit 1"
    res = await tx.run(nql)
    async for record in res:
        print(record)


async def async_neo4j_test():
    async with options.neo4j.session(database="test") as session:
        await session.execute_read(query)


loop = asyncio.get_event_loop()
loop.run_until_complete(async_neo4j_test())

# clickhouse 同步测试
with options.clickhouse as conn:
    with conn.cursor() as cursor:
        cursor.execute('SELECT 1')
        print(cursor.fetchall())


# clickhouse 异步测试测试
async def async_clickhouse_test():
    async with options.clickhouse.acquire() as conn:
        async with conn.cursor() as cursor:
            await cursor.execute("SELECT 1")
            ret = await cursor.fetchone()
            print(ret)


loop = asyncio.get_event_loop()
loop.run_until_complete(async_clickhouse_test())

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

lesscode_database-0.0.3.tar.gz (12.7 kB view hashes)

Uploaded Source

Built Distribution

lesscode_database-0.0.3-py3-none-any.whl (12.3 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page