Skip to main content

Connector Factory;

Project description

connector-factory

License: MIT PyPI

Connector factory is used to manage/create connection with execute queries using the connection. The concept of having single source to connect various sources and perform operations on top of it.

User need not to worry on the crafting the connection string and to identify the methods for the basic operations. Connector factory supports DML / DDL executions and have support of Pandas DataFrame to create or replace existing tables.

Connector factory is wrapper on various sources and for database source it uses sqlalchemy for crafting the connection and supports below databases:

* Sqlite3
* PostgreSQl
* Snowflake
* MariaDB
* MySQL
* Redshift
* Salesforce
* S3Select
* Databricks
* Synapse
* DynamoDB
* IBM DB2
  • Note: Only select operations are supported for Salesforce and S3Select and limited functionality. These are not managed by the usage of sqlalchemy.

  • Note: Connector factory can be enhanced for all the sqlalchemy supported database.

  • Note: DynamoDB is supported via PyDynao. Please check the features and limitations at PyDynamoDB.

Connector factory also provide the facility to create the connections for Salesforce using simple-saleforce python package. Only Select query is supported in Salesforce. For advance usage it exposes the Salesforce object (simple-saleforce python package) which can be used as per the requirements. Refer simple-saleforce for more detail.

Getting Started

pip install connector-factory

By default connector-factory is installed with only SQlite3 support only. For other support it can be installed using combinations of below flags

  • postgres
  • redshift
  • snowflake
  • mysql
  • mariadb
  • salesforce
  • s3select
  • databricks
  • synapse
  • db2
  • dynamodb
  • all (For all supported types)
pip install connector-factory["postgres"]

Using connector-factory


from connector_factory import ConnectorFactory
import tempfile

temp_dir = tempfile.gettempdir()
config = {
  "path": temp_dir,
  "database": "site.db"
  }
db = ConnectorFactory(connector_type="sqlite", config=config)

db.execute_sql(sql="create table test (id int PRIMARY KEY)")
db.execute_sql(sql="insert into test values (1)")
db.execute_sql(sql="insert into test values (2)")

rows = db.execute_sql(sql="select * from test")
if rows:
  print(rows)


df = db.get_df(sql="select * from test")
print(df)

db.execute_df(panda_df=df, table_name="copy_test", exist_action="replace")
# db.execute_df(panda_df=df, table_name=copy_test, exist_action="replace", chunk_size=100)
db.execute_sql(sql="insert into copy_test values (3)")
rows_copy = db.execute_sql(sql="select * from copy_test")
if rows_copy:
  print(rows_copy)

Appendix

Supported database type:


*   sqlite (default)
*   postgres
*   mysql
*   mariadb
*   snowflake
*   redshift
*   salesforce
*   s3select
*   databricks
*   synapse
*   db2
*   dynamodb

Configuration parameters for sqlite:



* connector_type: sqlite
* config = {
    "path": "<folder_path_where_sqlite_database_file_exist>",
    "database": "<name_of_sqlite_database_file>"
}

Details:

  • path: (Optional)=> Default user home directory. Path to folder where flat sqlite database file is present.
  • database (Required)=> .db is optional, if not present .db will attach to look for database flat file. If file not present will create the file.

Connection parameters for postgres:



* connector_type: postgres
* config = {
    "username": "<postgres_user>",
    "password": "<user_password>",
    "host": "<host_of_postgres_service>",
    "port": "<port_of_postgres_service>",
    "database": "<name_of_database>"
}

Details:

  • username: (Required)=> Name of user for connection.
  • password (Required)=> Password of user.
  • host: (Required)=> Host of PostgreSql service.
  • port: (Optional)=> Default 5432. Port of PostgreSql service.
  • database: (Optional)=> If provided will be used as default database in connection. If not then query should comply with fully qualified path to table.

Connection parameters for mysql:



* connector_type: mysql
* config = {
    "username": "<mysql_user>",
    "password": "<user_password>",
    "host": "<host_of_mysql_service>",
    "port": "<port_of_mysql_service>",
    "database": "<name_of_database>"
}

Details:

  • username: (Required)=> Name of user for connection.
  • password (Required)=> Password of user.
  • host: (Required)=> Host of MySQL service.
  • port: (Optional)=> Default 3306. Port of MySQL service.
  • database: (Optional)=> If provided will be used as default database in connection. If not then query should comply with fully qualified path to table.

Connection parameters for mariadb:



* connector_type: mariadb
* config = {
    "username": "<mariadb_user>",
    "password": "<user_password>",
    "host": "<host_of_mariadb_service>",
    "port": "<port_of_mariadb_service>",
    "database": "<name_of_database>"
}

