Skip to main content

Distributed Spark and Trino/Starburst adapter with SQL/JSON DAG execution and Spark writes to Trino, Iceberg and Hive.

Project description

trino-spark-adapter

trino-spark-adapter is a Python package for building hybrid Spark and Trino/Starburst data pipelines.

It provides:

  • distributed Trino/Starburst reads executed from Spark executors;
  • Spark writes to Trino through distributed INSERT batches;
  • Spark writes to Iceberg tables with Iceberg partition transforms and table properties;
  • Spark writes to Hive-compatible tables or paths with Hive-style partitioning and bucketing;
  • a DAG runner that executes .sql and .json files in alphanumeric order;
  • AES-CBC helpers and Spark SQL UDF registration;
  • class-based logging with one logger per component.

Installation

pip install trino-spark-adapter==1.1.2

Core concepts

A DAG is a folder containing SQL and JSON files. Files are executed in alphanumeric order. The suffix determines the execution mode.

Suffix Action
.trino.sql Execute SQL statements on Trino/Starburst.
.spark.sql Execute SQL statements on Spark.
.trino_to_spark.json Load a Trino table or query into a Spark temporary view.
.spark_to_trino.json Write a Spark view to Trino through distributed inserts.
.spark_to_iceberg.json Write a Spark view to an Iceberg table with Spark Writer V2.
.spark_to_hive.json Write a Spark view to a Hive-compatible table or path.

Minimal DAG runner

from trino_spark_adapter import DagRunner, SparkAESHelper, TrinoConnectionConfig

spark = SparkAESHelper.get_spark(
    app_name="trino_spark_adapter_job",
    register_aes=True,
    fail_if_missing_aes=False,
)

trino_config = TrinoConnectionConfig.from_env()

runner = DagRunner(
    spark=spark,
    trino_config=trino_config,
    params={"calculation_date": "2026-01-05"},
    reader_defaults={"fetch_size": 100_000, "num_ranges": 20, "num_partitions": 20},
)

results = runner.run_folder("dag")

Trino configuration

TrinoConnectionConfig.from_env() reads:

export TRINO_HOST="starburst.example.com"
export TRINO_USER="user"
export TRINO_PASSWORD="password"
export TRINO_ROLES="role"
export TRINO_VERIFY="false"
export TRINO_HTTP_SCHEME="https"
export TRINO_PORT="443"

Spark and AES helper

SparkAESHelper creates a Spark session from environment variables whose names start with spark. and optionally registers AES UDFs.

export spark.app.name="trino_spark_adapter_job"
export spark.sql.shuffle.partitions="200"
export aes_key_str="...base64..."
export aes_iv_str="...base64..."
from trino_spark_adapter import SparkAESHelper

spark = SparkAESHelper.get_spark(register_aes=True)

spark.sql("SELECT aes_encrypt('abc') AS encrypted_value").show()

Registered functions:

  • aes_encrypt(value)
  • aes_decrypt(value)

Date parameters

DateUtils can be used to compute placeholders used in DAG files.

from trino_spark_adapter import DateUtils

weekday_dates = DateUtils.generate_weekday_dates_between_start_stop(
    start_dt="2026-01-01",
    stop_dt="2026-02-01",
    weekday=0,
)

du = DateUtils(weekday_dates[0])
params = du.to_params()
params.update({"calculation_date": du.today_tiret[:10]})

Typical generated keys include today_tiret, today_slash, last_day_tiret, last_week_tiret, last_month_tiret, last_quarter_tiret, last_semester_tiret, and last_year_tiret.

.trino_to_spark.json

Load a complete table:

{
  "table_fullname": "catalog.schema.source_table",
  "target_view": "source_table"
}

Load a partitioned date range with distributed Trino queries:

{
  "table_fullname": "catalog.schema.source_table",
  "target_view": "source_view",
  "colname": "event_date",
  "coltype": "DATE",
  "format": "%Y-%m-%d",
  "rounding": "D",
  "colname_start_value": "{start_date}",
  "colname_stop_value": "{end_date}",
  "num_ranges": 20
}

The runner creates or replaces the Spark temporary view named by target_view. When no view name is provided, it derives the view name from the file name. For example, 3.source.trino_to_spark.json creates source.

