Build Lambda Function to remove duplicate data from Redshift in minutes.
Project description
Welcome to dupe_remove Documentation
How come duplicate data in database?
In OLAP database Redshift, the primary_key column doesn’t apply any restriction due to performance issue. What if our ETL pipeline load duplicate same data multiple times in retry?
How dupe_remove solve the problem?
dupe_remove use a optimized strategy to remove duplicate precisely and fast. You only need to specify:
database connection
table name, id column, sort key column
dupe_remove will do these on your own will:
remove duplicate data in specified sort key range
deploy as cron job on AWS Lambda to automatically remove all duplicate data in a table.
Usage Example
Our database:
table.events |-- column(id, type=string) # id column |-- column(time, type=timestamp) # sort key column |-- other columns ...
On Local Machine
from datetime import datetime, timedelta
from sqlalchemy_mate import EngineCreator
from dupe_remove import Worker
table_name = "events"
id_col_name = "id"
sort_col_name = "time"
credential_file = "/Users/admin/db.json"
engine_creator = EngineCreator.from_json(credential_file)
engine = engine_creator.create_redshift()
worker = Worker(
engine=engine,
table_name=table_name,
id_col_name=id_col_name,
sort_col_name=sort_col_name,
)
worker.remove_duplicate(
lower=datetime(2018, 1, 1),
upper=datetime(2018, 2, 1),
)
On AWS Lambda Cron Job
def handler(event, context):
from datetime import datetime, timedelta
from sqlalchemy_mate import EngineCreator
from dupe_remove import Scheduler, Worker, Handler
table_name = "events"
id_col_name = "id"
sort_col_name = "time"
engine_creator = EngineCreator.from_env(prefix="DEV_DB", kms_decrypt=True)
engine = engine_creator.create_redshift()
test_connection(engine, 6)
worker = Worker(
engine=engine,
table_name=table_name,
id_col_name=id_col_name,
sort_col_name=sort_col_name,
)
# run every 5 min, clean 31 days data at a time from 2018-01-01,
# start over in 12 cycle
cron_freq_in_seconds = 300
start = datetime(2018, 1, 1)
delta = timedelta(days=31)
bin_size = 12
scheduler = Scheduler(
cron_freq_in_seconds=cron_freq_in_seconds,
start=start,
delta=delta,
bin_size=bin_size,
)
real_handler = Handler(worker=worker, scheduler=scheduler)
real_handler.handler(event, context)
Install
dupe_remove is released on PyPI, so all you need is:
$ pip install dupe_remove
To upgrade to latest version:
$ pip install --upgrade dupe_remove
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file dupe_remove-0.0.1.tar.gz
.
File metadata
- Download URL: dupe_remove-0.0.1.tar.gz
- Upload date:
- Size: 18.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/1.13.0 pkginfo/1.5.0.1 requests/2.21.0 setuptools/40.8.0 requests-toolbelt/0.9.1 tqdm/4.31.1 CPython/3.6.2
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 9579eaffa38c0a53490479c61dbc40c6b4bab9534f4ee5a5b9a4a46629cdac57 |
|
MD5 | d33f9809145895491b603e7e4aa2f145 |
|
BLAKE2b-256 | fa27d6c468029aabe8e105aa78207693800840116899cd269008ea4ed0896224 |
File details
Details for the file dupe_remove-0.0.1-py2.py3-none-any.whl
.
File metadata
- Download URL: dupe_remove-0.0.1-py2.py3-none-any.whl
- Upload date:
- Size: 23.7 kB
- Tags: Python 2, Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/1.13.0 pkginfo/1.5.0.1 requests/2.21.0 setuptools/40.8.0 requests-toolbelt/0.9.1 tqdm/4.31.1 CPython/3.6.2
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 1b2f3e1a55a47982ed216ac658c776e24697244696e64d53936771fef132420a |
|
MD5 | fcf7d7d31c6d9d572c7c50ace7ae20de |
|
BLAKE2b-256 | 9404b8151b35f56cb61b9441748209b901493ecbaacf2e342625759f54aa544a |