Skip to main content

Tapdata Python Sdk

Project description

Tapdata Python Sdk

中文文档地址

Install

  1. Install python 3.7, pip By Yourself.
  2. Run pip install tapdata_cli to install sdk.
  3. If you use poetry, please run poetry add tapdata_cli to install sdk.

Initial

server = "127.0.0.1:3000"
access_code = "3324cfdf-7d3e-4792-bd32-571638d4562f"
from tapdata_cli import cli
cli.init(server, access_code)

Multi-thread concurrency is not supported

It will send a request to the server to obtain the identity information and save it as a global variable. Therefore, after multiple init the 'server' and 'access_code' variable will be overwritten.

For situations where you need to use different servers and access_codes concurrently, use Python's multiprocess.

DataSource

Create DataSource

The SDK supports the following data source operations:

  • Mongo
  • Mysql
  • Postgres
  • Oracle
  • Kafka

To create MySQL/Mongo:

from tapdata_cli import cli

connector = "mongodb"  # datasource type,mongodb mysql postgres
mongo = cli.DataSource("mongodb", name="mongo")
mongo.uri("mongodb://localhost:8080")  # datasource uri
mongo.save()

or:

from tapdata_cli import cli

mongo = cli.DataSource("mongodb", name="mongo")
mongo.host("localhost:27017").db("source").username("user").password("password").props("")
mongo.type("source")  # datasource type,source -> only source,target -> only target,source_and_target -> target and source (default)
mongo.save()  # success -> True, Failure -> False

To Create Oracle database:

from tapdata_cli import cli

datasource_name = "ds_name"  # datasource name
oracle = cli.Oracle(datasource_name)
oracle.thinType("SERVICE_NAME")  # connect type SID/SERVER_NAME (database name/service name)
oracle.host("106.55.169.3").password("Gotapd8!").port("3521").schema("TAPDATA").db("TAPDATA").username("tapdata")
oracle.save()

To create Kafka datasource:

from tapdata_cli import cli

database_name = "kafka_name"
kafka = cli.Kafka(database_name)
kafka.host("106.xx.xx.x").port("9092")
kafka.save()

To create Postgres datasource:

from tapdata_cli import cli

pg = cli.Postgres("jack_postgre") 
pg.host("106.55.169.3").port(5496).db("insurance").username("postgres").password("tapdata").type("source").schema("insurance")
pg.validate()
pg.save()

As for Kafka/Oracle/Postgres, the creation mode is heterogeneous. In the future, a unified interface will be provided in the form of datasource, which is backward compatible and will not affect the existing version.

DataSource List

from tapdata_cli import cli

cli.DataSource().list()

# return struct

{
    "total": 94,
    "items": [{
        "id": "",
        "lastUpdBy": "",
        "name": "",
        "config": {},
        "connection_type": "",
        "database_type": "",
        "definitionScope": "",
        "definitionVersion": "",
        "definitionGroup": "",
        "definitionPdkId": "",
        ...
    }]
}

Get datasource according to ID/name

from tapdata_cli import cli

cli.DataSource(id="")  # by id
cli.DataSource(name="")  # by name

Pipeline

A simple data migration Job

from tapdata_cli import cli

# Create datasource first
source = cli.DataSource("mongodb", name="source").uri("").save()
target = cli.DataSource("mongodb", name="target").uri("").save()
# create Pipeline
p = cli.Pipeline(name="example_job")
p.readFrom("source").writeTo("target")
# start
p.start()
# stop
p.stop()
# delete
p.delete()
# status
p.status()
# get job list
cli.Job.list()

Job is the underlying implementation of pipeline, so you can use job.start() like pipeline.start().

# init job (get job info) by id
from tapdata_cli import cli
job = cli.Job(id="some id string")
job.save() # success -> True, failure -> False
job.start() # success -> True, failure -> False

Data development job

Before performing data development tasks, you need to change the task type to Sync:

from tapdata_cli import cli

source = cli.DataSource("mongodb", name="source").uri("").save()
target = cli.DataSource("mongodb", name="target").uri("").save()
p = cli.Pipeline(name="")
p = p.readFrom("source.player") # source is db, player is table
p.dag.jobType = cli.JobType.sync

