Distributed Spark and Trino/Starburst adapter with SQL/JSON DAG execution and Spark writes to Trino, Iceberg and Hive.
Project description
trino-spark-adapter
trino-spark-adapter is a Python package for building hybrid Spark and Trino/Starburst data pipelines.
It provides:
- distributed Trino/Starburst reads executed from Spark executors;
- Spark writes to Trino through distributed
INSERTbatches; - Spark writes to Iceberg tables with Iceberg partition transforms and table properties;
- Spark writes to Hive-compatible tables or paths with Hive-style partitioning and bucketing;
- a DAG runner that executes
.sqland.jsonfiles in alphanumeric order; - AES-CBC helpers and Spark SQL UDF registration;
- class-based logging with one logger per component.
Installation
pip install trino-spark-adapter==1.1.1
Core concepts
A DAG is a folder containing SQL and JSON files. Files are executed in alphanumeric order. The suffix determines the execution mode.
| Suffix | Action |
|---|---|
.trino.sql |
Execute SQL statements on Trino/Starburst. |
.spark.sql |
Execute SQL statements on Spark. |
.trino_to_spark.json |
Load a Trino table or query into a Spark temporary view. |
.spark_to_trino.json |
Write a Spark view to Trino through distributed inserts. |
.spark_to_iceberg.json |
Write a Spark view to an Iceberg table with Spark Writer V2. |
.spark_to_hive.json |
Write a Spark view to a Hive-compatible table or path. |
Minimal DAG runner
from trino_spark_adapter import DagRunner, SparkAESHelper, TrinoConnectionConfig
spark = SparkAESHelper.get_spark(
app_name="trino_spark_adapter_job",
register_aes=True,
fail_if_missing_aes=False,
)
trino_config = TrinoConnectionConfig.from_env()
runner = DagRunner(
spark=spark,
trino_config=trino_config,
params={"calculation_date": "2026-01-05"},
reader_defaults={"fetch_size": 100_000, "num_ranges": 20, "num_partitions": 20},
)
results = runner.run_folder("dag")
Trino configuration
TrinoConnectionConfig.from_env() reads:
export TRINO_HOST="starburst.example.com"
export TRINO_USER="user"
export TRINO_PASSWORD="password"
export TRINO_ROLES="role"
export TRINO_VERIFY="false"
export TRINO_HTTP_SCHEME="https"
export TRINO_PORT="443"
Spark and AES helper
SparkAESHelper creates a Spark session from environment variables whose names start with spark. and optionally registers AES UDFs.
export spark.app.name="trino_spark_adapter_job"
export spark.sql.shuffle.partitions="200"
export aes_key_str="...base64..."
export aes_iv_str="...base64..."
from trino_spark_adapter import SparkAESHelper
spark = SparkAESHelper.get_spark(register_aes=True)
spark.sql("SELECT aes_encrypt('abc') AS encrypted_value").show()
Registered functions:
aes_encrypt(value)aes_decrypt(value)
Date parameters
DateUtils can be used to compute placeholders used in DAG files.
from trino_spark_adapter import DateUtils
weekday_dates = DateUtils.generate_weekday_dates_between_start_stop(
start_dt="2026-01-01",
stop_dt="2026-02-01",
weekday=0,
)
du = DateUtils(weekday_dates[0])
params = du.to_params()
params.update({"calculation_date": du.today_tiret[:10]})
Typical generated keys include today_tiret, today_slash, last_day_tiret, last_week_tiret, last_month_tiret, last_quarter_tiret, last_semester_tiret, and last_year_tiret.
.trino_to_spark.json
Load a complete table:
{
"table_fullname": "catalog.schema.source_table",
"target_view": "source_table"
}
Load a partitioned date range with distributed Trino queries:
{
"table_fullname": "catalog.schema.source_table",
"target_view": "source_view",
"colname": "event_date",
"coltype": "DATE",
"format": "%Y-%m-%d",
"rounding": "D",
"colname_start_value": "{start_date}",
"colname_stop_value": "{end_date}",
"num_ranges": 20
}
The runner creates or replaces the Spark temporary view named by target_view. When no view name is provided, it derives the view name from the file name. For example, 3.source.trino_to_spark.json creates source.
.spark_to_trino.json
Write a Spark view to a Trino table. The target table is created automatically when it does not exist, based on the Spark schema.
{
"source_view": "prepared_view",
"target_table": "catalog.schema.target_table",
"repartition_by": ["entity_id"],
"num_partitions": 40,
"sort_by": ["entity_id"]
}
.spark_to_iceberg.json
Write a Spark view to an Iceberg table through the configured Spark Iceberg catalog.
{
"source_view": "prepared_view",
"catalog": "iceberg_catalog",
"schema": "analytics",
"table": "target_table",
"mode": "append",
"partition_spec": [
{"transform": "day", "column": "event_ts"},
{"transform": "bucket", "column": "entity_id", "num_buckets": 32}
],
"distribution_mode": "hash",
"format_version": 2,
"file_format": "PARQUET",
"repartition_by": ["entity_id"],
"num_partitions": 200
}
Common modes are create, replace, append, and overwrite_partitions.
.spark_to_hive.json
Write a Spark view to a Hive table:
{
"source_view": "prepared_view",
"table": "analytics.target_table",
"mode": "overwrite",
"format": "parquet",
"partition_by": ["event_date"],
"repartition_by": ["event_date"],
"num_partitions": 40
}
Write a Spark view to a path such as S3A:
{
"source_view": "prepared_view",
"path": "s3a://bucket/path/target_table",
"mode": "overwrite",
"format": "parquet",
"partition_by": ["event_date"]
}
Hive bucketing is supported only with table / saveAsTable:
{
"source_view": "prepared_view",
"table": "analytics.bucketed_table",
"mode": "overwrite",
"format": "parquet",
"bucket_by": ["entity_id"],
"num_buckets": 32,
"sort_by": ["entity_id"]
}
Logging
Every main component inherits from LogBase and exposes a class logger.
import logging
from trino_spark_adapter import DagRunner, DistributedTrinoSparkReader, SparkHiveWriter
DagRunner.logger().setLevel(logging.INFO)
DistributedTrinoSparkReader.logger().setLevel(logging.DEBUG)
SparkHiveWriter.logger().setLevel(logging.INFO)
The default formatter includes timestamp, class name and level:
[2026-01-05 09:15:12] [DagRunner] [INFO] START task type=trino_to_spark file=1.source.trino_to_spark.json
[2026-01-05 09:17:03] [DagRunner] [INFO] SUCCESS task type=trino_to_spark file=1.source.trino_to_spark.json elapsed=0:01:51
Debug-only expensive Spark actions should be guarded explicitly:
logger = DagRunner.logger()
if logger.isEnabledFor(logging.DEBUG):
logger.debug("row_count=%s", df.count())
Publishing
The source archive includes .pypi.sh.
chmod +x .pypi.sh
./.pypi.sh
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file trino_spark_adapter-1.1.1.tar.gz.
File metadata
- Download URL: trino_spark_adapter-1.1.1.tar.gz
- Upload date:
- Size: 39.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.10.19
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
dcf26f9c6e8d2efbad47e5b9a01a3768ef6bf39a5943b5a787894cb55f43aaed
|
|
| MD5 |
87fcb816b7ac8b8fa1e674ab5c174e50
|
|
| BLAKE2b-256 |
c798b6690c6128e09aff09125d33cee482856bb34794516dfa3107fa5792fbfb
|
File details
Details for the file trino_spark_adapter-1.1.1-py3-none-any.whl.
File metadata
- Download URL: trino_spark_adapter-1.1.1-py3-none-any.whl
- Upload date:
- Size: 39.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.10.19
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ab38ec641afcc7115dd3fce0974ab47107e84f0bd7fc6d7ada45bfe8ac8e5a12
|
|
| MD5 |
a4f53255a7062a6fa39962f8b913b5d7
|
|
| BLAKE2b-256 |
d8573d8be8e7f7288d527fe94034df700084fceaabf40f4ea3969677ec68a4c1
|