Skip to main content

DataHex Utility for AWS

Project description

DataHex AWS Utilities

This python package facilitates commonly used functions that uses AWS features. This package is separated from dhx-utils as it requires boto3 to be installed.

The primary purpose of this package is to be used as an import module. However a simple CLI interface have been introduced for some of the functions so that they can be called from a non-python app (via container).

You can install the library from RZT Nexus using the command:

pip install dhx-utils-aws

Loading Params File

This command is useful for an ingestion job/task to load the params.json file created by the DataHex Ingestion pipeline.

dhx-utils-aws load-params-file --params-file-path s3://bucket/params.json

or for ECS tasks where the params file is supplied via env variable:

PARAMS_FILE=s3://datahex-dlau-rozettatech/params.json dhx-utils-aws load-params-file

Putting DataHex Events

The DataHex Event Bus undergone a revamp and there is now a new approach on how we emit events onto the DataHex Event Bus. For details on the new approach, please refer to the page: https://rozetta.atlassian.net/wiki/spaces/DHX/pages/1663238237/Proposal+-+Changing+the+way+we+use+AWS+Events+in+DataHex

There are several ways to put an event onto the DataHex Event Bus.

Simple Use Case

This is the simplest use case whereby the caller defines the necessary environment variables as part of the cloud formation template and call the put_datahex_event() function to put the event.

The environment variables to define are:

  • ${EVENT_BUS_NAME} - The name of the DataHex Event Bus, generally taken from the parameter store path: /${PROJECT}/${ENVIRONMENT}/coreinfra/datahex-event-bus/name See the datahex-core-infra git hub stack for more details. A typical value is dhx-dev-DataHexEventBus Note: In version 1, we had multiple event bus (e.g. DataEventBus, OrgEventBus, DataShopEventBus) but since in version 2, we decided to consolidate them all into a single DataHex event bus to reduce complexity.
  • ${SERVICE} - The name of the micro service that is emitting the event. This is generally already defined in your stack. Typical values will be oms, catalog, etc but don't reinvent them as they sure already exist unless you are creating a brand new suite of microservice.
  • ${COMPONENT} - This is an optional environment variable to zone in on the exact component that is emitting the event. You can also append sub components to this variable. For example backend, or backend.api, etc.

If you don't provide EVENT_BUS_NAME, you can also provide PROJECT or PRODUCT, alongside ENVIRONMENT and the module will attempt to define the event bus name using the default value of ${PROJECT}-${ENVIRONMENT}-DataHexEventBus. Note that this library will treat PROJECT and PRODUCT the same way as there were some confusion over which to use in the earlier part of this project.

Once these environment variables are defined, from within your module, you can call:

from dhx_utils_aws import put_datahex_event

put_datahex_event(detail_type="DataAssetStaged", detail={"data_asset_id": "1234-1234"})

Alternative Use Case

If for any reason you prefer to specify the parameters from your own code instead of relying on the environment variables, you can construct your own DataHexEventClient object and call the put_event() method directly:

from dhx_utils_aws import DataHexEventBusClient

client = DataHexEventBusClient(project="dhx", env="dev", service="oms", component="backend.api")
client.put_event(detail_type="DataAssetStaged", detail={"data_asset_id": "1234-1234"})

Putting onto other Event Bus

If you have other event bus that you want to put events onto (i.e. not the default DataHex event bus), you can use the EventBusClient class directly.

from dhx_utils_aws import EventBusClient

client = EventBusClient(event_bus_name="dhx-dev-MyEventBus", source="datahex.oms.api")
client.put_event(detail_type="MyEventType", detail={"key": "value1"})
client.put_event(detail_type="MyEventType", detail={"key": "value2"})

Sending Bus Events using CLI

Bulk sending data staged events

Send a list of "DataAssetStaged" event for a list of data assets.

dhx-utils-aws send-data-staged --assets-file-path assets.json --env dev

The content of the assets.json in the above example MUST comply to the following format:

{
  "period": "2021-08-01",
  "data_assets": [
    {
      "data_asset_id": "xxxx-id1",
      "data_location": "s3://rawdata-storage-dhx-dev-rozettatech/path/path/part/etc/"
    },
    {
      "data_asset_id": "xxxx-id2",
      "data_location": "s3://rawdata-storage-dhx-dev-rozettatech/path/path/part/etc/"
    }
  ]
}

Sending ad hoc bus events

Send an eventbridge event to event buses in the AWS account (assuming your IAM role has the permission).

dhx-utils-aws send-event --bus-name <BUS_NAME>
                         --source <SOURCE>
                         --detail-type <DETAIL_TYPE>
                         --detail <DETAIL>
  • BUS_NAME is the name of the event bus to send the event to.
  • SOURCE is the known name of the source that this event is emitted from (e.g. datahex.oms)
  • DETAIL_TYPE provides the type of the payload that is provided in detail.
  • DETAIL is the JSON payload to be sent with the event.

If DETAIL is too big or complex to be typed in, you can always create a JSON file and use the --detail-file switch instead.

For example to trigger a pipeline given the ID, you can send an event to the DataEventBus, you can type:

dhx-utils-aws send-event \
    --bus-name dhx-qa-DataHexEventBus \
    --source datahex.user \
    --detail-type RunPipelineRequested \
    --detail '{"pipeline_id": "ae446cca19dd11ec9a3f6a23280ecd04", "parameters": { "period": "2021-08"}}'

or

cat > payload.json << EOF
{
  "pipeline_id": "ae446cca19dd11ec9a3f6a23280ecd04",
  "parameters":
  {
    "period": "2021-08"
  }
}
EOF

dhx-utils-aws send-event \
    --bus-name dhx-qa-DataEventBus \
    --source datahex.user \
    --detail-type RunPipelineRequested \
    --detail-file payload.json

Triggering an async processing job callback

The async-job-completed command can be used for notifying an ingestion state machine that the async processing job is now completed.

Sending a SUCCESS async callback using the command line

dhx-utils-aws async-job-completed --status success --task-token ${TASK_TOKEN_FROM_STEP_FUNCTION}

or you can use the environment variable TASK_TOKEN

TASK_TOKEN=${TASK_TOKEN_FROM_STEP_FUNCTION} dhx-utils-aws async-job-completed --status success

Sending a FAILURE async callback

When sending a failed status, you should also provide a reason of why it failed via the error code and cause:

dhx-utils-aws async-job-completed --status failed --task-token ${TASK_TOKEN_FROM_STEP_FUNCTION} \
  --error DataError --cause "Invalid exchange code found in data"

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dhx_utils_aws-1.2.2b212-py3-none-any.whl (14.0 kB view details)

Uploaded Python 3

File details

Details for the file dhx_utils_aws-1.2.2b212-py3-none-any.whl.

File metadata

File hashes

Hashes for dhx_utils_aws-1.2.2b212-py3-none-any.whl
Algorithm Hash digest
SHA256 14887ea10348594851db3dde6f616ab3791261779f50e632e086418e232d017d
MD5 bc2424c691d5e410cc30d46677523a52
BLAKE2b-256 a98569a127186da75239dbec7ee3deb90e5ab05ab0b59db4a0e12ca6964958ec

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page