deltalake-redis-lock
Project description
deltalake-redis-lock
A library creating an interface for a write lock for delta-rs.
Library Usage
When using this client, it can be used from multiple hosts. Below follow a minimal example to mimic this behaviour.
Redis Env Variables
Make sure to set these envs
before executing code.
REDIS_HOST=<host>
REDIS_PORT=<port>
REDIS_DB=<0>
Concurrent Write Example
# run.py
import logging
import os
from multiprocessing import Pool
from deltalake import DeltaTable
from pandas import DataFrame
from deltalake_redis_lock import write_redis_lock_deltalake, optimize_redis_lock_deltalake
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
def fake_worker(args) -> None:
df, table_name = args
table_path = f"{os.getcwd()}/{table_name}"
write_redis_lock_deltalake(
table_or_uri=table_path,
lock_table_name=table_name,
mode="append",
data=df,
overwrite_schema=True,
)
def define_datasets(_table_name: str) -> None:
df1 = DataFrame({"id": [1]})
df2 = DataFrame({"id": [2]})
df3 = DataFrame({"id": [3]})
df4 = DataFrame({"id": [4]})
datasets = [(df1, table_name), (df2, table_name), (df3, table_name), (df4, table_name)]
with Pool() as pool:
pool.map(fake_worker, datasets)
if __name__ == '__main__':
table_name = f"test_run"
table_path = f"{os.getcwd()}/{table_name}"
define_datasets(_table_name=table_name)
df = DeltaTable(table_uri=table_path).to_pandas().to_string()
logging.info(df)
2023-07-18 21:28:47 [INFO] Try to Acquire Redis Lock...
2023-07-18 21:28:47 [INFO] Try to Acquire Redis Lock...
2023-07-18 21:28:47 [INFO] Acquired lock, blocking: True
2023-07-18 21:28:47 [INFO] Acquired Redis Lock...
2023-07-18 21:28:47 [INFO] Lock acquired. Executing function...
2023-07-18 21:28:47 [INFO] Try to Acquire Redis Lock...
2023-07-18 21:28:47 [INFO] Try to Acquire Redis Lock...
2023-07-18 21:28:47 [INFO] Releasing lock... 2023-07-18T20:28:47.378630
2023-07-18 21:28:47 [INFO] Acquired lock, blocking: True
2023-07-18 21:28:47 [INFO] Acquired Redis Lock...
2023-07-18 21:28:47 [INFO] Lock acquired. Executing function...
2023-07-18 21:28:47 [INFO] Releasing lock... 2023-07-18T20:28:47.419373
2023-07-18 21:28:47 [INFO] Acquired lock, blocking: True
2023-07-18 21:28:47 [INFO] Acquired Redis Lock...
2023-07-18 21:28:47 [INFO] Lock acquired. Executing function...
2023-07-18 21:28:47 [INFO] Releasing lock... 2023-07-18T20:28:47.476411
2023-07-18 21:28:47 [INFO] Acquired lock, blocking: True
2023-07-18 21:28:47 [INFO] Acquired Redis Lock...
2023-07-18 21:28:47 [INFO] Lock acquired. Executing function...
2023-07-18 21:28:47 [INFO] Releasing lock... 2023-07-18T20:28:47.517992
id
0 1
1 3
2 2
3 4
Structure
test_run
├── 0-a2811af1-e9fa-4984-9824-3956acdbaba8-0.parquet
├── 1-87889b2d-1971-4e9b-8244-5e0d4a222458-0.parquet
├── 2-a2f0ac25-df02-43b7-945d-014db522b19f-0.parquet
├── 3-e57eae65-3cc7-4539-9eb6-b41ba52642bc-0.parquet
└── _delta_log
├── 00000000000000000000.json
├── 00000000000000000001.json
├── 00000000000000000002.json
└── 00000000000000000003.json
1 directory, 8 files
Concurrent Write With Optimize Example
# run.py
import logging
import os
from multiprocessing import Pool
from deltalake import DeltaTable
from pandas import DataFrame
from deltalake_redis_lock import write_redis_lock_deltalake, optimize_redis_lock_deltalake
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
def fake_worker(args) -> None:
df, table_name = args
table_path = f"{os.getcwd()}/{table_name}"
write_redis_lock_deltalake(
table_or_uri=table_path,
lock_table_name=table_name,
mode="append",
data=df,
overwrite_schema=True,
)
optimize_redis_lock_deltalake(
table_or_uri=table_path,
lock_table_name=table_name,
retention_hours=0,
dry_run=False,
enforce_retention_duration=False,
)
def define_datasets(_table_name: str) -> None:
df1 = DataFrame({"id": [1]})
df2 = DataFrame({"id": [2]})
df3 = DataFrame({"id": [3]})
df4 = DataFrame({"id": [4]})
datasets = [(df1, table_name), (df2, table_name), (df3, table_name), (df4, table_name)]
with Pool() as pool:
pool.map(fake_worker, datasets)
if __name__ == '__main__':
table_name = f"test_run"
table_path = f"{os.getcwd()}/{table_name}"
define_datasets(_table_name=table_name)
df = DeltaTable(table_uri=table_path).to_pandas().to_string()
logging.info(df)
Output
2023-07-18 21:26:42 [INFO] Try to Acquire Redis Lock...
2023-07-18 21:26:42 [INFO] Try to Acquire Redis Lock...
2023-07-18 21:26:42 [INFO] Acquired lock, blocking: True
2023-07-18 21:26:42 [INFO] Acquired Redis Lock...
2023-07-18 21:26:42 [INFO] Lock acquired. Executing function...
2023-07-18 21:26:42 [INFO] Try to Acquire Redis Lock...
2023-07-18 21:26:42 [INFO] Try to Acquire Redis Lock...
2023-07-18 21:26:42 [INFO] Releasing lock... 2023-07-18T20:26:42.681030
2023-07-18 21:26:42 [INFO] Try to Acquire Redis Lock...
2023-07-18 21:26:42 [INFO] Acquired lock, blocking: True
2023-07-18 21:26:42 [INFO] Acquired Redis Lock...
2023-07-18 21:26:42 [INFO] Lock acquired. Executing function...
2023-07-18 21:26:42 [INFO] Releasing lock... 2023-07-18T20:26:42.689819
2023-07-18 21:26:42 [INFO] Acquired lock, blocking: True
2023-07-18 21:26:42 [INFO] Acquired Redis Lock...
2023-07-18 21:26:42 [INFO] Lock acquired. Executing function...
2023-07-18 21:26:42 [INFO] Releasing lock... 2023-07-18T20:26:42.750781
2023-07-18 21:26:42 [INFO] Try to Acquire Redis Lock...
2023-07-18 21:26:42 [INFO] Acquired lock, blocking: True
2023-07-18 21:26:42 [INFO] Acquired Redis Lock...
2023-07-18 21:26:42 [INFO] Lock acquired. Executing function...
2023-07-18 21:26:42 [INFO] Releasing lock... 2023-07-18T20:26:42.760280
2023-07-18 21:26:42 [INFO] Acquired lock, blocking: True
2023-07-18 21:26:42 [INFO] Acquired Redis Lock...
2023-07-18 21:26:42 [INFO] Lock acquired. Executing function...
2023-07-18 21:26:42 [INFO] Releasing lock... 2023-07-18T20:26:42.866534
2023-07-18 21:26:42 [INFO] Try to Acquire Redis Lock...
2023-07-18 21:26:42 [INFO] Acquired lock, blocking: True
2023-07-18 21:26:42 [INFO] Acquired Redis Lock...
2023-07-18 21:26:42 [INFO] Lock acquired. Executing function...
2023-07-18 21:26:42 [INFO] Releasing lock... 2023-07-18T20:26:42.882519
2023-07-18 21:26:42 [INFO] Acquired lock, blocking: True
2023-07-18 21:26:42 [INFO] Acquired Redis Lock...
2023-07-18 21:26:42 [INFO] Lock acquired. Executing function...
2023-07-18 21:26:42 [INFO] Releasing lock... 2023-07-18T20:26:42.985008
2023-07-18 21:26:42 [INFO] Try to Acquire Redis Lock...
2023-07-18 21:26:42 [INFO] Acquired lock, blocking: True
2023-07-18 21:26:42 [INFO] Acquired Redis Lock...
2023-07-18 21:26:42 [INFO] Lock acquired. Executing function...
2023-07-18 21:26:43 [INFO] Releasing lock... 2023-07-18T20:26:43.000558
id
0 4
1 3
2 1
3 2
Structure
test_run
└── _delta_log
│ ├── 00000000000000000000.json
│ ├── 00000000000000000001.json
│ ├── 00000000000000000002.json
│ ├── 00000000000000000003.json
│ ├── 00000000000000000004.json
│ ├── 00000000000000000005.json
│ └── 00000000000000000006.json
└──part-00001-a13ca1fe-0a52-44c2-b2ce-b7eb95704536-c000.zstd.parquet
1 directory, 8 files
This can be executed with something like:
seq 2 | xargs -I{} -P 2 poetry run python run.py
Setup From Scratch
Requirement
- ^python3.9
- poetry 1.1.13
- make (GNU Make 3.81)
Setup
make setup-environment
Update package
make update
Test
export PYTHONPATH="${PYTHONPATH}:src"
make test type=unit
Docker
The reason docker
is used in the source code here, is to be able to build up an encapsulated
environment of the codebase, and do unit/integration and load tests
.
make build-container-image DOCKER_BUILD="buildx build --platform linux/amd64" CONTEXT=.
make run-container-tests type=unit
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file deltalake-redis-lock-0.0.1a11.tar.gz
.
File metadata
- Download URL: deltalake-redis-lock-0.0.1a11.tar.gz
- Upload date:
- Size: 7.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.1.13 CPython/3.9.2 Darwin/19.6.0
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 06d505b46df0e1baf38897d8ae0c5538f4bc8cbed70b359fec7ce6a6ace49d2e |
|
MD5 | d3e1e5c8b50b485dcf63c597736c0538 |
|
BLAKE2b-256 | abf7350bdd279a2ef687adfd767790d24be47341c884a589f95caaf01dc10f96 |
File details
Details for the file deltalake_redis_lock-0.0.1a11-py3-none-any.whl
.
File metadata
- Download URL: deltalake_redis_lock-0.0.1a11-py3-none-any.whl
- Upload date:
- Size: 7.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.1.13 CPython/3.9.2 Darwin/19.6.0
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | bab19888179ae1827a582016fefb4456a6e747c9b0dc768c963407034c3c0532 |
|
MD5 | 7c306b00c8347dccf5022cd7065a4893 |
|
BLAKE2b-256 | 87b48a448035b695c3016fd9bf4e07b3c7d6742017c88715cb756248424ea4dd |