Skip to main content

A utility library for API interactions including Wompi payments and Airtable

Project description

SNCL - Sasha Nicolai's Library (Version 1.2.0, 2025-02-25)

SNCL is a Python library designed to host several API integrations and utility functions. Currently, it provides support for Airtable API interactions and Wompi payment platform integrations, with plans to expand to other APIs in the future.


Current APIs

Airtable

The library currently supports operations for Airtable API. For detailed documentation on the Airtable API itself, visit: Airtable API Documentation.

Supported Airtable operations include:

  • Fetching base schemas
  • Extracting table IDs
  • Creating fields in Airtable tables
  • Fetching and filtering records
  • Creating, updating, and deleting records
  • Uploading attachments to Airtable fields
  • Managing Airtable fields and configurations

The library provides two interfaces:

  • Airtable: A synchronous interface for Airtable API interactions.
  • AirtableAsync: An asynchronous interface for Airtable API interactions with optimized session management.

Wompi

The library provides comprehensive integration with the Wompi payment platform, a popular payment solution in Latin America. For detailed documentation on the Wompi API itself, visit: Wompi API Documentation.

Supported Wompi operations include:

  • Generating checkout URLs for payment processing with full support for all parameters
  • Retrieving transaction information by ID or reference
  • Verifying webhook events for payment notifications with secure cryptographic validation
  • Supporting both sandbox and production environments
  • Flexible session management for optimized HTTP connection handling

The library currently offers:

  • WompiAsync: An asynchronous interface for Wompi payment platform interactions with efficient session management.

Future Plans

  • Add integrations for additional APIs (Notion, WhatsApp, Gmail).
  • Expand utility functions for data processing and manipulation.
  • Provide improved error handling and logging for all operations.
  • Add synchronous interface for Wompi integration.

Installation

Pip Install

pip install sncl

This will make the library available for use across your projects.


Usage (Synchronous)

To start using the sncl library in synchronous code, import the Airtable class:

from sncl import Airtable

# Initialize
airtable = Airtable(base_id="your_base_id", api_key="your_api_key")

# Fetch Schema
schema = airtable.get_schema()
print(schema)

Usage (Asynchronous)

Airtable Async

To use the Airtable async interface, import the AirtableAsync class:

from sncl.airtable_async import AirtableAsync
import asyncio

async def main():
    # Initialize
    airtable_async = AirtableAsync(base_id="your_base_id", api_key="your_api_key")

    # Example: Fetch schema asynchronously
    schema = await airtable_async.get_schema()
    print(schema)

# Run the async entry point
asyncio.run(main())

Wompi Async

To use the Wompi async interface, import the WompiAsync class:

from sncl.wompi_async import WompiAsync
import asyncio

async def main():
    # Initialize
    wompi = WompiAsync(
        public_key="your_public_key", 
        integrity_key="your_integrity_key",
        environment="sandbox"  # or "production"
    )

    # Example: Generate a checkout URL
    checkout_data = await wompi.generate_checkout_url(
        amount_in_cents=10000,  # 100.00 in currency
        currency="COP",
        redirect_url="https://your-site.com/success"
    )
    
    print(f"Checkout URL: {checkout_data['checkout_url']}")
    print(f"Reference: {checkout_data['reference']}")

# Run the async entry point
asyncio.run(main())

Optimized Session Management

Both AirtableAsync and WompiAsync classes support efficient session management to optimize HTTP connections and improve performance. There are three ways to manage the HTTP session:

1. Automatic Session Management

The simplest approach is to let the library manage sessions automatically:

# Session is created when needed and closed when done
async def fetch_data():
    airtable = AirtableAsync(base_id="your_base_id", api_key="your_api_key")
    data = await airtable.fetch_records("your_table_id")
    # Session is automatically cleaned up when airtable is garbage collected
    return data

2. Using Async Context Manager

For more controlled session management:

async def fetch_with_context():
    async with AirtableAsync(base_id="your_base_id", api_key="your_api_key") as airtable:
        # Session is created when entering the context
        data = await airtable.fetch_records("your_table_id")
        # Session is automatically closed when exiting the context
        return data

3. Sharing an External Session

For maximum efficiency, especially in applications making multiple API calls:

import aiohttp

async def fetch_with_shared_session():
    # Create a shared session
    async with aiohttp.ClientSession() as session:
        # Pass the session to both clients
        airtable = AirtableAsync(base_id="your_base_id", api_key="your_api_key", session=session)
        wompi = WompiAsync(public_key="your_key", integrity_key="your_key", session=session)
        
        # Make API calls with both clients using the same session
        airtable_data = await airtable.fetch_records("your_table_id")
        wompi_data = await wompi.get_transaction_by_reference("your_reference")
        
        # The shared session is managed by the caller, not the API clients
        return airtable_data, wompi_data

Wompi Examples

Generating a Checkout URL

from sncl.wompi_async import WompiAsync
import asyncio

async def create_payment_link():
    wompi = WompiAsync(
        public_key="your_public_key", 
        integrity_key="your_integrity_key"
    )
    
    # Generate a checkout URL with custom parameters
    checkout_data = await wompi.generate_checkout_url(
        amount_in_cents=15000,
        currency="COP",
        reference="INV-001",  # Optional: provide your own reference
        redirect_url="https://yoursite.com/payment/success",
        expiration_time="2023-12-31T23:59:59+00:00",  # ISO8601 format
        tax_vat_in_cents=2850,  # VAT/IVA
        customer_data={
            "email": "customer@example.com",
            "full-name": "John Doe",
            "phone-number": "3001234567",
            "phone-number-prefix": "+57"  # Colombia
        },
        collect_shipping=False
    )
    
    return checkout_data["checkout_url"]

Verifying a Webhook Event

from sncl.wompi_async import WompiAsync
from fastapi import FastAPI, Request, HTTPException

app = FastAPI()

@app.post("/wompi-webhook")
async def handle_wompi_webhook(request: Request):
    # Get the raw event data from the request
    event_data = await request.json()
    
    # Your Events Secret from the Wompi dashboard
    events_secret = "your_events_secret"
    
    # Verify the event is authentic
    is_valid = WompiAsync.verify_webhook_event(event_data, events_secret)
    
    if not is_valid:
        raise HTTPException(status_code=400, detail="Invalid webhook signature")
    
    # Process the valid event
    event_type = event_data.get("event")
    if event_type == "transaction.updated":
        # Handle successful payment
        transaction = event_data.get("data", {}).get("transaction", {})
        status = transaction.get("status")
        
        if status == "APPROVED":
            # Payment was successful
            reference = transaction.get("reference")
            amount = transaction.get("amount_in_cents")
            
            # Update your database or perform business logic
            await update_order_status(reference, "paid", amount)
    
    return {"status": "success"}

Retrieving Transaction Data

from sncl.wompi_async import WompiAsync
import asyncio

async def check_transaction_status(reference):
    async with WompiAsync(
        public_key="your_public_key", 
        integrity_key="your_integrity_key"
    ) as wompi:
        # Get transactions by reference
        transactions = await wompi.get_transaction_by_reference(reference)
        
        if transactions:
            # Get the first matching transaction
            transaction_id = transactions[0]["id"]
            
            # Get detailed transaction information
            details = await wompi.get_transaction(transaction_id)
            
            return {
                "status": details["data"]["status"],
                "payment_method": details["data"]["payment_method_type"],
                "amount": details["data"]["amount_in_cents"] / 100,  # Convert to currency units
                "created_at": details["data"]["created_at"]
            }
        
        return {"status": "not_found"}

Example with FastAPI

Below is a minimal FastAPI example demonstrating how to integrate AirtableAsync:

from fastapi import FastAPI
from sncl.airtable_async import AirtableAsync

app = FastAPI()

