Skip to main content

Composio meta-connector for aisquare.pipe — execute 500+ SaaS toolkits through one connector

Project description

aisquare-pipe-composio

One API key, 500+ SaaS toolkits: Gmail, Slack, GitHub, Notion, Google Drive, HubSpot, ... — this meta-connector bridges aisquare.pipe to the entire Composio catalog, so any pipe source can feed a Composio app and any Composio app can feed a pipe sink. Ships in aisquare-pipe[full].

Install

pip install aisquare-pipe-composio

For development:

pip install -e "connectors/composio[dev]"

How it works

Composio concept Meaning here
toolkit An app (gmail, slack, github, ...)
tool One action in a toolkit, addressed by slug (GMAIL_FETCH_EMAILS, SLACK_SEND_MESSAGE)
user_id The end user (Composio "entity") tools execute as — "default" for single-user setups
connected account A user's authenticated link to an app. Composio runs the OAuth and stores/refreshes the tokens

The connector executes tools: composio-source runs a tool and yields its results as envelopes; composio-sink runs a tool with envelope data as the arguments; composio-triggers-source polls trigger events (new email, new row, ...). The only credential pipe ever sees is your Composio API key.

Configuration

All three connectors share the same config shape:

config = {
    "api_key": "ck_...",              # required — Composio API key
    "user_id": "default",             # entity tools run as (set per end-user in multi-tenant setups)
    "connected_account_id": None,     # optional — pin execution to one connected account
    "toolkit_filter": ["gmail"],      # optional — allow-list of toolkit slugs (governance)
    "base_url": None,                 # optional — Composio backend override
    "timeout_seconds": 60,            # optional — request timeout
    "file_workdir": None,             # optional — default: ~/.cache/aisquare-pipe/composio-files
}

The COMPOSIO_API_KEY environment variable is intentionally not read — config is the single source of truth, matching every other pipe connector.

Connect an account

Tool execution requires the user to have an ACTIVE connected account for the tool's toolkit (except no-auth toolkits like Hacker News). Either connect in the Composio dashboard, or programmatically:

from aisquare_pipe_composio import initiate_connection, wait_for_active, connection_status

request = initiate_connection(config, "gmail")
print("Authorize at:", request.redirect_url)   # send the user here
account = wait_for_active(config, request.id)  # blocks until OAuth completes
print(connection_status(config, "gmail"))      # "ACTIVE"

ComposioSource().list_resources(config) browses all toolkits with your connection status per toolkit.

Pull (composio-source)

from aisquare.pipe import PullParams
from aisquare_pipe_composio import ComposioSource

source = ComposioSource()
params = PullParams(params={
    "tool": "GMAIL_FETCH_EMAILS",
    "arguments": {"max_results": 10},
    "unwrap": True,                  # one envelope per message instead of one blob
})
for envelope in source.pull(config, params):
    print(envelope.metadata["composio_tool"], envelope.data)

