Skip to main content

Magical tools to interact with web APIs from a data scientist's perspective.

Project description

build codecov Contributor Covenant

Apicadabri

Apicadabri is a magical set of tools to interact with web APIs from a data scientist's perspective to "just get the damn data"™.

Whether you're using raw HTTP calls through requests or aiohttp or you already have a Python wrapper for your chosen API, apicadabri can probably make your life easier and get you the data faster. If you know how to send a single call to the API you're interested in, you should be good to go to scale up to 100k calls with a few lines of apicadabri code.

Current status

This is still in beta phase. Most of the API is stable, but breaking changes might still happen in minor version updates. Most (but not all) of the main features are implemented (see below).

Features

  • 🚀 Get the maximum amount of speed while still playing nice with the API provider.
    • ⚙️ Configurable number of calls active at the same time (using a Semaphore).
    • 🔀 Async execution, so everything stays within one Python process.
  • 🐤 You don't have to write async or care about task scheduling anywhere.
  • 🪜 Process results right as they come in.
  • 🐛 Comprehensive error handling and retry mechanisms.
  • 📊 Directly get a dataframe from just a single chain of method calls.*
  • 🔧 More than just HTTP: Use the abovementioned features for arbitrary (async) tasks.

*: Not yet fully implemented.

Assumptions

Apicadabri makes the following assumptions about your task:

  • 💾 All inputs fit into memory.
  • 💾 Outputs may be larger than available runtime memory.
    • ➡️ It must be possible to pipe outputs directly to a file.
  • ♾️ Wrapping all inputs into asyncio tasks at the same time will not overwhelm the asyncio event loop.
    • Overwhelming asyncio is apparently hard to achieve anyway unless you have tens of millions of calls.
  • 👀 Live access to results is important. Fire-and-forget is not good enough.
  • 🔢 The order of results must be preserved.
  • 🎱 The total number of results must be the same as the number of inputs.
    • If filtering is to happen, it happens after the apicadabri call has finished.

Future relaxing of constraints

The following changes to the above constraints could happen in the future if it turns out that there are enough use cases:

  • For an extreme numbers of calls (>> 1M), one could add another layer of batching to avoid creating all asyncio tasks at the same time while also avoiding that one slow call in a batch slows down the whole task.
  • Through the same mechanism, it would be possible to allow loading inputs one batch at a time.

Examples

Multiple URLs

import apicadabri
pokemon = ["bulbasaur", "squirtle", "charmander"]
data = apicadabri.bulk_get(
    urls=(f"https://pokeapi.co/api/v2/pokemon/{p}" for p in pokemon),
).json().to_list()

Multiple parameters

import apicadabri
test_inputs = [{"foo": "1" , "bar": "2"}] * 3
data = apicadabri.bulk_get(
  url = "https://httpbin.org/get",
  param_sets = test_inputs
).json().map(lambda r: r["args"]).to_list()

Multiple payloads

import apicadabri
test_inputs = [{"foo": 1 , "bar": 2}] * 3
data = apicadabri.bulk_post(
  url = "https://httpbin.org/post",
  json_sets = test_inputs
).json().map(lambda r: r["data"]).to_list()

Functional API using lazy evaluation

Apicadabri's API (say that fast 10 times) is built with a functional architecture using map and reduce methods. From the first call to one of the top-level functions to the last step that just gives you the list or DataFrame or whatever you want as output, everything is just a steam of function applications. Consider this example:

import apicadabri
pokemon = ["bulbasaur", "squirtle", "charmander"]
data = (
  apicadabri.bulk_get(
    urls=[f"https://pokeapi.co/api/v2/pokemon/{p}" for p in pokemon],
  )
  .json()
  .map(lambda p: [t["type"]["name"] for t in p["types"]])
  .tqdm(desc="Downloading")
  .tee(lambda res, i, n: print(f"Type of pokemon {i}/{n} is {', '.join(res)}."))
  .to_list()
)

All the function and method calls up to to_list() actually just build a pipeline. bulk_get() creates a response object that will call the specified API endpoint, but doesn't execute this call yet. Instead, you first have to define what to do with the data in subsequent method calls. Each map-like method (json, map, tqdm, and tee) just wraps the response in another object that contains the code necessary to achieve the goal of that method to the individual results.

Finally, when to_list() is called, which is a special case of a reduce method, the pipeline is actually activated: All API calls in bulk_get() are executed asynchronously and once the first one returns, it is passed through the entire pipeline, allowing you to inspect it via tee() and seet the progress with tqdm(). Once the last result has passed through the whole pipeline, to_list() terminates and returns a list of all results.

ℹ️ It is important to note that while bulk_get() is asynchronous, the pipeline of map and reduce calls that follows afterward is executed synchronously again. This is done for convenience, since apicadabri is built for tasks which have network latency as the main bottleneck. If computationally expensive postprocessing is required, it is better to just store intermediate results in a DataFrame or similar structure and then process them from there.

Multivariate calls

