generic connection pool
Project description
generic-connection-pool is a connection pool that can be used for TCP, http, database connections.
Features:
generic nature: can be used for any connection you desire (TCP, http, database)
runtime agnostic: synchronous and asynchronous pool supported
flexibility: flexable connection retention and recycling policy
fully-typed: mypy type-checker compatible
Installation
You can install generic-connection-pool with pip:
$ pip install generic-connection-pool
Quickstart
The following example illustrates how to create asynchronous ssl socket pool:
import asyncio
from typing import Tuple
from generic_connection_pool.asyncio import ConnectionPool
from generic_connection_pool.contrib.socket_async import TcpStreamConnectionManager
Hostname = str
Port = int
Endpoint = Tuple[Hostname, Port]
Connection = Tuple[asyncio.StreamReader, asyncio.StreamWriter]
async def main() -> None:
pool = ConnectionPool[Endpoint, Connection](
TcpStreamConnectionManager(ssl=True),
idle_timeout=30.0,
max_lifetime=600.0,
min_idle=3,
max_size=20,
total_max_size=100,
background_collector=True,
)
async with pool.connection(endpoint=('www.wikipedia.org', 443), timeout=5.0) as (reader, writer):
request = (
'GET / HTTP/1.0\n'
'Host: www.wikipedia.org\n'
'\n'
'\n'
)
writer.write(request.encode())
await writer.drain()
response = await reader.read()
print(response.decode())
asyncio.run(main())
Configuration
Synchronous and asynchronous pools supports the following parameters:
connection_manager: connection manager instance
acquire_timeout: connection acquiring default timeout
dispose_batch_size: number of connections to be disposed at once (if background collector is started the parameter is ignored)
dispose_timeout: connection disposal timeout
background_collector: start worker that disposes timed-out connections in background maintain provided pool state otherwise they will be disposed on each connection release
idle_timeout: number of seconds after which a connection will be closed respecting min_idle parameter (the connection will be closed only if the connection number exceeds min_idle)
max_lifetime: number of seconds after which a connection will be closed (min_idle parameter will be ignored)
min_idle: minimum number of connections the pool tries to hold (for each endpoint)
max_size: maximum number of connections (for each endpoint)
total_max_size: maximum number of connections (for all endpoints)
Generic nature
Since the pool has generic nature is can be used for database connections as well:
import psycopg2.extensions
from generic_connection_pool.contrib.psycopg2 import DbConnectionManager
from generic_connection_pool.threding import ConnectionPool
Endpoint = str
Connection = psycopg2.extensions.connection
def main() -> None:
dsn_params = dict(dbname='postgres', user='postgres', password='secret')
pool = ConnectionPool[Endpoint, Connection](
DbConnectionManager(
dsn_params={
'master': dict(dsn_params, host='db-master.local'),
'replica-1': dict(dsn_params, host='db-replica-1.local'),
'replica-2': dict(dsn_params, host='db-replica-2.local'),
},
),
acquire_timeout=2.0,
idle_timeout=60.0,
max_lifetime=600.0,
min_idle=3,
max_size=10,
total_max_size=15,
background_collector=True,
)
with pool.connection(endpoint='master') as conn:
cur = conn.cursor()
cur.execute("SELECT * FROM pg_stats;")
print(cur.fetchone())
with pool.connection(endpoint='replica-1') as conn:
cur = conn.cursor()
cur.execute("SELECT * FROM pg_stats;")
print(cur.fetchone())
pool.close()
main()
Extendability
If built-in connection managers are not suitable for your task the one can be easily created by yourself:
import socket
from ssl import SSLContext, SSLSocket
from typing import Optional, Tuple
from generic_connection_pool.threding import BaseConnectionManager, ConnectionPool
Hostname = str
Port = int
SslEndpoint = Tuple[Hostname, Port]
Connection = SSLSocket
class SslSocketConnectionManager(BaseConnectionManager[SslEndpoint, Connection]):
"""
SSL socket connection manager.
"""
def __init__(self, ssl: SSLContext):
self._ssl = ssl
def create(self, endpoint: SslEndpoint, timeout: Optional[float] = None) -> Connection:
hostname, port = endpoint
sock = self._ssl.wrap_socket(socket.socket(type=socket.SOCK_STREAM), server_hostname=hostname)
sock.settimeout(timeout)
sock.connect((hostname, port))
return sock
def dispose(self, endpoint: SslEndpoint, conn: Connection, timeout: Optional[float] = None) -> None:
conn.settimeout(timeout)
try:
conn.shutdown(socket.SHUT_RDWR)
except OSError:
pass
conn.close()
def main() -> None:
pool = ConnectionPool[SslEndpoint, Connection](
SslSocketConnectionManager(ssl=SSLContext()),
idle_timeout=30.0,
max_lifetime=600.0,
min_idle=3,
max_size=20,
total_max_size=100,
background_collector=True,
)
with pool.connection(endpoint=('www.wikipedia.org', 443), timeout=5.0) as sock:
request = (
'GET / HTTP/1.0\n'
'Host: www.wikipedia.org\n'
'\n'
'\n'
)
sock.write(request.encode())
response = []
while chunk := sock.recv():
response.append(chunk)
print(b''.join(response).decode())
pool.close()
main()
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for generic_connection_pool-0.1.1.tar.gz
Algorithm | Hash digest | |
---|---|---|
SHA256 | 80f0e3f1ca07f982ca0868672f05af5190df7fe2c49d4713e75414222cc333d2 |
|
MD5 | ca868a1ee300908d17a77381b45cf6e5 |
|
BLAKE2b-256 | d5dfa0d7fb65b04b468103c46c5c259365ad8bfc5c19103e0e275b5aca02cf91 |
Hashes for generic_connection_pool-0.1.1-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 026b12f99c0aaaa77b8fecad55350f22bc4ec5a51b556116b68c36573193c0fe |
|
MD5 | f0a4a11458915f6505b0e3aa925e8c92 |
|
BLAKE2b-256 | c1e6ec9095ed6a1a1e504e87d2e4c24a698c2152a56af0c3792f512312640052 |