Skip to main content

Type-friendly utilities for moving data between Python objects, Arrow, Polars, Pandas, Spark, and Databricks

Project description

ygg — Yggdrasil for Python

Schema-aware data interchange for Python teams that move data between Python types, Arrow, Polars, pandas, Spark, and Databricks. One conversion registry, one schema contract, optional dependencies.

pip install ygg

Why pick this up

  • Stop hand-writing brittle casting code between app models, dataframes, and warehouse schemas.
  • Treat Arrow schema as the contract surface so every tool agrees on field names, nullability, and metadata.
  • Use one conversion registry instead of separate utilities per engine.
  • Install only what you need beyond the core. Most integrations are optional extras.

Install with the right extras

pip install ygg                   # core: pyarrow + polars + yggrs
pip install "ygg[data]"           # pandas + numpy + sqlglot
pip install "ygg[bigdata]"        # pyspark + delta-spark
pip install "ygg[delta]"          # deltalake
pip install "ygg[databricks]"     # databricks-sdk
pip install "ygg[api]"            # fastapi + uvicorn + pydantic
pip install "ygg[http]"           # urllib3 + xxhash
pip install "ygg[pickle]"         # cloudpickle + dill + zstandard + xxhash + blake3
pip install "ygg[mongo]"          # mongoengine
pip install "ygg[postgres]"       # psycopg + adbc-driver-postgresql
pip install "ygg[kafka]"          # confluent-kafka
pip install "ygg[dev]"            # everything for local development

Editable dev install:

cd python
uv venv .venv && source .venv/bin/activate
uv pip install -e .[dev]

Progressive examples

1. Cast scalars

from yggdrasil.data.cast.registry import convert

convert("42", int)              # 42
convert("3.14", float)          # 3.14
convert("yes", bool)            # True
convert("2024-06-01", "date")   # datetime.date(2024, 6, 1)

2. Dict → typed dataclass

from dataclasses import dataclass
from yggdrasil.data.cast.registry import convert

@dataclass
class User:
    id: int
    email: str
    active: bool = True

convert({"id": "7", "email": "ada@example.com", "active": "false"}, User)
# User(id=7, email='ada@example.com', active=False)

3. Register a custom converter

from decimal import Decimal
from yggdrasil.data.cast.registry import convert, register_converter

@register_converter(str, Decimal)
def _str_to_decimal(value: str, options=None) -> Decimal:
    return Decimal(value.replace(",", "."))

convert("19,95", Decimal)   # Decimal('19.95')

4. Infer Arrow fields from Python type hints

import yggdrasil.arrow as pa
from yggdrasil.arrow import arrow_field_from_hint

pa.schema([
    arrow_field_from_hint(int,                 name="id"),
    arrow_field_from_hint(list[str],           name="tags"),
    arrow_field_from_hint(dict[str, float],    name="metrics"),
])

5. Cast an Arrow table to a target schema

import yggdrasil.arrow as pa
from yggdrasil.arrow.cast import cast_arrow_tabular
from yggdrasil.data.cast.options import CastOptions

raw = pa.table({"id": ["1", "2"], "score": ["9.1", "8.7"]})
target = pa.schema([
    pa.field("id",    pa.int64(),   nullable=False),
    pa.field("score", pa.float64(), nullable=False),
])

out = cast_arrow_tabular(raw, CastOptions(target_field=target, strict_match_names=True))
print(out.schema)

6. Convert across engines (Polars / pandas / Spark)

Always import optional engines through their lib.py guard:

from yggdrasil.polars.lib import polars
from yggdrasil.pandas.lib import pandas

Polars cast:

import yggdrasil.arrow as pa
from yggdrasil.data.cast.options import CastOptions
from yggdrasil.polars.cast import cast_polars_dataframe
from yggdrasil.polars.lib import polars

df = polars.DataFrame({"id": ["1", "2"], "value": ["4.2", "5.1"]})
target = pa.schema([pa.field("id", pa.int64()), pa.field("value", pa.float64())])
out = cast_polars_dataframe(df, CastOptions(target_field=target))

Arrow ↔ Polars round-trip:

