Skip to main content

Distributed Spark and Trino/Starburst adapter with SQL/JSON DAG execution and Spark writes to Trino, Iceberg and Hive.

Project description

trino-spark-adapter

trino-spark-adapter is a Python package for building hybrid Spark and Trino/Starburst data pipelines.

It provides:

  • distributed Trino/Starburst reads executed from Spark executors;
  • Spark writes to Trino through distributed INSERT batches;
  • Spark writes to Iceberg tables with Iceberg partition transforms and table properties;
  • Spark writes to Hive-compatible tables or paths with Hive-style partitioning and bucketing;
  • a DAG runner that executes .sql and .json files in alphanumeric order;
  • AES-CBC helpers and Spark SQL UDF registration;
  • class-based logging with one logger per component.

Installation

pip install trino-spark-adapter==1.1.3

Core concepts

A DAG is a folder containing SQL and JSON files. Files are executed in alphanumeric order. The suffix determines the execution mode.

Suffix Action
.trino.sql Execute SQL statements on Trino/Starburst.
.spark.sql Execute SQL statements on Spark.
.trino_to_spark.json Load a Trino table or query into a Spark temporary view.
.spark_to_trino.json Write a Spark view to Trino through distributed inserts.
.spark_to_iceberg.json Write a Spark view to an Iceberg table with Spark Writer V2.
.spark_to_hive.json Write a Spark view to a Hive-compatible table or path.

Minimal DAG runner

from trino_spark_adapter import DagRunner, SparkAESHelper, TrinoConnectionConfig

spark = SparkAESHelper.get_spark(
    app_name="trino_spark_adapter_job",
    register_aes=True,
    fail_if_missing_aes=False,
)

trino_config = TrinoConnectionConfig.from_env()

runner = DagRunner(
    spark=spark,
    trino_config=trino_config,
    params={"calculation_date": "2026-01-05"},
    reader_defaults={"fetch_size": 100_000, "num_ranges": 20, "num_partitions": 20},
)

results = runner.run_folder("dag")

Trino configuration

TrinoConnectionConfig.from_env() reads:

export TRINO_HOST="starburst.example.com"
export TRINO_USER="user"
export TRINO_PASSWORD="password"
export TRINO_ROLES="role"
export TRINO_VERIFY="false"
export TRINO_HTTP_SCHEME="https"
export TRINO_PORT="443"

Spark and AES helper

SparkAESHelper creates a Spark session from environment variables whose names start with spark. and optionally registers AES UDFs.

export spark.app.name="trino_spark_adapter_job"
export spark.sql.shuffle.partitions="200"
export aes_key_str="...base64..."
export aes_iv_str="...base64..."
from trino_spark_adapter import SparkAESHelper

spark = SparkAESHelper.get_spark(register_aes=True)

spark.sql("SELECT aes_encrypt('abc') AS encrypted_value").show()

Registered functions:

  • aes_encrypt(value)
  • aes_decrypt(value)

Date parameters

DateUtils can be used to compute placeholders used in DAG files.

from trino_spark_adapter import DateUtils

weekday_dates = DateUtils.generate_weekday_dates_between_start_stop(
    start_dt="2026-01-01",
    stop_dt="2026-02-01",
    weekday=0,
)

du = DateUtils(weekday_dates[0])
params = du.to_params()
params.update({"calculation_date": du.today_tiret[:10]})

Typical generated keys include today_tiret, today_slash, last_day_tiret, last_week_tiret, last_month_tiret, last_quarter_tiret, last_semester_tiret, and last_year_tiret.

.trino_to_spark.json

Load a complete table:

{
  "table_fullname": "catalog.schema.source_table",
  "target_view": "source_table"
}

Load a partitioned date range with distributed Trino queries:

{
  "table_fullname": "catalog.schema.source_table",
  "target_view": "source_view",
  "colname": "event_date",
  "coltype": "DATE",
  "format": "%Y-%m-%d",
  "rounding": "D",
  "colname_start_value": "{start_date}",
  "colname_stop_value": "{end_date}",
  "num_ranges": 20
}

The runner creates or replaces the Spark temporary view named by target_view. When no view name is provided, it derives the view name from the file name. For example, 3.source.trino_to_spark.json creates source.

.spark_to_trino.json

Write a Spark view to a Trino table. The target table is created automatically when it does not exist, based on the Spark schema.

{
  "source_view": "prepared_view",
  "target_table": "catalog.schema.target_table",
  "repartition_by": ["entity_id"],
  "num_partitions": 40,
  "sort_by": ["entity_id"]
}

.spark_to_iceberg.json

Write a Spark view to an Iceberg table through the configured Spark Iceberg catalog.

