Simple control 'psycopg2'(PostgreSQL).
Project description
threadingpg
Control PostgreSQL using thread(s).
Initialize Connector
import threadingpg
connector = threadingpg.Connector(dbname='database_name', user='user_name', password='password', port=5432)
# ...
connector.close()
Drop, Create Table
mytable = MyTable()
connector.drop_table(mytable)
connector.create_table(mytable)
Create Table and Row Class
Table Class
import threadingpg
from threadingpg import datatype
class MyTable(threadingpg.data.Table):
table_name="mytable"
index = threadingpg.data.Column(data_type=datatype.serial)
name = threadingpg.data.Column(data_type=datatype.varchar())
# or
class MyTable(threadingpg.data.Table):
def __init__(self) -> None:
self.index = threadingpg.data.Column(data_type=datatype.serial)
self.name = threadingpg.data.Column(data_type=datatype.varchar())
super().__init__("mytable") # important position
Row Class
equal name of columns.
class MyRow(threadingpg.data.Row):
def __init__(self,
name:str=None) -> None:
self.name = name
Insert
mytable = MyTable()
myrow = MyRow("my_row")
connector.insert_row(mytable, myrow)
# or
connector.insert_dict(mytable, {"name":"my_row"})
Select
mytable = MyTable()
column_name_list, rows = connector.select(mytable)
for row in rows:
myrow = MyRow()
myrow.set_data(column_name_list, row)
print(f"output: {myrow.name}") # output: my_row
Condition - Where
mytable = MyTable()
condition_equal_1 = threadingpg.condition.Equal(mytable.index, 1)
condition_equal_2 = threadingpg.condition.Equal(mytable.index, 2)
condition_equal_3 = threadingpg.condition.Equal(mytable.index, 3)
conditions = threadingpg.condition.Or(condition_equal_1, condition_equal_2, condition_equal_3)
column_name_list, rows = connector.select(mytable, where=conditions)
Condition - OrderBy
mytable = MyTable()
orderby_index = threadingpg.condition.OrderBy(mytable.index)
orderby_name = threadingpg.condition.OrderBy(mytable.name, True)
orderby_conditions = threadingpg.condition.And(orderby_index, orderby_name)
column_name_list, rows = connector.select(mytable, order_by=orderby_conditions)
Update
mytable = MyTable()
myrow = MyRow("update_my_row")
condition_equal_0 = threadingpg.condition.Equal(mytable.index, 0)
connector.update_row(mytable, myrow, condition_equal_0)
Delete
mytable = MyTable()
delete_condition = threadingpg.condition.Equal(mytable.index, 5)
connector.delete_row(mytable, delete_condition)
Simple Trigger
Need delay each function.
mytable = MyTable()
channel_name = "mych"
trigger_name = "mytr"
function_name = "myfn"
listner = threadingpg.TriggerListner()
# implement 'notify = listner.notify_queue.get()'
listner.connect(dbname=dbname, user=user, password=password, port=5432)
listner.create_function(function_name, channel_name)
listner.create_trigger(mytable, trigger_name, function_name)
listner.start_listening()
listner.listen_channel(channel_name)
# ...
listner.unlisten_channel(channel_name)
listner.stop_listening()
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
threadingpg-0.0.5.tar.gz
(12.7 kB
view hashes)
Built Distribution
Close
Hashes for threadingpg-0.0.5-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 00328522d83853ea7a1261a1895b712c01c43736ec2bc69022ed0bf5c8e5bce0 |
|
MD5 | 040a7711a88d468d61a062106728084c |
|
BLAKE2b-256 | 4a9d949441825fcef454ec73757aec643600f81ad30c81c6f8be455bbd41f57d |