from yggdrasil.polars.cast import (
    arrow_table_to_polars_dataframe,
    polars_dataframe_to_arrow_table,
)

pl_df = arrow_table_to_polars_dataframe(arrow_table)
roundtrip = polars_dataframe_to_arrow_table(pl_df)

7. Dataclass → Arrow struct field

from dataclasses import dataclass
from yggdrasil.dataclasses import dataclass_to_arrow_field

@dataclass
class Position:
    symbol: str
    quantity: float

field = dataclass_to_arrow_field(Position)
print(field)

8. HTTP: simple to advanced

from yggdrasil.io.http_ import HTTPSession

http = HTTPSession()
print(http.get("https://httpbin.org/get").json())
print(http.post("https://httpbin.org/post", json={"name": "alice"}).status)

Prepared request + send:

req = http.prepare_request("POST", "https://httpbin.org/post",
                           json={"event": "order_created", "id": 123})
resp = http.send(req)
print(resp.status, resp.json()["json"])

Parallel batch dispatch:

from yggdrasil.io import SendManyConfig

reqs = [http.prepare_request("GET", "https://httpbin.org/get", params={"page": i})
        for i in range(1, 11)]
responses = list(http.send_many(reqs, send_config=SendManyConfig(max_workers=5)))
print([r.status for r in responses])

Tabular response → engine of your choice:

resp = http.get("https://api.example.com/v1/orders?format=arrow")
table  = resp.to_arrow_table()
pdf    = resp.to_pandas()
plf    = resp.to_polars()

9. Buffers and URLs

from yggdrasil.io import BytesIO, URL

with BytesIO() as buf:           # spill-to-disk byte buffer with media detection
    buf.write(b"hello")
    buf.seek(0)
    print(buf.media_type, buf.compression)

u = URL.from_str("https://example.com/a/b?q=1")
print(u.host, u.path)
print(u.with_query_items({"q": 2, "lang": "en"}).to_string())

10. Databricks SQL: read/write across formats

from yggdrasil.databricks import DatabricksClient

c = DatabricksClient(host="https://<workspace>", token="<token>")

c.sql.execute("""
CREATE TABLE IF NOT EXISTS main.default.demo (id BIGINT, name STRING) USING DELTA
""")
c.sql.insert_into("main.default.demo",
                  [{"id": 1, "name": "alice"}, {"id": 2, "name": "bob"}])

stmt = c.sql.execute("SELECT * FROM main.default.demo ORDER BY id")
print(stmt.to_arrow_table())
print(stmt.to_pandas())
print(stmt.to_polars())

DatabricksClient also covers Unity Catalog (c.catalogs["main"]["default"]["orders"]), Compute (c.compute.clusters.all_purpose_cluster(...)), DBFS/Volumes (c.dbfs_path("/Volumes/...").write_text(...)), Secrets (c.secrets["scope/key"] = "..."), IAM, and Genie. See docs/guides/databricks.md.

11. Typed Databricks job widgets

from dataclasses import dataclass
from yggdrasil.databricks.jobs import NotebookConfig

@dataclass
class IngestConfig(NotebookConfig):
    catalog: str = "main"
    schema: str = "ingest"
    table: str = "events"
    dry_run: bool = False

cfg = IngestConfig.from_environment()   # in a job run
# cfg = IngestConfig.init_widgets()     # in a local notebook

12. Retries, parallelism, jobs

from yggdrasil.pyutils import retry, parallelize
from yggdrasil.concurrent import Job, JobPoolExecutor

@retry(tries=3, delay=0.2, backoff=2)
def flaky(x: int) -> int:
    return x

@parallelize(max_workers=4)
def square(x: int) -> int:
    return x * x

list(square(range(6)))   # [0, 1, 4, 9, 16, 25]

# Bounded streaming jobs
jobs = [Job.make(lambda x=x: x * x) for x in range(20)]
with JobPoolExecutor(max_workers=4, max_in_flight=8) as pool:
    for result in pool.as_completed(jobs):
        print(result.value)

13. Reuse CastOptions.check

from yggdrasil.data.cast.options import CastOptions

def normalize_options(options=None, *, target_field=None) -> CastOptions:
    return CastOptions.check(options, target_field=target_field, strict_match_names=True)

