Skip to main content

No project description provided

Project description

Release PyPI Downloads

Table of Contents

What is tapsh

tapsh is a newly launched programming API framework for the TapData Live Data Platform. It allows developers and data engineers to build data pipelines and models using a simple powerful programming language.

This release includes a Python SDK. tapsh requires a connection to a TapData Cluster, which can be either the enterprise, cloud, or community version, to operate.

Tapdata Live Data Platform

Why is a Programmatic Approach Needed ?

TapData currently provides a visual drag-and-drop UI for building data pipelines and workflows. This interface offers significant advantages in terms of ease of use and maintenance, but it also has limitations in certain areas. tapsh aims to effectively address these limitations by offering a programmatic approach as a valuable complement.

  1. Supporting Developers with Custom Logic: When Complex Processing Needs Code

For complex tasks like data masking or custom field value standardization, a significant amount of code is often required. When frequent changes are needed, relying on UI-based operations becomes less convenient.

  1. Better Support for CI/CD and Automation

Teams often need to deploy and update data integration tasks across multiple environments (such as development, testing, and production). With a programmatic API, tasks can be automatically generated and migrated through scripts, making it easy to integrate with version control tools like Git, without the need for manual operations in the GUI.

  1. Reducing Operational Costs in Complex Scenarios

Users often need to synchronize 100+ database tables, each with different field mapping rules. Manually setting these rules in the GUI is not only time-consuming but also prone to errors. With a programmatic API, these rules can be automatically generated, streamlining the process and reducing the chance of mistakes.

  1. Developer-Friendly and Easier Workflow Integration

Providing tools designed for developers aligns more closely with their workflows, making it easier to integrate with other business modules. Open source code capabilities also make the product more extensible, allowing for the addition of reusable internal components and other custom features.

tapsh Usage Example

Let's assume we have a CRM application running on a MySQL database. Below is the schema for this MySQL database:

Mysql Schema

Due to performance considerations and specific requirements for wide tables, we need to replicate order data from MySQL to MongoDB in order to publish an order query API. We will use tapsh to copy the data from MySQL to MongoDB, while performing some data transformations and merging operations along the way.

Installing tapsh

# pip3 install tapsh

The tapsh Python SDK supports two modes: programmatic execution or interactive mode. In the following example, we'll demonstrate how to use the tapsh API in interactive mode.

# tap

--- Please configure TapData cluster ---

Tap Flow requires TapData Live Data Platform(LDP) cluster to run. 
If you would like to use with TapData Enterprise or TapData Community, type L to continue. 
If you would like to use TapData Cloud, or you are new to TapData, type C or press ENTER to continue.
Please type L or C (L/[C]):

Once you enter the relevant TapData connection information or keys, you can start using tapsh in interactive mode.

For more installation details, please refer to the Quick Start documentation: https://docs.tapdata.net/tapsh/quick-start

Create Source and Target Database Connections

tap> mysql_conn = DataSource('mysql', 'MySQL_ECommerce', 
                    {
                    'database': 'ECommerceData',  
                    'port': 3306,                 
                    'host': 'demo.tapdata.io',      
                    'username': 'demouser',  
                    'password': 'demopass'   
                    })
                    .type('source')
                    .save()                  

tap> mongodb_conn = DataSource("mongodb", "MongoDB_ECommerce",
                    {
                        "uri": "mongodb://your_username:your_passwd@192.168.1.18:27017/ecommerce?authSource=admin"
                    })
                    .type("target")
                    .save()

Create a Simple Order Table Replication Task (Data Flow)

Objective: Sync order data from MySQL to MongoDB for unified order queries.

tap> myflow = Flow("simple_data_replication_flow") \
       .read_from("MySQL_ECommerce.ecom_orders") \
       .write_to("MongoDB_ECommerce.orders_replica_collection") \
       .save()
tap> myFlow.start()

Exclude Fields

Objective: The target table does not require all fields from the source table.

tap> myflow = Flow("order_fields_exclude_flow") \
       .read_from("MySQL_ECommerce.ecom_orders") \
       .exclude("order_delivered_carrier_date", "order_delivered_customer_date")  \
       .write_to("MongoDB_ECommerce.orders_renamed_fields_collection") \
       .save()
tap> myFlow.start()

Field Rename

tap> myflow = Flow("order_fields_rename_flow") \
       .read_from("MySQL_ECommerce.ecom_orders") \
       .rename_fields({'order_purchase_timestamp': 'purchase_timestamp'})  \
       .write_to("MongoDB_ECommerce.orders_exclude_fields_collection") \
       .save()
tap> myFlow.start()

Perform Complex Custom Processing with Python Script

# Standardize the order_status field value
tap> def pyfunc(record):
    if not record['order_status'] :
        record['order_status']  = 'invalid'
    if record['order_status'] == 'SendError' :
        record['order_status'] = 'undeliverable'
    return record # Returns the processed records

# Create a data flow task, apply the Python function, and write the data to the target database
tap> myflow = Flow("orders_complex_data_processing_flow") \
       .read_from("MySQL_ECommerce.ecom_orders") \
       .py(pyfunc) \
       .write_to("MongoDB_ECommerce.orders_processed_collection") \
       .save()
