Simple lightweight HTTP client to perform GET or POST requests in parallel with multiprocessing and in each process use aiohttp to do them concurrently and maximize the throughtput.
Project description
Patata
Easy parallel and concurrent GET requests
The idea of this package is to wrap multiprocessing
and async
concurrency and allow the user to perform thousands of requests in parallel and concurrently without having to worry about pools of processes and async event loops.
It only supports GET and POST requests. More methods will be implemented in later versions.
The input is an iterable and the output is a generator. As soon as the requests get answer they are yielded until all the requests of the input are done.
Each element of the iterable is a tuple with id
, url
and data
.
Each element yielded is a tuple with the same id
and the json response. With the id
you can
later map the responses.
This is useful for cases where you have a huge amount of requests to perform to an API and you need to do them as fast as possible.
The client by default detects the number of CPUs available and starts one process per each CPU. Then chunks the input iterator to provide requests to all the processes. Each process for each task opens an event loop and performs all those requests concurrently. Once all requests are awaited, the chunk with all the responses is returned back to the main process. This is why we can see that our generator is receiving the responses un bulks.
Install:
From PyPi:
pip install patata
Usage:
Use always context manager, for example for GET:
>>> from patata import Patata
>>> with Patata() as client:
... responses = client.http("get", [(1, "http://localhost:12345", {}), (2, "http://localhost:12345", {})])
... for response in responses:
... print(response)
...
patata INFO Start processing requests with Patata parameters:
patata INFO method: GET
patata INFO num_workers: 8
patata INFO queue_max_size: 100000
patata INFO input_chunk_size: 10000
patata INFO pool_submit_size: 1000
(2, {'message': 'Hello world!'})
(1, {'message': 'Hello world!'})
patata INFO All requests processed:
patata INFO Total requests: 2
patata INFO Total time (s): 0.10
patata INFO Requests/s: 20.35
You can provide a generator to don't blow up the memory:
>>> from patata import Patata
>>> from collections import deque
>>>
>>> def mygen():
... for i in range(100_000):
... yield (i, "http://localhost:12345", {"key": "value"})
...
>>> with Patata() as client:
... responses = client.http("post", mygen())
... _ = deque(responses)
...
patata INFO Start processing requests with Patata parameters:
patata INFO method: POST
patata INFO num_workers: 8
patata INFO queue_max_size: 100000
patata INFO input_chunk_size: 10000
patata INFO pool_submit_size: 1000
patata INFO All requests processed:
patata INFO Total requests: 100000
patata INFO Total time (s): 43.78
patata INFO Requests/s: 2283.90
Parameters
You can configure some parameters like the amount of workers or how the client chunks the input:
patata.Patata parameters:
num_workers
:- type: int
- required: False
- default: os.cpu_count()
- description: Number of processes to open with multiprocessing
queue_max_size
:- type: int
- required: False
- default: 100.000
- description: Maximum number of items that can be enqueued. This default number proved to not blow up the memory and to have enough items in the queue to have always work to do with 8 processes. Feel free to adjust it, just watch out the memory usage.
input_chunk_size
:- type: int
- required: False
- default: 10.000
- description: This is the size of the chunks for the input. We will be reading the input iterator in chunks of this size up to
queue_max_size
.
pool_submit_size
:- type: int
- required: False
- default: 1.000
- description: Each chunk of
input_chunk_size
will also be chunked to minor chunks of this size before being submited to the pool. The workers will be consuming chunks of this size and each of these chunks will be requested in an event loop.
Parameters:
method
:- type: str
- required: True
- description: Specify the method of the requests. Valid values: GET, POST.
requests
:- type: Iterable[Tuple[int, str]]
- required: True
- description: Provide the tuples containing the ID of the request and the URL to be requested.
Response: Generator[Tuple[int, str], None, None]. For each input tuple an output tuple will be returned containing the same ID + the JSON of the response.
TODO:
- use pydantic models
- allow specifying custom callbacks after resolving the request, to post-process each response taking benefit of multiprocessing
- flag to disable multiprocessing
- add flag to specify how many requests can fail, this will need to specify also which codes are "ok" or which are "not ok" do decide when to increment this count and decide to stop
- include the missing methods like PUT, DELETE, etc
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.