Automatic DataHub lineage collector via SQLAlchemy and boto3 event hooks
Project description
aileron-meta-collector
DataHub 기반 메타데이터 자동 수집 라이브러리입니다.
Python 마이크로서비스에서 SQLAlchemy / boto3 이벤트 훅을 통해 소스코드 변경을 최소화하면서 데이터 lineage를 자동으로 수집합니다.
목적
사내 데이터 플랫폼에 DataHub를 도입할 때, 각 마이크로서비스(MS)에 acryl-datahub SDK를 직접 심으면 다음 문제가 발생합니다.
- MS마다 반복적인 emit 코드 작성
- DataHub 스펙 변경 시 모든 MS 수정 필요
- 비즈니스 로직과 메타데이터 수집 로직의 혼재
aileron-meta-collector는 이 문제를 라이브러리 계층에서 흡수합니다.
MS 개발자는 의존성 추가 후 함수에 데코레이터 한 줄만 추가하면 lineage 수집이 완료됩니다.
아키텍처
┌─────────────────────────────────────────────────────────────┐
│ Python Microservice │
│ │
│ @datahub_job_fn("process-orders", flow="etl-pipeline") │
│ def process_orders(): │
│ df = pd.read_sql("SELECT ...", engine) ─┐ │
│ resp = s3.get_object(Bucket="..", Key="..") ─┤ │
│ qid = athena.start_query_execution(...) ─┤ │
│ df.to_sql("output_table", engine) ─┘ │
│ │
└──────────────────────┬──────────────────────────────────────┘
│ I/O 발생 시 자동 감지
▼
┌─────────────────────────────────────────────────────────────┐
│ aileron-meta-collector │
│ │
│ ┌──────────────────┐ ┌───────────────┐ ┌─────────────┐ │
│ │ SQLAlchemy Hook │ │ S3 Hook │ │ Athena Hook │ │
│ │ │ │ │ │ │ │
│ │ after_cursor_ │ │ before-param │ │ before-param│ │
│ │ execute 이벤트 │ │ -build.s3.* │ │ -build / │ │
│ └────────┬─────────┘ └──────┬────────┘ │ after-call │ │
│ │ │ └──────┬──────┘ │
│ └─────────┬─────────┘ │ │
│ │ ┌───────────────────┘ │
│ ▼ ▼ │
│ ┌──────────────────────┐ │
│ │ SQL Parser │ │
│ │ SELECT → input │ │
│ │ INSERT → output │ │
│ │ CTAS → output │ │
│ │ VIEW → output │ │
│ │ UNLOAD → S3 output │ │
│ └──────────┬───────────┘ │
│ ▼ │
│ ┌──────────────────────┐ │
│ │ JobContext │ (Thread-local) │
│ │ inputs[] │ │
│ │ outputs[] │ │
│ │ upstream_job_ids[] │ │
│ └──────────┬───────────┘ │
│ ▼ │
│ ┌──────────────────────┐ │
│ │ Async Emitter │ (ThreadPoolExecutor) │
│ └──────────┬───────────┘ │
└───────────────────────┼─────────────────────────────────────┘
│ REST (비동기)
▼
┌────────────────────────┐
│ DataHub GMS │
└────────────────────────┘
핵심 설계 원칙
| 원칙 | 구현 방식 |
|---|---|
| MS 침투 최소화 | 함수 데코레이터 한 줄 — 비즈니스 로직 변경 없음 |
| 멀티스레드 안전 | threading.local() 로 job context 스레드 격리 |
| 성능 영향 없음 | ThreadPoolExecutor 비동기 emit — 메인 스레드 블로킹 없음 |
| 장애 전파 방지 | emit 실패 시 logger.warning 흡수 — MS 정상 동작 유지 |
| 플랫폼 자동 추론 | DB URL에서 postgres/mysql/redshift 등 자동 감지 |
프로젝트 구조
aileron-meta-collector/
├── pyproject.toml
├── aileron_meta_collector/
│ ├── __init__.py # 공개 API (install_all_hooks, datahub_job_fn, propagate_job 등)
│ ├── config.py # 환경변수 기반 설정
│ ├── context.py # Thread-local JobContext, 데코레이터, propagate_job
│ ├── emitter.py # 비동기 DataHub REST emitter
│ ├── hooks/
│ │ ├── sqlalchemy.py # SQLAlchemy Engine 전역 훅
│ │ └── boto3.py # boto3 S3 / Athena 이벤트 훅
│ └── parsers/
│ └── sql_parser.py # SQL → (input_tables, output_tables)
└── tests/
├── test_pipeline.py # E-commerce 파이프라인 통합 테스트 (실제 DataHub 전송)
├── test_manual_lineage.py # 수동 lineage API 테스트
├── parsers/
│ └── test_sql_parser.py # SQL 파서 단위 테스트
└── hooks/
└── test_boto3_hooks.py # boto3 훅 단위 테스트
주요 기능
1. SQLAlchemy 자동 lineage 수집
모든 SQLAlchemy Engine에 after_cursor_execute 이벤트 리스너를 전역 등록합니다.
쿼리 실행 시점에 SQL을 파싱하여 입출력 테이블을 자동으로 DataHub에 등록합니다.
지원 SQL 패턴
| SQL 패턴 | input | output |
|---|---|---|
SELECT ... FROM table |
table | — |
SELECT ... FROM a JOIN b |
a, b | — |
INSERT INTO out SELECT ... FROM in |
in | out |
UPDATE table SET ... |
— | table |
CREATE TABLE out AS SELECT ... FROM in (CTAS) |
in | out |
CREATE VIEW v AS SELECT ... FROM in |
in | v |
CREATE OR REPLACE VIEW v AS SELECT ... FROM in |
in | v |
지원 DB 플랫폼
postgresql, mysql, redshift, snowflake, bigquery, mssql, sqlite
→ DB URL에서 자동 추론하여 DataHub platform URN 생성
2. boto3 S3 자동 lineage 수집
boto3 default session의 이벤트 시스템을 통해 S3 작업을 자동 감지합니다.
파일 단위가 아닌 디렉토리(prefix) 단위로 lineage를 추적합니다.
| S3 작업 | 방향 |
|---|---|
GetObject, HeadObject, ListObjects*, SelectObjectContent |
input |
PutObject, CompleteMultipartUpload |
output |
CopyObject |
source → input, destination → output |
DeleteObject |
output |
3. boto3 Athena 자동 lineage 수집
StartQueryExecution / GetQueryExecution 이벤트를 조합하여
쿼리가 SUCCEEDED 상태가 된 시점에만 lineage를 emit합니다.
Athena 훅 동작 흐름
before-parameter-build.StartQueryExecution → SQL + JobContext → thread-local 저장
after-call.StartQueryExecution → execution_id 확보 → _pending_athena에 이관
after-call.GetQueryExecution → SUCCEEDED 시 emit / FAILED·CANCELLED 시 정리
참고:
before-parameter-build이벤트를 사용합니다. botocore 1.40+ 에서before-call시점에는 요청이 이미 HTTP 직렬화된 이후라 원본QueryString접근이 불가합니다.
수집 가능한 Athena 쿼리 유형
| 쿼리 유형 | 예시 | input | output |
|---|---|---|---|
| SELECT | SELECT * FROM orders |
Glue 테이블 | — |
| SELECT + JOIN | SELECT ... FROM a JOIN b ON ... |
Glue 테이블 복수 | — |
| CTAS | CREATE TABLE summary AS SELECT ... FROM orders |
Glue 테이블 | Glue 테이블 |
| INSERT INTO | INSERT INTO out SELECT ... FROM in |
Glue 테이블 | Glue 테이블 |
| CREATE VIEW | CREATE VIEW v AS SELECT ... FROM orders |
Glue 테이블 | Glue VIEW |
| CREATE OR REPLACE VIEW | CREATE OR REPLACE VIEW v AS SELECT ... FROM orders |
Glue 테이블 | Glue VIEW |
| UNLOAD | UNLOAD (SELECT ... FROM orders) TO 's3://...' |
Glue 테이블 | S3 prefix |
수집 불가능한 경우
| 케이스 | 이유 |
|---|---|
GetQueryExecution 미호출 (fire-and-forget) |
완료 훅이 트리거되지 않음 |
| Athena Federated Query (외부 데이터소스) | SQL 파서가 외부 커넥터 테이블명 미인식 |
| 복잡한 CTE 중첩 | SQL 파서의 파싱 한계 |
4. DataFlow / DataJob / DataProcessInstance 자동 등록
@datahub_job_fn 데코레이터 진입/종료 시점에 파이프라인 실행 이력을 자동으로 등록합니다.
함수 진입 → DataFlow upsert (파이프라인 단위)
→ DataJob upsert (태스크 단위)
→ DataProcessInstance STARTED emit
실행 중 → I/O 훅이 inputs / outputs 누적
함수 종료 → DataProcessInstance COMPLETE / FAILED emit
→ DataJob inlet/outlet 최신 상태 업데이트
→ DataJob 간 의존 관계(inputDatajobs) 업데이트
DataHub에 등록되는 엔티티
| 엔티티 | URN 형식 | 설명 |
|---|---|---|
| DataFlow | urn:li:dataFlow:(pythonSdk,{flow},{env}) |
파이프라인 (최초 1회 upsert) |
| DataJob | urn:li:dataJob:(urn:li:dataFlow:(...),{job_id}) |
태스크 (최초 1회 upsert) |
| DataProcessInstance | urn:li:dataProcessInstance:{run_id} |
실행마다 신규 생성 |
DataHub UI Run History 예시
DataFlow: daily-etl-pipeline
└── DataJob: create-order-summary
inlet: glue.sales_db.orders
outlet: glue.sales_db.order_summary
│
└── Run History
┌──────────────────────────────────────┐
│ run-a3f2b1c0 COMPLETE 13.4s │
│ inputs: sales_db.orders │
│ outputs: sales_db.order_summary │
├──────────────────────────────────────┤
│ run-9d4e7f2a FAILED 2.1s │
│ inputs: sales_db.orders │
│ outputs: — │
└──────────────────────────────────────┘
5. DataJob 간 리니지 (upstream_jobs)
upstream_jobs 파라미터로 같은 flow 내 DataJob 간 의존 관계를 명시합니다.
DataHub UI의 Lineage 탭에서 Job 실행 순서와 흐름을 시각화할 수 있습니다.
@datahub_job_fn("step1-extract", flow="order-etl-pipeline")
def step1_extract(): ...
@datahub_job_fn("step2-transform", flow="order-etl-pipeline",
upstream_jobs=["step1-extract"])
def step2_transform(): ...
@datahub_job_fn("step3-load", flow="order-etl-pipeline",
upstream_jobs=["step2-transform"])
def step3_load(): ...
DataHub에서 보이는 DataJob 간 리니지:
step1-extract
↓
step2-transform (raw.* → staging.*)
↓
step3-load (staging.* → mart.*)
upstream_jobs에는 같은 flow 내의 job_id 를 지정합니다.
다른 flow의 job을 참조할 경우 URN 구성이 달라지므로 직접 URN을 사용하세요.
6. 단일 Job 내 다수 Athena 쿼리 — inlets/outlets 누적 방식
@datahub_job_fn 함수 안에서 Athena 쿼리를 여러 번 실행하면
각 쿼리의 input/output이 중복 없이 누적(union) 되어 하나의 DataJob에 등록됩니다.
@datahub_job_fn("multi-step-etl", flow="pipeline")
def run():
# Query 1: raw.orders → staging.orders
athena.start_query_execution(QueryString="INSERT INTO staging.orders SELECT * FROM raw.orders", ...)
wait_for_query(...)
# Query 2: staging.orders + raw.products → dw.order_items
athena.start_query_execution(QueryString="CREATE TABLE dw.order_items AS SELECT ... FROM staging.orders JOIN raw.products ...", ...)
wait_for_query(...)
# Query 3: dw.order_items → s3://export/
athena.start_query_execution(QueryString="UNLOAD (SELECT * FROM dw.order_items) TO 's3://export/'", ...)
wait_for_query(...)
최종적으로 DataHub DataJob에 등록되는 lineage:
inputs = [raw.orders, staging.orders, raw.products, dw.order_items]
outputs = [staging.orders, dw.order_items, s3://export/]
중간 테이블(
staging.orders,dw.order_items)은 inputs·outputs 양쪽에 동시에 나타납니다.
Dataset 수준 lineage(UpstreamLineageClass)는 쿼리마다 별도로 emit되므로
DataHub의 Dataset Lineage 탭에서는 쿼리별 정확한 연결을 확인할 수 있습니다.
7. ThreadPoolExecutor 병렬 Athena 작업 — propagate_job
threading.local() 특성상 새로 생성된 worker 스레드는 부모의 job context를 상속하지 않습니다.
ThreadPoolExecutor로 Athena 쿼리를 병렬 실행할 때 lineage를 캡처하려면
propagate_job 으로 submit할 함수를 감싸야 합니다.
with datahub_job() context manager와 함께 사용
from concurrent.futures import ThreadPoolExecutor
from aileron_meta_collector import datahub_job, propagate_job
def export_orders():
@propagate_job # with 블록 안 — 부모 스레드의 job context 캡처
def task(sql, database):
qid = athena.start_query_execution(
QueryString=sql,
QueryExecutionContext={"Database": database},
ResultConfiguration={"OutputLocation": "s3://results/"},
)["QueryExecutionId"]
wait_for_query(qid)
sqls = [
("INSERT INTO staging.orders SELECT * FROM raw.orders", "staging"),
("CREATE TABLE dw.items AS SELECT * FROM staging.orders JOIN raw.products ...", "dw"),
("UNLOAD (SELECT * FROM dw.items) TO 's3://export/'", "dw"),
]
with ThreadPoolExecutor(max_workers=3) as executor:
futures = [executor.submit(task, sql, db) for sql, db in sqls]
for f in futures:
f.result()
# 호출부
with datahub_job("export-orders-parquet", flow="daily-etl-pipeline"):
export_orders() # 내부 병렬 작업도 lineage 캡처됨
@datahub_job_fn 데코레이터와 함께 사용
from aileron_meta_collector import datahub_job_fn, propagate_job
@datahub_job_fn("export-orders-parquet", flow="daily-etl-pipeline")
def run():
@propagate_job
def task(sql, database):
qid = athena.start_query_execution(
QueryString=sql,
QueryExecutionContext={"Database": database},
ResultConfiguration={"OutputLocation": "s3://results/"},
)["QueryExecutionId"]
wait_for_query(qid)
sqls = [
("INSERT INTO staging.orders SELECT * FROM raw.orders", "staging"),
("CREATE TABLE dw.items AS SELECT * FROM staging.orders JOIN raw.products ...", "dw"),
("UNLOAD (SELECT * FROM dw.items) TO 's3://export/'", "dw"),
]
with ThreadPoolExecutor(max_workers=3) as executor:
futures = [executor.submit(task, sql, db) for sql, db in sqls]
for f in futures:
f.result()
propagate_job 동작:
| 시점 | 동작 |
|---|---|
@propagate_job 선언 시 (부모 스레드) |
get_job()으로 현재 job context 캡처 |
| worker 스레드 실행 시작 | 캡처한 job을 _local.job에 주입 |
| worker 스레드 실행 종료 | finally로 _local.job = None 정리 |
8. Thread-safe Job Context
threading.local() 기반으로 스레드별 독립적인 job context를 관리합니다.
멀티스레드 환경(FastAPI, Celery 등)에서 각 요청/태스크가 서로 간섭하지 않습니다.
9. 비동기 emit
DataHub GMS로의 HTTP 요청은 ThreadPoolExecutor로 비동기 처리됩니다.
emit 실패는 DATAHUB_SILENT_FAIL 설정에 따라 무시하거나 예외를 발생시킵니다.
설치
pip install aileron-meta-collector
개발 환경:
pip install "aileron-meta-collector[dev]"
지원 Python 버전: 3.9, 3.12 (테스트 완료)
환경변수
| 변수명 | 기본값 | 설명 |
|---|---|---|
DATAHUB_GMS_URL |
http://localhost:8080 |
DataHub GMS REST 엔드포인트 |
DATAHUB_ENV |
PROD |
DataHub 환경 (PROD / DEV / STAGING) |
DATAHUB_SILENT_FAIL |
true |
emit 실패 시 예외 전파 여부 (false로 설정 시 예외 발생) |
사용 방법
기본 설정 (앱 시작 시 1회)
# main.py 또는 settings.py
import os
from aileron_meta_collector import install_all_hooks
os.environ["DATAHUB_GMS_URL"] = "http://datahub-gms.internal:8080"
os.environ["DATAHUB_ENV"] = "PROD"
install_all_hooks() # SQLAlchemy + boto3(S3 + Athena) 훅 일괄 등록
훅을 개별 등록하려면:
from aileron_meta_collector import install_sqlalchemy_hooks, install_boto3_hooks
install_sqlalchemy_hooks(env="PROD")
install_boto3_hooks(env="PROD")
함수 데코레이터 — 기본 사용법
기존 함수에 @datahub_job_fn 한 줄만 추가합니다.
함수 내부의 SQLAlchemy / S3 / Athena 호출이 자동으로 감지됩니다.
from aileron_meta_collector import datahub_job_fn
from sqlalchemy import create_engine
import pandas as pd
engine = create_engine("postgresql://user:pass@host/db")
@datahub_job_fn("order-daily-aggregation", flow="order-processing-service")
def aggregate_orders():
# SELECT → orders가 input으로 자동 등록
df = pd.read_sql("SELECT * FROM orders WHERE status = 'done'", engine)
df_agg = df.groupby("user_id").size().reset_index(name="count")
# INSERT → order_summary가 output으로 자동 등록
df_agg.to_sql("order_summary", engine, if_exists="replace", index=False)
함수 데코레이터 — S3
import boto3
from aileron_meta_collector import datahub_job_fn
s3 = boto3.client("s3")
@datahub_job_fn("user-event-etl", flow="user-event-service")
def process_user_events():
# GetObject → s3://raw-data/events/2024/01 이 input으로 자동 등록
response = s3.get_object(Bucket="raw-data", Key="events/2024/01/data.parquet")
processed = transform(response["Body"].read())
# PutObject → s3://processed/user-events/2024/01 이 output으로 자동 등록
s3.put_object(
Bucket="processed",
Key="user-events/2024/01/result.parquet",
Body=processed,
)
함수 데코레이터 — Athena CTAS
import time
import boto3
from aileron_meta_collector import datahub_job_fn
athena = boto3.client("athena", region_name="ap-northeast-2")
def wait_for_query(execution_id: str) -> str:
while True:
result = athena.get_query_execution(QueryExecutionId=execution_id)
state = result["QueryExecution"]["Status"]["State"]
if state in ("SUCCEEDED", "FAILED", "CANCELLED"):
return state
time.sleep(2)
@datahub_job_fn("create-order-summary", flow="daily-etl-pipeline")
def create_order_summary():
# input: sales_db.orders / output: sales_db.order_summary
qid = athena.start_query_execution(
QueryString="""
CREATE TABLE order_summary
WITH (
format = 'PARQUET',
external_location = 's3://data-lake/order_summary/'
)
AS SELECT user_id, COUNT(*) AS cnt, SUM(amount) AS total
FROM orders
GROUP BY user_id
""",
QueryExecutionContext={"Database": "sales_db"},
ResultConfiguration={"OutputLocation": "s3://athena-results/tmp/"},
)["QueryExecutionId"]
wait_for_query(qid) # GetQueryExecution 호출 시점에 SUCCEEDED 감지 → lineage emit
함수 데코레이터 — Athena UNLOAD
@datahub_job_fn("export-orders-parquet", flow="daily-etl-pipeline")
def export_orders():
# input: sales_db.orders + sales_db.users / output: s3://data-lake/exports/orders/
qid = athena.start_query_execution(
QueryString="""
UNLOAD (
SELECT o.id, o.amount, u.name
FROM orders o
JOIN users u ON o.user_id = u.id
)
TO 's3://data-lake/exports/orders/'
WITH (format = 'PARQUET', compression = 'SNAPPY')
""",
QueryExecutionContext={"Database": "sales_db"},
ResultConfiguration={"OutputLocation": "s3://athena-results/tmp/"},
)["QueryExecutionId"]
wait_for_query(qid)
함수 데코레이터 — DataJob 간 리니지
upstream_jobs 파라미터로 이전 스텝의 job_id를 지정하면
DataHub UI에서 DataJob 간의 실행 흐름을 lineage 그래프로 확인할 수 있습니다.
from aileron_meta_collector import datahub_job_fn
@datahub_job_fn("step1-create-raw-tables", flow="order-etl-pipeline")
def step1():
# CREATE EXTERNAL TABLE → raw layer 등록
...
@datahub_job_fn("step2-staging-ctas", flow="order-etl-pipeline",
upstream_jobs=["step1-create-raw-tables"])
def step2():
# CTAS → raw.* 를 읽어 staging.* 생성
# input: [raw.orders, raw.order_items, raw.products]
# output: [staging.order_line_items]
...
@datahub_job_fn("step3-mart-insert", flow="order-etl-pipeline",
upstream_jobs=["step2-staging-ctas"])
def step3():
# INSERT INTO → staging.* 집계 후 mart.* 적재
# input: [staging.order_line_items]
# output: [mart.product_sales_daily]
...
@datahub_job_fn("step4-export-unload", flow="order-etl-pipeline",
upstream_jobs=["step3-mart-insert"])
def step4():
# UNLOAD → mart.* 를 S3로 내보내기
# input: [mart.product_sales_daily]
# output: [s3://bucket/export/...]
...
DataHub UI에서 Pipelines → order-etl-pipeline 진입 후 Lineage 탭에서 확인:
step1-create-raw-tables
↓
step2-staging-ctas [raw.orders, raw.order_items → staging.order_line_items]
↓
step3-mart-insert [staging.order_line_items → mart.product_sales_daily]
↓
step4-export-unload [mart.product_sales_daily → s3://export/...]
함수 데코레이터 — 혼합 사용 (SQLAlchemy + Athena + S3)
비즈니스 로직과 여러 종류의 I/O가 혼재된 경우, 데코레이터를 상위 함수에 적용하면
내부에서 호출되는 모든 함수의 I/O가 같은 job context로 수집됩니다.
@datahub_job_fn("daily-pipeline", flow="order-service")
def run_daily_pipeline():
fetch_from_rds() # 내부에서 pd.read_sql() → 자동 감지
run_athena_summary() # 내부에서 start/get_query_execution() → 자동 감지
export_to_s3() # 내부에서 s3.put_object() → 자동 감지
def fetch_from_rds():
pd.read_sql("SELECT * FROM orders", engine)
def run_athena_summary():
qid = athena.start_query_execution(...)["QueryExecutionId"]
wait_for_query(qid)
def export_to_s3():
s3.put_object(Bucket="output", Key="result/data.csv", Body=...)
함수 데코레이터 — 예외 처리
함수 실행 중 예외가 발생하면 DataProcessInstance가 자동으로 FAILED로 등록됩니다.
@datahub_job_fn("risky-job", flow="daily-etl-pipeline")
def risky_job():
raise ValueError("something went wrong")
# → DataProcessInstance: result=FAILURE
수동 lineage 주입
자동 훅이 감지하지 못하는 경우 또는 명시적으로 lineage를 지정하고 싶을 때 사용합니다.
job context 내에서 dataset 추가 (add_input / add_output)
from aileron_meta_collector import datahub_job_fn, add_input, add_output
@datahub_job_fn("my-etl-job", flow="daily-pipeline")
def run():
# 자동 훅과 병행 — 훅이 감지 못한 외부 시스템 데이터셋을 수동으로 추가
add_input("external_db.raw_events", platform="postgres",
description="외부 시스템 원시 이벤트 테이블")
add_input("s3://raw-bucket/feeds/", platform="s3")
add_output("sales_db.processed_events", platform="glue",
description="처리된 이벤트 집계 테이블")
# URN을 직접 지정할 수도 있음
add_input(urn="urn:li:dataset:(urn:li:dataPlatform:snowflake,prod.sales.orders,PROD)")
job context 없이 즉시 emit (emit_lineage)
job context(@datahub_job_fn) 없이 dataset 간 lineage를 one-shot으로 전송합니다.
from aileron_meta_collector import emit_lineage
# 테이블명 + platform 방식
emit_lineage(
inputs=["sales_db.orders", "sales_db.customers"],
outputs=["sales_db.order_summary"],
platform="glue",
)
# URN 직접 지정 방식
emit_lineage(
inputs=["urn:li:dataset:(urn:li:dataPlatform:glue,sales_db.orders,PROD)"],
outputs=["urn:li:dataset:(urn:li:dataPlatform:s3,my-bucket/out,PROD)"],
)
# dataset description 추가 (key: 테이블명 또는 URN)
emit_lineage(
inputs=["sales_db.orders"],
outputs=["sales_db.order_summary"],
platform="glue",
descriptions={
"sales_db.orders": "원시 주문 테이블",
"sales_db.order_summary": "일별 주문 집계 결과",
},
)
URN 직접 생성 (build_dataset_urn)
platform + table 조합으로 URN만 생성하고 싶을 때 사용합니다.
from aileron_meta_collector import build_dataset_urn
urn = build_dataset_urn("sales_db.orders", "glue")
# → "urn:li:dataset:(urn:li:dataPlatform:glue,sales_db.orders,PROD)"
urn = build_dataset_urn("s3://my-bucket/data/events/", "s3", env="DEV")
# → "urn:li:dataset:(urn:li:dataPlatform:s3,my-bucket/data/events,DEV)"
| API | 용도 |
|---|---|
add_input(table, platform, description) |
job context 내에서 input dataset 추가 |
add_output(table, platform, description) |
job context 내에서 output dataset 추가 |
emit_lineage(inputs, outputs, platform, descriptions) |
job context 없이 즉시 lineage emit |
build_dataset_urn(table, platform, env) |
URN 문자열만 생성 |
FastAPI 미들웨어 연동 (코드 변경 0줄)
엔드포인트 함수에 데코레이터 없이, 미들웨어에서 job context를 자동으로 설정합니다.
from fastapi import FastAPI, Request
from aileron_meta_collector.context import set_job, clear_job
import uuid
app = FastAPI()
@app.middleware("http")
async def datahub_lineage_middleware(request: Request, call_next):
set_job(
job_id=f"{request.method}:{request.url.path}:{uuid.uuid4().hex[:8]}",
flow="order-processing-service",
)
try:
return await call_next(request)
finally:
clear_job()
# 비즈니스 코드 변경 없음
@app.post("/orders/summarize")
def summarize_orders():
df = pd.read_sql("SELECT * FROM orders", engine) # 자동 감지
df.to_sql("order_summary", engine) # 자동 감지
Celery 태스크 연동 (코드 변경 0줄)
from celery import signals
from aileron_meta_collector.context import set_job, clear_job
@signals.task_prerun.connect
def on_task_start(task_id, task, **kwargs):
set_job(job_id=task.name, flow="celery-workers")
@signals.task_postrun.connect
def on_task_end(**kwargs):
clear_job()
DataHub URN 형식
수집된 lineage는 아래 형식으로 DataHub에 등록됩니다.
DB 테이블 (SQLAlchemy)
urn:li:dataset:(urn:li:dataPlatform:{platform},{schema}.{table},{env})
# 예시
urn:li:dataset:(urn:li:dataPlatform:postgres,public.orders,PROD)
urn:li:dataset:(urn:li:dataPlatform:mysql,shop.users,PROD)
Athena 테이블 (Glue Data Catalog)
urn:li:dataset:(urn:li:dataPlatform:glue,{database}.{table},{env})
# 예시
urn:li:dataset:(urn:li:dataPlatform:glue,sales_db.orders,PROD)
urn:li:dataset:(urn:li:dataPlatform:glue,sales_db.order_summary,PROD)
S3 (boto3 직접 호출 / Athena UNLOAD)
urn:li:dataset:(urn:li:dataPlatform:s3,{bucket}/{prefix},{env})
# 예시
urn:li:dataset:(urn:li:dataPlatform:s3,raw-data/events/2024/01,PROD)
urn:li:dataset:(urn:li:dataPlatform:s3,data-lake/exports/orders,PROD)
DataJob
urn:li:dataJob:(urn:li:dataFlow:(pythonSdk,{flow},{env}),{job_id})
# 예시
urn:li:dataJob:(urn:li:dataFlow:(pythonSdk,order-etl-pipeline,PROD),step2-staging-ctas)
테스트 실행
# 전체 테스트 (DataHub 실행 중일 때)
pytest tests/ -v -s
# 모듈별 실행
pytest tests/parsers/ # SQL 파서 단위 테스트
pytest tests/hooks/ # 훅 단위 테스트
pytest tests/test_manual_lineage.py # 수동 lineage API 테스트
pytest tests/test_pipeline.py # E-commerce 파이프라인 통합 테스트 (DataHub 필요)
제약 사항 및 주의점
- SQLAlchemy 훅은
install_all_hooks()호출 이후 생성되는 모든 Engine에 적용됩니다. - boto3
resourceAPI는 내부적으로client를 사용하므로 동일하게 감지됩니다. - ThreadPoolExecutor 병렬 작업 시, worker 스레드는 부모의 job context를 상속하지 않습니다.
propagate_job데코레이터로 submit할 함수를 감싸면 lineage를 정상적으로 캡처할 수 있습니다. - Athena fire-and-forget 패턴에서는
GetQueryExecution을 호출하지 않으면 lineage가 emit되지 않습니다. - 복잡한 CTE / 중첩 서브쿼리는 SQL 파서의 한계로 일부 테이블이 누락될 수 있습니다. 이 경우
get_job().inputs/get_job().outputs에 직접 URN을 추가하는 방식을 병행하세요. upstream_jobs는 같은flow내 job_id를 지정해야 합니다. 다른 flow의 job을 참조할 경우 URN이 달라지므로 직접 지정이 필요합니다.DATAHUB_SILENT_FAIL=true(기본값) 상태에서는 DataHub GMS가 내려가 있어도 MS 서비스에 영향을 주지 않습니다.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file aileron_meta_collector-0.1.4.tar.gz.
File metadata
- Download URL: aileron_meta_collector-0.1.4.tar.gz
- Upload date:
- Size: 248.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.9.6
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
18c571ce50c2504b65e5bbfbf9b0f28f634c8e958f917aea5629f0c895de82d3
|
|
| MD5 |
adf18e1eb6c7ce4d52f3127f25937525
|
|
| BLAKE2b-256 |
876fc859bed1a5c56eab3c6d3d5ddec2aa364933dbb77107ce0c54ace5246075
|
File details
Details for the file aileron_meta_collector-0.1.4-py3-none-any.whl.
File metadata
- Download URL: aileron_meta_collector-0.1.4-py3-none-any.whl
- Upload date:
- Size: 26.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.9.6
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4e1d7f955c1db921340be5ee289983f29d655990bfbb55945ab28fa24e6eeedf
|
|
| MD5 |
560c1c1d43a94c1de1b9db8b17b74eb7
|
|
| BLAKE2b-256 |
8297ddde6986d5aa6be109dfc931a825cb331918f8954697d26f95c3fb4cc178
|