sql-scheduler allows you to easily run a suite of SQL scripts against a Postgres/Redshift database.
Project description
sql-scheduler
sql-scheduler
allows you to easily run a suite of SQL scripts against a Postgres/Redshift database.
sql-scheduler
works with pairs of ddl
and insert
scripts.
The ddl
script takes the form:
DROP TABLE IF EXISTS "schema"."table";
CREATE TABLE "schema"."table" (
column_a INT,
column_b VARCHAR
);
The insert
script takes the form:
INSERT INTO "schema"."table" (
SELECT 1, ''
);
These scripts should be put into the correlating ddl
/insert
folders and should have identical names following the convention: schema.table.sql
(I recommend using all lowercase).
In order for the dev schema replacement to work without issue, your table names should be unique. When run in the dev stage, the schemas are replaced with the given dev schema.
Features:
- Automatic inter-script dependency management
Ensuring that script B which selects from the table created in script A runs after script A.
- Automatic schema replacement for development/staging workflows
- Concurrency
Scripts will be run concurrently if they don't depend on each other.
- Easy unit testing for table granularity, column nullability, and table relationships
- Incremental table inserts.
Quickstart:
Installation
pip install sql-scheduler
Configuration
Environment Variables
SQL_SCHEDULER_DDL_DIRECTORY
: An absolute path to theddl
directory. EX:/home/ubuntu/sql/ddl/
SQL_SCHEDULER_INSERT_DIRECTORY
: An absolute path to theinsert
directory. EX:/home/ubuntu/sql/insert/
SQL_SCHEDULER_DSN
: A DSN for connecting to your database in the form:postgres://user:password@host:port/database?option=value
SQL_SCHEDULER_STAGE
: The default stage (prod
,dev
) to run in. Can be overridden by the CLI flag--dev
or--prod
. When running in the dev stage a dev schema must be provided, either thru an Environment Variable, or a cli argument.SQL_SCHEDULER_DEV_SCHEMA
: The schema to replace with when run in thedev
stage. Can be overridden by the CLI argument--dev-schema
.SQL_SCHEDULER_SIMPLE_OUTPUT
: Simplify the output of this program by removing the status message. (If you are runningsql-scheduler
not in the CLI then you probably want to set this to1
)SQL_SCHEDULER_CACHE_DURATION
: The length of time for development cache runs to be valid (specified in seconds). Defaults to 6 hoursSQL_SCHEDULER_INCREMENTAL_INTERVAL
: The number of days for the default incremental window (defaults to 14). Incremental windows starts at00:00:00
14 days ago and ends at23:59:59.999
on the current day. The interval can be overridden by setting the--start
and--end
cli values.SQL_SCHEDULER_CONCURRENCY
: The maximum number of concurrent tasks to be run
Common Commands
Running all scripts.
sql-scheduler
Run a specific script.
sql-scheduler -t schema.table
Run multiple specific scripts.
sql-scheduler -t schema.table -t schema.table2
Run a specific script and all of its upstream dependencies.
sql-scheduler -t schema.table --dependencies
Run a specific script in the dev stage.
sql-scheduler -t schema.table --dev
Check for circular dependencies
sql-scheduler --check
Non-CLI Usage
You may want to utilize sql-scheduler
from within a python script instead of from the command line, for instance if you wanted to run it from within an airflow task. To do this simply import the sql_scheduler
function which can be used as the entrypoint.
from sql_scheduler.sql_scheduler import sql_scheduler
if __name__ == "__main__":
sql_scheduler(
stage="dev",
dev_schema="dev_schema",
targets=["prod_schema.table_a", "prod_schema.table_b"],
dependencies=False,
check=False,
)
To remove the in progress status message, which doesn't always play nice with python logging, set the environment variable SQL_SCHEDULER_SIMPLE_OUTPUT
to 1
.
Tests
You can add tests to insert scripts which will make certain assertions about the data. Currently there are three options for tests: granularity
, not_null
, and relationship
. To specify a test in a script simply add the test into a comment contained within the insert
script. A failure of a test will stop downstream tasks from running.
granularity
This test will assert that the granularity of the table is as expected. For example, lets say we have a table with three columns: column_a
, column_b
, and column_c
and we expect that there should only be one row per unique combination of column_a
and column_b
. We can add this test assertion with the following:
/*
granularity: column_a, column_b
*/
After populating this table, sql-scheduler
will query the table and ensure that no more than one row exists for each unique combination of column_a
and column_b
.
not_null
This test will assert that the given columns contain no null values. We can add this test assertion with the following:
/*
not_null: column_a, column_b
*/
relationship
This test will assert that all of the values in a given column are found within another column in another table. Keep in mind that this test is run after insertion, but before any downstream tasks are run so make sure to only reference upstream tables (or tables populated via other means). When running in dev
stage any tables referenced will have their schemas swapped if they are upstream of the given table. Multiple relationships can be set.
/*
relationship: column_a = schema.table.column_a
relationship: column_b = schema.table.column_b
*/
upstream_count
This test will assert that a given table has at least X number of rows. This is useful when verifying that an upstream source not updated by sql-scheduler
is populated before executing a given script. For example, if we wanted to assert that raw_data_schema.table_a
has at least 1000 rows before we execute this script:
/*
upstream_count: raw_data_schema.table_a 1000
*/
upstream_granularity
This test will assert the granularity of a given table. This is useful when working with upstream tables not populated by sql-scheduler. The same syntax is used as the granularity
test. For example, if we wanted to assert that the granularity of raw_data_schema.table_a
is column_a, column_b
before we execute this script:
/*
upstream_granularity: raw_data_schema.table_a column_a, column_b
*/
Automatic inter-script dependency management
Before execution of a run, sql-scheduler
parses all of the scripts found in the ddl
and insert
folders and identifies dependencies between scripts. It is able to do this by identifying tables referenced in FROM
and JOIN
statements within the insert
query. During the execution of a run sql-scheduler
ensures that any upstream dependencies have completed successfully before executing.
sql-scheduler
will notify you of any circular dependencies found and exit. This can be checked without initiating a run with the flag --check
.
Development script caching
sql-scheduler
keeps track of scripts that are run in dev
stage with a cache, this is then leveraged so that you don't have to re-run upstream dependency scripts if they haven't changed. To disable this functionality use the --no-cache
flag. No caching is done in the prod
stage.
When a script is run in the dev
stage a cache record is made with the current time and a hash of the ddl and insert scripts. This hash and time is used to evaluate if the script needs to be run.
Scripts explicitly targeted (-t
) are always run.
To clear the local cache run:
sql-scheduler --clear-cache
The caches are held in the $HOME/.sql-scheduler/cache
directory.
Automatic dev-schema replacement
A key feature of sql-scheduler
is the ability to write scripts targeting a prod schema/s, and test these scripts inserting into a development schema. When combining this feature with the dependency inference this can make your development experience much much smoother. Not only will the insert/ddl schema change, but any references (from, join) inside the insert script will be switched to point to the dev schema IF that table will be populated by the current run of sql-scheduler
.
I'll give you a few examples to better understand this functionality. For these examples I will use the following insert scripts (I will omit the ddl statements as they aren't relevant here).
INSERT INTO prod_schema.table_a (
select column_a
, column_b
, column_c
, column_d
from raw_data_schema.table_x
);
INSERT INTO prod_schema.table_b (
select column_a
, column_b
, column_e
, column_f
from raw_data_schema.table_y
);
INSERT INTO prod_schema.table_c (
select column_a
, column_b
, sum(column_c) as s_column_c
from prod_schema.table_a
group by 1, 2
);
INSERT INTO prod_schema.table_d (
select c.column_a
, c.column_b
, (c.s_column_c + d.column_e) / d.column_f as a_new_metric
, z.column_z
from prod_schema.table_c c
inner join prod_schema.table_b d on
c.column_a = d.column_a
and c.column_b = d.column_b
inner join raw_data_schema.table_z z on
c.column_a = z.column_a
group by 1, 2
);
These four scripts will make up a dependency graph of:
table_a ─────►table_c─────►table_d
▲
table_b───────────────────────┘
Example #1
sql-scheduler -t prod_schema.table_a --dev --dev-schema dev_schema
This will run just the prod_schema.table_a
script with dev replacement. Which would run the following statement:
INSERT INTO dev_schema.table_a (
select column_a
, column_b
, column_c
, column_d
from raw_data_schema.table_x
);
raw_data_schema.table_x
is not changed at all, because it is not a table that is being modified by the current run.
Example #2
sql-scheduler -t prod_schema.table_c --dev --dev-schema dev_schema --dependencies
This will run prod_schema.table_c
and its upstream dependencies (prod_schema.table_a
).
First the prod_schema.table_a
with dev schema replacement:
INSERT INTO dev_schema.table_a (
select column_a
, column_b
, column_c
, column_d
from raw_data_schema.table_x
);
Then prod_schema.table_c
will run both replacing its schema as well as the schema for its reference to prod_schema.table_a
:
INSERT INTO dev_schema.table_c (
select column_a
, column_b
, sum(column_c) as s_column_c
from dev_schema.table_a
group by 1, 2
);
Example #3
sql-scheduler -t prod_schema.table_c --dev --dev-schema dev_schema
This will run only prod_schema.table_c
.
Because this run doesn't modify the table it references prod_schema.table_a
that reference will be unchanged.
INSERT INTO dev_schema.table_c (
select column_a
, column_b
, sum(column_c) as s_column_c
from prod_schema.table_a
group by 1, 2
);
Example #4
sql-scheduler -t prod_schema.table_d --dev --dev-schema dev_schema --dependencies
All upstream dependencies of prod_schema.table_d
will be run.
Resulting in prod_schema.table_a
and prod_schema.table_b
running first concurrently:
INSERT INTO dev_schema.table_a (
select column_a
, column_b
, column_c
, column_d
from raw_data_schema.table_x
);
INSERT INTO dev_schema.table_b (
select column_a
, column_b
, column_e
, column_f
from raw_data_schema.table_y
);
Then prod_schema.table_c
will be run with it's reference to prod_schema.table_a
replaced with a reference to dev_schema.table_a
INSERT INTO dev_schema.table_c (
select column_a
, column_b
, sum(column_c) as s_column_c
from dev_schema.table_a
group by 1, 2
);
Then finally prod_schema.table_d
will run with it's references to prod_schema.table_c
and prod_schema.table_b
replaced with dev_schema.table_c
and dev_schema.table_b
because they were both modified in this run. The reference to raw_data_schema.table_z
is untouched because it was not modified by this run.
INSERT INTO dev_schema.table_d (
select c.column_a
, c.column_b
, (c.s_column_c + d.column_e) / d.column_f as a_new_metric
, z.column_z
from dev_schema.table_c c
inner join dev_schema.table_b d on
c.column_a = d.column_a
and c.column_b = d.column_b
inner join raw_data_schema.table_z z on
c.column_a = z.column_a
group by 1, 2
);
Incremental table inserts
Sometimes you may not want to drop and recreate each table every time, but instead want to update only the most recent data. To utilize this functionality within sql-scheduler
you must enable it with the --sql-scheduler-incremental
comment within the insert script, as well as implement a delete function. When the --sql-scheduler-incremental
flag is found within an insert script $1
and $2
will be replaced with the start
and end
of the current interval. By default the interval is set to start at (today-14 days) 00:00:00
and end at (today 23:59:59.999)
, you can change the number of days in the rolling interval by setting the SQL_SCHEDULER_INCREMENTAL_INTERVAL
environment variable. You can also override the start and end with the --start
and --end
cli arguments (When overriding a wide variety of date/time formats are allowed. You must include both start
and end
when overriding). When sql-scheduler
detects an incremental table, it will log the interval that it is running the table for.
Incrementally adding to tables presents a problem when the DDL of the table changes, or when the table doesn't exist the first time the script is run.
If the table doesn't exist, sql-scheduler
will run the DDL script to create the table.
If the tables DDL changes, it is up to you to use the --refill
flag which will run the DDL script dropping and recreating the table, or manually drop the table yourself.
Here is an example of a simple insert script which uses this functionality:
--sql-scheduler-incremental
DELETE FROM prod_schema.table_e WHERE column_e BETWEEN $1 AND $2;
INSERT INTO prod_schema.table_e (
SELECT column_e
, column_f
, column_g
FROM raw_data_schema.table_h
WHERE column_e BETWEEN $1 and $2
);
How to organize/name scripts
sql-scheduler
only cares about a few conventions when it comes to organizing and naming scripts:
- A suite of scripts consists of two folders: a
ddl
folder, and aninsert
folder. The actual names of these directories and there placement relative to each other is not prescribed. - Any script in the
insert
folder must have an identically named script in theddl
folder. - Scripts must only insert into/create one table.
- The names of scripts need to follow the convention:
schema.table.sql
. While it is not required to be lowercase I HIGHLY RECOMMEND that you do so. If you do make these names case sensitive, then you will have to follow that casing when referencing the table in the cli.
An example directory structure.
In your $HOME
directory create a directory sql
. In that directory create the directories ddl
and insert
. To add a script to populate the table schema.table_a
we will create the file schema.table_a.sql
in both the ~/sql/ddl
and ~sql/insert
directories.
~/sql/
-- ddl/
---- schema.table_a.sql
-- sql/
---- schema.table_a.sql
Now to point sql-scheduler
to these directories we will set the following environment variables.
export SQL_SCHEDULER_DDL_DIRECTORY="~/sql/ddl/"
export SQL_SCHEDULER_INSERT_DIRECTORY="~/sql/insert/"
If you are only working with one suite of SQL scripts or one database, you may find it useful to add these export statements to your .bashrc
/.zshrc
# .zshrc
echo 'export SQL_SCHEDULER_DDL_DIRECTORY="~/sql/ddl/"' >> ~/.zshrc
echo 'export SQL_SCHEDULER_INSERT_DIRECTORY="~/sql/insert/"' >> ~/.zshrc
echo 'export SQL_SCHEDULER_DSN="postgres://user:password@host:port/database?option=value"' >> ~/.zshrc
source ~/.zshrc
# .bashrc
echo 'export SQL_SCHEDULER_DDL_DIRECTORY="~/sql/ddl/"' >> ~/.bashrc
echo 'export SQL_SCHEDULER_INSERT_DIRECTORY="~/sql/insert/"' >> ~/.bashrc
echo 'export SQL_SCHEDULER_DSN="postgres://user:password@host:port/database?option=value"' >> ~/.bashrc
source ~/.bashrc
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file sql-scheduler-0.3.2.tar.gz
.
File metadata
- Download URL: sql-scheduler-0.3.2.tar.gz
- Upload date:
- Size: 35.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.8.16
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 7699cbca5f0a4225dc89204dfeac06dbedc9b9fd6d2ecd2e617872e57b944c4b |
|
MD5 | 7091f2efa80532bbc3a0bfb290a35a16 |
|
BLAKE2b-256 | 0e01c6312e444964a2d4a21243dc3753ff48b5b8c3acef5fa6335e7047b02ded |
File details
Details for the file sql_scheduler-0.3.2-py3-none-any.whl
.
File metadata
- Download URL: sql_scheduler-0.3.2-py3-none-any.whl
- Upload date:
- Size: 32.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.8.16
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 38e07307329ab627a25b54d38bccdabbaeb96c57f773c47f2ef43826ab3edfd3 |
|
MD5 | c6fb3948644f7d38a0b8cc2ef11bed1f |
|
BLAKE2b-256 | feb4bf74f007570f0f3609b34e5c6b31180a2007e455bac54f8bc2a5c2de70d8 |