Huntflow API Client for Python
Project description
huntflow-api-client
Async Python client for the Huntflow API. It wraps httpx, adds Bearer authentication, optional automatic token refresh, and typed helpers for major resources.
Installation
pip install huntflow-api-client
Requires Python 3.8.1+. Main dependencies: httpx, pydantic v2, email-validator.
Integration overview
- Obtain an access token (and optionally a refresh token) using the flows described in the Huntflow API documentation (OAuth, service account, or your Huntflow product settings — whichever applies to your integration).
- Create a
HuntflowAPIinstance with your API base URL and either a staticApiToken(token=) or atoken_proxy=. If both are supplied,token_proxywins andtokenis ignored. - Call
await api.request(...)for any endpoint, or use entity classes (e.g.Applicant,Vacancy) for typed request/response models.
The client appends /v2 to base_url for all requests. Paths you pass to request() are relative to that versioned root (for example GET "/accounts", not "/v2/accounts").
Base URL
The constructor default is https://api.huntflow.dev (development). For production, pass your real API host, for example:
HuntflowAPI("https://api.huntflow.ai", token=token)
Use the base URL Huntflow provides for your environment (no trailing /v2).
Quick start (access token only)
Minimal setup: pass ApiToken with an access_token. Fine for short scripts. For persisted refresh across restarts or processes, use HuntflowTokenProxy and storage (see Token proxy, storage, and locks). You can still set auto_refresh_tokens=True with token= — refresh then updates the in-memory token only (see that section for details).
import asyncio
from huntflow_api_client import HuntflowAPI
from huntflow_api_client.tokens.token import ApiToken
async def main() -> None:
token = ApiToken(access_token="YOUR_ACCESS_TOKEN")
api = HuntflowAPI("https://api.huntflow.ai", token=token)
response = await api.request("GET", "/accounts")
accounts = response.json()
print(accounts)
asyncio.run(main())
Using resource entities
Entity classes take a HuntflowAPI instance and return Pydantic models parsed from JSON.
import asyncio
from huntflow_api_client import HuntflowAPI
from huntflow_api_client.entities import Applicant
from huntflow_api_client.tokens.token import ApiToken
async def main() -> None:
api = HuntflowAPI(
"https://api.huntflow.ai",
token=ApiToken(access_token="YOUR_ACCESS_TOKEN"),
)
applicants = Applicant(api)
page = await applicants.list(account_id=1, count=10, page=1)
for item in page.items:
print(item.id, item.first_name, item.last_name)
asyncio.run(main())
Other entities live under huntflow_api_client.entities (vacancies, webhooks, dictionaries, etc.). Each method docstring links to the matching OpenAPI operation where applicable.
Token proxy, storage, and locks
HuntflowAPI authenticates every request using an AbstractTokenProxy. Most apps pass token= or token_proxy=HuntflowTokenProxy(...). Subclass AbstractTokenProxy only for uncommon setups (custom token sources, extra logging, and so on).
- If you pass
token=(ApiToken), the client wraps it inDummyHuntflowTokenProxy. Withauto_refresh_tokens=True, refreshed tokens stay in memory on that proxy only (nothing is persisted). You still needrefresh_tokenset onApiToken, otherwise refresh cannot run. - For persisted refresh, pass
token_proxy=— typicallyHuntflowTokenProxy, which loads and saves tokens throughAbstractHuntflowTokenStorage.
Storage (AbstractHuntflowTokenStorage)
Implementations must:
get()— return anApiToken(at leastaccess_token; includerefresh_tokenif you use refresh).update(token)— persist the token after a successful/token/refresh(and any fields you care about, e.g.expiration_timestamp).
The built-in HuntflowTokenFileStorage reads/writes a JSON file with the same keys as ApiToken (access_token, refresh_token, optional timestamps). The file is overwritten on refresh.
Locker (AbstractLocker)
When auto_refresh_tokens=True, several coroutines can hit token expiry at once. HuntflowTokenProxy can use a locker so only one refresh runs; others wait or retry.
- If
locker=None(default), no synchronization is applied: concurrent refreshes are possible under load. Prefer a locker whenever one storage is shared by many concurrent requests. AsyncioLockLocker— sufficient for one process / one event loop (seeexamples/api_client_with_simple_locks.py).- For multiple workers or hosts, use a distributed lock (Redis, etc.) implementing
AbstractLocker, together with storage that all instances share.
Wiring HuntflowTokenProxy
from huntflow_api_client import HuntflowAPI
from huntflow_api_client.tokens.locker import AsyncioLockLocker
from huntflow_api_client.tokens.proxy import HuntflowTokenProxy
from huntflow_api_client.tokens.storage import HuntflowTokenFileStorage
storage = HuntflowTokenFileStorage("/secure/huntflow_token.json")
locker = AsyncioLockLocker()
token_proxy = HuntflowTokenProxy(storage, locker=locker)
api = HuntflowAPI(
"https://api.huntflow.ai",
token_proxy=token_proxy,
auto_refresh_tokens=True,
)
Seed the JSON file once with access_token and refresh_token from Huntflow before starting.
Example: Redis-backed storage and lock
The package does not depend on Redis; install it separately (pip install "redis>=4.2" so redis.asyncio and async locks behave consistently). Use one async Redis client for both storage and the lock. Populate the token key before the first API call (same JSON shape as the file storage).
This example uses an accessor-style flow:
RedisTokenProxy -> RedisTokenAccessor -> storage.
RedisTokenProxy implements the SDK token contract, RedisTokenAccessor is responsible
for token retrieval and update operations, and RedisLockLocker synchronizes refresh
between concurrent requests.
import asyncio
import json
import time
from typing import Any, Dict, Optional
from redis.asyncio import Redis
from redis.asyncio.lock import Lock
from redis.exceptions import LockError
from huntflow_api_client import HuntflowAPI
from huntflow_api_client.tokens.locker import AbstractLocker
from huntflow_api_client.tokens.proxy import (
AbstractTokenProxy,
convert_refresh_result_to_hf_token,
get_auth_headers,
get_refresh_token_data,
)
from huntflow_api_client.tokens.token import ApiToken
POLL_INTERVAL = 0.2
class RedisLockLocker(AbstractLocker):
"""Coordinates token refresh across concurrent workers.
One caller acquires the lock and performs refresh; others wait until
the lock is released and then continue with updated token data.
"""
def __init__(self, redis: Redis, name: str = "huntflow:token_refresh") -> None:
self._lock = Lock(redis, name=name, timeout=30.0, blocking=False)
async def acquire(self) -> bool:
try:
return bool(await self._lock.acquire())
except LockError:
return False
async def wait_for_lock(self) -> None:
while await self._lock.locked():
await asyncio.sleep(POLL_INTERVAL)
async def release(self) -> None:
try:
await self._lock.release()
except LockError:
return
class RedisTokenAccessor:
"""Layer for token read/update operations.
Keeps Redis calls in one place and exposes lock-related operations
used by the proxy.
"""
def __init__(
self,
redis: Redis,
locker: AbstractLocker,
token_key: str = "huntflow:token",
) -> None:
self._redis = redis
self._locker = locker
self._token_key = token_key
async def get(self, bypass_lock: bool = False) -> Optional[Dict[str, Any]]:
if not bypass_lock:
await self._locker.wait_for_lock()
raw = await self._redis.get(self._token_key)
if not raw:
return None
return json.loads(raw)
async def update(self, token: ApiToken) -> None:
await self._redis.set(self._token_key, json.dumps(token.dict()))
async def lock_for_update(self) -> bool:
return await self._locker.acquire()
async def release_lock(self) -> None:
await self._locker.release()
class RedisTokenProxy(AbstractTokenProxy):
"""`AbstractTokenProxy` implementation over accessor + locker.
Returns auth headers, provides refresh payload, saves refreshed token,
and checks whether another worker has already updated the token.
"""
def __init__(self, accessor: RedisTokenAccessor) -> None:
self._accessor = accessor
self._token: Optional[ApiToken] = None
self._last_read_timestamp: Optional[float] = None
async def get_auth_header(self) -> Dict[str, str]:
data = await self._accessor.get()
if data is None:
raise KeyError("Token not found in Redis. Seed access_token and refresh_token first.")
self._token = ApiToken.from_dict(data)
self._last_read_timestamp = time.time()
return get_auth_headers(self._token)
async def get_refresh_data(self) -> Dict[str, str]:
if self._token is None:
data = await self._accessor.get()
if data is None:
raise KeyError("Token not found in Redis. Seed access_token and refresh_token first.")
self._token = ApiToken.from_dict(data)
return get_refresh_token_data(self._token)
async def update(self, refresh_result: dict) -> None:
assert self._token is not None
self._token = convert_refresh_result_to_hf_token(refresh_result, self._token)
await self._accessor.update(self._token)
async def lock_for_update(self) -> bool:
return await self._accessor.lock_for_update()
async def release_lock(self) -> None:
await self._accessor.release_lock()
async def is_updated(self) -> bool:
if self._last_read_timestamp is None:
return False
current_data = await self._accessor.get(bypass_lock=True)
if current_data is None:
return False
current = ApiToken.from_dict(current_data)
last_refresh_timestamp = current.last_refresh_timestamp or 0.0
return last_refresh_timestamp > self._last_read_timestamp
def build_api(redis: Redis) -> HuntflowAPI:
locker = RedisLockLocker(redis, name="huntflow:token_refresh")
accessor = RedisTokenAccessor(redis, locker=locker, token_key="huntflow:token")
token_proxy = RedisTokenProxy(accessor)
return HuntflowAPI(
"https://api.huntflow.ai",
token_proxy=token_proxy,
auto_refresh_tokens=True,
)
Raw HTTP access
Every method on entities ultimately uses HuntflowAPI.request, which mirrors httpx.AsyncClient.request (json, params, files, timeout, etc.). Entity methods usually serialize typed request models (for example ApplicantCreateRequest.jsonable_dict(...)); with request() you build the JSON yourself.
account_id = 1
payload = {"first_name": "John", "last_name": "Doe"} # match API schema
response = await api.request(
"POST",
f"/accounts/{account_id}/applicants",
json=payload,
)
Errors from non-success status codes are turned into typed exceptions in huntflow_api_client.errors (for example NotFoundError, BadRequestError, TokenExpiredError, AuthorizationError).
Links
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file huntflow_api_client-2.13.5.tar.gz.
File metadata
- Download URL: huntflow_api_client-2.13.5.tar.gz
- Upload date:
- Size: 71.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: pdm/2.20.1 CPython/3.8.16 Linux/6.17.0-1010-azure
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a8c4df624aa37897700c1ee8ef6d093ec9301f3adf052a21ef6864b597b34729
|
|
| MD5 |
baf9bb0bde0179f8e925498dcba0689d
|
|
| BLAKE2b-256 |
03741165b3b89b354ed72bac38ab4bc1ad242631a505abfc9aefbbf63cc19cd9
|
File details
Details for the file huntflow_api_client-2.13.5-py3-none-any.whl.
File metadata
- Download URL: huntflow_api_client-2.13.5-py3-none-any.whl
- Upload date:
- Size: 86.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: pdm/2.20.1 CPython/3.8.16 Linux/6.17.0-1010-azure
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
aabcaac6b7da303c885db779ed36d4849c8747171ff37e61b113e57dd0daf6a7
|
|
| MD5 |
2eaa3c9602c7be977f5eef312e0cb4be
|
|
| BLAKE2b-256 |
47f83e6f5a43d8a6c4ab46a8fcc6ce3bb9395396e780ea447e0a14a96cf142e8
|