Skip to main content

Distributed Spark and Trino/Starburst adapter with SQL/JSON DAG execution and Spark writes to Trino, Iceberg and Hive.

Project description

trino-spark-adapter

trino-spark-adapter is a Python package for building hybrid Spark and Trino/Starburst data pipelines.

It provides:

  • distributed Trino/Starburst reads executed from Spark executors;
  • Spark writes to Trino through distributed INSERT batches;
  • Spark writes to Iceberg tables with Iceberg partition transforms and table properties;
  • Spark writes to Hive-compatible tables or paths with Hive-style partitioning and bucketing;
  • a DAG runner that executes .sql and .json files in alphanumeric order;
  • AES-CBC helpers and Spark SQL UDF registration;
  • class-based logging with one logger per component.

Installation

pip install trino-spark-adapter==1.1.3

Core concepts

A DAG is a folder containing SQL and JSON files. Files are executed in alphanumeric order. The suffix determines the execution mode.

Suffix Action
.trino.sql Execute SQL statements on Trino/Starburst.
.spark.sql Execute SQL statements on Spark.
.trino_to_spark.json Load a Trino table or query into a Spark temporary view.
.spark_to_trino.json Write a Spark view to Trino through distributed inserts.
.spark_to_iceberg.json Write a Spark view to an Iceberg table with Spark Writer V2.
.spark_to_hive.json Write a Spark view to a Hive-compatible table or path.

Minimal DAG runner

from trino_spark_adapter import DagRunner, SparkAESHelper, TrinoConnectionConfig

spark = SparkAESHelper.get_spark(
    app_name="trino_spark_adapter_job",
    register_aes=True,
    fail_if_missing_aes=False,
)

trino_config = TrinoConnectionConfig.from_env()

runner = DagRunner(
    spark=spark,
    trino_config=trino_config,
    params={"calculation_date": "2026-01-05"},
    reader_defaults={"fetch_size": 100_000, "num_ranges": 20, "num_partitions": 20},
)

results = runner.run_folder("dag")

Trino configuration

TrinoConnectionConfig.from_env() reads:

export TRINO_HOST="starburst.example.com"
export TRINO_USER="user"
export TRINO_PASSWORD="password"
export TRINO_ROLES="role"
export TRINO_VERIFY="false"
export TRINO_HTTP_SCHEME="https"
export TRINO_PORT="443"

Spark and AES helper

SparkAESHelper creates a Spark session from environment variables whose names start with spark. and optionally registers AES UDFs.

export spark.app.name="trino_spark_adapter_job"
export spark.sql.shuffle.partitions="200"
export aes_key_str="...base64..."
export aes_iv_str="...base64..."
from trino_spark_adapter import SparkAESHelper

spark = SparkAESHelper.get_spark(register_aes=True)

spark.sql("SELECT aes_encrypt('abc') AS encrypted_value").show()

Registered functions:

  • aes_encrypt(value)
  • aes_decrypt(value)

Date parameters

DateUtils can be used to compute placeholders used in DAG files.

from trino_spark_adapter import DateUtils

weekday_dates = DateUtils.generate_weekday_dates_between_start_stop(
    start_dt="2026-01-01",
    stop_dt="2026-02-01",
    weekday=0,
)

du = DateUtils(weekday_dates[0])
params = du.to_params()
params.update({"calculation_date": du.today_tiret[:10]})

Typical generated keys include today_tiret, today_slash, last_day_tiret, last_week_tiret, last_month_tiret, last_quarter_tiret, last_semester_tiret, and last_year_tiret.

.trino_to_spark.json

Load a complete table:

{
  "table_fullname": "catalog.schema.source_table",
  "target_view": "source_table"
}

Load a partitioned date range with distributed Trino queries:

{
  "table_fullname": "catalog.schema.source_table",
  "target_view": "source_view",
  "colname": "event_date",
  "coltype": "DATE",
  "format": "%Y-%m-%d",
  "rounding": "D",
  "colname_start_value": "{start_date}",
  "colname_stop_value": "{end_date}",
  "num_ranges": 20
}

The runner creates or replaces the Spark temporary view named by target_view. When no view name is provided, it derives the view name from the file name. For example, 3.source.trino_to_spark.json creates source.

.spark_to_trino.json

Write a Spark view to a Trino table. The target table is created automatically when it does not exist, based on the Spark schema.

{
  "source_view": "prepared_view",
  "target_table": "catalog.schema.target_table",
  "repartition_by": ["entity_id"],
  "num_partitions": 40,
  "sort_by": ["entity_id"]
}

.spark_to_iceberg.json

Write a Spark view to an Iceberg table through the configured Spark Iceberg catalog.

{
  "source_view": "prepared_view",
  "catalog": "iceberg_catalog",
  "schema": "analytics",
  "table": "target_table",
  "mode": "append",
  "partition_spec": [
    {"transform": "day", "column": "event_ts"},
    {"transform": "bucket", "column": "entity_id", "num_buckets": 32}
  ],
  "distribution_mode": "hash",
  "format_version": 2,
  "file_format": "PARQUET",
  "repartition_by": ["entity_id"],
  "num_partitions": 200
}

Common modes are create, replace, append, and overwrite_partitions.

.spark_to_hive.json

Write a Spark view to a Hive table:

{
  "source_view": "prepared_view",
  "table": "analytics.target_table",
  "mode": "overwrite",
  "format": "parquet",
  "partition_by": ["event_date"],
  "repartition_by": ["event_date"],
  "num_partitions": 40
}

Write a Spark view to a path such as S3A:

{
  "source_view": "prepared_view",
  "path": "s3a://bucket/path/target_table",
  "mode": "overwrite",
  "format": "parquet",
  "partition_by": ["event_date"]
}

Hive bucketing is supported only with table / saveAsTable:

{
  "source_view": "prepared_view",
  "table": "analytics.bucketed_table",
  "mode": "overwrite",
  "format": "parquet",
  "bucket_by": ["entity_id"],
  "num_buckets": 32,
  "sort_by": ["entity_id"]
}

Logging

Every main component inherits from LogBase and exposes a class logger.

import logging
from trino_spark_adapter import DagRunner, DistributedTrinoSparkReader, SparkHiveWriter

DagRunner.logger().setLevel(logging.INFO)
DistributedTrinoSparkReader.logger().setLevel(logging.DEBUG)
SparkHiveWriter.logger().setLevel(logging.INFO)

The default formatter includes timestamp, class name and level:

[2026-01-05 09:15:12] [DagRunner] [INFO] START task type=trino_to_spark file=1.source.trino_to_spark.json
[2026-01-05 09:17:03] [DagRunner] [INFO] SUCCESS task type=trino_to_spark file=1.source.trino_to_spark.json elapsed=0:01:51

Debug-only expensive Spark actions should be guarded explicitly:

logger = DagRunner.logger()
if logger.isEnabledFor(logging.DEBUG):
    logger.debug("row_count=%s", df.count())

Publishing

The source archive includes .pypi.sh.

chmod +x .pypi.sh
./.pypi.sh

Resuming or partially executing a DAG

DagRunner.run_folder can execute only part of a DAG folder. This is useful when a long run fails after several successful files and you want to restart from the failing task.

runner.run_folder(
    "dag",
    start_from_file="6.transform.spark.sql",
)

You can also start from a zero-based index:

runner.run_folder(
    "dag",
    start_from_index=5,
)

For short validation runs, stop after a specific file or index:

runner.run_folder(
    "dag",
    stop_after_file="6.transform.spark.sql",
)

To run only a chosen subset, provide explicit file names. The files are still executed in the DAG folder's alphanumeric order:

runner.run_folder(
    "dag",
    files=[
        "6.transform.spark.sql",
        "7.export.spark_to_iceberg.json",
    ],
)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

trino_spark_adapter-1.1.3.tar.gz (41.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

trino_spark_adapter-1.1.3-py3-none-any.whl (41.6 kB view details)

Uploaded Python 3

File details

Details for the file trino_spark_adapter-1.1.3.tar.gz.

File metadata

  • Download URL: trino_spark_adapter-1.1.3.tar.gz
  • Upload date:
  • Size: 41.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.10.19

File hashes

Hashes for trino_spark_adapter-1.1.3.tar.gz
Algorithm Hash digest
SHA256 71747e7859be7802f17e6efcfd0330e02759d10a4d8ebb1b5ab5c98aee31473a
MD5 194928b953aeb562f1b6869274e76dac
BLAKE2b-256 83f3ff421ddb1e442b8a7244da3c3f09a86a67dd905debd71688965a83d8239e

See more details on using hashes here.

File details

Details for the file trino_spark_adapter-1.1.3-py3-none-any.whl.

File metadata

File hashes

Hashes for trino_spark_adapter-1.1.3-py3-none-any.whl
Algorithm Hash digest
SHA256 da23b2879a0614977d6fee101dd0956bcfc6f3cf01382f0549b404b12ca961c2
MD5 4834c6aa53f8fa8ff65a0b6fb177556f
BLAKE2b-256 eba19c0e3a333e6f0548200bdec3cc96e855944e7fd394e84420d7a6cf1ae792

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page