@app.get("/records")
async def get_records():
    airtable_async = AirtableAsync(base_id="your_base_id", api_key="your_api_key")
    records = await airtable_async.fetch_records(table_id="your_table_id")
    return {"records": records}

Using concurrency in FastAPI

Within FastAPI, calling multiple Airtable operations in parallel is as simple as using asyncio.gather. For instance:

@app.get("/parallel")
async def parallel_requests():
    airtable_async = AirtableAsync(base_id="your_base_id", api_key="your_api_key")

    # Suppose you want to fetch records from two different tables concurrently:
    task1 = airtable_async.fetch_records(table_id="Table1")
    task2 = airtable_async.fetch_records(table_id="Table2")

    results = await asyncio.gather(task1, task2)
    return {"table1": results[0], "table2": results[1]}

Class Manager pattern (reusing the ClientSession)

If you prefer to manage the AirtableAsync instance yourself (for example, to reuse the underlying HTTP session), you might do:

class AirtableManager:
    def __init__(self, base_id: str, api_key: str):
        self.airtable = AirtableAsync(base_id, api_key)

    async def fetch_two_tables(self):
        table1, table2 = await asyncio.gather(
            self.airtable.fetch_records("Table1"),
            self.airtable.fetch_records("Table2")
        )
        return table1, table2

@app.get("/manager-example")
async def manager_example():
    manager = AirtableManager(base_id="your_base_id", api_key="your_api_key")
    table1, table2 = await manager.fetch_two_tables()
    return {"table1": table1, "table2": table2}

The Goal

The purpose of the Class Manager pattern is to encapsulate the setup of your AirtableAsync client (or any other resource) into a single Python class. This lets you:

  1. Reuse the same instance of AirtableAsync across multiple methods or endpoints.
  2. Potentially reuse the underlying HTTP session (if you modify AirtableAsync to store a single session, rather than creating it anew in each call).
  3. Keep related Airtable logic in one place, making your codebase more organized and testable.

Example Code

import asyncio
from fastapi import FastAPI
from sncl.airtable_async import AirtableAsync

# 1) Create a Manager class that holds a single AirtableAsync instance
class AirtableManager:
    def __init__(self, base_id: str, api_key: str):
        # Here we initialize exactly one AirtableAsync client
        self.airtable = AirtableAsync(base_id, api_key)

    async def fetch_two_tables(self):
        """
        Example method that fetches data from two separate tables in parallel (asynchronously).
        """
        # asyncio.gather will run these two coroutines concurrently
        table1, table2 = await asyncio.gather(
            self.airtable.fetch_records("Table1"),
            self.airtable.fetch_records("Table2")
        )
        return table1, table2

# 2) Create a FastAPI app
app = FastAPI()

@app.get("/manager-example")
async def manager_example():
    """
    Example FastAPI endpoint that uses the AirtableManager to fetch data from two tables.
    """
    # Instantiate the manager (in real code, you might do this once at startup)
    manager = AirtableManager(base_id="your_base_id", api_key="your_api_key")
    
    # Call the manager method which performs concurrent Airtable calls
    table1, table2 = await manager.fetch_two_tables()

    return {"table1": table1, "table2": table2}

Key Points

  1. Single Instantiation: By creating the AirtableAsync client in the AirtableManager.__init__, all subsequent methods in that manager can reuse the same instance.
  2. Encapsulation: Any additional logic (e.g., error handling, caching, logging) can live inside methods of AirtableManager.

If you want to go even further, you could hold a single aiohttp.ClientSession inside AirtableAsync, manually open it at manager initialization, and close it gracefully on shutdown. This helps reuse TCP connections and reduce overhead.


Rate-Limiting Examples

