Skip to main content

Utilities for working with StreamZero Executor Framework

Project description

STREAMZERO FX Executor Helper

This package can be used for logging and accessing service configuration, parameters, secrets and state through it's context.

from fx_ef import context

Accessing package configuration

from fx_ef import context

context.config.get('some_configuration_key')

Accessing execution parameters

from fx_ef import context

context.params.get('param_name')

Accessing secrets

With fx_ef.context.secrets you can access secrets stored on platform, project or package level.

from fx_ef import context

context.secrets.get('secret_name')

This command will first lookup for secret named secret_name within package secrets (defined in secrets.json file of the package). If such key doesn't exist it will lookup for it within project secrets, and finally within platform's secrets. If secret with such name doesn't exist None will be returned.

Setting secrets

Using fx_ef.context.secrets.set(name, value, context) method you can set secrets on project and platform level.

from fx_ef import context

context.secrets.set(name="platform_secret", value={"somekey":"someval"}, context="platform")
Parameter Description
name Name of the secret to be set. If secret with the same name already exist it will be updated
value Value of the secret that should be set
context Context of the secret. Possible values are platform and project

Accessing package id and name

from fx_ef import context

context.package.name
context.package.id

Accessing and updating package state

from fx_ef import context

context.state.get()
context.state.put("some_key", "some_value")

Logging

Available levels: DEBUG, INFO (default), ERROR, WARNING, CRITICAL

from fx_ef import context

context.logging.setLevel('INFO')

context.logging.debug("debug msg")
context.logging.info("info msg")
context.logging.error("error msg")
context.logging.warning("warning msg")
context.logging.critical("critical msg")

Sending events

Used for sending events to the system. Those events can be used to trigger another service execution.

from fx_ef import context

print("sending event with some data")

context.events.send(
    event_type="sample_event_type",
    data={"some_key": "some_val"}
)
Parameter Description
event_type Type of the event (string)
event_source Source of the event (if not provided it default to service name of current execution.
data Event data (dict)
topic Name of the topic that event should be sent to. If not provider it will be set to system default.

Scheduling retry of service execution

Used for scheduling next execution of the service from within that service script.

from fx_ef import context

context.retry(minutes=0, hours=0, days=0, cron_expression=None, parameters={}):
Parameter Description
minutes Number of minutes until next execution
hours Number of hours until next execution
days Number of days until next execution
cron_expression Cron like expression when next execution should occur. It is not allowed to have / in the cron expression (e.g. */2 3 * * *) If cron_expression is set minutes, hours and days will be skipped
parameters Parameters that will be passed to next execution
from fx_ef import context

# retry in 3 minutes
jobid = context.scheduler.retry(minutes=3)

# retry in 3 hours
jobid = context.scheduler.retry(hours=3)

# retry in 3 days
jobid = context.scheduler.retry(days=3)

# retry on 56th minute of next hour
jobid = context.scheduler.retry(cron_expression="56 * * * *")

Using fx_ef for local development

To use fx_ef for local development and testing without need to run scripts through Executor EF_ENV=local env variable must be set. When it is set fx_ef.context will be read from local file ef_env.json that must be created within same directory as the script that is accessing fx_ef.context.
ef_env.json must have following structure:

{
  "params": {
    "package_name": "some_package_name",
    "package_id": "some_package_id",
    "optional_param_1": "param_1_value",
    "optional_param_2": "param_2_value"
  },
  "secrets": {
    "secret_param_1": "secret_1_value",
    "secret_param_2": "secret_2_value"
  },
  "config": {
    "config_param_1": "config_1_value",
    "config_param_2": "config_2_value"
  }
}

NOTE: params, package_name and package_id are mandatory.

When EF_ENV=local is set, package state is also stored and fetched from the local file ef_package_state.json within the same directory. If file does not exist it will be created on the fly.

By default context.events.send(...) will be skipped when local env is set. If event should be sent anyway following should be added in config section of the ef_env.json file:

{
  "params": {...},
  "secrets": {...},
  "config": {
    "KAFKA_BOOTSTRAP_SERVER": "kafka",
    "KAFKA_PORT": "9092",
    "DEFAULT_TOPIC": "your_topic_name"
  }
}
Parameter Description
KAFKA_BOOTSTRAP_SERVER address of the kafka server
KAFKA_PORT kafka port
DEFAULT_TOPIC name of the default topic which will be used if it is not passed via send() method

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fx_ef-1.5.1.tar.gz (15.3 kB view hashes)

Uploaded Source

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page