Skip to main content

Lightweight DataPipeline Documentation

Project description

dbpipe

dbpipe is a lightweight and simple way to manage data pipelines.

graph LR
A(Endpoints)-->B(Pipes)
B-->C(Jobs)
D(Schedules)-->C
C-.->E(Clusters)

Creating Endpoints

from dbpipe import EndPoint


facebook = EndPoint('Facebook','API','https://facebook.com/Posts')
facebook
{'name': 'Facebook', 'type': 'API', 'location': 'https://facebook.com/Posts'}
facebook.save()
posttable = EndPoint('DW.Facebook.Posts','Database','ServerName')
posttable
{'name': 'DW.Facebook.Posts', 'type': 'Database', 'location': 'ServerName'}
posttable.save()

Creating a Pipe

from dbpipe import Pipe


pipe = Pipe(
        name='DW',
        sources=[facebook],
        destination=posttable,
        processfile="Test.py"
    )

pipe
{'name': 'DW', 'sources': [{'name': 'Facebook', 'type': 'API', 'location': 'https://facebook.com/Posts'}], 'destination': {'name': 'DW.Facebook.Posts', 'type': 'Database', 'location': 'ServerName'}, 'logfile': None, 'processfile': 'Test.py'}
pipe.to_dict()
{'name': 'DW',
 'sources': [{'name': 'Facebook',
   'type': 'API',
   'location': 'https://facebook.com/Posts'}],
 'destination': {'name': 'DW.Facebook.Posts',
  'type': 'Database',
  'location': 'ServerName'},
 'logfile': None,
 'processfile': 'Test.py'}
pipe.save()

Creating a Schedule

from dbpipe import Schedule

schedule = Schedule(frequency="Daily", start_time="8:00AM")

schedule
{'frequency': 'Daily', 'start_time': '8:00AM', 'end_time': None, 'time_zone': 'UTC'}
schedule.to_dict()
{'frequency': 'Daily',
 'start_time': '8:00AM',
 'end_time': None,
 'time_zone': 'UTC'}

Creating a Pipe Cluster

from dbpipe.core.pipes import Cluster


clstr = Cluster([pipe,pipe])
clstr
[{'name': 'DW', 'sources': ['AdSpend', 'SocialStats'], 'destination': 'DW', 'logfile': None, 'processfile': 'Test.py'}, {'name': 'DW', 'sources': ['AdSpend', 'SocialStats'], 'destination': 'DW', 'logfile': None, 'processfile': 'Test.py'}]

Creating a Job

from dbpipe import Job


job = Job('My Job',schedule=schedule,jobs=clstr)
job
{'name': 'My Job', 'schedule': {'frequency': 'Daily', 'start_time': '8:00AM', 'end_time': None, 'time_zone': 'UTC'}, 'jobs': [{'name': 'DW', 'sources': ['AdSpend', 'SocialStats'], 'destination': 'DW', 'logfile': None, 'processfile': 'Test.py'}, {'name': 'DW', 'sources': ['AdSpend', 'SocialStats'], 'destination': 'DW', 'logfile': None, 'processfile': 'Test.py'}]}
job.save()

Reading a Pipe

from dbpipe import read_pipe


pipe = read_pipe('pipes/DW.json')
pipe
{'name': 'DW', 'sources': ['AdSpend', 'SocialStats'], 'destination': 'DW', 'logfile': None, 'processfile': 'Test.py'}
pipe.to_dict()
{'name': 'DW',
 'sources': ['AdSpend', 'SocialStats'],
 'destination': 'DW',
 'logfile': None,
 'processfile': 'Test.py'}

Reading a Job

from dbpipe import read_job

job = read_job('jobs/My Job.json')
job
{'name': 'My Job', 'schedule': {'frequency': 'Daily', 'start_time': '8:00AM', 'end_time': None, 'time_zone': 'UTC'}, 'jobs': [{'name': 'DW', 'sources': ['AdSpend', 'SocialStats'], 'destination': 'DW', 'logfile': None, 'processfile': 'Test.py'}, {'name': 'DW', 'sources': ['AdSpend', 'SocialStats'], 'destination': 'DW', 'logfile': None, 'processfile': 'Test.py'}]}
job.to_dict()
{'name': 'My Job',
 'schedule': {'frequency': 'Daily',
  'start_time': '8:00AM',
  'end_time': None,
  'time_zone': 'UTC'},
 'jobs': [{'name': 'DW',
   'sources': ['AdSpend', 'SocialStats'],
   'destination': 'DW',
   'logfile': None,
   'processfile': 'Test.py'},
  {'name': 'DW',
   'sources': ['AdSpend', 'SocialStats'],
   'destination': 'DW',
   'logfile': None,
   'processfile': 'Test.py'}]}

Lineage

from dbpipe.lineage.mermaid import generate_mermaid_markdown_file


generate_mermaid_markdown_file('pipes','test.md')

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dbpipe-0.2.0.tar.gz (6.0 kB view hashes)

Uploaded Source

Built Distribution

dbpipe-0.2.0-py3-none-any.whl (6.1 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page