Build Lambda Function to remove duplicate data from Redshift in minutes.
Project description
Welcome to dupe_remove Documentation
How come duplicate data in database?
In OLAP database Redshift, the primary_key column doesn’t apply any restriction due to performance issue. What if our ETL pipeline load duplicate same data multiple times in retry?
How dupe_remove solve the problem?
dupe_remove use a optimized strategy to remove duplicate precisely and fast. You only need to specify:
database connection
table name, id column, sort key column
dupe_remove will do these on your own will:
remove duplicate data in specified sort key range
deploy as cron job on AWS Lambda to automatically remove all duplicate data in a table.
Usage Example
Our database:
table.events |-- column(id, type=string) # id column |-- column(time, type=timestamp) # sort key column |-- other columns ...
On Local Machine
from datetime import datetime, timedelta
from sqlalchemy_mate import EngineCreator
from dupe_remove import Worker
table_name = "events"
id_col_name = "id"
sort_col_name = "time"
credential_file = "/Users/admin/db.json"
engine_creator = EngineCreator.from_json(credential_file)
engine = engine_creator.create_redshift()
worker = Worker(
engine=engine,
table_name=table_name,
id_col_name=id_col_name,
sort_col_name=sort_col_name,
)
worker.remove_duplicate(
lower=datetime(2018, 1, 1),
upper=datetime(2018, 2, 1),
)
On AWS Lambda Cron Job
def handler(event, context):
from datetime import datetime, timedelta
from sqlalchemy_mate import EngineCreator
from dupe_remove import Scheduler, Worker, Handler
table_name = "events"
id_col_name = "id"
sort_col_name = "time"
engine_creator = EngineCreator.from_env(prefix="DEV_DB", kms_decrypt=True)
engine = engine_creator.create_redshift()
test_connection(engine, 6)
worker = Worker(
engine=engine,
table_name=table_name,
id_col_name=id_col_name,
sort_col_name=sort_col_name,
)
# run every 5 min, clean 31 days data at a time from 2018-01-01,
# start over in 12 cycle
cron_freq_in_seconds = 300
start = datetime(2018, 1, 1)
delta = timedelta(days=31)
bin_size = 12
scheduler = Scheduler(
cron_freq_in_seconds=cron_freq_in_seconds,
start=start,
delta=delta,
bin_size=bin_size,
)
real_handler = Handler(worker=worker, scheduler=scheduler)
real_handler.handler(event, context)
Install
dupe_remove is released on PyPI, so all you need is:
$ pip install dupe_remove
To upgrade to latest version:
$ pip install --upgrade dupe_remove
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for dupe_remove-0.0.1-py2.py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 1b2f3e1a55a47982ed216ac658c776e24697244696e64d53936771fef132420a |
|
MD5 | fcf7d7d31c6d9d572c7c50ace7ae20de |
|
BLAKE2b-256 | 9404b8151b35f56cb61b9441748209b901493ecbaacf2e342625759f54aa544a |