For AFS developer to access Datasource
Project description
AFS2-DataSource SDK
The AFS2-DataSource SDK package allows developers to easily access PostgreSQL, MongoDB, InfluxDB, S3 and APM.
Installation
Support Python version 3.6 or later
pip install afs2-datasource
API
DBManager
Init DBManager
DBManager.connect()
DBManager.disconnect()
DBManager.is_connected()
DBManager.is_connecting()
DBManager.get_dbtype()
DBManager.execute_query()
DBManager.create_table(table_name, columns)
DBManager.is_table_exist(table_name)
DBManager.is_file_exist(table_name, file_name)
DBManager.insert(table_name, columns, records, source, destination)
DBManager.delete_file(table_name, file_name)
Init DBManager
With Database Config
Import database config via Python.
from afs2datasource import DBManager, constant
# For PostgreSQL
manager = DBManager(db_type=constant.DB_TYPE['POSTGRES'],
username=username,
password=password,
host=host,
port=port,
database=database,
querySql=querySql
)
# For MongoDB
manager = DBManager(db_type=constant.DB_TYPE['MONGODB'],
username=username,
password=password,
host=host,
port=port,
database=database,
collection=collection,
querySql=querySql
)
# For InfluxDB
manager = DBManager(db_type=constant.DB_TYPE['INFLUXDB'],
username=username,
password=password,
host=host,
port=port,
database=database,
querySql=querySql
)
# For S3
manager = DBManager(db_type=constant.DB_TYPE['S3'],
endpoint=endpoint,
access_key=access_key,
secret_key=secret_key,
bucket_name=bucket_name,
blob_list=[file_name]
# file_name can be directory `models/` or file `test.csv`
)
# For APM
manager = DBManager(db_type=constant.DB_TYPE['APM'],
username=username, # sso username
password=password, # sso password
apmUrl=apmUrl,
machineIdList=[machineId], # APM Machine Id
parameterList=[parameter], # APM Parameter
mongouri=mongouri,
# timeRange or timeLast
timeRange=[{'start': start_ts, 'end': end_ts}],
timeLast={'lastDays:' lastDay, 'lastHours': lastHour, 'lastMins': lastMin}
)
# For Azure Blob
manager = DBManager(db_type=constant.DB_TYPE['AZUREBLOB'],
access_key=access_key,
account_key=account_key,
containers=[{
'container': container_name,
'blobs': {
'files': ['file_name']
'folders': ['folder_name']
}
}]
)
DBManager.connect()
Connect to PostgreSQL, MongoDB, InfluxDB, S3, APM with specified by the given config.
manager.connect()
DBManager.disconnect()
Close the connection. Note S3 datasource not support this function.
manager.disconnect()
DBManager.is_connected()
Return if the connection is connected.
manager.is_connected()
DBManager.is_connecting()
Return if the connection is connecting.
manager.is_connecting()
DBManager.get_dbtype()
Return database type of the connection.
manager.get_dbtype()
DBManager.execute_query()
Return the result in PostgreSQL, MongoDB or InfluxDB after executing the querySql
in config.
Download files which is specified in blob_list
in S3 and Azure Blob config, and return if all files downloaded is successfully.
Return data of Machine
and Parameter
in timeRange
or timeLast
from APM.
# For Postgres, MongoDB, InfluxDB and APM
df = manager.execute_query()
# Return type: DataFrame
"""
Age Cabin Embarked Fare ... Sex Survived Ticket_info Title2
0 22.0 7.0 2.0 7.2500 ... 1.0 0.0 2.0 2.0
1 38.0 2.0 0.0 71.2833 ... 0.0 1.0 14.0 3.0
2 26.0 7.0 2.0 7.9250 ... 0.0 1.0 31.0 1.0
3 35.0 2.0 2.0 53.1000 ... 0.0 1.0 36.0 3.0
4 35.0 7.0 2.0 8.0500 ... 1.0 0.0 36.0 2.0
...
"""
# For S3 and Azure Blob
is_success = manager.execute_query()
# Return Boolean
DBManager.create_table(table_name, columns=[])
Create table in database for Postgres, MongoDB and InfluxDB.
Create Bucket/Container in S3/Azure Blob.
Note: PostgreSQL table_name format schema.table
# For Postgres, MongoDB and InfluxDB
table_name = 'titanic'
columns = [
{'name': 'index', 'type': 'INTEGER', 'is_primary': True},
{'name': 'survived', 'type': 'FLOAT', 'is_not_null': True},
{'name': 'age', 'type': 'FLOAT'},
{'name': 'embarked', 'type': 'INTEGER'}
]
manager.create_table(table_name=table_name, columns=columns)
# For S3 and Azure Blob
bucket_name = 'bucket'
manager.create_table(table_name=bucket_name)
DBManager.is_table_exist(table_name)
Return if the table is exist in Postgres, MongoDB or Influxdb.
Return if the bucket is exist in S3 and Azure Blob.
# For Postgres, MongoDB and InfluxDB
table_name = 'titanic'
manager.is_table_exist(table_name=table_name)
# For S3 and Azure Blob
bucket_name = 'bucket'
manager.is_table_exist(table_name=bucket_name)
DBManager.is_file_exist(table_name, file_name)
Return if the file is exist in Bucket in S3 and Azure Blob.
Note this function only support S3 and Azure Blob.
# For S3 and Azure Blob
bucket_name = 'bucket'
file_name = 'test.csv
manager.is_file_exist(table_name=bucket_name, file_name=file_name)
# Return: Boolean
DBManager.insert(table_name, columns=[], records=[], source='', destination='')
Insert records into table in Postgres, MongoDB or InfluxDB.
Upload file to S3 and Azure Blob.
# For Postgres, MongoDB and InfluxDB
table_name = 'titanic'
columns = ['index', 'survived', 'age', 'embarked']
records = [
[0, 1, 22.0, 7.0],
[1, 1, 2.0, 0.0],
[2, 0, 26.0, 7.0]
]
manager.insert(table_name=table_name, columns=columns, records=records)
# For S3
bucket_name = 'bucket'
source='test.csv' # local file path
destination='test_s3.csv' # the file path and name in s3
manager.insert(table_name=bucket_name, source=source, destination=destination)
# For Azure Blob
container_name = 'container'
source='test.csv' # local file path
destination='test_s3.csv' # the file path and name in s3
manager.insert(table_name=container_name, source=source, destination=destination)
Use APM data source
- Get Hist Raw data from SCADA Mongo data base
- Required
- username: APM SSO username
- password: APM SSO password
- uri: mongo data base uri
- apmurl: APM api url
- machineIdList: APM machine Id list (type:Array)
- parameterList: APM parameter name list (type:Array)
- time range: Training date range
- example:
[{'start':'2019-05-01', 'end':'2019-05-31'}]
DBManager.delete_file(table_name, file_name)
Delete file in bucket in S3 and return if the file is deleted successfully.
Note this function only support S3.
# For S3
bucket_name = 'bucket'
file_name = 'test_s3.csv'
manager.delete_file(table_name=bucket_name, file_name=file_name)
# Return: Boolean
Example
MongoDB Example
from afs2datasource import DBManager, constant
# Init DBManager
manager = DBManager(
db_type=constant.DB_TYPE['MONGODB'],
username={USERNAME},
password={PASSWORD},
host={HOST},
port={PORT},
database={DATABASE},
collection={COLLECTION},
querySql={QUERYSQL}
)
# Connect DB
manager.connect()
# Check the status of connection
is_connected = manager.is_connected()
# Return type: boolean
# Check is the table is exist
table_name = 'titanic'
manager.is_table_exist(table_name)
# Return type: boolean
# Create Table
columns = [
{'name': 'index', 'type': 'INTEGER', 'is_not_null': True},
{'name': 'survived', 'type': 'INTEGER'},
{'name': 'age', 'type': 'FLOAT'},
{'name': 'embarked', 'type': 'INTEGER'}
]
manager.create_table(table_name=table_name, columns=columns)
# Insert Record
columns = ['index', 'survived', 'age', 'embarked']
records = [
[0, 1, 22.0, 7.0],
[1, 1, 2.0, 0.0],
[2, 0, 26.0, 7.0]
]
manager.insert(table_name=table_name, columns=columns, records=records)
# Execute querySql in DB config
data = manager.execute_query()
# Return type: DataFrame
"""
index survived age embarked
0 0 1 22.0 7.0
1 1 1 2.0 0.0
2 2 0 26.0 7.0
...
"""
# Disconnect to DB
manager.disconnect()
S3 Example
from afs2datasource import DBManager, constant
# Init DBManager
manager = DBManager(
db_type = constant.DB_TYPE['S3'],
endpoint={ENDPOINT},
access_key={ACCESSKEY},
secret_key={SECRETKEY},
bucket_name={BUCKET_NAME},
blob_list=['models/', 'dataset/train.csv']
)
# Connect S3
manager.connect()
# Check is the table is exist
bucket_name = 'titanic'
manager.is_table_exist(table_name=bucket_name)
# Return type: boolean
# Create Bucket
manager.create_table(table_name=bucket_name)
# Upload File to S3
local_file = '../test.csv'
s3_file = 'dataset/test.csv'
manager.insert(table_name=bucket_name, source=local_file, destination=s3_file)
# Download files in blob_list
# Download all files in directory
is_success = manager.execute_query()
# Return type: Boolean
# Check if file is exist or not
is_exist = manager.is_file_exist(table_name=bucket_name, file_name=s3_file)
# Return type: Boolean
# Delete the file in Bucket and return if the file is deleted successfully
is_success = manager.delete_file(table_name=bucket_name, file_name=s3_file)
# Return type: Boolean
APM Data source example
APMDSHelper(
username,
password,
apmurl,
machineIdList,
parameterList,
mongouri,
timeRange)
APMDSHelper.execute()
Azure Blob Example
from afs2datasource import DBManager, constant
# Init DBManager
manager = DBManager(
db_type=constant.DB_TYPE['AZUREBLOB'],
access_key={ACCESS_KEY},
access_name={ACCESS_NAME}
containers=[{
'container': {CONTAINER_NAME},
'blobs': {
'files': ['titanic.csv', 'models/train.csv'],
'folders': ['test/']
}
}]
)
# Connect Azure Blob
manager.connect()
# Check is the container is exist
container_name = 'container'
manager.is_table_exist(table_name=container_name)
# Return type: boolean
# Create container
manager.create_table(table_name=container_name)
# Upload File to Azure Blob
local_file = '../test.csv'
azure_file = 'dataset/test.csv'
manager.insert(table_name=container_name, source=local_file, destination=azure_file)
# Download files in `containers`
# Download all files in directory
is_success = manager.execute_query()
# Return type: Boolean
# Check if file is exist in container or not
is_exist = manager.is_file_exist(table_name=container_name, file_name=azure_file)
# Return type: Boolean
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
Built Distribution
Hashes for afs2_datasource-2.1.23-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | fe467992444cccdf3480f11dd86ea1327b8cb669cc7a412516893dd5cce4c712 |
|
MD5 | 005aa375fb527c9a63a5578c00327f79 |
|
BLAKE2b-256 | dc27eaea7a5c8847e38ec987336f1caa47e3619b4ab1518ed63746c6ff9d78bf |