Common Database Connector
Project description
Common Database Connector
Cache / Redis
from cdbc.cache import Redis
redis_host = 'redis'
redis_port = 16382
redis_db = 1
c = Redis(redis_host=redis_host, redis_port=redis_port, redis_db=redis_db)
c.create_redis_conn()
c.kvs()
c.set('a', 'b')
c.get('a')
kvs = {
'a1': 1,
'b1': 2,
'c1': 3
}
c.set(batch=True, kvs=kvs)
c.kvs()
SQL / PostgreSQL
import random
import string
import time
import json
from cdbc.postgresql import PG
host = 'freedbs'
port = 15432
database = 'testdb02'
user = 'postgresql-user'
password = 'postgresql-Passw0rd'
p = PG(host=host, port=port, database=database, user=user, password=password)
p.create_pg_conn()
p.select('select version();')
p.select('select count(1) from table1;')
# pool mode
p.create_pg_pool()
p.select_from_pool('select version();')
p.select_from_pool('select count(1) from table1;')
### Select
p = PG(host=host, port=port, database=database, user=user, password=password)
p.create_pg_conn()
for _ in range(5):
sql = 'select count(1) from table1'
p.select(sql=sql)
time.sleep(1)
for _ in range(5):
sql = 'select count(1) from table1'
p.select_from_pool(sql=sql)
time.sleep(1)
### Insert
p = PG(host=host, port=port, database=database, user=user, password=password)
table = 'table1'
columns = 'col1, col2, col3'
p.create_pg_conn()
for _ in range(10):
value1 = ''.join([random.choice(string.ascii_lowercase + string.digits) for _n in range(10)])
value2 = ''.join([random.choice(string.ascii_lowercase + string.digits) for _n in range(10)])
k = ''.join([random.choice(string.ascii_lowercase + string.digits) for _n in range(5)])
v = ''.join([random.choice(string.ascii_lowercase + string.digits) for _n in range(5)])
values = "'" + value1 + "'" + ', '\
+ "'" + value2 + "'" + ', '\
+ "'" + json.dumps({k:v}) + "'"
cvs = [columns, values]
p.insert(table=table, cvs=cvs)
# insert via pg pool
p.create_pg_pool()
for _ in range(10):
value1 = ''.join([random.choice(string.ascii_lowercase + string.digits) for _n in range(10)])
value2 = ''.join([random.choice(string.ascii_lowercase + string.digits) for _n in range(10)])
k = ''.join([random.choice(string.ascii_lowercase + string.digits) for _n in range(5)])
v = ''.join([random.choice(string.ascii_lowercase + string.digits) for _n in range(5)])
values = "'" + value1 + "'" + ', '\
+ "'" + value2 + "'" + ', '\
+ "'" + json.dumps({k:v}) + "'"
cvs = [columns, values]
p.insert_with_pool(table=table, cvs=cvs)
MessageQueue/RabbitMQ
Consumer
from cdbc.message import RabbitMQ
import json
host = 'freedbs'
port = 15603
user = 'rabbitmq-user'
password = 'rabbitmq-user-Passw0rd'
exchange = 'exchange-test'
queue = 'queue-test'
def call_back(ch, method, properties, body):
message = json.loads(body.decode())
print(message)
ch.basic_ack(delivery_tag=method.delivery_tag)
print("%r" % body)
# if __name__ == '__main__':
q = RabbitMQ(
user = user,
password = password,
enable_ssl = False,
host = host,
port = port,
v_host = '/',
exchange = exchange,
queue= queue,
)
q.create_channel()
q.create_queue()
q.connect_exchange()
q.run_consumer(call_back)
Producer
from cdbc.message import RabbitMQ
import json
host = 'freedbs'
port = 15603
user = 'rabbitmq-user'
password = 'rabbitmq-user-Passw0rd'
exchange = 'exchange-test'
queue = 'queue-test'
q = RabbitMQ(
user = user,
password = password,
enable_ssl = False,
host = host,
port = port,
v_host = '/',
exchange = exchange,
queue= queue,
)
q.create_channel()
q.send_msg("""{"abc":"123"}""")
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
cdbc-0.0.15.tar.gz
(4.0 kB
view hashes)
Built Distribution
cdbc-0.0.15-py3-none-any.whl
(4.8 kB
view hashes)