Skip to main content

Distributed Spark and Trino/Starburst adapter with SQL/JSON DAG execution and Spark writes to Trino, Iceberg and Hive.

Project description

trino-spark-adapter

trino-spark-adapter is a Python package for building hybrid Spark and Trino/Starburst data pipelines.

It provides:

  • distributed Trino/Starburst reads executed from Spark executors;
  • Spark writes to Trino through distributed INSERT batches;
  • Spark writes to Iceberg tables with Iceberg partition transforms and table properties;
  • Spark writes to Hive-compatible tables or paths with Hive-style partitioning and bucketing;
  • a DAG runner that executes .sql and .json files in alphanumeric order;
  • AES-CBC helpers and Spark SQL UDF registration;
  • class-based logging with one logger per component.

Installation

pip install trino-spark-adapter==1.1.1

Core concepts

A DAG is a folder containing SQL and JSON files. Files are executed in alphanumeric order. The suffix determines the execution mode.

Suffix Action
.trino.sql Execute SQL statements on Trino/Starburst.
.spark.sql Execute SQL statements on Spark.
.trino_to_spark.json Load a Trino table or query into a Spark temporary view.
.spark_to_trino.json Write a Spark view to Trino through distributed inserts.
.spark_to_iceberg.json Write a Spark view to an Iceberg table with Spark Writer V2.
.spark_to_hive.json Write a Spark view to a Hive-compatible table or path.

Minimal DAG runner

from trino_spark_adapter import DagRunner, SparkAESHelper, TrinoConnectionConfig

spark = SparkAESHelper.get_spark(
    app_name="trino_spark_adapter_job",
    register_aes=True,
    fail_if_missing_aes=False,
)

trino_config = TrinoConnectionConfig.from_env()

runner = DagRunner(
    spark=spark,
    trino_config=trino_config,
    params={"calculation_date": "2026-01-05"},
    reader_defaults={"fetch_size": 100_000, "num_ranges": 20, "num_partitions": 20},
)

results = runner.run_folder("dag")

Trino configuration

TrinoConnectionConfig.from_env() reads:

export TRINO_HOST="starburst.example.com"
export TRINO_USER="user"
export TRINO_PASSWORD="password"
export TRINO_ROLES="role"
export TRINO_VERIFY="false"
export TRINO_HTTP_SCHEME="https"
export TRINO_PORT="443"

Spark and AES helper

SparkAESHelper creates a Spark session from environment variables whose names start with spark. and optionally registers AES UDFs.

export spark.app.name="trino_spark_adapter_job"
export spark.sql.shuffle.partitions="200"
export aes_key_str="...base64..."
export aes_iv_str="...base64..."
from trino_spark_adapter import SparkAESHelper

spark = SparkAESHelper.get_spark(register_aes=True)

spark.sql("SELECT aes_encrypt('abc') AS encrypted_value").show()

Registered functions:

  • aes_encrypt(value)
  • aes_decrypt(value)

Date parameters

DateUtils can be used to compute placeholders used in DAG files.

from trino_spark_adapter import DateUtils

weekday_dates = DateUtils.generate_weekday_dates_between_start_stop(
    start_dt="2026-01-01",
    stop_dt="2026-02-01",
    weekday=0,
)

du = DateUtils(weekday_dates[0])
params = du.to_params()
params.update({"calculation_date": du.today_tiret[:10]})

Typical generated keys include today_tiret, today_slash, last_day_tiret, last_week_tiret, last_month_tiret, last_quarter_tiret, last_semester_tiret, and last_year_tiret.

.trino_to_spark.json

Load a complete table:

{
  "table_fullname": "catalog.schema.source_table",
  "target_view": "source_table"
}

Load a partitioned date range with distributed Trino queries:

{
  "table_fullname": "catalog.schema.source_table",
  "target_view": "source_view",
  "colname": "event_date",
  "coltype": "DATE",
  "format": "%Y-%m-%d",
  "rounding": "D",
  "colname_start_value": "{start_date}",
  "colname_stop_value": "{end_date}",
  "num_ranges": 20
}

The runner creates or replaces the Spark temporary view named by target_view. When no view name is provided, it derives the view name from the file name. For example, 3.source.trino_to_spark.json creates source.