Modules at a glance

Module Purpose
yggdrasil.data Cast registry, CastOptions, DataType, Field/Schema, DataTable, normalized enums
yggdrasil.arrow Arrow type inference, casting helpers (cast_arrow_tabular, cast_arrow_record_batch_reader)
yggdrasil.dataclasses dataclass_to_arrow_field, WaitingConfig, Expiring, ExpiringDict
yggdrasil.polars / yggdrasil.pandas / yggdrasil.spark Engine bridges (cast.py, lib.py, tests.py TestCase bases)
yggdrasil.io BytesIO, URL, SendConfig/SendManyConfig, codecs, media types
yggdrasil.io.http_ HTTPSession (preferred), PreparedRequest, Response
yggdrasil.requests Legacy retry-only YGGSession + MSAL variant
yggdrasil.databricks DatabricksClient + sql/compute/workspaces/fs/iam/secrets/jobs/account/ai.genie
yggdrasil.fastapi FastAPI service powering the Power Query connector
yggdrasil.pyutils / yggdrasil.concurrent retry, parallelize, Job, JobPoolExecutor
yggdrasil.pickle / blake3 / xxhash Optional serialization + hashing
yggdrasil.mongo / mongoengine Mongo helpers
yggdrasil.fxrates FX-rate helpers
yggdrasil.rs Bridge to native yggrs kernels (with pure-Python fallback)

For per-module pages, see docs/modules/ and the navigable docs site.


Testing

Tests that touch a dataframe or Arrow object subclass the matching engine TestCase from yggdrasil.<engine>.tests:

from yggdrasil.arrow.tests import ArrowTestCase

class TestX(ArrowTestCase):
    def test_table(self):
        t = self.table({"id": [1, 2]})
        self.assertSchemaEqual(t.schema, self.pa.schema([self.pa.field("id", self.pa.int64())]))

This handles optional-dependency skipping, per-test tmp dirs, Arrow interop, and frame/schema assertions.

pytest                                                   # full suite
pytest tests/test_yggdrasil/test_data/                   # one area
pytest tests/test_yggdrasil/test_data/test_registry.py   # one file
ruff check
black .

pytest-asyncio is in strict mode — async tests must use the explicit marker. The integration marker is skipped unless DATABRICKS_HOST is set.


Documentation locally

cd python
mkdocs serve     # http://127.0.0.1:8000
mkdocs build     # static site → python/site/

The published site is deployed by .github/workflows/docs.yml on every push to main that touches python/docs/**, python/src/**, mkdocs.yml, or the workflow itself.


License

Apache-2.0.

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ygg-0.7.45.tar.gz (939.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

ygg-0.7.45-py3-none-any.whl (1.1 MB view details)

Uploaded Python 3

File details

Details for the file ygg-0.7.45.tar.gz.

File metadata

  • Download URL: ygg-0.7.45.tar.gz
  • Upload date:
  • Size: 939.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.11 {"installer":{"name":"uv","version":"0.11.11","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for ygg-0.7.45.tar.gz
Algorithm Hash digest
SHA256 f997110e843fbe4d1f7b17718252eeb9e4647e9683e101f8cbb58934a4633cf7
MD5 ff1e176030f2cff53e11df278b54c91f
BLAKE2b-256 8dafd5e63ca359a32d4ef053eb74e6de9272e3518144a4e68577e82b698c8bb8

See more details on using hashes here.

File details

Details for the file ygg-0.7.45-py3-none-any.whl.

File metadata

  • Download URL: ygg-0.7.45-py3-none-any.whl
  • Upload date:
  • Size: 1.1 MB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.11 {"installer":{"name":"uv","version":"0.11.11","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for ygg-0.7.45-py3-none-any.whl
Algorithm Hash digest
SHA256 4a3741e9f52e29e190903515cd9ffd62f4429df9c364e5d60f006f5eb6fce828
MD5 8404ac9b8a087bbb2d5f6ba9aaef49af
BLAKE2b-256 d9f6ee1510329e7fa638a28cd6fcdc0091e8e3454e15b67339f9703ebb95cc9d

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page