tap> myFlow.start()

Use Lookup to Enrich Customer Information in the Order Table

tap> myflow = Flow("orders_lookup_flow") \
       .read_from("MySQL_ECommerce.ecom_orders") \
       .lookup("MySQL_ECommerce.ecom_customers", relation=[["customer_id", "customer_id"]]) \
       .write_to("MongoDB_ECommerce.wide_orders_collection") \
       .save()
tap> myFlow.start()

Final Result In Monogdb

tap> use MongoDB_ECommerce
datasource switch to: MongoDB_ECommerce

tap> peek wide_orders_collection
table wide_orders_collection has 12641 records
{
  'order_status': 'unavailable',
  'order_purchase_timestamp': '2017-10-21 19:32:06',
  'customer_state': 'SP',
  'customer_unique_id': 'a77550dd00887c5bb24100ccbd08cbe9',
  'order_estimated_delivery_date': '2017-11-03 00:00:00',
  '_id': '676391b15338b293cc8525f1',
  'customer_id': '3a92efdb6e6163dc1734d44f2f5f6d04',
  'order_id': '0010dedd556712d7bb69a19cb7bbd37a',
  'order_approved_at': '2017-10-24 03:25:32',
  'customer_city': 'sao paulo',
  'customer_zip_code_prefix': '04851'
}
{
  'order_status': 'unavailable',
  'order_purchase_timestamp': '2017-02-14 14:49:29',
  'customer_state': 'SP',
  'customer_unique_id': '6968d01009b952ed987de42a239d7fa0',
  'order_estimated_delivery_date': '2017-03-13 00:00:00',
  '_id': '676391b15338b293cc8525f3',
  'customer_id': '1541ebabf956d17f3afe883790bd7dd3',
  'order_id': '00bca4adac549020c1273714d04d0208',
  'order_approved_at': '2017-02-14 15:03:38',
  'customer_city': 'campinas',
  'customer_zip_code_prefix': '13090'
}
{
  'order_status': 'canceled',
  'order_purchase_timestamp': '2017-04-10 00:45:56',
  'customer_state': 'SC',
  'customer_unique_id': 'b9badb100ff8ecc16a403111209e3a06',
  'order_estimated_delivery_date': '2017-05-04 00:00:00',
  '_id': '676391b15338b293cc8525f4',
  'customer_id': 'e3626ed5d1f2e3a02d692ec948b25eeb',
  'order_id': '00ff0cf5583758e6964723e42f111bf4',
  'order_approved_at': '2017-04-10 01:03:29',
  'customer_city': 'florianopolis',
  'customer_zip_code_prefix': '88025'
}
{
  'order_status': 'canceled',
  'order_purchase_timestamp': '2018-02-08 21:09:18',
  'customer_state': 'SP',
  'customer_unique_id': '2fbe21ee78c83f908ed47ba9f4a74121',
  'order_estimated_delivery_date': '2018-02-22 00:00:00',
  '_id': '676391b15338b293cc8525f6',
  'customer_id': '19e9dac43eee8df98092482fdce676da',
  'order_id': '0166cc8756d58b4a30fb49de83527120',
  'order_delivered_carrier_date': '2018-02-09 23:12:30',
  'order_approved_at': '2018-02-08 21:27:56',
  'customer_city': 'jundiai',
  'customer_zip_code_prefix': '13203'
}
{
  'order_status': 'unavailable',
  'order_purchase_timestamp': '2017-01-31 17:48:07',
  'customer_state': 'MA',
  'customer_unique_id': '859801343e938e403027c0668f64f037',
  'order_estimated_delivery_date': '2017-03-10 00:00:00',
  '_id': '676391b15338b293cc8525f7',
  'customer_id': 'cf52d3f2132b17f52fdf4b40c135163e',
  'order_id': '01cb6d702e5233235f4125309d184bf4',
  'order_approved_at': '2017-02-01 02:45:37',
  'customer_city': 'sao luis',
  'customer_zip_code_prefix': '65054'
}

For more commands references, please refer to this documentation: https://docs.tapdata.net/tapsh/tapcli-reference

Run Python Code using tapsh command

# cat order_mview.py
from tapsh.lib import *
from tapsh.cli.cli import init
                    
mysql_conn = DataSource('mysql', 'MySQL_ECommerce', 
                    {
                    'database': 'ECommerceData',  
                    'port': 3306,                 
                    'host': 'demo.tapdata.io',      
                    'username': 'demouser',  
                    'password': 'demopass'   
                    })
                    .type('source')
                    .save()   
                    
mongodb_conn = DataSource("mongodb", "MongoDB_ECommerce",
                    {
                        "uri": "mongodb://your_username:your_passwd@192.168.1.18:27017/ecommerce?authSource=admin"
                    })
                    .type("target")
                    .save()

def pyfunc(record):
    if not record['order_status'] :
        record['order_status']  = 'invalid'
    if record['order_status'] == 'SendError' :
        record['order_status'] = 'undeliverable'
    return record
         
