HiDALGO Data Transfer library provides methods to transfer data between different data providers and consumers using NIFI pipelines
Project description
Hidalgo2 Data Transfer Lib
This repository contains the implementation of the Hidalgo2 data transfer library. It uses Apache NIFI to transfer data from different data sources to specified targets
Features
This library is planning to support the following features:
- transfer datasets from Cloud Providers to HDFS
- transfer datasets from Cloud Providers to CKAN
- transfer datasets from/to Hadoop HDFS to/from HPC
- transfer datasets from/to Hadoop HDFS to/from CKAN
- transfer datasets from/to a CKAN to/from HPC
- transfer datasets from/to local filesystem to/from HPC
- transfer datasets from/to local filesystem to/from CKAN
Current version
Current version of the library supports the following features:
- transfer datasets from/to Hadoop HDFS to/from HPC
- transfer datasets from/to Hadoop HDFS to/from CKAN
- transfer datasets from/to a CKAN to/from HPC
- transfer datasets from/to local filesystem to/from CKAN
Implementation
This is a Python library that offers specialized API methods to transfer data from data sources to targets. Each API method launches a NIFI pipeline, by instantiating a NIFI process group out of its workflow definition registered in the NIFI registry. It uses the parameters given within the library method invocation to populate a NIFI parameter context that is asociated to the process group. Then, processors in the process group are executed once (or forever until the incomining processor's flowfile queue gets empty), one after another, following the group sequence flow, until the flow is completed. A processor is executed after the previous one has terminated. To check the status of the transfer command, the library offers another check-status command. Upon termination, the NIFI environment is cleaned up, by removing the created entities (i.e. the process group and its paramenter context). The Data Transfer Library sends requests to NIFI through its REST API.
Requirements
To use the Data Transfer library, it is required the following requirements:
- Python3 execution environment
- Poetry python package management tool (optional)
- NIFI instance, with a NIFI server SSH account (for keys transfer)
- Keycloak instance, with a KEYCLOAK user's account
- HDFS instance, with an user's Kerberos principal account
- CKAN instance, with an user APIKey
Python3 should be installed in the computer where Data Transfer CLI will be used. To install Poetry, follows this instructions
Data Transfer lib configuration
Configuration file
Before using the Data Transfer library, you should configure it to point at the target NIFI and Keycloak services. The default configuration file is located, by default, at the data_transfer_cli/conf/hid_dt.cfg file. However, this default configuration should be complemented (and optionally overriden) with user's specific settings, placed in a configuration file whose location should be also specified (e.g., ~/.dtcli/dtcli.cfg). Settings in this latter user's configuration will override those in the former default library configuration. The user should not modify the default library configuration, but the the user's specific one, including therein any additional required settings (see below) or modifications to the default ones. The location of this user's specific configuration file is passed as parameter to the library when setting it programmatically.
[Nifi]
nifi_endpoint=https://nifi.hidalgo2.eu:9443
nifi_upload_folder=/opt/nifi/data/upload
nifi_download_folder=/opt/nifi/data/download
nifi_secure_connection=True
[Keycloak]
keycloak_endpoint=https://idm.hidalgo2.eu
keycloak_client_id=nifi
keycloak_client_secret=<keycloak_nifi_client_secret>
[Logging]
logging_level=INFO
[Network]
check_status_sleep_lapse=5
Under the NIFI section,
- We define the url of the NIFI service (nifi_endpoint),
- We also specify a folder (nifi_upload_folder) in NIFI server where to upload files
- And another folder (nifi_download_folder) where from to download files. These folder must be accessible by the NIFI service (ask NIFI administrator for details).
- Additionally, you cat set if NIFI servers listens on a secure HTTPS connection (nifi_secure_connection=True) or on a non-secure HTTP (nifi_secure_connection=False)
These default library settings works with the HiDALGO2 NIFI, so not additional modifications are required.
Under the Keycloak section, you can configure the Keycloak integrated with NIFI, specifying:
- The Keycloak service endpoint (keycloak_endpoint)
- The NIFI client in Keycloak (keycloak_client)
- The NIFI secret in Keycloak (keycloak_client_secret). This setting must be set in the user's configuration (e.g., ~/.dtcli/dtcli.cfg).
These default library settings works with the HiDALGO2 Keycloak, so not additional modifications are required, excepting for the NIFI client secret.
Under the Logging section, you can configure the logging level. Logfile *dtcli.log" is located at the workdir of the process that executes the library.
Under the Network section, you can configure the lapse time (in seconds) each processor in the NIFI pipeline is checked for completion. Most of users should leave the default value.
Remember that any modification or addition for the default settings must be placed on the user's specific configuration file (e.g., ~/.dtcli/dtcli.cfg).
Under the Network section, you can configure the lapse time (in seconds) each processor in the NIFI pipeline is checked for complation. Most of users should leave the default value.
Under the Network section, you can configure the lapse time (in seconds) each processor in the NIFI pipeline is checked for complation. Most of users should leave the default value.
HiDALGO2 developers can contact the Keycloak administrator for the keycloak_client_secret
User's accounts
Additional user's accounts are specified in the user's specific configuration file (e.g., ~/.dtcli/dtcli.cfg):
[Nifi]
nifi_server_username=<user_name>
nifi_server_private_key=<path/to/private/key>
[Keycloak]
keycloak_login=<user_name>
keycloak_password=<password>
Under the Nifi section, you must also specify a user account (username, private_key) that grants to upload/download files to the NIFI server (as requested to upload temporary HPC keys or to support local file transfer). This user's account is provided by Hidalgo2 infrastructure provider and it is user's or service's specific.
Under the Keycloak section, you must specify your Keycloak account (username and password). This account granted with access to the NIFI service.
For HiDALGO2 developers, NIFI (Service, Server) and Keycloak accounts are provided by the HiDALGO2 administrator.
Usage
The data transfer library can be invoked following two procedures:
Using user's configuration (e.g., ~/.dtcli/dtcli.cfg)
In this case, user's configuration will be read from a give file, such as ~/.dtcli/dtcli.cfg, whose location is programmatically passed as a parameter upon the setup of the library (see procedure below).
Providing configuration in a dictionary
In this case, the user's configuration is provided in a dictionary, with this structure:
{
'Nifi':
{
'nifi_server_username': '<username>',
'nifi_server_private_key': '<path/to/private/key>'
},
'Keycloak':
{
'keycloak_login': '<username>',
'keycloak_password': '<password>',
'keycloak_client_secret': '<client_secret>'
},
'Logging': {
'logging_level': 'DEBUG'},
'Network': {
'check_status_sleep_lapse': '2'
}
}
In this settings' dictionary you should add the user's specific accounts for Nifi and Keycloak, and optionally, other settings, as shown for the logging level or the sleep lapse time for checking the processors status on the Nifi pipeline. This dictionary is programmatically passed as parameter to the library upon its setup (see procedure below).
The remaining procedure goes as follows:
- In your python code, instantiate a HIDDataTransferConfiguration object and an HIDDataTranfer object The HDIDataTransfer object can be created, by default, using the configuration read from the user's configuration file (or from the provided configuration dictionary), or by providing a dictionary with the Keycloak token, the refresh token, and the expiration time
Example with user's configuration file
from hid_data_transfer_lib.hid_dt_lib import HIDDataTransfer
from hid_data_transfer_lib.conf.hid_dt_configuration import (
HidDataTransferConfiguration
)
# Using Keycloak configuration from users's file
user_config_file = None
if os.path.exists(os.path.expanduser("~/.dtcli/dtcli.cfg")):
user_config_file = os.path.expanduser("~/.dtcli/dtcli.cfg")
config = HidDataTransferConfiguration().configure(
user_config_file=user_config_file,
logging_level=logging.DEBUG)
# Create a HIDDataTransfer object that uses the Keycloak user's configuration
dt_client = HIDDataTransfer(conf=config, secure=True)
# OR
user_config_file = None
if os.path.exists(os.path.expanduser("~/.dtcli/dtcli.cfg")):
user_config_file = os.path.expanduser("~/.dtcli/dtcli.cfg")
config = HidDataTransferConfiguration().configure(
user_config_file=user_config_file,
logging_level=logging.DEBUG)
# Create a HIDDataTransfer object that uses the provided Keycloak token dictionary
keycloak_token = {
"username": <keycloak_username>,
"token": <keycloak_token>,
"expires_in": <keycloak_token_expires_in>,
"refresh_token": <keycloak_refresh_token>
}
dt_client = HIDDataTransfer(
conf=config,
secure=True,
keycloak_token=keycloak_token
)
Example with user's configuration dictionary:
user_config_dict = {...} # See example of dictionary given above
config = HidDataTransferConfiguration().configure(
user_config_dict=user_config_dict,
logging_level=logging.DEBUG)
dt_client = HIDDataTransfer(conf=config, secure=True)
- Invoke any data transfer library method using the created object to tranfer data
pipeline_id, accounting = dt_client.hpc2ckan(
hpc_host=<hpc_endpoint>,
hpc_username=<hpc_username>,
hpc_secret_key_path=<hpc_secret_key>,
ckan_host=<ckan_endpoint>,
ckan_api_key=<ckan_apikey>,
ckan_organization=<ckan_organization>,
ckan_dataset=<ckan_dataset>,
ckan_resource=<ckan_resource>,
data_source=<hpc_source_folder>,
concurrent_tasks=10,
recursive=False,
)
This method returns the id of the data transfer pipeline that NIFI executed and an AccountingInfo object that contains statistics of data transfer, including the pipeline_timespan (or total transfer time) and the flowfiles_sizes, a dictionary whose keys are the names of the transferred files and the values are their lengths.
Data Transfer optimization
You can improve the data transfer rate by setting the optional parameter concurrent_tasks (integer) to the number of concurrent tasks that will be used in the NIFI pipeline (default is 1). The maximum number of tasks that improve the transfer throughput depends on the physical resources of the NIFI server (consult its administrator). The parallel transfer is currently supported to/from HPC and HDFS data servers, but not to/from CKAN (under development)
Support for HPC clusters that require a 2FA token
This library includes methods (suffixed as _2fa) to transfer data to/from HPC clusters that require a 2FA token. These methods offer an optional parameter callback_2fa that points to a method that should return (as str) the 2FA token when invoked by the library. If not set by the method caller, these methods call a default implementation that prompts the user (in the standard input) for the token.
Data transfer process with NIFI
The following UML Sequence Diagram describes the data transfer process for each command, for instance ckan2hpc, leveraging the associated NIFI pipeline. The Data Transfer (DT) Consumer (a client of this library) invokes a ckan2hpc command by following these steps:
- Creates an instance of HidDataTransferConfiguration, which reads the file and environment configuration (see Installation Instructions).
- Creates an instance of HIDDataTransfer with that configuration object, with secure mode activated, and by passing a dictionary with Keycloak token details. A renewable Keycloak token is required to invoke the remote NIFI REST APIs.
This HIDDataTransfer instance acts as the proxy to trigger one or more data transfer requests, by selecting the correspoding data transfer method. In the following, we explain the internal process to trigger a data transfer from CKAN to HPC, but the common internal process is identical to any other data transfer command.
- The DT consumer invokes the HIDDataTransfer ckan2hpc command, passing the required information to identify the CKAN resource to transfer and the destination HPC, including the HPC user's account and data destination path.
- The HIDDataTransfer proxy leverages the NIFIClient class to run the NIFI pipeline for ckan2hpc. In turn, this NIFIClient:
- Instantiates the ckan2hpc pipeline in the NIFI service, taking it from the NIFI registry.
- Uploads the user's HPC keys (if provided) into the NIFI server, for future HPC ssh access. This keys are safekeeping in a temporary folder accessible only by the user and the NIFI service.
- Starts the pipeline in the NIFI service. This concrete pipeline retrieves the source resource from CKAN, keeps it in the NIFI queue and transfers it to the target HPC location using SFTP
- Eventually, if during the data transfer process the keycloak token expires, and additional requests to the REST API of the NIFI service are required, the NIFI Client proxy requests Keycloak to renew the token.
- Once the data transfer process terminates (or in case of failure), the pipeline is cleaned up in the NIFI service, and the keys uploaded to the NIFI server deleted.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file hid_data_transfer_lib-0.3.9.tar.gz.
File metadata
- Download URL: hid_data_transfer_lib-0.3.9.tar.gz
- Upload date:
- Size: 38.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.1.1 CPython/3.12.9 Linux/6.18.0-1-default
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
eb3e37b678c55f6abb4f869cd2d24979260ae2a3b93ae5254023833862b3b2e0
|
|
| MD5 |
a745ef8dd7520b92b37c5c584840e6df
|
|
| BLAKE2b-256 |
cf7e72fed685fadb37479e5ff06f0f4f3b71166a821cf29cd0f63695d6f46a8a
|
File details
Details for the file hid_data_transfer_lib-0.3.9-py3-none-any.whl.
File metadata
- Download URL: hid_data_transfer_lib-0.3.9-py3-none-any.whl
- Upload date:
- Size: 41.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.1.1 CPython/3.12.9 Linux/6.18.0-1-default
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f29018ebb8100a22d14b5fe7185606bb8dde3b14c6adcbcde7be3c3a442ca3c3
|
|
| MD5 |
22bf9581aa4e5bd6909719535243827d
|
|
| BLAKE2b-256 |
6a981346b795eea306fe5c3871dbd5a6b74a78c6f597a6efd55a411e7cd8f529
|