For AFS developer to access Datasource
Project description
AFS2-DataSource SDK
The AFS2-DataSource SDK package allows developers to easily access PostgreSQL, MongoDB, InfluxDB.
Installation
Support Pyton version 3.6 or later
pip install afs2-datasource
API
DBManager
Init DBManager
DBManager.connect()
DBManager.disconnect()
DBManager.is_connected()
DBManager.is_connecting()
DBManager.get_dbtype()
DBManager.execute_query()
DBManager.create_table(table_name, columns)
DBManager.is_table_exist(table_name)
DBManager.insert(table_name, columns, records, source, destination)
Init DBManager
With Database Config
Import database config via Python.
from afs2datasource import DBManager, constant
# For PostgreSQL
manager = DBManager(db_type=constant.DB_TYPE['POSTGRES'],
username=username,
password=password,
host=host,
port=port,
database=database,
querySql=querySql
)
# For MongoDB
manager = DBManager(db_type=constant.DB_TYPE['MONGODB'],
username=username,
password=password,
host=host,
port=port,
database=database,
collection=collection,
querySql=querySql
)
# For InfluxDB
manager = DBManager(db_type=constant.DB_TYPE['INFLUXDB'],
username=username,
password=password,
host=host,
port=port,
database=database,
querySql=querySql
)
# For S3
manager = DBManager(db_type=constant.DB_TYPE['S3'],
endpoint=endpoint,
access_key=access_key,
secret_key=secret_key,
bucket_name=bucket_name,
blob_list=[file_name]
# file_name can be directory `models/` or file `test.csv`
)
DBManager.connect()
Connect to PostgreSQL, MongoDB, InfluxDB, S3 with specified by the given config.
manager.connect()
DBManager.disconnect()
Close the connection. Note S3 datasource not support this function.
manager.disconnect()
DBManager.is_connected()
Return if the connection is connected.
manager.is_connected()
DBManager.is_connecting()
Return if the connection is connecting.
manager.is_connecting()
DBManager.get_dbtype()
Return database type of the connection.
manager.get_dbtype()
DBManager.execute_query()
Return the result in PostgreSQL, MongoDB or InfluxDB after executing the querySql in config. Download files which is specified in blob_list in config, and return if all files downloaded is successfully.
# For Postgres, MongoDB and InfluxDB
df = manager.execute_query()
# Return type: DataFrame
"""
Age Cabin Embarked Fare ... Sex Survived Ticket_info Title2
0 22.0 7.0 2.0 7.2500 ... 1.0 0.0 2.0 2.0
1 38.0 2.0 0.0 71.2833 ... 0.0 1.0 14.0 3.0
2 26.0 7.0 2.0 7.9250 ... 0.0 1.0 31.0 1.0
3 35.0 2.0 2.0 53.1000 ... 0.0 1.0 36.0 3.0
4 35.0 7.0 2.0 8.0500 ... 1.0 0.0 36.0 2.0
...
"""
# For S3
is_success = manager.execute_query()
# Return Boolean
DBManager.create_table(table_name, columns=[])
Create table in database for Postgres, MongoDB and InfluxDB.
Create Bucket in S3.
Note: PostgreSQL table_name format schema.table
# For Postgres, MongoDB and InfluxDB
table_name = 'titanic'
columns = [
{'name': 'index', 'type': 'INTEGER', 'is_primary': True},
{'name': 'survived', 'type': 'FLOAT', 'is_not_null': True},
{'name': 'age', 'type': 'FLOAT'},
{'name': 'embarked', 'type': 'INTEGER'}
]
manager.create_table(table_name=table_name, columns=columns)
# For S3
bucket_name = 'bucket'
manager.create_table(table_name=bucket_name)
DBManager.is_table_exist(table_name)
Return if the table is exist in Postgres, MongoDB or Influxdb.
Return if the bucket is exist in S3.
# For Postgres, MongoDB and InfluxDB
table_name = 'titanic'
manager.is_table_exist(table_name=table_name)
# For S3
bucket_name = 'bucket'
manager.is_table_exist(table_name=bucket_name)
DBManager.insert(table_name, columns=[], records=[], source='', destination='')
Insert records into table in Postgres, MongoDB or InfluxDB.
Upload file to S3
# For Postgres, MongoDB and InfluxDB
table_name = 'titanic'
columns = ['index', 'survived', 'age', 'embarked']
records = [
[0, 1, 22.0, 7.0],
[1, 1, 2.0, 0.0],
[2, 0, 26.0, 7.0]
]
manager.insert(table_name=table_name, columns=columns, records=records)
# For S3
bucket_name = 'bucket'
source='test.csv' # local file path
destination='test_s3.csv' # the file path and name in s3
manager.insert(table_name=bucket_name, source=source, destination=destination)
Example
MongoDB Example
from afs2datasource import DBManager, constant
# Init DBManager
manager = DBManager(
db_type=constant.DB_TYPE['MONGODB'],
username={USERNAME},
password={PASSWORD},
host={HOST},
port={PORT},
database={DATABASE},
collection={COLLECTION},
querySql={QUERYSQL}
)
# Connect DB
manager.connect()
# Check the status of connection
is_connected = manager.is_connected()
# Return type: boolean
# Check is the table is exist
table_name = 'titanic'
manager.is_table_exist(table_name)
# Return type: boolean
# Create Table
columns = [
{'name': 'index', 'type': 'INTEGER', 'is_not_null': True},
{'name': 'survived', 'type': 'INTEGER'},
{'name': 'age', 'type': 'FLOAT'},
{'name': 'embarked', 'type': 'INTEGER'}
]
manager.create_table(table_name=table_name, columns=columns)
# Insert Record
columns = ['index', 'survived', 'age', 'embarked']
records = [
[0, 1, 22.0, 7.0],
[1, 1, 2.0, 0.0],
[2, 0, 26.0, 7.0]
]
manager.insert(table_name=table_name, columns=columns, records=records)
# Execute querySql in DB config
data = manager.execute_query()
# Return type: DataFrame
"""
index survived age embarked
0 0 1 22.0 7.0
1 1 1 2.0 0.0
2 2 0 26.0 7.0
...
"""
# Disconnect to DB
manager.disconnect()
S3 Example
from afs2datasource import DBManager, constant
# Init DBManager
manager = DBManager(
db_type = constant.DB_TYPE['S3'],
endpoint={ENDPOINT},
access_key={ACCESSKEY},
secret_key={SECRETKEY},
bucket_name={BUCKET_NAME},
blob_list=['models/', 'dataset/train.csv']
)
# Connect S3
manager.connect()
# Check is the table is exist
bucket_name = 'titanic'
manager.is_table_exist(table_name=bucket_name)
# Return type: boolean
# Create Bucket
manager.create_table(table_name=bucket_name)
# Upload File to S3
local_file = '../test.csv'
s3_file = 'dataset/test.csv'
manager.insert(table_name=bucket_name, source=local_file, destination=s3_file)
# Download files in blob_list
# Download all files in directory
is_success = manager.execute_query()
# Return type: Boolean
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
Built Distribution
Hashes for afs2_datasource-2.1.18-0617-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 2852fcc8b3ab62743bbd824a6db6af7dd45734588137911c269e12239f743d7e |
|
MD5 | d230784103e586741f50015f5ec01c1f |
|
BLAKE2b-256 | 4d44ab57119fd6ee430fcd07ab23f72271217c79eedd3811625b37a8c12b6be8 |