Skip to main content

No project description provided

Project description

codecov test

CloudSDP Library

The CloudSDP library is designed to simplify the creation and management of serverless data pipelines between Google Cloud Run and Google BigQuery. It provides a developer-friendly interface to extract data from various sources, transform it, and seamlessly load it into BigQuery tables, all while leveraging the power of serverless architecture.

Features

WIP:

  • Data Extraction and Ingestion: Extract data from various sources, convert it into a common format, and ingest it into BigQuery tables.

TODO:

  • Data Transformation: Perform data transformations, such as cleaning, enrichment, and normalization, before loading into BigQuery.
  • Scheduled Jobs and Triggers: Schedule data pipeline jobs based on time triggers using Cloud Scheduler.
  • Data Pipeline Workflow: Define and orchestrate data pipeline workflows with configurable execution order and dependencies.
  • Conflict Resolution and Error Handling: Implement conflict resolution strategies and error handling mechanisms for reliable data processing.
  • Monitoring and Logging: Monitor job progress, resource utilization, and performance metrics using integrated logging and monitoring tools.
  • Documentation and Examples: Comprehensive documentation and code examples to guide developers in using the library effectively.

Installation

Install the library using pip:

pip install cloudsdp

Or, install the library using poetry:

poetry add cloudsdp

QuickStart

Data Ingestion

Create dataset, ingest data and cleanup

Ingest data from a pandas dataframe:

import os
import pandas as pd

from cloudsdp.api.bigquery import BigQuery, WriteDisposition


PROJECT_NAME = "project_name"


def main():
    bq = BigQuery(PROJECT_NAME)
    dataset_name = "dataset_1"
    table_name = "table_1"

    data = {
        "name": [ f"Name{str(el)}" for el in range(0, 10000)],
        "score": [ num for num in range(0, 10000)]
    }
    df = pd.DataFrame(data)
    data_schema = [
        {"name": "name", "field_type": "STRING", "mode": "REQUIRED"},
        {"name": "score", "field_type": "NUMERIC", "mode": "REQUIRED"},
    ]

    bq.create_dataset(dataset_name)
    bq.create_table(table_name, data_schema, dataset_name)

    bq.ingest_from_dataframe(df, dataset_name, table_name, write_disposition=WriteDisposition.WRITE_IF_TABLE_EMPTY)

    bq.delete_dataset(dataset_name, delete_contents=True, not_found_ok=True)

From a list of python dicts:

import os

from cloudsdp.api.bigquery import BigQuery

PROJECT_NAME = "project_name"


def main():
    bq = BigQuery(PROJECT_NAME)
    dataset_name = "dataset_1"
    table_name = "table_1"

    data = [{"name": "Someone", "age": 29}, {"name": "Something", "age": 22}]

    data_schema = [
        {"name": "name", "field_type": "STRING", "mode": "REQUIRED"},
        {"name": "age", "field_type": "INTEGER", "mode": "REQUIRED"},
    ]

    bq.create_dataset(dataset_name)

    bq.create_table(table_name, data_schema, dataset_name)

    errors = bq.ingest_rows_json(data, dataset_name, table_name)
    if errors:
        print("Errors", ";".join(errors))

    bq.delete_dataset(dataset_name, delete_contents=True, not_found_ok=True)

From csv files stored in GCS:

import os

from cloudsdp.api.bigquery import BigQuery


PROJECT_NAME = "project_name"


def main():
    bq = BigQuery(PROJECT_NAME)
    dataset_name = "dataset_1"
    table_name = "table_1"

    data_schema = [
        {"name": "name", "field_type": "STRING", "mode": "REQUIRED"},
        {"name": "age", "field_type": "INTEGER", "mode": "REQUIRED"},
    ]

    bq.create_dataset(dataset_name)

    bq.create_table(table_name, data_schema, dataset_name)

    csv_uris = ["gs://mybucket/name_age_data_1.csv", "gs://mybucket/name_age_data_2.csv"]

    result = bq.ingest_csvs_from_cloud_bucket(
        csv_uris, dataset_name, table_name, skip_leading_rows=1, autodetect_schema=False, timeout=120
    )
    print(result)

    bq.delete_dataset(dataset_name, delete_contents=True, not_found_ok=True)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cloudsdp-0.1.11.tar.gz (6.3 kB view details)

Uploaded Source

Built Distribution

cloudsdp-0.1.11-py3-none-any.whl (7.5 kB view details)

Uploaded Python 3

File details

Details for the file cloudsdp-0.1.11.tar.gz.

File metadata

  • Download URL: cloudsdp-0.1.11.tar.gz
  • Upload date:
  • Size: 6.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.5.1 CPython/3.10.12 Linux/5.15.0-1042-azure

File hashes

Hashes for cloudsdp-0.1.11.tar.gz
Algorithm Hash digest
SHA256 cb1c5bb83a73c986898f3e12ea6938dc591cfa74015f6ba600e0b4c706cd783c
MD5 3001cc7dee58b39f05d5a85d14f5cb28
BLAKE2b-256 94acd928e9897cbc623b154bc9e416b28056e7ee97447ce8c7c60311e0161b92

See more details on using hashes here.

File details

Details for the file cloudsdp-0.1.11-py3-none-any.whl.

File metadata

  • Download URL: cloudsdp-0.1.11-py3-none-any.whl
  • Upload date:
  • Size: 7.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.5.1 CPython/3.10.12 Linux/5.15.0-1042-azure

File hashes

Hashes for cloudsdp-0.1.11-py3-none-any.whl
Algorithm Hash digest
SHA256 25f26768501fe00a93534c6a8c968efd0bae365375e8ce21451725003b0df38f
MD5 e43c437e712339c17ced42cabe603cf6
BLAKE2b-256 0ba6b8da92fa8ead1f5fa57d2ad0280f63979f681c0e139b05f65d67a4fd5e49

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page