A package to query Postgres databases in parallel
This class manages multiple database connections, handles the parallel access to it, and hides the complexity this entails. The execution of queries is distributed by running it for each connection in parallel. The result (as retrieved by fetchall() and fetchone()) is the union of the parallelized query results from each connection.
The use case we had in mind when we created this class was having sharded tables (distributed across n database instances) that we needed to query concurrently and merging the results.
See below for an architecture overview for possible inspiration.
The package is in principle database independent, as long as you expect a connection object to respond to (part of) these methods:
cursor cursor.execute cursor.fetchone cursor.fetchall cursor.mogrify cursor.commit cursor.close putconn
To use in its simplest form, do (example using psycopg2 and assuming that dsns is a list containing your databases’ connection strings)
n = 20 # maximum connections per pool pools = [psycopg2.ThreadedConnectionPool(1, n, dsn=d) for d in dsns] connections = [p.getconn() for p in pools] pdb = ParallelConnection(connections)
The pdb object works for the rest like a normal, single, database connection object, but it merges results return by each database. You can therefore use it like so:
c = pdb.cursor(cursor_factory=psycopg2.extras.RealDictCursor) c.execute("SELECT * FROM my_shrd_tbl WHERE shrd_column = 1543", parameters) results = c.fetchall() c.close
Results will fetch everything from all database. In case your query has a where in the sharded column (shrd_column) the results from all but one databases will be empty. This is fine, as the package handles it for you. When it gets more interesting is a query like
c = pdb.cursor(cursor_factory=psycopg2.extras.RealDictCursor) c.execute("SELECT * FROM my_shrd_tbl WHERE not_shrd_column = 543", parameters) results = c.fetchall() c.close
In this case the query does not have a WHERE on a sharded column, so the package will fetch results from each database and merge them. Why that may be of interest for you, will be shown below.
If you are executing a query on a non-sharded table you should use a normal connection object.
We found ourselves having long running queries that where aggregating records on a particular column (let’s call it shrd_column). To reduce the run-time we decided to split only the table(s) containing shrd_column between multiple databases, and have each database have a copy of all non-sharded tables.
Then each query grouping on shrd_column can be basically be executed independently in each databases. The results still need to be merged though, so that’s why we build this package (we call it package even if Niels insists on calling it “just a class”).
Q. Why don’t you use things as pg_shard?
A. Because pg_shard doesn’t handle JOIN on the distributed query, which we want to do. Our package has the additional advantage that all the databases are completely unaware of each other. It all happens on the application layer and on the ingestion layer.
Q. What about if a machine goes down, etc.?
A. Just use two machines with a load balancer in front of them.
Q. What about INSERT?
A. Yeah, we don’t do that sort of things (it’s read-only application for explorative purposes). But feel to see if it works, and fix it plus submitting a PR if it doesn’t.
Q. What about how to shard the data? This package does nothing, Niels is right!
A. We trust you are savvy enough to do that by yourself before ingestion. We could help you with that though, just drop us a line.
Q. I want to know more!
A. That’s technically not a question, but you can begin by watching Niels present the project at PyData Paris 2015. It’s on Youtube.