DataHex Utility for AWS
Project description
DataHex AWS Utilities
This python package facilitates commonly used functions that uses AWS features. This package is separated from dhx-utils as it requires boto3 to be installed.
The primary purpose of this package is to be used as an import module. However a simple CLI interface have been introduced for some of the functions so that they can be called from a non-python app (via container).
You can install the library from RZT Nexus using the command:
pip install dhx-utils-aws
Loading Params File
This command is useful for an ingestion job/task to load the params.json file created
by the DataHex Ingestion pipeline.
dhx-utils-aws load-params-file --params-file-path s3://bucket/params.json
or for ECS tasks where the params file is supplied via env variable:
PARAMS_FILE=s3://datahex-dlau-rozettatech/params.json dhx-utils-aws load-params-file
Putting DataHex Events
The DataHex Event Bus undergone a revamp and there is now a new approach on how we emit events onto the DataHex Event Bus. For details on the new approach, please refer to the page: https://rozetta.atlassian.net/wiki/spaces/DHX/pages/1663238237/Proposal+-+Changing+the+way+we+use+AWS+Events+in+DataHex
There are several ways to put an event onto the DataHex Event Bus.
Simple Use Case
This is the simplest use case whereby the caller defines the necessary environment variables as part of the cloud
formation template and call the put_datahex_event() function to put the event.
The environment variables to define are:
${EVENT_BUS_NAME}- The name of the DataHex Event Bus, generally taken from the parameter store path:/${PROJECT}/${ENVIRONMENT}/coreinfra/datahex-event-bus/nameSee thedatahex-core-infragit hub stack for more details. A typical value isdhx-dev-DataHexEventBusNote: In version 1, we had multiple event bus (e.g. DataEventBus, OrgEventBus, DataShopEventBus) but since in version 2, we decided to consolidate them all into a single DataHex event bus to reduce complexity.${SERVICE}- The name of the micro service that is emitting the event. This is generally already defined in your stack. Typical values will beoms,catalog, etc but don't reinvent them as they sure already exist unless you are creating a brand new suite of microservice.${COMPONENT}- This is an optional environment variable to zone in on the exact component that is emitting the event. You can also append sub components to this variable. For examplebackend, orbackend.api, etc.
If you don't provide EVENT_BUS_NAME, you can also provide PROJECT or PRODUCT, alongside ENVIRONMENT and
the module will attempt to define the event bus name using the default value of
${PROJECT}-${ENVIRONMENT}-DataHexEventBus. Note that this library will treat PROJECT and PRODUCT the same way
as there were some confusion over which to use in the earlier part of this project.
Once these environment variables are defined, from within your module, you can call:
from dhx_utils_aws import put_datahex_event
put_datahex_event(detail_type="DataAssetStaged", detail={"data_asset_id": "1234-1234"})
Alternative Use Case
If for any reason you prefer to specify the parameters from your own code instead of relying on the environment
variables, you can construct your own DataHexEventClient object and call the put_event() method directly:
from dhx_utils_aws import DataHexEventBusClient
client = DataHexEventBusClient(project="dhx", env="dev", service="oms", component="backend.api")
client.put_event(detail_type="DataAssetStaged", detail={"data_asset_id": "1234-1234"})
Putting onto other Event Bus
If you have other event bus that you want to put events onto (i.e. not the default DataHex event bus), you can
use the EventBusClient class directly.
from dhx_utils_aws import EventBusClient
client = EventBusClient(event_bus_name="dhx-dev-MyEventBus", source="datahex.oms.api")
client.put_event(detail_type="MyEventType", detail={"key": "value1"})
client.put_event(detail_type="MyEventType", detail={"key": "value2"})
Sending Bus Events using CLI
Bulk sending data staged events
Send a list of "DataAssetStaged" event for a list of data assets.
dhx-utils-aws send-data-staged --assets-file-path assets.json --env dev
The content of the assets.json in the above example MUST comply to the following format:
{
"period": "2021-08-01",
"data_assets": [
{
"data_asset_id": "xxxx-id1",
"data_location": "s3://rawdata-storage-dhx-dev-rozettatech/path/path/part/etc/"
},
{
"data_asset_id": "xxxx-id2",
"data_location": "s3://rawdata-storage-dhx-dev-rozettatech/path/path/part/etc/"
}
]
}
Sending ad hoc bus events
Send an eventbridge event to event buses in the AWS account (assuming your IAM role has the permission).
dhx-utils-aws send-event --bus-name <BUS_NAME>
--source <SOURCE>
--detail-type <DETAIL_TYPE>
--detail <DETAIL>
BUS_NAMEis the name of the event bus to send the event to.SOURCEis the known name of the source that this event is emitted from (e.g.datahex.oms)DETAIL_TYPEprovides the type of the payload that is provided indetail.DETAILis the JSON payload to be sent with the event.
If DETAIL is too big or complex to be typed in, you can always create a JSON file and use the --detail-file
switch instead.
For example to trigger a pipeline given the ID, you can send an event to the DataEventBus, you can type:
dhx-utils-aws send-event \
--bus-name dhx-qa-DataHexEventBus \
--source datahex.user \
--detail-type RunPipelineRequested \
--detail '{"pipeline_id": "ae446cca19dd11ec9a3f6a23280ecd04", "parameters": { "period": "2021-08"}}'
or
cat > payload.json << EOF
{
"pipeline_id": "ae446cca19dd11ec9a3f6a23280ecd04",
"parameters":
{
"period": "2021-08"
}
}
EOF
dhx-utils-aws send-event \
--bus-name dhx-qa-DataEventBus \
--source datahex.user \
--detail-type RunPipelineRequested \
--detail-file payload.json
Triggering an async processing job callback
The async-job-completed command can be used for notifying an ingestion state machine that the async processing
job is now completed.
Sending a SUCCESS async callback using the command line
dhx-utils-aws async-job-completed --status success --task-token ${TASK_TOKEN_FROM_STEP_FUNCTION}
or you can use the environment variable TASK_TOKEN
TASK_TOKEN=${TASK_TOKEN_FROM_STEP_FUNCTION} dhx-utils-aws async-job-completed --status success
Sending a FAILURE async callback
When sending a failed status, you should also provide a reason of why it failed via the error code and cause:
dhx-utils-aws async-job-completed --status failed --task-token ${TASK_TOKEN_FROM_STEP_FUNCTION} \
--error DataError --cause "Invalid exchange code found in data"
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file dhx_utils_aws-1.2.2b212-py3-none-any.whl.
File metadata
- Download URL: dhx_utils_aws-1.2.2b212-py3-none-any.whl
- Upload date:
- Size: 14.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
14887ea10348594851db3dde6f616ab3791261779f50e632e086418e232d017d
|
|
| MD5 |
bc2424c691d5e410cc30d46677523a52
|
|
| BLAKE2b-256 |
a98569a127186da75239dbec7ee3deb90e5ab05ab0b59db4a0e12ca6964958ec
|