A utility library for API interactions including Wompi payments and Airtable
Project description
Mimeiapify (Version 1.3.3, 2025-06-04)
Mimeiapify is a Python library designed to host several API integrations and utility functions. It currently bundles an Airtable client, a full Wompi payment wrapper and a new symphony_ai module which provides concurrency utilities and Redis-backed state management for Agency-Swarm agents. More integrations are planned for future releases.
Current APIs
Airtable
The library currently supports operations for Airtable API. For detailed documentation on the Airtable API itself, visit: Airtable API Documentation.
Supported Airtable operations include:
- Fetching base schemas
- Extracting table IDs
- Creating fields in Airtable tables
- Fetching and filtering records
- Creating, updating, and deleting records
- Uploading attachments to Airtable fields
- Managing Airtable fields and configurations
The library provides two interfaces:
- Airtable: A synchronous interface for Airtable API interactions.
- AirtableAsync: An asynchronous interface for Airtable API interactions with optimized session management.
Wompi
The library provides comprehensive integration with the Wompi payment platform, a popular payment solution in Latin America. For detailed documentation on the Wompi API itself, visit: Wompi API Documentation.
Supported Wompi operations include:
- Generating checkout URLs for payment processing with full support for all parameters
- Retrieving transaction information by ID or reference
- Verifying webhook events for payment notifications with secure cryptographic validation
- Supporting both sandbox and production environments
- Flexible session management for optimized HTTP connection handling
The library currently offers:
- WompiAsync: An asynchronous interface for Wompi payment platform interactions with efficient session management.
Future Plans
- Add integrations for additional APIs (Notion, WhatsApp, Gmail).
- Expand utility functions for data processing and manipulation.
- Provide improved error handling and logging for all operations.
- Add synchronous interface for Wompi integration.
Installation
UV Install
uv add mimeiapify
This will make the library available for use across your projects.
Usage (Synchronous)
To start using the mimeiapify library in synchronous code, import the Airtable class:
from mimeiapify import Airtable
# Initialize
airtable = Airtable(base_id="your_base_id", api_key="your_api_key")
# Fetch Schema
schema = airtable.get_schema()
print(schema)
Usage (Asynchronous)
Airtable Async
To use the Airtable async interface, import the AirtableAsync class:
from mimeiapify.airtable_async import AirtableAsync
import asyncio
async def main():
# Initialize
airtable_async = AirtableAsync(base_id="your_base_id", api_key="your_api_key")
# Example: Fetch schema asynchronously
schema = await airtable_async.get_schema()
print(schema)
# Run the async entry point
asyncio.run(main())
Wompi Async
To use the Wompi async interface, import the WompiAsync class:
from mimeiapify.wompi_async import WompiAsync
import asyncio
async def main():
# Initialize
wompi = WompiAsync(
public_key="your_public_key",
integrity_key="your_integrity_key",
environment="sandbox" # or "production"
)
# Example: Generate a checkout URL
checkout_data = await wompi.generate_checkout_url(
amount_in_cents=10000, # 100.00 in currency
currency="COP",
redirect_url="https://your-site.com/success"
)
print(f"Checkout URL: {checkout_data['checkout_url']}")
print(f"Reference: {checkout_data['reference']}")
# Run the async entry point
asyncio.run(main())
Optimized Session Management
Both AirtableAsync and WompiAsync classes support efficient session management to optimize HTTP connections and improve performance. There are three ways to manage the HTTP session:
1. Automatic Session Management
The simplest approach is to let the library manage sessions automatically:
# Session is created when needed and closed when done
async def fetch_data():
airtable = AirtableAsync(base_id="your_base_id", api_key="your_api_key")
data = await airtable.fetch_records("your_table_id")
# Session is automatically cleaned up when airtable is garbage collected
return data
2. Using Async Context Manager
For more controlled session management:
async def fetch_with_context():
async with AirtableAsync(base_id="your_base_id", api_key="your_api_key") as airtable:
# Session is created when entering the context
data = await airtable.fetch_records("your_table_id")
# Session is automatically closed when exiting the context
return data
3. Sharing an External Session
For maximum efficiency, especially in applications making multiple API calls:
import aiohttp
async def fetch_with_shared_session():
# Create a shared session
async with aiohttp.ClientSession() as session:
# Pass the session to both clients
airtable = AirtableAsync(base_id="your_base_id", api_key="your_api_key", session=session)
wompi = WompiAsync(public_key="your_key", integrity_key="your_key", session=session)
# Make API calls with both clients using the same session
airtable_data = await airtable.fetch_records("your_table_id")
wompi_data = await wompi.get_transaction_by_reference("your_reference")
# The shared session is managed by the caller, not the API clients
return airtable_data, wompi_data
Wompi Examples
Generating a Checkout URL
from mimeiapify.wompi_async import WompiAsync
import asyncio
async def create_payment_link():
wompi = WompiAsync(
public_key="your_public_key",
integrity_key="your_integrity_key"
)
# Generate a checkout URL with custom parameters
checkout_data = await wompi.generate_checkout_url(
amount_in_cents=15000,
currency="COP",
reference="INV-001", # Optional: provide your own reference
redirect_url="https://yoursite.com/payment/success",
expiration_time="2023-12-31T23:59:59+00:00", # ISO8601 format
tax_vat_in_cents=2850, # VAT/IVA
customer_data={
"email": "customer@example.com",
"full-name": "John Doe",
"phone-number": "3001234567",
"phone-number-prefix": "+57" # Colombia
},
collect_shipping=False
)
return checkout_data["checkout_url"]
Verifying a Webhook Event
from mimeiapify.wompi_async import WompiAsync
from fastapi import FastAPI, Request, HTTPException
app = FastAPI()
@app.post("/wompi-webhook")
async def handle_wompi_webhook(request: Request):
# Get the raw event data from the request
event_data = await request.json()
# Your Events Secret from the Wompi dashboard
events_secret = "your_events_secret"
# Verify the event is authentic
is_valid = WompiAsync.verify_webhook_event(event_data, events_secret)
if not is_valid:
raise HTTPException(status_code=400, detail="Invalid webhook signature")
# Process the valid event
event_type = event_data.get("event")
if event_type == "transaction.updated":
# Handle successful payment
transaction = event_data.get("data", {}).get("transaction", {})
status = transaction.get("status")
if status == "APPROVED":
# Payment was successful
reference = transaction.get("reference")
amount = transaction.get("amount_in_cents")
# Update your database or perform business logic
await update_order_status(reference, "paid", amount)
return {"status": "success"}
Retrieving Transaction Data
from mimeiapify.wompi_async import WompiAsync
import asyncio
async def check_transaction_status(reference):
async with WompiAsync(
public_key="your_public_key",
integrity_key="your_integrity_key"
) as wompi:
# Get transactions by reference
transactions = await wompi.get_transaction_by_reference(reference)
if transactions:
# Get the first matching transaction
transaction_id = transactions[0]["id"]
# Get detailed transaction information
details = await wompi.get_transaction(transaction_id)
return {
"status": details["data"]["status"],
"payment_method": details["data"]["payment_method_type"],
"amount": details["data"]["amount_in_cents"] / 100, # Convert to currency units
"created_at": details["data"]["created_at"]
}
return {"status": "not_found"}
Example with FastAPI
Below is a minimal FastAPI example demonstrating how to integrate AirtableAsync:
from fastapi import FastAPI
from mimeiapify.airtable_async import AirtableAsync
app = FastAPI()
@app.get("/records")
async def get_records():
airtable_async = AirtableAsync(base_id="your_base_id", api_key="your_api_key")
records = await airtable_async.fetch_records(table_id="your_table_id")
return {"records": records}
Using concurrency in FastAPI
Within FastAPI, calling multiple Airtable operations in parallel is as simple as using asyncio.gather. For instance:
@app.get("/parallel")
async def parallel_requests():
airtable_async = AirtableAsync(base_id="your_base_id", api_key="your_api_key")
# Suppose you want to fetch records from two different tables concurrently:
task1 = airtable_async.fetch_records(table_id="Table1")
task2 = airtable_async.fetch_records(table_id="Table2")
results = await asyncio.gather(task1, task2)
return {"table1": results[0], "table2": results[1]}
Class Manager pattern (reusing the ClientSession)
If you prefer to manage the AirtableAsync instance yourself (for example, to reuse the underlying HTTP session), you might do:
class AirtableManager:
def __init__(self, base_id: str, api_key: str):
self.airtable = AirtableAsync(base_id, api_key)
async def fetch_two_tables(self):
table1, table2 = await asyncio.gather(
self.airtable.fetch_records("Table1"),
self.airtable.fetch_records("Table2")
)
return table1, table2
@app.get("/manager-example")
async def manager_example():
manager = AirtableManager(base_id="your_base_id", api_key="your_api_key")
table1, table2 = await manager.fetch_two_tables()
return {"table1": table1, "table2": table2}
The Goal
The purpose of the Class Manager pattern is to encapsulate the setup of your AirtableAsync client (or any other resource) into a single Python class. This lets you:
- Reuse the same instance of AirtableAsync across multiple methods or endpoints.
- Potentially reuse the underlying HTTP session (if you modify AirtableAsync to store a single session, rather than creating it anew in each call).
- Keep related Airtable logic in one place, making your codebase more organized and testable.
Example Code
import asyncio
from fastapi import FastAPI
from mimeiapify.airtable_async import AirtableAsync
# 1) Create a Manager class that holds a single AirtableAsync instance
class AirtableManager:
def __init__(self, base_id: str, api_key: str):
# Here we initialize exactly one AirtableAsync client
self.airtable = AirtableAsync(base_id, api_key)
async def fetch_two_tables(self):
"""
Example method that fetches data from two separate tables in parallel (asynchronously).
"""
# asyncio.gather will run these two coroutines concurrently
table1, table2 = await asyncio.gather(
self.airtable.fetch_records("Table1"),
self.airtable.fetch_records("Table2")
)
return table1, table2
# 2) Create a FastAPI app
app = FastAPI()
@app.get("/manager-example")
async def manager_example():
"""
Example FastAPI endpoint that uses the AirtableManager to fetch data from two tables.
"""
# Instantiate the manager (in real code, you might do this once at startup)
manager = AirtableManager(base_id="your_base_id", api_key="your_api_key")
# Call the manager method which performs concurrent Airtable calls
table1, table2 = await manager.fetch_two_tables()
return {"table1": table1, "table2": table2}
Key Points
- Single Instantiation: By creating the AirtableAsync client in the AirtableManager.
__init__, all subsequent methods in that manager can reuse the same instance. - Encapsulation: Any additional logic (e.g., error handling, caching, logging) can live inside methods of AirtableManager.
If you want to go even further, you could hold a single aiohttp.ClientSession inside AirtableAsync, manually open it at manager initialization, and close it gracefully on shutdown. This helps reuse TCP connections and reduce overhead.
Rate-Limiting Examples
Airtable imposes rate limits, and you may want to throttle or delay your requests to avoid hitting them. Below are three illustrative methods:
-
Explicit Delay in Your Code
Simply callawait asyncio.sleep(...)after your Airtable call:from mimeiapify.airtable_async import AirtableAsync import asyncio async def create_records_with_sleep(): airtable = AirtableAsync(base_id="your_base_id", api_key="your_api_key") await airtable.create_records("your_table_id", [{"fields": {"Name": "Test"}}]) await asyncio.sleep(1.0) # Sleep for 1 second before next request
-
Decorator-Based Approach
Define a decorator that injects a delay before or after the function call:import asyncio import functools from mimeiapify.airtable_async import AirtableAsync def rate_limit(delay: float = 1.0): def decorator(func): @functools.wraps(func) async def wrapper(*args, **kwargs): result = await func(*args, **kwargs) await asyncio.sleep(delay) return result return wrapper return decorator @rate_limit(delay=2.0) async def create_record_decorated(): airtable = AirtableAsync("base_id", "api_key") return await airtable.create_records("table_id", [{"fields": {"Name": "Decorated"}}]) # Usage # records = await create_record_decorated() # This will always wait 2 seconds after finishing
-
Token Bucket or Semaphore
A more advanced pattern involves a semaphore to limit concurrent requests. For instance:import asyncio from mimeiapify.airtable_async import AirtableAsync # Global semaphore (e.g., allow 5 concurrent Airtable calls) airtable_semaphore = asyncio.Semaphore(5) async def fetch_with_semaphore(): async with airtable_semaphore: # Your code here airtable = AirtableAsync("base_id", "api_key") return await airtable.fetch_records("table_id") async def main(): # Launch many tasks, each must acquire the semaphore first tasks = [fetch_with_semaphore() for _ in range(20)] return await asyncio.gather(*tasks) # results = asyncio.run(main())
Each approach can be fine-tuned to your project's needs. The token bucket or semaphore pattern is often the most flexible and powerful for controlling concurrency in a production environment.
Getting Help
To get help on any function in the Airtable, AirtableAsync, or WompiAsync classes, you can use Python's built-in help() function. For example:
help(AirtableAsync.fetch_records)
help(WompiAsync.generate_checkout_url)
This will display the function's docstring, including its purpose, arguments, and return values.
Dependencies
The following libraries are required to use mimeiapify:
- requests: For making HTTP requests in the synchronous API.
- aiohttp: For making HTTP requests in the asynchronous API.
- pandas: For processing and managing Airtable records as DataFrames.
You can install these dependencies using:
pip install requests aiohttp pandas
Symphony-AI × Agency-Swarm — Concurrency + Redis Integration Guide
| Layer | What we added | Why it matters |
|---|---|---|
| Infrastructure | GlobalSymphony singleton → event-loop + 3 named ThreadPoolExecutors + Redis client + CapacityLimiter |
Keeps blocking Agency-Swarm work off the FastAPI loop while sharing one Redis pool per worker |
| Redis plumbing | • redis_client.py (fork-safe pool)• redis/ops.py (atomic helpers)• serde.py (swap JSON↔MsgPack later)• key_factory.py (names all keys)• tenant_cache.py (TTL + helper mix-in) |
Zero duplicated socket code; one place for key rules and (de)serialisation |
| Domain repos | SharedStateRepo, RedisUser, HandlerRepo, … (all subclass TenantCache) |
Each repo ≤ 150 LOC, single responsibility, testable with fakeredis |
| Tool shim | AsyncBaseTool overrides self._shared_state to read a ContextVar (_current_ss) |
Removes the unsafe global BaseTool._shared_state = … assignment race |
| FastAPI glue | In the request/web-socket handler:python<br>ss = SharedStateRepo(tenant=t_id, user_id=u_id)<br>tok = _current_ss.set(ss)<br>try: result = await call_agent()<br>finally: _current_ss.reset(tok) |
Every in-flight coroutine—and its thread-pool jobs—sees the correct per-user SharedState |
Quick-start checklist
uv add mimeiapify # your private wheel
1. FastAPI lifespan
from contextlib import asynccontextmanager
from mimieiapify.symphony_ai.globals import GlobalSymphony, GlobalSymphonyConfig
from mimieiapify.symphony_ai.utils.logger import setup_logging
from mimieiapify.symphony_ai.redis import _current_ss, SharedStateRepo
@asynccontextmanager
async def lifespan(app):
setup_logging(level="INFO", mode="DEV", log_dir="./logs")
await GlobalSymphony.create(GlobalSymphonyConfig(redis_url="redis://cache/0"))
yield # shutdown handled automatically
2. Request / WebSocket handler
async def handle_message(tenant: str, user_id: str, text: str):
ss = SharedStateRepo(tenant=tenant, user_id=user_id) # ①
token = _current_ss.set(ss) # ② bind ContextVar
try:
pool = GlobalSymphony.get().pool_user
fut = pool.submit(lambda: agency.get_completion(text)) # ③ sync API call
return await asyncio.wrap_future(fut)
finally:
_current_ss.reset(token) # ④ avoid leaks
3. Inside any Tool
class RememberStep(AsyncBaseTool):
def run(self, state_name: str, step: int):
coro = self._shared_state.set_field(state_name, "step", step)
loop = GlobalSymphony.get().loop
return asyncio.run_coroutine_threadsafe(coro, loop).result()
Current implementation status ✅
- Global orchestration (
globals.py) finished; pools tuned for network I/O. - Redis connectivity fork-safe; pool caps @ 64 sockets per worker.
- Core utils: Rich logger, BoundedExecutor, ContextVar wiring.
- Redis repos:
SharedStateRepo(complete),RedisUser(sketched), other repos stubs ready. - Agency-Swarm bridge:
AsyncBaseToolproperty override done; Send-message streaming via Redis pub/sub queued for next sprint.
Roadmap
- Unit-tests with
fakeredis.asyncio(P0). - Full
RedisUser,HandlerRepo,TableRepoimplementation (P1). - Prometheus metrics for pool queue length + loop latency (P1).
- Optional Temporal workflow driver for CPU-heavy Tool chains (P2).
How Shared-State flows per request
sequenceDiagram
participant Client
participant FastAPI-Coro
participant ThreadPool
participant Tool
Client->>FastAPI-Coro: Websocket msg
FastAPI-Coro->>FastAPI-Coro: _current_ss.set(SharedStateRepo(...))
FastAPI-Coro->>ThreadPool: agency.get_completion()
ThreadPool->>Tool: Tool.run()
Tool->>Tool: self._shared_state → SS of this request
Tool->>Redis: async hset / hget via run_coroutine_threadsafe
Redis-->>Tool: result
Tool-->>ThreadPool: return
ThreadPool-->>FastAPI-Coro: result
With ContextVar, each coroutine & its downstream worker threads
automatically carry the right SharedStateRepo—no global mutation, no races.
License
This library is licensed under the MIT License.
For questions, feedback, or contributions, contact Sasha Nicolai.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file mimeiapify-1.3.3.tar.gz.
File metadata
- Download URL: mimeiapify-1.3.3.tar.gz
- Upload date:
- Size: 117.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.6.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
061f461d281cb84a42cf55d2e4476e0a955dcce6a7798e18ec60affb35d56f4a
|
|
| MD5 |
f360524fe37e55ee2e288010e841e3f9
|
|
| BLAKE2b-256 |
d35668b0422e0dd6fcee5d8cebbda28c696dde390be7e6bcda35567940a168c4
|
File details
Details for the file mimeiapify-1.3.3-py3-none-any.whl.
File metadata
- Download URL: mimeiapify-1.3.3-py3-none-any.whl
- Upload date:
- Size: 67.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.6.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
cd85302399c4031301b157d0e5cc5d86da054e5f26213cae842640f214588f2d
|
|
| MD5 |
97abf9a596b007db8f3182cd78bcebd3
|
|
| BLAKE2b-256 |
02f4a5c75c80a265e4d69a317e2549e6961f4663b54d7bb7475608f4a448ed8d
|