Skip to main content

deltalake-redis-lock

Project description

deltalake-redis-lock

example workflow example workflow

A library creating an interface for a write lock for delta-rs.

Library Usage

When using this client, it can be used from multiple hosts. Below follow a minimal example to mimic this behaviour.

Redis Env Variables

Make sure to set these envs before executing code.

REDIS_HOST=<host>
REDIS_PORT=<port>
REDIS_DB=<0>

Concurrent Write Example

# run.py
import logging
import os
from multiprocessing import Pool

from deltalake import DeltaTable
from pandas import DataFrame

from deltalake_redis_lock import write_redis_lock_deltalake, optimize_redis_lock_deltalake

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s [%(levelname)s] %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)

def fake_worker(args) -> None:
    df, table_name = args
    table_path = f"{os.getcwd()}/{table_name}"

    write_redis_lock_deltalake(
        table_or_uri=table_path,
        lock_table_name=table_name,
        mode="append",
        data=df,
        overwrite_schema=True,
    )


def define_datasets(_table_name: str) -> None:
    df1 = DataFrame({"id": [1]})
    df2 = DataFrame({"id": [2]})
    df3 = DataFrame({"id": [3]})
    df4 = DataFrame({"id": [4]})

    datasets = [(df1, table_name), (df2, table_name), (df3, table_name), (df4, table_name)]

    with Pool() as pool:
        pool.map(fake_worker, datasets)


if __name__ == '__main__':
    table_name = f"test_run"
    table_path = f"{os.getcwd()}/{table_name}"

    define_datasets(_table_name=table_name)

    df = DeltaTable(table_uri=table_path).to_pandas().to_string()
    logging.info(df)
2023-07-18 21:28:47 [INFO] Try to Acquire Redis Lock...
2023-07-18 21:28:47 [INFO] Try to Acquire Redis Lock...
2023-07-18 21:28:47 [INFO] Acquired lock, blocking: True
2023-07-18 21:28:47 [INFO] Acquired Redis Lock...
2023-07-18 21:28:47 [INFO] Lock acquired. Executing function...
2023-07-18 21:28:47 [INFO] Try to Acquire Redis Lock...
2023-07-18 21:28:47 [INFO] Try to Acquire Redis Lock...
2023-07-18 21:28:47 [INFO] Releasing lock... 2023-07-18T20:28:47.378630
2023-07-18 21:28:47 [INFO] Acquired lock, blocking: True
2023-07-18 21:28:47 [INFO] Acquired Redis Lock...
2023-07-18 21:28:47 [INFO] Lock acquired. Executing function...
2023-07-18 21:28:47 [INFO] Releasing lock... 2023-07-18T20:28:47.419373
2023-07-18 21:28:47 [INFO] Acquired lock, blocking: True
2023-07-18 21:28:47 [INFO] Acquired Redis Lock...
2023-07-18 21:28:47 [INFO] Lock acquired. Executing function...
2023-07-18 21:28:47 [INFO] Releasing lock... 2023-07-18T20:28:47.476411
2023-07-18 21:28:47 [INFO] Acquired lock, blocking: True
2023-07-18 21:28:47 [INFO] Acquired Redis Lock...
2023-07-18 21:28:47 [INFO] Lock acquired. Executing function...
2023-07-18 21:28:47 [INFO] Releasing lock... 2023-07-18T20:28:47.517992
   id
0   1
1   3
2   2
3   4

Structure

test_run
├── 0-a2811af1-e9fa-4984-9824-3956acdbaba8-0.parquet
├── 1-87889b2d-1971-4e9b-8244-5e0d4a222458-0.parquet
├── 2-a2f0ac25-df02-43b7-945d-014db522b19f-0.parquet
├── 3-e57eae65-3cc7-4539-9eb6-b41ba52642bc-0.parquet
└── _delta_log
    ├── 00000000000000000000.json
    ├── 00000000000000000001.json
    ├── 00000000000000000002.json
    └── 00000000000000000003.json

1 directory, 8 files

Concurrent Write With Optimize Example

# run.py
import logging
import os
from multiprocessing import Pool

from deltalake import DeltaTable
from pandas import DataFrame

from deltalake_redis_lock import write_redis_lock_deltalake, optimize_redis_lock_deltalake

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s [%(levelname)s] %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)

def fake_worker(args) -> None:
    df, table_name = args
    table_path = f"{os.getcwd()}/{table_name}"

    write_redis_lock_deltalake(
        table_or_uri=table_path,
        lock_table_name=table_name,
        mode="append",
        data=df,
        overwrite_schema=True,
    )

    optimize_redis_lock_deltalake(
        table_or_uri=table_path,
        lock_table_name=table_name,
        retention_hours=0,
        dry_run=False,
        enforce_retention_duration=False,
    )


def define_datasets(_table_name: str) -> None:
    df1 = DataFrame({"id": [1]})
    df2 = DataFrame({"id": [2]})
    df3 = DataFrame({"id": [3]})
    df4 = DataFrame({"id": [4]})

    datasets = [(df1, table_name), (df2, table_name), (df3, table_name), (df4, table_name)]

    with Pool() as pool:
        pool.map(fake_worker, datasets)


if __name__ == '__main__':
    table_name = f"test_run"
    table_path = f"{os.getcwd()}/{table_name}"

    define_datasets(_table_name=table_name)

    df = DeltaTable(table_uri=table_path).to_pandas().to_string()
    logging.info(df)

Output

