Utility function to run a PostgreSQL query with SQLAlchemy, terminating any other clients that block it
Project description
pg-force-execute
Utility function to run a PostgreSQL query with SQLAlchemy, terminating any other clients that continue to block it after a configurable delay.
Using this function to run queries is somewhat of a last resort, but is useful in certain Extract Transform Load (ETL) pipeline contexts. For example, if it is more important to replace one table with another than to allow running queries on the table to complete, then this function can be used to run the relevant ALTER TABLE RENAME TO
query.
Installation
pip install pg-force-execute
Example usage
import datetime
import sqlalchemy as sa
from pg_force_execute import pg_force_execute
# Run postgresql locally should allow the below to run
# docker run --rm -it -e POSTGRES_HOST_AUTH_METHOD=trust -p 5432:5432 postgres
engine = sa.create_engine('postgresql://postgres@127.0.0.1:5432/')
query = 'SELECT 1' # A more realistic example would be something that needs an exclusive lock on a table
with engine.begin() as conn:
results = pg_force_execute(
sa.text(query), # SQLAlchemy statement to execute
conn, # SQLAlchemy connection to run the query
engine, # SQLAlchemy engine that will create new connections to cancel blocking queries
delay=datetime.timedelta(minutes=5), # Amount of time to wait before cancelling queries
)
print(results.fetchall())
API
The API a single function pg_force_execute
.
pg_force_execute
(statement, conn, engine, parameters=None, execution_options=None, delay=datetime.timedelta(minutes=5), check_interval=datetime.timedelta(seconds=1), termination_thread_timeout=datetime.timedelta(seconds=10), logger=logging.getLogger("pg_force_execute"))
-
statement
- A SQLAlchemy statement to be executed, passed to Connection.execute -
conn
- A SQLAlchemy connection to runstatement
on -
engine
- A SQLAlchemy engine to create a new connection that will be used to terminate backends blockingstatement
-
parameters
(optional) - SQLAlchemy parameters to be bound tostatement
, passed to Connection.execute -
execution_options
(optional) - Dictionary of execution options assocated with thestatement
execution, passed to Connection.execute -
delay
(optional) - How long to wait before attempting to terminate backends blockingstatement
-
check_interval
(optional) - The interval between repeated attempted to terminate backends blockingstatement
-
termination_thread_timeout
(optional) - How long to wait for the termination to complete -
logger
(optional) The Python logger instance through which to log
Running tests locally
pip install -e ".[dev]" # Only needed once
./start-services.sh # Only needed once
pytest
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file pg_force_execute-0.0.5.tar.gz
.
File metadata
- Download URL: pg_force_execute-0.0.5.tar.gz
- Upload date:
- Size: 4.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.0 CPython/3.8.11
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 81e320012f95c42f9ec5c8d993881f03430ec25be1f2f974ce7b534f085cbb10 |
|
MD5 | 53bf584a006f355d9e8577dc7c904496 |
|
BLAKE2b-256 | 4e9fd9c636916b65408f7f3c948af81ec6d0bc330263fd69c2e258a33b52fa3e |
File details
Details for the file pg_force_execute-0.0.5-py3-none-any.whl
.
File metadata
- Download URL: pg_force_execute-0.0.5-py3-none-any.whl
- Upload date:
- Size: 4.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.0 CPython/3.8.11
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 6209be9059c05414daa11fb3e4b7300fbc6bbb76e970c1bb8e24a4a6fa256e0c |
|
MD5 | 716c803e09035d19da0b0242430da713 |
|
BLAKE2b-256 | cae764884320feced5b39f9ddbad42093d1e4c1b64c28ac18600a7c0e985bc7b |