Then perform specific operations:

# filter cli.FilterType.keep (keep data) / cli.FilterType.delete (delete data)
p = p.filter("id > 2", cli.FilterType.keep)

# filerColumn cli.FilterType.keep (keep column) / cli.FilterType.delete (delete column)
p = p.filterColumn(["name"], cli.FilterType.delete)

# rename
p = p.rename("name", "player_name")

# valueMap
p = p.valueMap("position", 1) 

# js
p = p.js("return record;")

p.writeTo("target.player")  # target is db, player is table

master slave merge:

# merge
p2 = cli.Pipeline(name="source_2")  # Create merged pipeline
p3 = p.merge(p2, [('id', 'id')]).writeTo("target")  # Merge pipeline

p3.writeTo("target.player")  # target is db, player is table

Create initial_sync/cdc job

By default, all tasks created through pipeline are "full + incremental" job.

You can create a initial_sync job by:

from tapdata_cli import cli

p = cli.Pipeline(name="")
p.readFrom("source").writeTo("target")
config = {"type": "initial_sync"}  # initial_sync
p1 = p.config(config=config)
p1.start()

As above, changing config to {"type": "cdc"} can create an incremental task.

All pipeline configuration modification operations are passed in through the pipeline.config method through the config default parameters, and the parameters are verified.

For more configuration modification items, please see this file, get more configuration items.

API Operation

Update/Create ApiServer

from tapdata_cli import cli

# create
cli.ApiServer(name="test", uri="http://127.0.0.1:3000/").save()

# update
# 1.get ApiServer id
api_server_id = cli.ApiServer.list()["id"]
# 2.update ApiServer
cli.ApiServer(id=api_server_id, name="test_2", uri="http://127.0.0.1:3000/").save()

# delete
cli.ApiServer(id=api_server_id).delete()

Publish Api

from tapdata_cli import cli
cli.Api(name="test", table="source.player").publish() # source is db, player is table

Unpublish APi

from tapdata_cli import cli
cli.Api(name="test").unpublish()

Delete Api

from tapdata_cli import cli
cli.Api(name="test").delete()

Api Status

from tapdata_cli import cli
cli.Api().status("test")  # success -> "pending" or "active" / failure -> None

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

tapdata_cli-2.2.32.tar.gz (71.8 kB view details)

Uploaded Source

Built Distribution

tapdata_cli-2.2.32-py3-none-any.whl (72.5 kB view details)

Uploaded Python 3

File details

Details for the file tapdata_cli-2.2.32.tar.gz.

File metadata

  • Download URL: tapdata_cli-2.2.32.tar.gz
  • Upload date:
  • Size: 71.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.2.1 CPython/3.10.6 Darwin/21.5.0

File hashes

Hashes for tapdata_cli-2.2.32.tar.gz
Algorithm Hash digest
SHA256 79d9ee1c93ccb3e036f3f8fffcb36ab645539a7c4ec5565e9d3455f69aba0fdb
MD5 066a55320515c6fee87e12c3f86c9d95
BLAKE2b-256 319aaf5f5a5c8d794383bf9351967a81a812632703916479bd0096b278492c87

See more details on using hashes here.

Provenance

File details

Details for the file tapdata_cli-2.2.32-py3-none-any.whl.

File metadata

  • Download URL: tapdata_cli-2.2.32-py3-none-any.whl
  • Upload date:
  • Size: 72.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.2.1 CPython/3.10.6 Darwin/21.5.0

File hashes

Hashes for tapdata_cli-2.2.32-py3-none-any.whl
Algorithm Hash digest
SHA256 c39a80175f4e585f2e3bfe333e45944d6eaa6e5a3d2498a8e62520310ce307d6
MD5 21b317ec86e9b81b89f429147f85a250
BLAKE2b-256 ef8f220c83cc1c08b6dd46906e7b47d41fa501c983d38a579d3b38a461704c8d

See more details on using hashes here.

Provenance

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page