If you need to supply mulitple values for more than one parameter of the bulk HTTP call (e.g. supplying both urls and param_sets), apicadabri supports two separate behaviors chosen via the mode parameter.

  • zip combines the first value of the first parameter with the first value of the second parameter for the first call, and so on (using Python's zip function).

    import apicadabri
    data = apicadabri.bulk_post(
      url = "https://httpbin.org/post",
      param_sets = [{"foo": "1" , "bar": "2"}] * 2,
      json_sets = [ {"foobar": "bar"} ] * 2,
    ).json().map(lambda r: r["args"]).to_list()
    
  • product builds the carthesian product of all iterable inputs, effectively using all possible combinations of them for the individual calls.

    import apicadabri
    data = apicadabri.bulk_post(
      url = "https://httpbin.org/post",
      param_sets = [{"foo": "1" , "bar": "2"}] * 2,
      json_sets = [ {"foobar": "bar"} ] * 2,
      mode = "product",
    ).json().map(lambda r: r["args"]).to_list()
    

Error Handling

API calls can always fail and you don't want your script with 100k API calls to crash on call number 10k because you forgot to handle a None somewhere. At the same time, though, you might not even care about errors and just want to set up a test scenario quick and dirty. Apicadabri adapts to both scenarios, by providing you three options for error handling, managed by the on_error parameter:

  • raise: The exception is not caught at all, instead it is just raised as normal and the bulk call will fail.

  • return: The exception is caught and encapsulated in an ApicadabriErrorResponse object, that also contains the input that triggered the exception.

  • A lambda function: The exception is caught and the provided error handling function is called with the triggering input and the error message and type. The error handling function must return a result of the same type as would be expected by a successful call. This can, for example, be used to return an "empty" result that does not lead to exceptions in further processing.

    ℹ️ If you need to return a different type of object in case of an error, you can instead use map with on_error="return" and then do another map that transforms the error response into the type you want.

The on_error parameter is available for multiple central methods of return objects, most notably map and reduce.

Controlling retry behavior

By default, all bulk_* calls retry an API call up to ten times on any exception that is thrown. If you want to modify this behavior, you can supply an AsyncRetrier object in the retrier parameter like this:

import apicadabri
pokemon = ["bulbasaur", "squirtle", "charmander"]
apicadabri.bulk_get(
  urls=(f"https://pokeapi.co/api/v2/pokemon/{p}" for p in pokemon),
  retrier=apicadabri.AsyncRetrier(
    max_retries = 3,
    initial_sleep_s = 0.01,
    sleep_multiplier = 2,
    max_sleep_s = 60 * 15,
  ),
).json().to_list()

Apart from the maximum number of retries, you can also configure the exponential backoff parameters. The first retry will wait initial_sleep_s seconds before attempting the call again, and after that the sleep time is multiplied by sleep_multiplier for every next retry until either the call succeeds, the maximum number of retries is reached. If during this time the maxmimum sleep time is reached, the multiplier will no longer be applied.

Tracking task progress

There are two ways of tracking the progress of an apicadabri bulk call:

  1. Using the tqdm() method of a response object, which will just print a progress bar:

    import apicadabri
    data = apicadabri.bulk_get(
      urls=[f"https://pokeapi.co/api/v2/pokemon/{id}" for id in range(1, 5)],
    ).json().tqdm().to_list()
    
  2. Using the tee() method to inject a lambda function that introduces a side-effect and receives the number of items processed as argument.

    import apicadabri
    inspect_func = lambda res, i, n: print(f"Halfway done!") if i == int(n / 2) else None
    data = apicadabri.bulk_get(
      urls=[f"https://pokeapi.co/api/v2/pokemon/{id}" for id in range(1, 5)],
    ).json().tee(inspect_func).to_list()
    

ℹ️ Note that the total number of calls is only known if all user-supplied iterables implement __len__ or if a size hint was explicitly given with the size argument.

Using apicadabri for arbitrary async tasks

The examples presented so far all use HTTP calls through aiohttp. However, it is entirely possible to use the functionality provided by apicadabry for any arbitrary task involving async.

For that, you have to create a subclass of ApicadabriBulkResponse[A, R] where A is the type of the individual arguments sent to the task and R is the type of the results.

The methods you have to overwrite are call_api, which implements an invidivual instance of the async call you want to make, and instances which returns an interable that supplies the arguments to the individual calls.

The following code shows a trivial example of a task that just returns the length of a string:

class ExampleTask(ApicadabriBulkResponse[str, int]):
    """Test task as example for using Apicadabri without aiotthp."""

    def __init__(self, data: list[str], max_active_calls: int = 10) -> None:
        """Initialize test task."""
        super().__init__(
          max_active_calls=max_active_calls,
          size=len(data)
        )
        self.data = data

    async def call_api(
        self,
        client: ClientSession,
        index: int,
        instance_args: str,
    ) -> tuple[int, int]:
        """Non-aiohttp API call."""
        return (index, len(instance_args))

    def instances(self) -> Iterable[str]:
        """Get instances."""
        return self.data

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

apicadabri-0.5.0.tar.gz (70.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

apicadabri-0.5.0-py3-none-any.whl (19.0 kB view details)

Uploaded Python 3

File details

Details for the file apicadabri-0.5.0.tar.gz.

File metadata

  • Download URL: apicadabri-0.5.0.tar.gz
  • Upload date:
  • Size: 70.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.6.12

File hashes

Hashes for apicadabri-0.5.0.tar.gz
Algorithm Hash digest
SHA256 e68126d069a99c6cddedc8be804791188f1a2b3451f43e5d291e6caed739cef0
MD5 8f466bcfc6dc2923e474f2fb9f0fd999
BLAKE2b-256 cd55664b9fad0b6b37abfd55435998113c96d2a3665157b545dc79a9cdbeda82

See more details on using hashes here.

File details

Details for the file apicadabri-0.5.0-py3-none-any.whl.

File metadata

  • Download URL: apicadabri-0.5.0-py3-none-any.whl
  • Upload date:
  • Size: 19.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.6.12

File hashes

Hashes for apicadabri-0.5.0-py3-none-any.whl
Algorithm Hash digest
SHA256 62de7b35e69b29c60a143ba1e1d0475103bdfcfb69d9714782bb333b1283e874
MD5 8af75913eecb1314c66637ff4b821e87
BLAKE2b-256 ed752bc8e826e09eabaf60c8051eafcd26d27215d8bdf8fd5ccb7d1c895d86d7

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page