Skip to main content

API Pipe

Project description

API Pipe

API Pipe is a simple Python library for fetching data from a URL endpoint (GET) and applying transformations from a set of predefined steps (filter, map, select, etc) on the fetched data. In other words it helps you create a data pipeline from data fetched from an API Endpoint.

It logs the full data in every step for inspection. Though simple, it can be handy, as:

  1. Simplifies applying transformations on data
  2. Simplifies development/debugging, as often lack of visibility on API data and transformations make the task cumbersome.
  3. Simplifies working with multiple API endpoints. API Pipe will keep the logs tidy and organized under a logs directory.
  4. Built-in exponential retries
  5. Takes in httpx (both sync and async) clients to make GET API requests
  6. Uses rich to pretty print logs.

Installation

pip3 install api-pipe

Steps

Available data transformation steps:

  • fetch (GET)
  • fetch_async (GET Async)
  • filter
  • map
  • select
  • key
  • to_python
  • to_json

Examples

First define a URL and common parameters for all the API calls that will be made:

gitlab_url = Url("https://gitlab.com/api/v4")

common_params = ApiParams(
    headers={
        "PRIVATE-TOKEN": os.environ["TOKEN"]
    },
    timeout=(5.0, 5.0),
    retries={
        "initial_delay": 0.5,
        "backoff_factor": 3,
        "max_retries": 7,
    },
    logs={
        "unique_name": "__TO_BE_REPLACED__",
        "log_dir": Path("../logs"),
        "level": logging.DEBUG,
        "words_to_highlight": [config.logger_words_to_highlight]
    }
)
  • headers: passed directly to the httpx client
  • timeout: passed directly to the httpx client
  • retries: parameters for the retry mechanism with exponential backoff
  • logs: log configuration
    • log_dir: parent log dir, can be shared by multiple Api objects to keep logs tidy

    • unique_name:

      • Each Api object has its own logger, the logger uses unique_name to make itself unique so it won't create the same logger multiple times and endup printing the same log message 20 times (one per each created logger object).
      • Also used to organize logged output by directory. A directory named exactly as passed in unique_name will be created and any logged files for this object will be placed in there.
    • level: log level

    • words_to_highlight: this is to configure rich to highlight any word in this list

Example 1

This example is a single API call to read all variable from a Gitlab API endpoint, select only certain fields of interest, and filter by variable key. Notice how unique_name is set to something meaningful and how params are copied from common params so they can be shared for multiple API objects.

params = deepcopy(common_params)
params.logs["unique_name"] = "test_gitlab_read_var"

with httpx.Client() as client:
    api = ApiPipe(
        gitlab_url / "groups" / GROUP_ID / "variables",
        client,
        params
    )

    api                         \
        .fetch()                \
        .to_python()            \
        .select([
            "key",
            "value",
            "masked",
        ])                      \
        .filter(
            lambda item: item["key"] == "Var2"
        )                       \
        .to_json(indent=2)

    print(api.data)

When run, all data transformation steps are logged to Path("../logs"):

Demo

Log level can be changed to reduce logging, say for prod:

Demo

Sometimes calls fail with transient 500 errors or get rate-limited, and depending on a number of things (say you're using threads to call the API) there may be the need to wait for a while. This is where the built-in exponential retrying come in handy:

Demo

Example 2

This example demonstrastes paginated calls to the Gitlab API to extract all the CI/CD variable from a Gitlab group. To force multiple API calls, page size is set to 1 (per_page=1). Because there are 2 CI/CD variables in DUMMY_TEST_GROUP_ID this will result in 3 API calls (2 where data is returned, 1 empty).

params = deepcopy(common_params)
page_number = 1
results = []

while True:

    url = gitlab_url                    \
        / "groups"                      \
        / DUMMY_TEST_GROUP_ID           \
        / "variables"                   \
        / f"?page={page_number}&per_page=1"

    params.logs["unique_name"] = f"test_run_fetch_all_{page_number}"

    api = ApiPipe(
        url,
        client,
        params
    )
    api                         \
        .fetch()                \
        .to_python()            \
        .select([
            "key",
            "value",
            "masked",
        ])

    if api.data:
        results += api.data
    else:
        break

    page_number += 1

    print(
        "Number of calls made: ", page_number, " (last one was empty)"
    )

    print(
        json.dumps(results, indent=2)
    )

each call is logged for inspection:

Demo

mixing paginated calls with calls to multiple API endpoints.

Demo

Use case (sample pipeline)

  1. Collect all Gitlab users from 150 pages (150 API calls).
    • This is similar to what is shown in Example 2
  2. Keep only the fields id,username, last_activity_on
.select(
    "id",
    "user",
    "last_activity_on"
)
  1. Based on last_activity_on calculate number of days inactive:
.map(
    lambda user: {
        **user,
        "days_inactive" : calculate_days_inactive(user['last_activity_on'])
    }
)
  1. Keep only users that have been inactive longer than a threshold:
.filter(
    lambda user:
        user['days_inactive'] > threshold
)
  1. For each user get all their group and project memberships, but only those where they have owner access, and don't keep all the fields, only source_id and access_level:
.map(
    lambda user:
        **user
        "memberships" : read_memberships(
            user['id'], ...
        ).select([
            "source_id",
            "access_level",
        ]).filter(
            lambda member: member['access_level'] >= OWNER
        )
)
  1. Clean data, remove users with no memberships
.filter(
    lambda user: len(user['memberships']) > 0
)
  1. Remove duplicate memberships (a Gitlab bug... really)
.map(
    lambda user: {
        **user,
        "memberships" : remove_duplicated(member['memberhip'])
    }
)

Verify the final user list is correct and downgrade them from owner to developers.

In fact this is a similar example to the scenario that led to the creation of this library, you want to "see" ALL the data exactly as it was fetched and in every transformation step, for debugging and for verification. That and auto retry exponentially on random 500 errors and not so random rate limit errors

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

api_pipe-2.0.22.tar.gz (13.5 kB view details)

Uploaded Source

Built Distribution

api_pipe-2.0.22-py3-none-any.whl (17.1 kB view details)

Uploaded Python 3

File details

Details for the file api_pipe-2.0.22.tar.gz.

File metadata

  • Download URL: api_pipe-2.0.22.tar.gz
  • Upload date:
  • Size: 13.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: poetry/1.7.1 CPython/3.12.1 Linux/5.15.154+

File hashes

Hashes for api_pipe-2.0.22.tar.gz
Algorithm Hash digest
SHA256 4a251bc52cfe17e07c07c80bcb720bd345755245ca043d14f3f257756468d577
MD5 bff5ab59800e93adbcb323190fd9a699
BLAKE2b-256 dd23c44031add9877134246eab80a63726c0a6a095cac0747da779b0dfa3f6c0

See more details on using hashes here.

File details

Details for the file api_pipe-2.0.22-py3-none-any.whl.

File metadata

  • Download URL: api_pipe-2.0.22-py3-none-any.whl
  • Upload date:
  • Size: 17.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: poetry/1.7.1 CPython/3.12.1 Linux/5.15.154+

File hashes

Hashes for api_pipe-2.0.22-py3-none-any.whl
Algorithm Hash digest
SHA256 d0087d875e351137beaa724de1159fea52fc0adc07130c2ab9d432337997d259
MD5 54e80ac28004fa51988970370beca60b
BLAKE2b-256 ad3cd62922e8ad79518ef44d0f9d81014329dd464e8eb65ee88e24d483575b7d

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page