No project description provided
Project description
CloudSDP Library
The CloudSDP library is designed to simplify the creation and management of serverless data pipelines between Google Cloud Run and Google BigQuery. It provides a developer-friendly interface to extract data from various sources, transform it, and seamlessly load it into BigQuery tables, all while leveraging the power of serverless architecture.
Features
WIP:
- Data Extraction and Ingestion: Extract data from various sources, convert it into a common format, and ingest it into BigQuery tables.
TODO:
- Data Transformation: Perform data transformations, such as cleaning, enrichment, and normalization, before loading into BigQuery.
- Scheduled Jobs and Triggers: Schedule data pipeline jobs based on time triggers using Cloud Scheduler.
- Data Pipeline Workflow: Define and orchestrate data pipeline workflows with configurable execution order and dependencies.
- Conflict Resolution and Error Handling: Implement conflict resolution strategies and error handling mechanisms for reliable data processing.
- Monitoring and Logging: Monitor job progress, resource utilization, and performance metrics using integrated logging and monitoring tools.
- Documentation and Examples: Comprehensive documentation and code examples to guide developers in using the library effectively.
Installation
Install the library using pip:
pip install cloudsdp
Or, install the library using poetry:
poetry add cloudsdp
QuickStart
Data Ingestion
Create dataset, ingest data and cleanup
Ingest data from a pandas dataframe:
import os
import pandas as pd
from cloudsdp.api.bigquery import BigQuery, WriteDisposition
PROJECT_NAME = "project_name"
def main():
bq = BigQuery(PROJECT_NAME)
dataset_name = "dataset_1"
table_name = "table_1"
data = {
"name": [ f"Name{str(el)}" for el in range(0, 10000)],
"score": [ num for num in range(0, 10000)]
}
df = pd.DataFrame(data)
data_schema = [
{"name": "name", "field_type": "STRING", "mode": "REQUIRED"},
{"name": "score", "field_type": "NUMERIC", "mode": "REQUIRED"},
]
bq.create_dataset(dataset_name)
bq.create_table(table_name, data_schema, dataset_name)
bq.ingest_from_dataframe(df, dataset_name, table_name, write_disposition=WriteDisposition.WRITE_IF_TABLE_EMPTY)
bq.delete_dataset(dataset_name, delete_contents=True, not_found_ok=True)
From a list of python dicts:
import os
from cloudsdp.api.bigquery import BigQuery
PROJECT_NAME = "project_name"
def main():
bq = BigQuery(PROJECT_NAME)
dataset_name = "dataset_1"
table_name = "table_1"
data = [{"name": "Someone", "age": 29}, {"name": "Something", "age": 22}]
data_schema = [
{"name": "name", "field_type": "STRING", "mode": "REQUIRED"},
{"name": "age", "field_type": "INTEGER", "mode": "REQUIRED"},
]
bq.create_dataset(dataset_name)
bq.create_table(table_name, data_schema, dataset_name)
errors = bq.ingest_rows_json(data, dataset_name, table_name)
if errors:
print("Errors", ";".join(errors))
bq.delete_dataset(dataset_name, delete_contents=True, not_found_ok=True)
From csv files stored in GCS:
import os
from cloudsdp.api.bigquery import BigQuery
PROJECT_NAME = "project_name"
def main():
bq = BigQuery(PROJECT_NAME)
dataset_name = "dataset_1"
table_name = "table_1"
data_schema = [
{"name": "name", "field_type": "STRING", "mode": "REQUIRED"},
{"name": "age", "field_type": "INTEGER", "mode": "REQUIRED"},
]
bq.create_dataset(dataset_name)
bq.create_table(table_name, data_schema, dataset_name)
csv_uris = ["gs://mybucket/name_age_data_1.csv", "gs://mybucket/name_age_data_2.csv"]
result = bq.ingest_csvs_from_cloud_bucket(
csv_uris, dataset_name, table_name, skip_leading_rows=1, autodetect_schema=False, timeout=120
)
print(result)
bq.delete_dataset(dataset_name, delete_contents=True, not_found_ok=True)
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file cloudsdp-0.1.11.tar.gz
.
File metadata
- Download URL: cloudsdp-0.1.11.tar.gz
- Upload date:
- Size: 6.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.5.1 CPython/3.10.12 Linux/5.15.0-1042-azure
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | cb1c5bb83a73c986898f3e12ea6938dc591cfa74015f6ba600e0b4c706cd783c |
|
MD5 | 3001cc7dee58b39f05d5a85d14f5cb28 |
|
BLAKE2b-256 | 94acd928e9897cbc623b154bc9e416b28056e7ee97447ce8c7c60311e0161b92 |
File details
Details for the file cloudsdp-0.1.11-py3-none-any.whl
.
File metadata
- Download URL: cloudsdp-0.1.11-py3-none-any.whl
- Upload date:
- Size: 7.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.5.1 CPython/3.10.12 Linux/5.15.0-1042-azure
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 25f26768501fe00a93534c6a8c968efd0bae365375e8ce21451725003b0df38f |
|
MD5 | e43c437e712339c17ced42cabe603cf6 |
|
BLAKE2b-256 | 0ba6b8da92fa8ead1f5fa57d2ad0280f63979f681c0e139b05f65d67a4fd5e49 |