Simple control 'psycopg2'(PostgreSQL).
Project description
threadingpg
Control PostgreSQL using thread(s).
Initialize Connector
import threadingpg
connector = threadingpg.Connector(dbname='database_name', user='user_name', password='password', port=5432)
# ...
connector.close()
Drop, Create Table
mytable = MyTable()
connector.drop_table(mytable)
connector.create_table(mytable)
Create Table and Row Class
Table Class
import threadingpg
from threadingpg import datatype
class MyTable(threadingpg.data.Table):
table_name="mytable"
index = threadingpg.data.Column(data_type=datatype.serial)
name = threadingpg.data.Column(data_type=datatype.varchar())
# or
class MyTable(threadingpg.data.Table):
def __init__(self) -> None:
self.index = threadingpg.data.Column(data_type=datatype.serial)
self.name = threadingpg.data.Column(data_type=datatype.varchar())
super().__init__("mytable") # important position
Row Class
equal name of columns.
class MyRow(threadingpg.data.Row):
def __init__(self,
name:str=None) -> None:
self.name = name
Insert Row
mytable = MyTable()
myrow = MyRow("my_row")
connector.insert_row(mytable, myrow)
# or
connector.insert_dict(mytable, {"name":"my_row"})
Select Row
mytable = MyTable()
column_name_list, rows = connector.select(mytable)
for row in rows:
myrow = MyRow()
myrow.set_data(column_name_list, row)
print(f"output: {myrow.name}") # output: my_row
Select Where Conditions
mytable = MyTable()
condition_equal_1 = threadingpg.condition.Equal(mytable.index, 1)
condition_equal_2 = threadingpg.condition.Equal(mytable.index, 2)
condition_equal_3 = threadingpg.condition.Equal(mytable.index, 3)
conditions = threadingpg.condition.Or(condition_equal_1, condition_equal_2, condition_equal_3)
column_name_list, rows = connector.select(mytable, where=conditions)
Update Row
mytable = MyTable()
myrow = MyRow("update_my_row")
condition_equal_0 = threadingpg.condition.Equal(mytable.index, 0)
connector.update_row(mytable, myrow, condition_equal_0)
Simple Trigger
mytable = MyTable()
channel_name = "mych"
trigger_name = "mytr"
function_name = "myfn"
connector.create_trigger_function(function_name, channel_name)
connector.create_trigger(mytable, trigger_name, function_name)
notify_queue = queue.Queue()
# implement notify_queue.get()
connector.start_channel_listener(notify_queue)
connector.listen_channel(channel_name)
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
threadingpg-0.0.3.tar.gz
(12.6 kB
view hashes)
Built Distribution
Close
Hashes for threadingpg-0.0.3-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 45e79dcbb2d0c97e2b7abd8a012e73fcdcdae0b58ec9955369cc18a25a56a482 |
|
MD5 | dc203ae889412dffc0b3d91a27f4986a |
|
BLAKE2b-256 | 24f6e4f07f6b256eb99fe541425f2b72134fc90aaf8f8d35a899b5b2294088d4 |