No project description provided
Project description
Jobsworth
A set of utility functions and classes to aid in build Spark jobs on Azure databricks.
Job Configuration
Spark Job Module
Job provides a decorator which wraps the execution of a spark job. You use the decorator at the entry point for the job. At the moment it performs 1 function; calling all the registered initialisers.
from jobsworthy import spark_job
@spark_job.job()
def execute(args=None) -> monad.EitherMonad[value.JobState]:
pass
To register initialisers (to be run just before the job function is called) do the following.
@spark_job.register()
def some_initialiser():
...
The initialisers must be imported before the job function is called; to ensure they are registered. To do that, either import them directly in the job module, or add them to a module __init__.py
and import the module.
Model Library
Streamer
The Streamer
module provides a fluent streaming abstraction on top of 2 hive repos (the from and to repos). The Streamer runs a pipeline as follows:
- Read from the source repo.
- Perform a transformation.
- Write to the target repo. This is where the stream starts and uses pyspark streaming to perform the read, transform and write.
- Wait for the stream to finish.
To setup a stream:
from jobsworthy import model
streamer = (model.STREAMER()
.stream_from(from_table)
.stream_to(to_table)
.with_transformer(transform_fn))
# Execute the stream
result = streamer.run()
# When successful it returns the Streamer wrapped in a Right.
# When there is a failure, it returns the error (subtype of JobError) wrapped in a Left
assert result.is_right()
Some transformation functions require data from outside the input table. You can configure the streamer with additional transformation context by passing in kwargs on the with_transformer
.
from dataclasses import dataclass
from jobsworthy import model
@dataclass
class TransformContext:
run_id: int
def transform_fn_with_ctx(df, **kwargs):
...
streamer = (model.Streamer()
.stream_from(from_table)
.stream_to(to_table)
.with_transformer(transform_fn_with_ctx, run=TransformContext(run_id=1)))
When configuring the stream_to
table, you can provide partition columns when writing the stream. Provide a tuple of column names.
streamer = model.STREAMER().stream_to(to_table, ('name', ))
Repository Module
The repo library offers a number of simple abstractions for managing Databrick/Spark databases and tables. It is by no means an object-mapper. Rather its a few classes with some simple functions we have found useful when working with Hive tables.
from jobsworthy import repo
SparkDB
Db
is the base class representing a Hive Database. Once constructed it is provided to the hive table classes when they are constructed.
Db
takes a spark session and a job config.
db = repo.Db(session=spark_test_session.create_session(), config=job_config())
When intialised it checks that the database (defined in the config) exists and creates it if it doesn't.
Hive Table
Util Module
Spark Session
Secrets
The Secrets module obtains secrets using the Databricks DBUtils secrets utility. The module acts as a wrapper for DButils. This allows for secrets to be mocked in tests without needing DBUtils. The CosmosDB repository is injected with the secrets provider to enable secured access to CosmosDB.
The provider requires access to the Spark session when running on Databricks. However this is not required in test. You also provide Secrets with a wrapper for DBUtils with also, optionally, takes a session. Both test and production wrappers are available in the util.databricks
module.
from jobsworthy.util import secrets, databricks
provider = secrets.Secrets(session=di_container.session,
config=job_config(),
secrets_provider=databricks.DatabricksUtilsWrapper())
The default secret scope name is defined from the JobConfig
properties; domain_name
and data_product_name
, separated by a .
. This can be overridden by defining the scope on the Secrets
constructor, or on the call to get_secret
. It looks like this on the constructor.
provider = secrets.Secrets(session=di_container.session,
config=job_config(),
secrets_provider=databricks.DatabricksUtilsWrapper(),
default_scope_name="custom-scope-name")
Getting a secret.
provider.get_secret(secret_name="name-of-secret") # returns an Either[secret-key]
Secrets is also able to return a ClientCredential
using an Azure AD client credentials grant. The grant requires that client id and secrets are obtainable via DBUtils through key-vault with the key names defined in JobConfig
in the properties client_id_key
and client_secret_key
provider.client_credential_grant() # returns an Either[ClientCredential]
Testing using secrets. DBUtils is not available as an open source project. When creating the secrets provider, you can provide a DBUtils mock class which is available. On this class you can also construct valid keys to be used for test (if required; the mock returns a dummy key response to any generic lookup).
The example below also shows how to use a non-default scope on the get_secrets
function.
from jobsworthy.util import secrets, databricks
test_secrets = {"my_domain.my_data_product_name": {'my_secret': 'a-secret'},
"alt_scope": {'my_secret': 'b-secret'}}
provider = secrets.Secrets(
session=di_container.session,
config=job_config(),
secrets_provider=databricks.DatabricksUtilMockWrapper(spark_test_session.MockPySparkSession, test_secrets))
provider.get_secret(non_default_scope_name="alt_scope", secret_name="my_secret")
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file jobsworthy-0.5.1.tar.gz
.
File metadata
- Download URL: jobsworthy-0.5.1.tar.gz
- Upload date:
- Size: 32.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.2.1 CPython/3.9.7 Darwin/21.6.0
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | f0c2a231f81307ad8287cbfb6f16d08b6b773f288d4ba8ffbb8d14a40981ccc1 |
|
MD5 | 30e3028a0e8c376d5ccd862f9f745af9 |
|
BLAKE2b-256 | 5108d11790b8593e394340bff2bd458eebc50a83ba5f532e775712cc3ea48e86 |
File details
Details for the file jobsworthy-0.5.1-py3-none-any.whl
.
File metadata
- Download URL: jobsworthy-0.5.1-py3-none-any.whl
- Upload date:
- Size: 39.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.2.1 CPython/3.9.7 Darwin/21.6.0
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 5ed67248b7111668a95aa2b086f570947c0734a91b3fb34109ab4816c078cee7 |
|
MD5 | e8fcbeeef3b866a90ac1c9c6de1ca3be |
|
BLAKE2b-256 | 4b16fefacf39481c25b57cacc43790a4f7294fdac0e335c54e9391fbf1408332 |