.spark_to_trino.json

Write a Spark view to a Trino table. The target table is created automatically when it does not exist, based on the Spark schema.

{
  "source_view": "prepared_view",
  "target_table": "catalog.schema.target_table",
  "repartition_by": ["entity_id"],
  "num_partitions": 40,
  "sort_by": ["entity_id"]
}

.spark_to_iceberg.json

Write a Spark view to an Iceberg table through the configured Spark Iceberg catalog.

{
  "source_view": "prepared_view",
  "catalog": "iceberg_catalog",
  "schema": "analytics",
  "table": "target_table",
  "mode": "append",
  "partition_spec": [
    {"transform": "day", "column": "event_ts"},
    {"transform": "bucket", "column": "entity_id", "num_buckets": 32}
  ],
  "distribution_mode": "hash",
  "format_version": 2,
  "file_format": "PARQUET",
  "repartition_by": ["entity_id"],
  "num_partitions": 200
}

Common modes are create, replace, append, and overwrite_partitions.

.spark_to_hive.json

Write a Spark view to a Hive table:

{
  "source_view": "prepared_view",
  "table": "analytics.target_table",
  "mode": "overwrite",
  "format": "parquet",
  "partition_by": ["event_date"],
  "repartition_by": ["event_date"],
  "num_partitions": 40
}

Write a Spark view to a path such as S3A:

{
  "source_view": "prepared_view",
  "path": "s3a://bucket/path/target_table",
  "mode": "overwrite",
  "format": "parquet",
  "partition_by": ["event_date"]
}

Hive bucketing is supported only with table / saveAsTable:

{
  "source_view": "prepared_view",
  "table": "analytics.bucketed_table",
  "mode": "overwrite",
  "format": "parquet",
  "bucket_by": ["entity_id"],
  "num_buckets": 32,
  "sort_by": ["entity_id"]
}

Logging

Every main component inherits from LogBase and exposes a class logger.

import logging
from trino_spark_adapter import DagRunner, DistributedTrinoSparkReader, SparkHiveWriter

DagRunner.logger().setLevel(logging.INFO)
DistributedTrinoSparkReader.logger().setLevel(logging.DEBUG)
SparkHiveWriter.logger().setLevel(logging.INFO)

The default formatter includes timestamp, class name and level:

[2026-01-05 09:15:12] [DagRunner] [INFO] START task type=trino_to_spark file=1.source.trino_to_spark.json
[2026-01-05 09:17:03] [DagRunner] [INFO] SUCCESS task type=trino_to_spark file=1.source.trino_to_spark.json elapsed=0:01:51

Debug-only expensive Spark actions should be guarded explicitly:

logger = DagRunner.logger()
if logger.isEnabledFor(logging.DEBUG):
    logger.debug("row_count=%s", df.count())

Publishing

The source archive includes .pypi.sh.

chmod +x .pypi.sh
./.pypi.sh

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

trino_spark_adapter-1.1.1.tar.gz (39.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

trino_spark_adapter-1.1.1-py3-none-any.whl (39.7 kB view details)

Uploaded Python 3

File details

Details for the file trino_spark_adapter-1.1.1.tar.gz.

File metadata

  • Download URL: trino_spark_adapter-1.1.1.tar.gz
  • Upload date:
  • Size: 39.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.10.19

File hashes

Hashes for trino_spark_adapter-1.1.1.tar.gz
Algorithm Hash digest
SHA256 dcf26f9c6e8d2efbad47e5b9a01a3768ef6bf39a5943b5a787894cb55f43aaed
MD5 87fcb816b7ac8b8fa1e674ab5c174e50
BLAKE2b-256 c798b6690c6128e09aff09125d33cee482856bb34794516dfa3107fa5792fbfb

See more details on using hashes here.

File details

Details for the file trino_spark_adapter-1.1.1-py3-none-any.whl.

File metadata

File hashes

Hashes for trino_spark_adapter-1.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 ab38ec641afcc7115dd3fce0974ab47107e84f0bd7fc6d7ada45bfe8ac8e5a12
MD5 a4f53255a7062a6fa39962f8b913b5d7
BLAKE2b-256 d8573d8be8e7f7288d527fe94034df700084fceaabf40f4ea3969677ec68a4c1

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page