Airtable imposes rate limits, and you may want to throttle or delay your requests to avoid hitting them. Below are three illustrative methods:

  1. Explicit Delay in Your Code
    Simply call await asyncio.sleep(...) after your Airtable call:

    from sncl.airtable_async import AirtableAsync
    import asyncio
    
    async def create_records_with_sleep():
        airtable = AirtableAsync(base_id="your_base_id", api_key="your_api_key")
        await airtable.create_records("your_table_id", [{"fields": {"Name": "Test"}}])
        await asyncio.sleep(1.0)  # Sleep for 1 second before next request
    
  2. Decorator-Based Approach
    Define a decorator that injects a delay before or after the function call:

    import asyncio
    import functools
    from sncl.airtable_async import AirtableAsync
    
    def rate_limit(delay: float = 1.0):
        def decorator(func):
            @functools.wraps(func)
            async def wrapper(*args, **kwargs):
                result = await func(*args, **kwargs)
                await asyncio.sleep(delay)
                return result
            return wrapper
        return decorator
    
    @rate_limit(delay=2.0)
    async def create_record_decorated():
        airtable = AirtableAsync("base_id", "api_key")
        return await airtable.create_records("table_id", [{"fields": {"Name": "Decorated"}}])
    
    # Usage
    # records = await create_record_decorated()  # This will always wait 2 seconds after finishing
    
  3. Token Bucket or Semaphore
    A more advanced pattern involves a semaphore to limit concurrent requests. For instance:

    import asyncio
    from sncl.airtable_async import AirtableAsync
    
    # Global semaphore (e.g., allow 5 concurrent Airtable calls)
    airtable_semaphore = asyncio.Semaphore(5)
    
    async def fetch_with_semaphore():
        async with airtable_semaphore:
            # Your code here
            airtable = AirtableAsync("base_id", "api_key")
            return await airtable.fetch_records("table_id")
    
    async def main():
        # Launch many tasks, each must acquire the semaphore first
        tasks = [fetch_with_semaphore() for _ in range(20)]
        return await asyncio.gather(*tasks)
    
    # results = asyncio.run(main())
    

Each approach can be fine-tuned to your project's needs. The token bucket or semaphore pattern is often the most flexible and powerful for controlling concurrency in a production environment.


Getting Help

To get help on any function in the Airtable, AirtableAsync, or WompiAsync classes, you can use Python's built-in help() function. For example:

help(AirtableAsync.fetch_records)
help(WompiAsync.generate_checkout_url)

This will display the function's docstring, including its purpose, arguments, and return values.


Dependencies

The following libraries are required to use sncl:

  • requests: For making HTTP requests in the synchronous API.
  • aiohttp: For making HTTP requests in the asynchronous API.
  • pandas: For processing and managing Airtable records as DataFrames.

You can install these dependencies using:

pip install requests aiohttp pandas

License

This library is licensed under the MIT License.


For questions, feedback, or contributions, contact Sasha Nicolai.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

sncl-1.2.0.tar.gz (23.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

sncl-1.2.0-py3-none-any.whl (23.5 kB view details)

Uploaded Python 3

File details

Details for the file sncl-1.2.0.tar.gz.

File metadata

  • Download URL: sncl-1.2.0.tar.gz
  • Upload date:
  • Size: 23.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.8

File hashes

Hashes for sncl-1.2.0.tar.gz
Algorithm Hash digest
SHA256 e02c728312f89a2dc9b4f0d3b73ec6ffcb0914c6665413735ee72fd2e1b8e788
MD5 cdccd501982b38add20a43a469647e14
BLAKE2b-256 31b3a9dd950d061fdae70f69b098f2f23b4e58b47fe92e7bc69be647bfd16065

See more details on using hashes here.

File details

Details for the file sncl-1.2.0-py3-none-any.whl.

File metadata

  • Download URL: sncl-1.2.0-py3-none-any.whl
  • Upload date:
  • Size: 23.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.8

File hashes

Hashes for sncl-1.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 25e57d45a4140c3555b014759ac038b503e8f0244c8b1c1e32fe6c8525355793
MD5 b7bf8a8d62e478e321a7923f4f9586d0
BLAKE2b-256 603c74ec25578dbefc97b0ed6d6cf077c87cb7cd781d38e56835f1ace3eae4a4

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page