Details:

  • username: (Required)=> Name of user for connection.
  • password (Required)=> Password of user.
  • host: (Required)=> Host of MariaDB service.
  • port: (Optional)=> Default 3306. Port of MariaDB service.
  • database: (Optional)=> If provided will be used as default database in connection. If not then query should comply with fully qualified path to table.

Connection parameters for snowflake:



* connector_type: snowflake
* config = {
    "username": "<snowflake_user>",
    "password": "<user_password>",
    "role": "<snowflake_role>",
    "account": "<snowflake_account>",
    "warehouse": "<snowflake_warehouse>",
    "schema": "<snowflake_schema>",
    "database": "<name_of_database>",
    "key": "<private_key_path>"
}

Details:

  • username: (Required)=> Name of user for connection.
  • account: (Required)=> Snowflake account name.
  • password (Optional)=> Password of user/private key. Required if private key is not provided.
  • role: (Optional)=> If not provided user default role will be used. Consider USE statement to switch from default role.
  • database: (Optional)=> If provided will be used as default database in connection. If not then query should comply with fully qualified path to table.
  • warehouse: (Optional)=> If not provided user default warehouse will be used. Consider USE statement to switch from default warehouse.
  • schema: (Optional)=> If not provided default public schema will be used. Consider USE statement to switch from default schema or fully qualified path to table. Ignored if database is not proivided.
  • key: (Optional)=> Either Key or Password is required. If key is present and password is also present then password will be used to decrypt the key. If password is not given then consider the unencrypted key. We strongly recommended to use only encrypted key.

Connection parameters for redshift:



* connector_type: redshift
* config = {
    "username": "<redshift_user>",
    "password": "<user_password>",
    "host": "<host_of_redshift_service>",
    "port": "<port_of_redshift_service>",
    "database": "<name_of_database>"
}

Details:

  • username: (Required)=> Name of user for connection.
  • password (Required)=> Password of user.
  • host: (Required)=> Host of Redshift service.
  • port: (Optional)=> Default 5469. Port of Redshift service.
  • database: (Required)=> Name of database for connection.

Connection parameters for Salesforce:



* connector_type: salesforce
* config = {
    "username": "<salesforce_user>",
    "password": "<user_password>",
    "domain": "<domain_of_salesforce_service>",
    "token": "<user_token_for_salesforce_service>"
}

Details:

  • username: (Required)=> Name of user for connection.
  • password (Required)=> Password of user.
  • domain: (Required)=> Host of Salesforce service.
  • token: (Required)=> User token for Salesforce service.

Connection parameters for S3select:



* connector_type: s3select
* config = {
    "bucket": "<AWS_S3_bucket>",
    "file": "<Path_of_file_in_bucket>",
    "type": "<file_type>",
    "limit": "<limit_of_record_to_fetch>",
    "compression": "<compression_type>"
}

Details:

  • bucket: (Required)=> Name of AWS S3 bucket to look for files for queries.
  • type: (Required)=> Supported types is one of csv, json or parquet.
  • file (Required)=> Full file path without bucket or prefix key to serach for similar files as per the type of files provided. First line is considered as header of file for csv. Example: if only prefix is provided like mypath/mydatafile and type is csv then all files named as mypath/mydatafile*.csv will be read.
  • limit: (Optional)=> Commulative number of records to read from the file (multiple files).
  • compression: (Optional)=> Supported types is one of GZIP, BZIP2 or NONE.

Connection parameters for Synapse:



* connector_type: synapse
* config = {
    "username": "<synapse_user>",
    "password": "<user_password>",
    "host": "<hostname_of_synapse_service>",
    "port": "<port_of_synapse_service>",
    "database": "<database_name>",
    "driver": "<driver_to_connect_synapse_service>",
    "trust_certificate": "<trust_the_synapse_connection>",
    "authentication_with": "<authentication_type_of_synapse_service>",
    "connection_timeout": "<connection_timeout_to_synapse_service>"
}

Details:

  • username: (Required)=> Name of user for connection.
  • password (Required)=> Password of user.
  • host: (Required)=> Host of Synapse service.
  • port: (Optional)=> Port of Synapse service. Default to 1433
  • database: (Required)=> Database name.
  • driver: (Optional)=> Name of driver to use. Default to ODBC Driver 17 for SQL Server.
  • trust_certificate: (Optional)=> Trust the connection with certificate. Default to no.
  • authentication_with: (Optional)=> Default to None and not in use to authenticate with. If service principal or AD authenitication is used then pass appropriate value.
  • connection_timeout: (Optional)=> Connection timeout. Default to 30.

Connection parameters for Databricks:



