Skip to main content

No project description provided

Project description

codecov test

CloudSDP Library

The CloudSDP library is designed to simplify the creation and management of serverless data pipelines between Google Cloud Run and Google BigQuery. It provides a developer-friendly interface to extract data from various sources, transform it, and seamlessly load it into BigQuery tables, all while leveraging the power of serverless architecture.

Features

WIP:

  • Data Extraction and Ingestion: Extract data from various sources, convert it into a common format, and ingest it into BigQuery tables.

TODO:

  • Data Transformation: Perform data transformations, such as cleaning, enrichment, and normalization, before loading into BigQuery.
  • Scheduled Jobs and Triggers: Schedule data pipeline jobs based on time triggers using Cloud Scheduler.
  • Data Pipeline Workflow: Define and orchestrate data pipeline workflows with configurable execution order and dependencies.
  • Conflict Resolution and Error Handling: Implement conflict resolution strategies and error handling mechanisms for reliable data processing.
  • Monitoring and Logging: Monitor job progress, resource utilization, and performance metrics using integrated logging and monitoring tools.
  • Documentation and Examples: Comprehensive documentation and code examples to guide developers in using the library effectively.

Installation

Install the library using pip:

pip install cloudsdp

Or, install the library using poetry:

poetry add cloudsdp

QuickStart

Data Ingestion

Create dataset, ingest data and cleanup

Ingest data from a pandas dataframe:

import os
import pandas as pd

from cloudsdp.api.bigquery import BigQuery, WriteDisposition


PROJECT_NAME = "project_name"


def main():
    bq = BigQuery(PROJECT_NAME)
    dataset_name = "dataset_1"
    table_name = "table_1"

    data = {
        "name": [ f"Name{str(el)}" for el in range(0, 10000)],
        "score": [ num for num in range(0, 10000)]
    }
    df = pd.DataFrame(data)
    data_schema = [
        {"name": "name", "field_type": "STRING", "mode": "REQUIRED"},
        {"name": "score", "field_type": "NUMERIC", "mode": "REQUIRED"},
    ]

    bq.create_dataset(dataset_name)
    bq.create_table(table_name, data_schema, dataset_name)

    bq.ingest_from_dataframe(df, dataset_name, table_name, write_disposition=WriteDisposition.WRITE_IF_TABLE_EMPTY)

    bq.delete_dataset(dataset_name, delete_contents=True, not_found_ok=True)

From a list of python dicts:

import os

from cloudsdp.api.bigquery import BigQuery

PROJECT_NAME = "project_name"


def main():
    bq = BigQuery(PROJECT_NAME)
    dataset_name = "dataset_1"
    table_name = "table_1"

    data = [{"name": "Someone", "age": 29}, {"name": "Something", "age": 22}]

    data_schema = [
        {"name": "name", "field_type": "STRING", "mode": "REQUIRED"},
        {"name": "age", "field_type": "INTEGER", "mode": "REQUIRED"},
    ]

    bq.create_dataset(dataset_name)

    bq.create_table(table_name, data_schema, dataset_name)

    errors = bq.ingest_rows_json(data, dataset_name, table_name)
    if errors:
        print("Errors", ";".join(errors))

    bq.delete_dataset(dataset_name, delete_contents=True, not_found_ok=True)

From csv files stored in GCS:

import os

from cloudsdp.api.bigquery import BigQuery


PROJECT_NAME = "project_name"


def main():
    bq = BigQuery(PROJECT_NAME)
    dataset_name = "dataset_1"
    table_name = "table_1"

    data_schema = [
        {"name": "name", "field_type": "STRING", "mode": "REQUIRED"},
        {"name": "age", "field_type": "INTEGER", "mode": "REQUIRED"},
    ]

    bq.create_dataset(dataset_name)

    bq.create_table(table_name, data_schema, dataset_name)

    csv_uris = ["gs://mybucket/name_age_data_1.csv", "gs://mybucket/name_age_data_2.csv"]

    result = bq.ingest_csvs_from_cloud_bucket(
        csv_uris, dataset_name, table_name, skip_leading_rows=1, autodetect_schema=False, timeout=120
    )
    print(result)

    bq.delete_dataset(dataset_name, delete_contents=True, not_found_ok=True)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cloudsdp-0.1.11.tar.gz (6.3 kB view hashes)

Uploaded Source

Built Distribution

cloudsdp-0.1.11-py3-none-any.whl (7.5 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page