PullParams keys:

  • tool (required) — tool slug, case-insensitive
  • arguments — tool input arguments (see the tool's schema in Composio docs)
  • unwrapFalse (default): one envelope with the whole result; True: auto-fan-out when the result is a list or a single-list-key dict; "dot.path": explicit path to the list (raises if not a list). Unwrapped envelopes carry item_index/item_count metadata.
  • download_files — also yield one bytes envelope per file output (real MIME type, filename + file_field metadata). Files are materialised under file_workdir.
  • user_id, connected_account_id, tool_version — per-call overrides

Push (composio-sink)

from aisquare.pipe import DataEnvelope, PushParams
from aisquare_pipe_composio import ComposioSink

sink = ComposioSink()
envelope = DataEnvelope(
    content_type="application/json",
    data={"channel": "#alerts", "text": "deploy finished"},
    source_id="my-app",
)
result = sink.push(envelope, config, PushParams(params={"tool": "SLACK_SEND_MESSAGE"}))
assert result.success

Argument layering (later wins, shallow per-key merge):

  1. Envelope payload — a JSON-object envelope is the base arguments; with data_key="text" the payload is nested under that argument; with file_arg="file" a binary envelope is uploaded and the file reference passed in that argument
  2. envelope.metadata["composio_arguments"] — per-envelope steering from upstream
  3. params["arguments"] — operator overrides, always win
# Text envelope into a named argument:
PushParams(params={"tool": "SLACK_SEND_MESSAGE", "data_key": "text",
                   "arguments": {"channel": "#general"}})

# Binary envelope as a file upload:
PushParams(params={"tool": "GOOGLEDRIVE_UPLOAD_FILE", "file_arg": "file_to_upload"})

The tool's response data is returned in PushResult.metadata["data"].

Toolkit-pinned connectors (factory)

from aisquare.pipe import Pipeline, PullParams, PushParams
from aisquare_pipe_composio import composio_source, composio_sink

GmailSource = composio_source("gmail")     # name: composio-gmail-source
SlackSink = composio_sink("slack")         # name: composio-slack-sink

result = Pipeline(source=GmailSource(), sink=SlackSink()).run(
    {"composio-gmail-source": config, "composio-slack-sink": config},
    pull_params=PullParams(params={"tool": "GMAIL_FETCH_EMAILS", "unwrap": True}),
    push_params=PushParams(params={"tool": "SLACK_SEND_MESSAGE", "data_key": "text",
                                   "arguments": {"channel": "#inbox"}}),
)

Pinned classes reject tools from other toolkits and scope list_resources() to their toolkit. They are deliberately not entry-point registered (pipe list shows the three generic connectors only): entry points are static, Composio has ~500 toolkits — build what you need on demand.

Trigger events (composio-triggers-source)

Prerequisite: enable trigger instances in Composio (dashboard → toolkit → triggers), e.g. GMAIL_NEW_GMAIL_MESSAGE. Then:

from aisquare.pipe import PullParams
from aisquare_pipe_composio import ComposioTriggersSource

source = ComposioTriggersSource()
config = {
    "api_key": "ck_...",
    "user_id": "default",
    "trigger_slugs": ["GMAIL_NEW_GMAIL_MESSAGE"],   # optional filter
    "poll_interval_seconds": 10,
    "cursor_path": None,   # optional — default: ~/.cache/aisquare-pipe/composio-cursor.json
}
for envelope in source.pull(config):               # polls forever
    print(envelope.data["payload"])
  • One application/json envelope per event; envelope.data["payload"] is the app payload.
  • idempotency_key metadata (composio:event:<id>) is stable across re-polls, pairing with sinks that dedupe (e.g. aisquare-gateway).
  • Position is a timestamp watermark + bounded seen-id ring persisted atomically to cursor_path; PullParams since sets the initial watermark (default: now). max_polls/sleep params support tests and one-shot drains.
  • cursor_path defaults to ~/.cache/aisquare-pipe/composio-cursor.json (honouring $XDG_CACHE_HOME) — per-user, not shared /tmp. A pre-0.1.1 cursor at /tmp/composio-pipe-cursor.json is migrated automatically on first run.
  • list_resources(config) browses available trigger types and your active trigger instances.

Example pipelines

These examples compose with other pipe connectors — install them first: pip install aisquare-pipe-local aisquare-pipe-salesforce (both included in aisquare-pipe[full]).

from aisquare.pipe import Pipeline, PullParams, PushParams
from aisquare_pipe_composio import ComposioSource, ComposioSink

# Save Gmail attachments to disk (composio → local)
from aisquare_pipe_local import LocalSink
Pipeline(source=ComposioSource(), sink=LocalSink()).run(
    {"composio-source": config, "local-sink": {"base_path": "/tmp/attachments"}},
    pull_params=PullParams(params={
        "tool": "GMAIL_GET_ATTACHMENT",
        "arguments": {"message_id": "...", "attachment_id": "...", "file_name": "x.pdf"},
        "download_files": True,
    }),
)

# Salesforce records into Notion (salesforce → composio)
from aisquare_pipe_salesforce import SalesforceSource
Pipeline(source=SalesforceSource(), sink=ComposioSink()).run(
    {"salesforce-source": sf_config, "composio-sink": config},
    pull_params=PullParams(params={"object_type": "Account", "limit": 10}),
    push_params=PushParams(params={
        "tool": "NOTION_ADD_PAGE_CONTENT",
        "data_key": "content",
        "arguments": {"parent_block_id": "..."},
    }),
)

Features

  • Whole Composio catalog through three connectors + an on-demand factory
  • Retry with exponential backoff on rate limits (HTTP 429)
  • SDK exceptions mapped to framework errors; failed pushes return PushResult(success=False), never raise
  • toolkit_filter allow-listing for governance
  • File outputs → bytes envelopes with real MIME types; binary envelopes → file-upload arguments (uploads restricted to the connector's own workdir)
  • Connection status surfaced in list_resources(); programmatic OAuth helpers

Notes & limitations

  • Toolkit pinning is a slug-prefix check (GMAIL_* belongs to gmail). It avoids a per-call API lookup; use list_resources()/Composio docs for ground truth on slugs.
  • Multi-tenant deployments must set user_id per end-user — otherwise everything executes as "default" against whatever account that entity has connected.
  • File downloads are buffered in memory when yielded as envelopes (the SDK enforces a 100 MB cap per file); streamed envelopes are a future upgrade.
  • Trigger polling reads Composio's event log (the SDK's first-class trigger interface is a realtime websocket). The log endpoint is versioned under /api/v3.1/internal/ — pinned SDK versions keep this stable, but it is the most drift-prone surface; all access is isolated in client.py.
  • Tool arguments and results are never logged — only tool slugs and counts.
  • The composio SDK is pinned >=0.13.1,<2.0 (verified against 0.13.1 / SDK v1.0.0-rc2). All SDK touchpoints live in client.py; version bumps should only ever touch that file.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aisquare_pipe_composio-0.1.1.tar.gz (36.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

aisquare_pipe_composio-0.1.1-py3-none-any.whl (26.4 kB view details)

Uploaded Python 3

File details

Details for the file aisquare_pipe_composio-0.1.1.tar.gz.

File metadata

  • Download URL: aisquare_pipe_composio-0.1.1.tar.gz
  • Upload date:
  • Size: 36.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for aisquare_pipe_composio-0.1.1.tar.gz
Algorithm Hash digest
SHA256 445735ac000a9c197490c52f456ef17d847d8adceb84d4dcfea285a0f861959b
MD5 8b59053a263f2ec5e4da012f67e907c0
BLAKE2b-256 3cc62b57279974b970dd5d10c374f78cb628a63eb2f08696952d91c3d5d0cee2

See more details on using hashes here.

Provenance

The following attestation bundles were made for aisquare_pipe_composio-0.1.1.tar.gz:

Publisher: publish.yml on AISquare-Studio/pipe

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file aisquare_pipe_composio-0.1.1-py3-none-any.whl.

File metadata

File hashes

Hashes for aisquare_pipe_composio-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 b921d835e519ad9d807b0e4da0af8e09d641a8bd8d06a5c118711b84afec1b82
MD5 7558dc05ea03677a9da76be1cea093fd
BLAKE2b-256 a1916e39f0553af48b92314d92e5079e926f8ff3ccf2cd155522c71e3fe23e31

See more details on using hashes here.

Provenance

The following attestation bundles were made for aisquare_pipe_composio-0.1.1-py3-none-any.whl:

Publisher: publish.yml on AISquare-Studio/pipe

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page