* connector_type: databricks
* config = {
    "hostname": "<hostname_of_databricks_service>",
    "token": "<pat_token_of_databricks_as_password>",
    "http_path": "<path_of_warehouse_or_cluster>",
    "catalog": "<catalog_name_of_databricks>",
    "schema": "<schema_name_of_databricks>",
}

Details:

  • hostname: (Required)=> Databricks hostname.
  • token (Required)=> Pat token of user.
  • http_path: (Required)=> Path of warehouse or cluster.
  • catalog: (Required)=> Unity catalog name
  • schema: (Required)=> Schema name under catalog.

Connection parameters for IBM DB2:



* connector_type: db2
* config = {
    "username": "<hostname_of_databricks_service>",
    "password": "<pat_token_of_databricks_as_password>",
    "host": "<path_of_warehouse_or_cluster>",
    "port": "<catalog_name_of_databricks>",
    "database": "<schema_name_of_databricks>",
}

Details:

  • username: (Required)=> Name of user for connection.
  • password (Required)=> Password of user.
  • host: (Required)=> Host of IBM DB2 Server.
  • port: (Optional)=> Port of IBM DB2 Server. Default to 50000
  • database: (Required)=> Database name.

Connection parameters for DynamoDB:



* connector_type: dynamodb
* config = {
    "region": "<aws_region_of_dynamodb>"
}

Details:

  • region: (Optional)=> AWS Region. Default is us-east-1

Development Setup

Using virtualenv

python3 -m venv venv
source env/bin/activate
pip install .["all"]

Contributing

  1. Fork repo- https://github.com/shrivastava-v-ankit/connector-factory.git

  2. Create your feature branch - git checkout -b feature/name

  3. Install Python packages under all virtual environments of Python 3.9, 3.10, 3.11, 3.12

    • pip install .["all"]
    • pip install coverage==7.2.3
    • pip install exceptiongroup==1.1.1
    • pip install pluggy==1.0.0
    • pip install pytest==7.3.0
    • pip install pytest-cov==4.0.0
    • pip install tomli==2.0.1
  4. Run Python test (pytest)

    • For Python 3.9:
      pytest -v --cov --cov-report=html:test-results/connection_factory_test/htmlcov.3.9 --cov-report=xml:test-results/connection_factory_test/coverage.3.9.xml --junitxml=test-results/connection_factory_test/results.3.9.xml
      
    • For Python 3.10:
      pytest -v --cov --cov-report=html:test-results/connection_factory_test/htmlcov.3.10 --cov-report=xml:test-results/connection_factory_test/coverage.3.10.xml --junitxml=test-results/connection_factory_test/results.3.10.xml
      
    • For Python 3.11:
      pytest -v --cov --cov-report=html:test-results/connection_factory_test/htmlcov.3.11 --cov-report=xml:test-results/connection_factory_test/coverage.3.11.xml --junitxml=test-results/connection_factory_test/results.3.11.xml
      
    • For Python 3.12:
      pytest -v --cov --cov-report=html:test-results/connection_factory_test/htmlcov.3.12 --cov-report=xml:test-results/connection_factory_test/coverage.3.12.xml --junitxml=test-results/connection_factory_test/results.3.12.xml
      
  5. Add Python test (pytest) and covrage report for new/changed feature generated under test-results/connection_factory_test/

  6. Commit your changes - git commit -am "Added name"

  7. Push to the branch - git push origin feature/name

  8. Create a new pull request

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

connector_factory-0.0.5.tar.gz (27.0 kB view details)

Uploaded Source

Built Distribution

connector_factory-0.0.5-py3-none-any.whl (29.8 kB view details)

Uploaded Python 3

File details

Details for the file connector_factory-0.0.5.tar.gz.

File metadata

  • Download URL: connector_factory-0.0.5.tar.gz
  • Upload date:
  • Size: 27.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.0 CPython/3.10.12

File hashes

Hashes for connector_factory-0.0.5.tar.gz
Algorithm Hash digest
SHA256 b639014628719ae4c473ebb1e9df764ce0f49fae4f89105d1f95411f61d5c56e
MD5 3ece221cfee5d0da8afc3bc98a1b7473
BLAKE2b-256 5a03fdc8bde231f4a25ff2cf85ea454f5158ba224196e082bb37e90f1feacd04

See more details on using hashes here.

File details

Details for the file connector_factory-0.0.5-py3-none-any.whl.

File metadata

File hashes

Hashes for connector_factory-0.0.5-py3-none-any.whl
Algorithm Hash digest
SHA256 30286f5a21662fdbc7d1e2bfe60714811cd38822d940bf8672d851329c6b5292
MD5 3d3073e1192fb8e393a283a736aba2e4
BLAKE2b-256 a741398e53b06e963ea6b68daa8815d205208bda11420a0fe9a9ae6878348cc5

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page