myflow = Flow("mysql_order_flow") \
       .read_from("MySQL_ECommerce.ecom_orders") \
       .exclude("order_delivered_carrier_date", "order_delivered_customer_date")  \
       .rename_fields({'order_purchase_timestamp': 'purchase_timestamp'})  \
       .py(pyfunc)
       .lookup("MySQL_ECommerce.ecom_customers", relation=[["customer_id", "customer_id"]]) \
       .write_to("MongoDB_ECommerce.wide_orders_collection") \
       .save()

myflow.start() # Start the flow
       
# python order_mview.py

Known Issue

  • Currently, Lookup can only be used with MongoDB as the target.

tapsh Roadmap

We will continue to enhance tapsh's capabilities. Here are some medium- to long-term features on the roadmap:

  • Lookup support for more target databases beyond MongoDB
  • Support for Project management
  • Support for inner joins
  • Java SDK and RESTful API availability

About TapData Live Data Platform

If you're not yet familiar, TapData is a real-time data platform designed specifically for enterprise-level real-time data integration and data services. It has the following features:

  1. Framework Designed for Real-Time Data Pipelines: Based on CDC (Change Data Capture) technology, data collection and processing latency can be sub-second.
  2. High Performance: Each node can process hundreds of thousands of records per second.
  3. Built-in Rich CDC Connectors: Quickly integrate with Oracle, DB2, Sybase, MySQL, PostgreSQL, MSSQL, and more.
  4. Rich Real-Time Data Processing Functions: Including filtering, renaming, and adding/removing fields.
  5. Multi-Table Associations: Build continuously updated materialized views and aggregate data.
  6. UDF Support: Custom JavaScript or Python functions to handle complex logic.
  7. At Least Once and Exactly Once Consistency Guarantees.
  8. Comprehensive Data Validation: Full and incremental validation, hash validation, secondary checks, etc.
  9. Support for Domestic (Mainland China) Databases: Dameng, Kingbase, GaussDB, OceanBase, GBase, VastBase.
  10. Kafka Support: Act as a producer to push database events directly to Kafka or consume events from Kafka queues.
  11. Private and Fully Managed Deployment Options: Can be deployed offline with the open-source version or use the fully managed service on TapData Cloud.

Common Use Cases of TapData

  1. Real-Time Database Synchronization (Replacing Oracle GoldenGate)

Traditional database replication tools are often expensive and complex. tapsh offers a lightweight, easy-to-use alternative that efficiently syncs data across different databases, supporting everything from MySQL to PostgreSQL and even NoSQL databases.

  1. Real-Time Data Pipeline (Replacing Kafka)

For scenarios requiring real-time data transmission, tapsh is a powerful alternative to Kafka. It doesn't require the deployment of complex Kafka clusters, but instead provides an equally or even more efficient data pipeline construction in a lightweight manner.

  1. Creating Continuously Refreshed Materialized Views for Query Acceleration and Read-Write Separation

When real-time query results are needed, materialized views offer an efficient solution. tapsh can continuously refresh materialized views (sub-second latency), ensuring data freshness and supporting real-time analysis and decision-making.

  1. Real-Time Data Ingestion into Data Warehouses or Data Lakes

The trend in modern data analytics is real-time processing. tapsh can write data in real-time into data warehouses or data lakes (e.g., Apache Doris, ClickHouse, or cloud data warehouses like Ali Cloud ADB, SelectDB, BigQuery).

  1. General-Purpose Streaming ETL Data Processing

tapsh also supports complex ETL (Extract, Transform, Load) tasks. With the flexibility of Python and built-in processing capabilities, developers can easily handle complex data transformation needs.

Join Our Community

We welcome you to join our community and interact with us. You can:

Stargazers over time

Stargazers over time

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

tapsh-0.2.83a1.tar.gz (126.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

tapsh-0.2.83a1-py3-none-any.whl (167.3 kB view details)

Uploaded Python 3

File details

Details for the file tapsh-0.2.83a1.tar.gz.

File metadata

  • Download URL: tapsh-0.2.83a1.tar.gz
  • Upload date:
  • Size: 126.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.2

File hashes

Hashes for tapsh-0.2.83a1.tar.gz
Algorithm Hash digest
SHA256 c6aced161eff06d0734130f917ede9929664b0cbed7477c6b54a792cb3559621
MD5 a3cdf4c56fdb77708a53e2795b2d400a
BLAKE2b-256 40043b896a3d9b89b64f657bb76d6b6a2e528e73ca0c05b1c8fcd8ff6d0d7d20

See more details on using hashes here.

File details

Details for the file tapsh-0.2.83a1-py3-none-any.whl.

File metadata

  • Download URL: tapsh-0.2.83a1-py3-none-any.whl
  • Upload date:
  • Size: 167.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.2

File hashes

Hashes for tapsh-0.2.83a1-py3-none-any.whl
Algorithm Hash digest
SHA256 0efd147c936ddd69e05638fce0bcd12e95c3d024c278cba9af904f7e7fd97bfe
MD5 79008aeec647e733de29c2fc6d41fa7b
BLAKE2b-256 56699e98f4b1469d823904945992d07b7ecf11bf747f93077e3c5bb9b96d3c71

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page