API Pipe
Project description
API Pipe
API Pipe is a simple Python library for fetching data from a URL endpoint (GET) and applying transformations from a set of predefined steps (filter, map, select, etc) on the fetched data. In other words it helps you create a data pipeline from data fetched from an API Endpoint.
It logs the full data in every step for inspection. Though simple, it can be handy, as:
- Simplifies applying transformations on data
- Simplifies development/debugging, as often lack of visibility on API data and transformations make the task cumbersome.
- Simplifies working with multiple API endpoints. API Pipe will keep the logs tidy and organized under a logs directory.
- Built-in exponential retries
- Takes in httpx (both
sync
andasync
) clients to make GET API requests - Uses rich to pretty print logs.
Installation
pip3 install api-pipe
Steps
Available data transformation steps:
- fetch (GET)
- fetch_async (GET Async)
- filter
- map
- select
- key
- to_python
- to_json
Examples
First define a URL and common parameters for all the API calls that will be made:
gitlab_url = Url("https://gitlab.com/api/v4")
common_params = ApiParams(
headers={
"PRIVATE-TOKEN": os.environ["TOKEN"]
},
timeout=(5.0, 5.0),
retries={
"initial_delay": 0.5,
"backoff_factor": 3,
"max_retries": 7,
},
logs={
"unique_name": "__TO_BE_REPLACED__",
"log_dir": Path("../logs"),
"level": logging.DEBUG,
"words_to_highlight": [config.logger_words_to_highlight]
}
)
- headers: passed directly to the httpx client
- timeout: passed directly to the httpx client
- retries: parameters for the retry mechanism with exponential backoff
- logs: log configuration
-
log_dir: parent log dir, can be shared by multiple Api objects to keep logs tidy
-
unique_name:
- Each Api object has its own logger, the logger uses
unique_name
to make itself unique so it won't create the same logger multiple times and endup printing the same log message 20 times (one per each created logger object). - Also used to organize logged output by directory. A directory named exactly as passed in
unique_name
will be created and any logged files for this object will be placed in there.
- Each Api object has its own logger, the logger uses
-
level: log level
-
words_to_highlight: this is to configure
rich
to highlight any word in this list
-
Example 1
This example is a single API call to read all variable from a Gitlab API endpoint, select only certain fields of interest, and filter by variable key. Notice how unique_name
is set to something meaningful and how params are copied from common params so they can be shared for multiple API objects.
params = deepcopy(common_params)
params.logs["unique_name"] = "test_gitlab_read_var"
with httpx.Client() as client:
api = ApiPipe(
gitlab_url / "groups" / GROUP_ID / "variables",
client,
params
)
api \
.fetch() \
.to_python() \
.select([
"key",
"value",
"masked",
]) \
.filter(
lambda item: item["key"] == "Var2"
) \
.to_json(indent=2)
print(api.data)
When run, all data transformation steps are logged to Path("../logs")
:
Log level can be changed to reduce logging, say for prod:
Sometimes calls fail with transient 500 errors or get rate-limited, and depending on a number of things (say you're using threads to call the API) there may be the need to wait for a while. This is where the built-in exponential retrying come in handy:
Example 2
This example demonstrastes paginated calls to the Gitlab API to extract all the CI/CD variable from a Gitlab group. To force multiple API calls, page size is set to 1 (per_page=1
). Because there are 2 CI/CD variables in DUMMY_TEST_GROUP_ID
this will result in 3 API calls (2 where data is returned, 1 empty).
params = deepcopy(common_params)
page_number = 1
results = []
while True:
url = gitlab_url \
/ "groups" \
/ DUMMY_TEST_GROUP_ID \
/ "variables" \
/ f"?page={page_number}&per_page=1"
params.logs["unique_name"] = f"test_run_fetch_all_{page_number}"
api = ApiPipe(
url,
client,
params
)
api \
.fetch() \
.to_python() \
.select([
"key",
"value",
"masked",
])
if api.data:
results += api.data
else:
break
page_number += 1
print(
"Number of calls made: ", page_number, " (last one was empty)"
)
print(
json.dumps(results, indent=2)
)
each call is logged for inspection:
mixing paginated calls with calls to multiple API endpoints.
Use case (sample pipeline)
- Collect all Gitlab users from 150 pages (150 API calls).
- This is similar to what is shown in Example 2
- Keep only the fields
id
,username
,last_activity_on
.select(
"id",
"user",
"last_activity_on"
)
- Based on
last_activity_on
calculate number of days inactive:
.map(
lambda user: {
**user,
"days_inactive" : calculate_days_inactive(user['last_activity_on'])
}
)
- Keep only users that have been inactive longer than a threshold:
.filter(
lambda user:
user['days_inactive'] > threshold
)
- For each user get all their group and project memberships, but only those where they have owner access, and don't keep all the fields, only
source_id
andaccess_level
:
.map(
lambda user:
**user
"memberships" : read_memberships(
user['id'], ...
).select([
"source_id",
"access_level",
]).filter(
lambda member: member['access_level'] >= OWNER
)
)
- Clean data, remove users with no memberships
.filter(
lambda user: len(user['memberships']) > 0
)
- Remove duplicate memberships (a Gitlab bug... really)
.map(
lambda user: {
**user,
"memberships" : remove_duplicated(member['memberhip'])
}
)
Verify the final user list is correct and downgrade them from owner to developers.
In fact this is a similar example to the scenario that led to the creation of this library, you want to "see" ALL the data exactly as it was fetched and in every transformation step, for debugging and for verification. That and auto retry exponentially on random 500 errors and not so random rate limit errors
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file api_pipe-2.0.22.tar.gz
.
File metadata
- Download URL: api_pipe-2.0.22.tar.gz
- Upload date:
- Size: 13.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: poetry/1.7.1 CPython/3.12.1 Linux/5.15.154+
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 4a251bc52cfe17e07c07c80bcb720bd345755245ca043d14f3f257756468d577 |
|
MD5 | bff5ab59800e93adbcb323190fd9a699 |
|
BLAKE2b-256 | dd23c44031add9877134246eab80a63726c0a6a095cac0747da779b0dfa3f6c0 |
File details
Details for the file api_pipe-2.0.22-py3-none-any.whl
.
File metadata
- Download URL: api_pipe-2.0.22-py3-none-any.whl
- Upload date:
- Size: 17.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: poetry/1.7.1 CPython/3.12.1 Linux/5.15.154+
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | d0087d875e351137beaa724de1159fea52fc0adc07130c2ab9d432337997d259 |
|
MD5 | 54e80ac28004fa51988970370beca60b |
|
BLAKE2b-256 | ad3cd62922e8ad79518ef44d0f9d81014329dd464e8eb65ee88e24d483575b7d |