{
  "source_view": "prepared_view",
  "catalog": "iceberg_catalog",
  "schema": "analytics",
  "table": "target_table",
  "mode": "append",
  "partition_spec": [
    {"transform": "day", "column": "event_ts"},
    {"transform": "bucket", "column": "entity_id", "num_buckets": 32}
  ],
  "distribution_mode": "hash",
  "format_version": 2,
  "file_format": "PARQUET",
  "repartition_by": ["entity_id"],
  "num_partitions": 200
}

Common modes are create, replace, append, and overwrite_partitions.

.spark_to_hive.json

Write a Spark view to a Hive table:

{
  "source_view": "prepared_view",
  "table": "analytics.target_table",
  "mode": "overwrite",
  "format": "parquet",
  "partition_by": ["event_date"],
  "repartition_by": ["event_date"],
  "num_partitions": 40
}

Write a Spark view to a path such as S3A:

{
  "source_view": "prepared_view",
  "path": "s3a://bucket/path/target_table",
  "mode": "overwrite",
  "format": "parquet",
  "partition_by": ["event_date"]
}

Hive bucketing is supported only with table / saveAsTable:

{
  "source_view": "prepared_view",
  "table": "analytics.bucketed_table",
  "mode": "overwrite",
  "format": "parquet",
  "bucket_by": ["entity_id"],
  "num_buckets": 32,
  "sort_by": ["entity_id"]
}

Logging

Every main component inherits from LogBase and exposes a class logger.

import logging
from trino_spark_adapter import DagRunner, DistributedTrinoSparkReader, SparkHiveWriter

DagRunner.logger().setLevel(logging.INFO)
DistributedTrinoSparkReader.logger().setLevel(logging.DEBUG)
SparkHiveWriter.logger().setLevel(logging.INFO)

The default formatter includes timestamp, class name and level:

[2026-01-05 09:15:12] [DagRunner] [INFO] START task type=trino_to_spark file=1.source.trino_to_spark.json
[2026-01-05 09:17:03] [DagRunner] [INFO] SUCCESS task type=trino_to_spark file=1.source.trino_to_spark.json elapsed=0:01:51

Debug-only expensive Spark actions should be guarded explicitly:

logger = DagRunner.logger()
if logger.isEnabledFor(logging.DEBUG):
    logger.debug("row_count=%s", df.count())

Publishing

The source archive includes .pypi.sh.

chmod +x .pypi.sh
./.pypi.sh

Resuming or partially executing a DAG

DagRunner.run_folder can execute only part of a DAG folder. This is useful when a long run fails after several successful files and you want to restart from the failing task.

runner.run_folder(
    "dag",
    start_from_file="6.transform.spark.sql",
)

You can also start from a zero-based index:

runner.run_folder(
    "dag",
    start_from_index=5,
)

For short validation runs, stop after a specific file or index:

runner.run_folder(
    "dag",
    stop_after_file="6.transform.spark.sql",
)

To run only a chosen subset, provide explicit file names. The files are still executed in the DAG folder's alphanumeric order:

runner.run_folder(
    "dag",
    files=[
        "6.transform.spark.sql",
        "7.export.spark_to_iceberg.json",
    ],
)

Optional Spark and Trino dependencies

Install core package:

pip install trino-spark-adapter==1.1.4

Optional extras:

pip install "trino-spark-adapter[spark]==1.1.4"
pip install "trino-spark-adapter[trino]==1.1.4"
pip install "trino-spark-adapter[all]==1.1.4"

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

trino_spark_adapter-1.1.4.tar.gz (41.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

trino_spark_adapter-1.1.4-py3-none-any.whl (41.7 kB view details)

Uploaded Python 3

File details

Details for the file trino_spark_adapter-1.1.4.tar.gz.

File metadata

  • Download URL: trino_spark_adapter-1.1.4.tar.gz
  • Upload date:
  • Size: 41.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.4

File hashes

Hashes for trino_spark_adapter-1.1.4.tar.gz
Algorithm Hash digest
SHA256 c013f5aad772420c495a7bf3d654a9ba200f26ae317ec72f008d817139f80c2c
MD5 820d958fcdaefe9f296c3fc7672dd43e
BLAKE2b-256 68dc90d275840591128ebe5bb61e666076125b66281048b65108f666e388a7a5

See more details on using hashes here.

File details

Details for the file trino_spark_adapter-1.1.4-py3-none-any.whl.

File metadata

File hashes

Hashes for trino_spark_adapter-1.1.4-py3-none-any.whl
Algorithm Hash digest
SHA256 c613a44ea52fbbbe7997017f4d6ac1b89471a06be466191b048519c23e792557
MD5 9bf6048db4da2271d43f4a258d7a1dbd
BLAKE2b-256 4995ae61115e6b2faae9dc13ec69835c0d814d933eb0da704147a76ab63410b3

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page