.spark_to_trino.json

Write a Spark view to a Trino table. The target table is created automatically when it does not exist, based on the Spark schema.

{
  "source_view": "prepared_view",
  "target_table": "catalog.schema.target_table",
  "repartition_by": ["entity_id"],
  "num_partitions": 40,
  "sort_by": ["entity_id"]
}

.spark_to_iceberg.json

Write a Spark view to an Iceberg table through the configured Spark Iceberg catalog.

{
  "source_view": "prepared_view",
  "catalog": "iceberg_catalog",
  "schema": "analytics",
  "table": "target_table",
  "mode": "append",
  "partition_spec": [
    {"transform": "day", "column": "event_ts"},
    {"transform": "bucket", "column": "entity_id", "num_buckets": 32}
  ],
  "distribution_mode": "hash",
  "format_version": 2,
  "file_format": "PARQUET",
  "repartition_by": ["entity_id"],
  "num_partitions": 200
}

Common modes are create, replace, append, and overwrite_partitions.

.spark_to_hive.json

Write a Spark view to a Hive table:

{
  "source_view": "prepared_view",
  "table": "analytics.target_table",
  "mode": "overwrite",
  "format": "parquet",
  "partition_by": ["event_date"],
  "repartition_by": ["event_date"],
  "num_partitions": 40
}

Write a Spark view to a path such as S3A:

{
  "source_view": "prepared_view",
  "path": "s3a://bucket/path/target_table",
  "mode": "overwrite",
  "format": "parquet",
  "partition_by": ["event_date"]
}

Hive bucketing is supported only with table / saveAsTable:

{
  "source_view": "prepared_view",
  "table": "analytics.bucketed_table",
  "mode": "overwrite",
  "format": "parquet",
  "bucket_by": ["entity_id"],
  "num_buckets": 32,
  "sort_by": ["entity_id"]
}

Logging

Every main component inherits from LogBase and exposes a class logger.

import logging
from trino_spark_adapter import DagRunner, DistributedTrinoSparkReader, SparkHiveWriter

DagRunner.logger().setLevel(logging.INFO)
DistributedTrinoSparkReader.logger().setLevel(logging.DEBUG)
SparkHiveWriter.logger().setLevel(logging.INFO)

The default formatter includes timestamp, class name and level:

[2026-01-05 09:15:12] [DagRunner] [INFO] START task type=trino_to_spark file=1.source.trino_to_spark.json
[2026-01-05 09:17:03] [DagRunner] [INFO] SUCCESS task type=trino_to_spark file=1.source.trino_to_spark.json elapsed=0:01:51

Debug-only expensive Spark actions should be guarded explicitly:

logger = DagRunner.logger()
if logger.isEnabledFor(logging.DEBUG):
    logger.debug("row_count=%s", df.count())

Publishing

The source archive includes .pypi.sh.

chmod +x .pypi.sh
./.pypi.sh

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

trino_spark_adapter-1.1.2.tar.gz (39.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

trino_spark_adapter-1.1.2-py3-none-any.whl (40.2 kB view details)

Uploaded Python 3

File details

Details for the file trino_spark_adapter-1.1.2.tar.gz.

File metadata

  • Download URL: trino_spark_adapter-1.1.2.tar.gz
  • Upload date:
  • Size: 39.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.10.19

File hashes

Hashes for trino_spark_adapter-1.1.2.tar.gz
Algorithm Hash digest
SHA256 52486038ef9e8817176e299ca750d61f5d407f9eeaacff18c3ed53f97236c74b
MD5 7d1902e1c43044df96a8832cb340751f
BLAKE2b-256 fe4d55581fb1c223d16b97e43e8dea3e3f825495aed16f72fc54bdeea50afe3c

See more details on using hashes here.

File details

Details for the file trino_spark_adapter-1.1.2-py3-none-any.whl.

File metadata

File hashes

Hashes for trino_spark_adapter-1.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 6cc4f3f04fdef62901c54ab3ec6f43e9b749a4a5a2c3228213185ac566e510bb
MD5 9403bfa8382aa92d288aee47723c8911
BLAKE2b-256 9284aaa7a7adfb725926c0525b687603f1b54178b0cdcfa9fb49d51b9c622631

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page