e-Stat APIを使ってデータを取得し、dltを使ってデータをロードするためのヘルパーライブラリ
Project description
estat_api_dlt_helper
e-Stat APIからデータを取得しロードするhelper
ドキュメントはこちら
概要
e-Stat APIを利用してデータを取得し、DWHなどのデータ基盤にロードするシーンでの活用を想定しています。
Pythonのライブラリとして動作し、以下の3つの機能を提供します。
parse_response- APIのレスポンスをパースし、データとメタデータを結合させたArrow Tableを作成します。
estat_table/estat_source- dltのsource/resourceデコレータを活用した宣言的APIです。
- 単一の統計表には
estat_table()、複数の統計表をまとめて扱う場合はestat_source()を使います。
load_estat_data- dlt(data load tool)のラッパーとして動作し、 統計表IDとテーブル名などを設定するだけで、簡単にDWHなどにロード可能です。
- paginationや複数の統計表IDを同じテーブルにロードしたいケースなどを内部でいい感じに処理します。
create_estat_resource/create_estat_pipeline/create_estat_source- dltのresource・pipeline・sourceをそれぞれ個別に生成し、dltの細かい設定や機能を使いこなしたい場合に使用します。
create_estat_sourceは複数のEstatDltConfigをまとめてdlt sourceを生成できます。
モチベーションとコンセプト
それなりの数の政府統計の統計表を効率よくデータ基盤に抽出・ロードしたいというニーズをもとに生まれました。 e-Stat APIのレスポンスはある程度抽象化されているため、メタデータを本体データに結合するパーサーと、 データロードスクリプトを非常に抽象化・量産化できるdlt(data load tool)を組み合わせることで、上記を達成できると感じて実装を始めました。
コンセプト:
- なるべく統計表IDとテーブル名を記述するだけで動くものがいい
- 複数の統計表IDのロードや、マージ戦略などの設定にも対応したい
- 何のデータソース(統計表ID)を、どこに(DWH|データセット|テーブル)、ロードするか、という設定をなるべく同じところで記述したい
- どの統計表のレスポンスにも対応できるパーサーが欲しい
インストール
pip install estat_api_dlt_helper
# BigQuery
pip install "estat_api_dlt_helper[bigquery]"
# Snowflake
pip install "estat_api_dlt_helper[snowflake]"
# duckdb
pip install "estat_api_dlt_helper[duckdb]"
使用方法
e-Stat APIに関して、ユーザー登録やアプリケーションIDの取得が完了している前提です。 取得したアプリケーションIDは環境変数に入れておいてください。
export ESTAT_API_KEY=YOUR_APP_ID
Win:
$env:ESTAT_API_KEY = "YOUR_APP_ID"
estat_table / estat_source を使う場合は、dltのsecrets管理によりapp_idを自動解決できます。
secrets.toml:
[sources.estat]
app_id = "YOUR_APP_ID"
環境変数:
export SOURCES__ESTAT__APP_ID=YOUR_APP_ID
parse_responseの使い方
e-Stat APIの/rest/3.0/app/json/getStatsDataのレスポンスをparse_response()に渡すことで、
responseのTABLE_INF.VALUEの中身をテーブルとして、CLASS_INF.CLASS_OBJの中身をメタデータとして名寄せさせたArrow Tableを生成することができます。
処理イメージ:
| response | 加工後 |
|---|---|
see: examples
import os
import pandas as pd
import requests
from estat_api_dlt_helper import parse_response
# API endpoint
url = "https://api.e-stat.go.jp/rest/3.0/app/json/getStatsData"
# Parameters for the API request
params = {
"appId": os.getenv("ESTAT_API_KEY"),
"statsDataId": "0000020201", # 社会人口統計 市区町村データ 基礎データ
"cdCat01": "A2101", # 住民基本台帳人口(日本人)
"cdArea": "01100,01101", # 札幌市, 札幌市中央区
"limit": 10
}
try:
# Make API request
response = requests.get(url, params=params)
response.raise_for_status()
data = response.json()
# Parse the response into Arrow table
table = parse_response(data)
# Print data
print(table.to_pandas())
except requests.RequestException as e:
print(f"Error fetching data from API: {e}")
except Exception as e:
print(f"Error processing data: {e}")
estat_table / estat_sourceの使い方
dltの @dlt.resource / @dlt.source デコレータを活用した宣言的APIです。
統計表IDを指定するだけでDLTのリソース/ソースを生成できます。
see: examples
単一の統計表を取得する場合:
NOTE:
estat_tableを単体で使う場合、app_idは dlt の secrets 解決によりSOURCES__ESTAT_TABLE__APP_ID環境変数(またはsecrets.tomlの[sources.estat_table]セクション)から探索されます。estat_source経由で使う場合はSOURCES__ESTAT__APP_ID([sources.estat])から解決されます。 混乱を避けるには、app_idを明示的に渡すか、estat_source経由で使うことを推奨します。
import dlt
from estat_api_dlt_helper import estat_table
resource = estat_table(
stats_data_id="0000020201",
write_disposition="replace",
limit=100,
maximum_offset=200,
)
pipeline = dlt.pipeline(
pipeline_name="estat",
destination="duckdb",
dataset_name="estat_data",
)
pipeline.run(resource)
複数の統計表をまとめて取得する場合:
from estat_api_dlt_helper import estat_source
# IDリストを指定(リソース名は自動的に estat_{id} になります)
source = estat_source(
stats_data_ids=["0000020201", "0004028584"],
limit=100,
maximum_offset=200,
)
pipeline.run(source)
# 辞書でカスタムリソース名を指定することもできます
source = estat_source(
stats_data_ids={"population": "0000020201", "gdp": "0004028584"},
write_disposition="merge",
primary_key=["time_code", "area_code"],
)
pipeline.run(source)
リソースごとに個別の設定が必要な場合は、tables パラメータを使います:
from estat_api_dlt_helper import estat_source, estat_table
source = estat_source(
tables=[
estat_table(
stats_data_id="0000020201",
table_name="pop",
write_disposition="merge",
primary_key=["time_code"],
limit=100,
maximum_offset=200,
),
estat_table(
stats_data_id="0004028584",
table_name="gdp",
write_disposition="replace",
limit=100,
maximum_offset=200,
),
],
)
pipeline.run(source)
インクリメンタルロード(増分ロード)
estat_table / estat_source は dlt.sources.incremental を使ったインクリメンタルロードに対応しています。
前回ロード以降の新しい時点のデータだけを取得できるため、定期パイプラインでの全件取得を避けられます。
see: examples
import dlt
from estat_api_dlt_helper import estat_table
pipeline = dlt.pipeline(
pipeline_name="estat_incremental",
destination="duckdb",
dataset_name="estat_data",
)
resource = estat_table(
stats_data_id="0000020201",
write_disposition="merge",
primary_key=["time", "area"],
incremental=dlt.sources.incremental("time", initial_value="2020000000"),
)
# 初回: initial_value 以降のデータを取得
# 2回目以降: 前回の最大 time 値以降のデータのみ取得
pipeline.run(resource)
estat_source でも同様に指定でき、全リソースに共通の incremental 設定が適用されます:
from estat_api_dlt_helper import estat_source
source = estat_source(
stats_data_ids=["0000020201", "0004028584"],
write_disposition="merge",
primary_key=["time", "area"],
incremental=dlt.sources.incremental("time", initial_value="2020000000"),
)
pipeline.run(source)
注意点:
write_dispositionは"merge"または"append"と組み合わせて使用してください- 新しい時点の追加のみ検出されます。既存データの改訂(遡及改定)は検出できません
- time カラムの値(例:
"2020000000")は辞書順で時系列順になるため、文字列比較で正しく動作します
load_estat_dataの使い方
dlt(data load tool)のwrapperとして簡便なconfigで取得データを DWH等にロードできます。
ロード可能なDWHについてはdltのドキュメントを参考にしてください。
see: examples
# duckdbの場合
import os
import dlt
import duckdb
from estat_api_dlt_helper import EstatDltConfig, load_estat_data
db = duckdb.connect("estat_demo.duckdb")
# Simple configuration
config = {
"source": {
"app_id": os.getenv("ESTAT_API_KEY"), #(必須項目)
"statsDataId": "0000020201", # (必須項目) 人口推計
"limit": 100, # (Optional) 1 requestで取得する行数 | デフォルト:10万
"maximum_offset": 200, # (Optional) 最大取得行数
},
"destination": {
"pipeline_name": "estat_demo",
"destination": dlt.destinations.duckdb(db),
"dataset_name": "estat_api_data",
"table_name": "population_estimates",
"write_disposition": "replace", # Replace existing data
},
}
estat_config = EstatDltConfig(**config)
# Load data with one line
info = load_estat_data(estat_config)
print(info)
load_estat_dataの使い方 (Advanced)
load_estat_data()は簡単な設定でロードを可能にしますが、dltの細かい設定や機能を使いこなしたい場合(dlt.transformやbigquery_adapterなど)は、
dltのresourceとpipelineをそれぞれ単体で生成し、既存のdltのコードと同じように扱うこともできます。
see: examples (resource)
see: examples (pipeline)
create_estat_sourceの使い方
複数の統計表を一括でロードしたい場合は、create_estat_source()を使用できます。
EstatDltConfigのリストを渡すことで、複数のリソースをまとめたdlt sourceを生成します。
see: examples
import os
import dlt
from estat_api_dlt_helper import EstatDltConfig, create_estat_source
app_id = os.getenv("ESTAT_API_KEY")
configs = [
EstatDltConfig(
source={"app_id": app_id, "statsDataId": "0000020201"},
destination={"destination": "duckdb", "dataset_name": "estat", "table_name": "population"},
),
EstatDltConfig(
source={"app_id": app_id, "statsDataId": "0004028584"},
destination={"destination": "duckdb", "dataset_name": "estat", "table_name": "household",
"write_disposition": "replace", "primary_key": None},
),
]
source = create_estat_source(configs)
pipeline = dlt.pipeline(
pipeline_name="estat_multi",
destination="duckdb",
dataset_name="estat",
)
info = pipeline.run(source)
print(info)
Development
# Install development dependencies
uv sync
# Run tests
uv run pytest
# Format code
uv run ruff format src/
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file estat_api_dlt_helper-0.3.0.tar.gz.
File metadata
- Download URL: estat_api_dlt_helper-0.3.0.tar.gz
- Upload date:
- Size: 514.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
408b21e1216126ab6ba0d496dbf0e2c88276bd0781b35bbe04ae9009f39ca808
|
|
| MD5 |
7129ac139ca3a55e1421c32e10e27710
|
|
| BLAKE2b-256 |
c94c1bf8e7d197b5cb9d4b3c6d53c3170c97fbf570e286e7f28b1b96ff8150f0
|
Provenance
The following attestation bundles were made for estat_api_dlt_helper-0.3.0.tar.gz:
Publisher:
release.yml on K-Oxon/estat_api_dlt_helper
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
estat_api_dlt_helper-0.3.0.tar.gz -
Subject digest:
408b21e1216126ab6ba0d496dbf0e2c88276bd0781b35bbe04ae9009f39ca808 - Sigstore transparency entry: 1066095516
- Sigstore integration time:
-
Permalink:
K-Oxon/estat_api_dlt_helper@8e5bccfb8876e964f392f0ab0373c8a71b877879 -
Branch / Tag:
refs/tags/v0.3.0 - Owner: https://github.com/K-Oxon
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@8e5bccfb8876e964f392f0ab0373c8a71b877879 -
Trigger Event:
push
-
Statement type:
File details
Details for the file estat_api_dlt_helper-0.3.0-py3-none-any.whl.
File metadata
- Download URL: estat_api_dlt_helper-0.3.0-py3-none-any.whl
- Upload date:
- Size: 37.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a8e3a95da190fe0f3f2e726ffe80d9d147fa698d0915ab3ad7264e128f639308
|
|
| MD5 |
5d85b27b627b195eb933774f6f48ae9f
|
|
| BLAKE2b-256 |
d5d98dd5c015e1ac62ae682d8ca8f0f8bb7d1df761dea24a5dde034110918d34
|
Provenance
The following attestation bundles were made for estat_api_dlt_helper-0.3.0-py3-none-any.whl:
Publisher:
release.yml on K-Oxon/estat_api_dlt_helper
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
estat_api_dlt_helper-0.3.0-py3-none-any.whl -
Subject digest:
a8e3a95da190fe0f3f2e726ffe80d9d147fa698d0915ab3ad7264e128f639308 - Sigstore transparency entry: 1066095667
- Sigstore integration time:
-
Permalink:
K-Oxon/estat_api_dlt_helper@8e5bccfb8876e964f392f0ab0373c8a71b877879 -
Branch / Tag:
refs/tags/v0.3.0 - Owner: https://github.com/K-Oxon
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@8e5bccfb8876e964f392f0ab0373c8a71b877879 -
Trigger Event:
push
-
Statement type: