Skip to main content

Lightweight DataPipeline Documentation

Project description

dbpipe

dbpipe is a lightweight and simple way to manage data pipelines.

graph LR
A(Endpoints)-->B(Pipes)
B-->C(Jobs)
D(Schedules)-->C
C-.->E(Clusters)

Creating Endpoints

from dbpipe import EndPoint


facebook = EndPoint('Facebook','API','https://facebook.com/Posts')
facebook
{'name': 'Facebook', 'type': 'API', 'location': 'https://facebook.com/Posts'}
facebook.save()
posttable = EndPoint('DW.Facebook.Posts','Database','ServerName')
posttable
{'name': 'DW.Facebook.Posts', 'type': 'Database', 'location': 'ServerName'}
posttable.save()

Creating a Pipe

from dbpipe import Pipe


pipe = Pipe(
        name='DW',
        sources=[facebook],
        destination=posttable,
        processfile="Test.py"
    )

pipe
{'name': 'DW', 'sources': [{'name': 'Facebook', 'type': 'API', 'location': 'https://facebook.com/Posts'}], 'destination': {'name': 'DW.Facebook.Posts', 'type': 'Database', 'location': 'ServerName'}, 'logfile': None, 'processfile': 'Test.py'}
pipe.to_dict()
{'name': 'DW',
 'sources': [{'name': 'Facebook',
   'type': 'API',
   'location': 'https://facebook.com/Posts'}],
 'destination': {'name': 'DW.Facebook.Posts',
  'type': 'Database',
  'location': 'ServerName'},
 'logfile': None,
 'processfile': 'Test.py'}
pipe.save()

Creating a Schedule

from dbpipe import Schedule

schedule = Schedule(frequency="Daily", start_time="8:00AM")

schedule
{'frequency': 'Daily', 'start_time': '8:00AM', 'end_time': None, 'time_zone': 'UTC'}
schedule.to_dict()
{'frequency': 'Daily',
 'start_time': '8:00AM',
 'end_time': None,
 'time_zone': 'UTC'}

Creating a Pipe Cluster

from dbpipe.core.pipes import Cluster


clstr = Cluster([pipe,pipe])
clstr
[{'name': 'DW', 'sources': ['AdSpend', 'SocialStats'], 'destination': 'DW', 'logfile': None, 'processfile': 'Test.py'}, {'name': 'DW', 'sources': ['AdSpend', 'SocialStats'], 'destination': 'DW', 'logfile': None, 'processfile': 'Test.py'}]

Creating a Job

from dbpipe import Job


job = Job('My Job',schedule=schedule,jobs=clstr)
job
{'name': 'My Job', 'schedule': {'frequency': 'Daily', 'start_time': '8:00AM', 'end_time': None, 'time_zone': 'UTC'}, 'jobs': [{'name': 'DW', 'sources': ['AdSpend', 'SocialStats'], 'destination': 'DW', 'logfile': None, 'processfile': 'Test.py'}, {'name': 'DW', 'sources': ['AdSpend', 'SocialStats'], 'destination': 'DW', 'logfile': None, 'processfile': 'Test.py'}]}
job.save()

Reading a Pipe

from dbpipe import read_pipe


pipe = read_pipe('pipes/DW.json')
pipe
{'name': 'DW', 'sources': ['AdSpend', 'SocialStats'], 'destination': 'DW', 'logfile': None, 'processfile': 'Test.py'}
pipe.to_dict()
{'name': 'DW',
 'sources': ['AdSpend', 'SocialStats'],
 'destination': 'DW',
 'logfile': None,
 'processfile': 'Test.py'}

Reading a Job

from dbpipe import read_job

job = read_job('jobs/My Job.json')
job
{'name': 'My Job', 'schedule': {'frequency': 'Daily', 'start_time': '8:00AM', 'end_time': None, 'time_zone': 'UTC'}, 'jobs': [{'name': 'DW', 'sources': ['AdSpend', 'SocialStats'], 'destination': 'DW', 'logfile': None, 'processfile': 'Test.py'}, {'name': 'DW', 'sources': ['AdSpend', 'SocialStats'], 'destination': 'DW', 'logfile': None, 'processfile': 'Test.py'}]}
job.to_dict()
{'name': 'My Job',
 'schedule': {'frequency': 'Daily',
  'start_time': '8:00AM',
  'end_time': None,
  'time_zone': 'UTC'},
 'jobs': [{'name': 'DW',
   'sources': ['AdSpend', 'SocialStats'],
   'destination': 'DW',
   'logfile': None,
   'processfile': 'Test.py'},
  {'name': 'DW',
   'sources': ['AdSpend', 'SocialStats'],
   'destination': 'DW',
   'logfile': None,
   'processfile': 'Test.py'}]}

Lineage

from dbpipe.lineage.mermaid import generate_mermaid_markdown_file


generate_mermaid_markdown_file('pipes','test.md')

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dbpipe-0.2.0.tar.gz (6.0 kB view details)

Uploaded Source

Built Distribution

dbpipe-0.2.0-py3-none-any.whl (6.1 kB view details)

Uploaded Python 3

File details

Details for the file dbpipe-0.2.0.tar.gz.

File metadata

  • Download URL: dbpipe-0.2.0.tar.gz
  • Upload date:
  • Size: 6.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.0.0 CPython/3.11.4

File hashes

Hashes for dbpipe-0.2.0.tar.gz
Algorithm Hash digest
SHA256 159a801a8ae20e35d433657bd4a66bb8607eb53688fc098704984cdf2aef21b5
MD5 8b4d23255e6bd7f1bb867787076ae77e
BLAKE2b-256 415cee55a4d5d90232cbed1cd3486a28ace842b9a302219788a69bd44c1c7ce3

See more details on using hashes here.

File details

Details for the file dbpipe-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: dbpipe-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 6.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.0.0 CPython/3.11.4

File hashes

Hashes for dbpipe-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 c679398b03330d7dc8f4d2d4e1d371d38db27ffddcbc9e12b3e2b40b8ffe39e0
MD5 9f03d3434e1e5c50dec3ede37ae83ca2
BLAKE2b-256 b1a35f348e275f9870c451b41dc5e8227b9944e1d596a65b5e00d2f1ac25ee7a

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page