2023-07-18 21:26:42 [INFO] Try to Acquire Redis Lock...
2023-07-18 21:26:42 [INFO] Try to Acquire Redis Lock...
2023-07-18 21:26:42 [INFO] Acquired lock, blocking: True
2023-07-18 21:26:42 [INFO] Acquired Redis Lock...
2023-07-18 21:26:42 [INFO] Lock acquired. Executing function...
2023-07-18 21:26:42 [INFO] Try to Acquire Redis Lock...
2023-07-18 21:26:42 [INFO] Try to Acquire Redis Lock...
2023-07-18 21:26:42 [INFO] Releasing lock... 2023-07-18T20:26:42.681030
2023-07-18 21:26:42 [INFO] Try to Acquire Redis Lock...
2023-07-18 21:26:42 [INFO] Acquired lock, blocking: True
2023-07-18 21:26:42 [INFO] Acquired Redis Lock...
2023-07-18 21:26:42 [INFO] Lock acquired. Executing function...
2023-07-18 21:26:42 [INFO] Releasing lock... 2023-07-18T20:26:42.689819
2023-07-18 21:26:42 [INFO] Acquired lock, blocking: True
2023-07-18 21:26:42 [INFO] Acquired Redis Lock...
2023-07-18 21:26:42 [INFO] Lock acquired. Executing function...
2023-07-18 21:26:42 [INFO] Releasing lock... 2023-07-18T20:26:42.750781
2023-07-18 21:26:42 [INFO] Try to Acquire Redis Lock...
2023-07-18 21:26:42 [INFO] Acquired lock, blocking: True
2023-07-18 21:26:42 [INFO] Acquired Redis Lock...
2023-07-18 21:26:42 [INFO] Lock acquired. Executing function...
2023-07-18 21:26:42 [INFO] Releasing lock... 2023-07-18T20:26:42.760280
2023-07-18 21:26:42 [INFO] Acquired lock, blocking: True
2023-07-18 21:26:42 [INFO] Acquired Redis Lock...
2023-07-18 21:26:42 [INFO] Lock acquired. Executing function...
2023-07-18 21:26:42 [INFO] Releasing lock... 2023-07-18T20:26:42.866534
2023-07-18 21:26:42 [INFO] Try to Acquire Redis Lock...
2023-07-18 21:26:42 [INFO] Acquired lock, blocking: True
2023-07-18 21:26:42 [INFO] Acquired Redis Lock...
2023-07-18 21:26:42 [INFO] Lock acquired. Executing function...
2023-07-18 21:26:42 [INFO] Releasing lock... 2023-07-18T20:26:42.882519
2023-07-18 21:26:42 [INFO] Acquired lock, blocking: True
2023-07-18 21:26:42 [INFO] Acquired Redis Lock...
2023-07-18 21:26:42 [INFO] Lock acquired. Executing function...
2023-07-18 21:26:42 [INFO] Releasing lock... 2023-07-18T20:26:42.985008
2023-07-18 21:26:42 [INFO] Try to Acquire Redis Lock...
2023-07-18 21:26:42 [INFO] Acquired lock, blocking: True
2023-07-18 21:26:42 [INFO] Acquired Redis Lock...
2023-07-18 21:26:42 [INFO] Lock acquired. Executing function...
2023-07-18 21:26:43 [INFO] Releasing lock... 2023-07-18T20:26:43.000558
   id
0   4
1   3
2   1
3   2

Structure

test_run
└── _delta_log
│   ├── 00000000000000000000.json
│   ├── 00000000000000000001.json
│   ├── 00000000000000000002.json
│   ├── 00000000000000000003.json
│   ├── 00000000000000000004.json
│   ├── 00000000000000000005.json
│   └── 00000000000000000006.json
└──part-00001-a13ca1fe-0a52-44c2-b2ce-b7eb95704536-c000.zstd.parquet

1 directory, 8 files

This can be executed with something like:

seq 2 | xargs -I{} -P 2 poetry run python run.py

Setup From Scratch

Requirement

  • ^python3.9
  • poetry 1.1.13
  • make (GNU Make 3.81)

Setup

make setup-environment

Update package

make update

Test

export PYTHONPATH="${PYTHONPATH}:src"
make test type=unit

Docker

The reason docker is used in the source code here, is to be able to build up an encapsulated environment of the codebase, and do unit/integration and load tests.

make build-container-image DOCKER_BUILD="buildx build --platform linux/amd64" CONTEXT=.
make run-container-tests type=unit

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

deltalake-redis-lock-0.0.1a11.tar.gz (7.5 kB view details)

Uploaded Source

Built Distribution

deltalake_redis_lock-0.0.1a11-py3-none-any.whl (7.2 kB view details)

Uploaded Python 3

File details

Details for the file deltalake-redis-lock-0.0.1a11.tar.gz.

File metadata

  • Download URL: deltalake-redis-lock-0.0.1a11.tar.gz
  • Upload date:
  • Size: 7.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.1.13 CPython/3.9.2 Darwin/19.6.0

File hashes

Hashes for deltalake-redis-lock-0.0.1a11.tar.gz
Algorithm Hash digest
SHA256 06d505b46df0e1baf38897d8ae0c5538f4bc8cbed70b359fec7ce6a6ace49d2e
MD5 d3e1e5c8b50b485dcf63c597736c0538
BLAKE2b-256 abf7350bdd279a2ef687adfd767790d24be47341c884a589f95caaf01dc10f96

See more details on using hashes here.

File details

Details for the file deltalake_redis_lock-0.0.1a11-py3-none-any.whl.

File metadata

File hashes

Hashes for deltalake_redis_lock-0.0.1a11-py3-none-any.whl
Algorithm Hash digest
SHA256 bab19888179ae1827a582016fefb4456a6e747c9b0dc768c963407034c3c0532
MD5 7c306b00c8347dccf5022cd7065a4893
BLAKE2b-256 87b48a448035b695c3016fd9bf4e07b3c7d6742017c88715cb756248424ea4dd

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page