Skip to main content

Simple control 'psycopg2'(PostgreSQL).

Project description

threadingpg

Control PostgreSQL using thread(s).

Initialize Connector

import threadingpg
connector = threadingpg.Connector(dbname='database_name', user='user_name', password='password', port=5432)
# ...
connector.close()

Drop, Create Table

mytable = MyTable()
connector.drop_table(mytable)
connector.create_table(mytable)

Create Table and Row Class

Table Class

import threadingpg
from threadingpg import datatype

class MyTable(threadingpg.data.Table):
    table_name="mytable"
    index = threadingpg.data.Column(data_type=datatype.serial)
    name = threadingpg.data.Column(data_type=datatype.varchar())
# or 
class MyTable(threadingpg.data.Table):
    def __init__(self) -> None:
        self.index = threadingpg.data.Column(data_type=datatype.serial)
        self.name = threadingpg.data.Column(data_type=datatype.varchar())
        super().__init__("mytable") # important position

Row Class

equal name of columns.

class MyRow(threadingpg.data.Row):
    def __init__(self,
                 name:str=None) -> None:
        self.name = name

Insert

mytable = MyTable()
myrow = MyRow("my_row")
connector.insert_row(mytable, myrow)
# or
connector.insert_dict(mytable, {"name":"my_row"})

Select

mytable = MyTable()
column_name_list, rows = connector.select(mytable)
for row in rows:
    myrow = MyRow()
    myrow.set_data(column_name_list, row)
    print(f"output: {myrow.name}") # output: my_row

Condition - Where

mytable = MyTable()
condition_equal_1 = threadingpg.condition.Equal(mytable.index, 1)
condition_equal_2 = threadingpg.condition.Equal(mytable.index, 2)
condition_equal_3 = threadingpg.condition.Equal(mytable.index, 3)
conditions = threadingpg.condition.Or(condition_equal_1, condition_equal_2, condition_equal_3)
column_name_list, rows = connector.select(mytable, where=conditions)

Condition - OrderBy

mytable = MyTable()
orderby_index = threadingpg.condition.OrderBy(mytable.index)
orderby_name = threadingpg.condition.OrderBy(mytable.name, True)
orderby_conditions = threadingpg.condition.And(orderby_index, orderby_name)
column_name_list, rows = connector.select(mytable, order_by=orderby_conditions)

Update

mytable = MyTable()
myrow = MyRow("update_my_row")
condition_equal_0 = threadingpg.condition.Equal(mytable.index, 0)
connector.update_row(mytable, myrow, condition_equal_0)

Delete

mytable = MyTable()
delete_condition = threadingpg.condition.Equal(mytable.index, 5)
connector.delete_row(mytable, delete_condition)

Simple Trigger

mytable = MyTable()
channel_name = "mych"
trigger_name = "mytr"
function_name = "myfn"
connector.create_trigger_function(function_name, channel_name)
connector.create_trigger(mytable, trigger_name, function_name)
notify_queue = queue.Queue()
# implement 'notify = notify_queue.get()'
connector.start_channel_listener(notify_queue)
connector.listen_channel(channel_name)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

threadingpg-0.0.4.tar.gz (12.5 kB view hashes)

Uploaded Source

Built Distribution

threadingpg-0.0.4-py3-none-any.whl (13.0 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page