Composio meta-connector for aisquare.pipe — execute 500+ SaaS toolkits through one connector
Project description
aisquare-pipe-composio
One API key, 500+ SaaS toolkits: Gmail, Slack, GitHub, Notion, Google Drive, HubSpot, ... — this meta-connector bridges aisquare.pipe to the entire Composio catalog, so any pipe source can feed a Composio app and any Composio app can feed a pipe sink. Ships in aisquare-pipe[full].
Install
pip install aisquare-pipe-composio
For development:
pip install -e "connectors/composio[dev]"
How it works
| Composio concept | Meaning here |
|---|---|
| toolkit | An app (gmail, slack, github, ...) |
| tool | One action in a toolkit, addressed by slug (GMAIL_FETCH_EMAILS, SLACK_SEND_MESSAGE) |
| user_id | The end user (Composio "entity") tools execute as — "default" for single-user setups |
| connected account | A user's authenticated link to an app. Composio runs the OAuth and stores/refreshes the tokens |
The connector executes tools: composio-source runs a tool and yields its results as envelopes; composio-sink runs a tool with envelope data as the arguments; composio-triggers-source polls trigger events (new email, new row, ...). The only credential pipe ever sees is your Composio API key.
Configuration
All three connectors share the same config shape:
config = {
"api_key": "ck_...", # required — Composio API key
"user_id": "default", # entity tools run as (set per end-user in multi-tenant setups)
"connected_account_id": None, # optional — pin execution to one connected account
"toolkit_filter": ["gmail"], # optional — allow-list of toolkit slugs (governance)
"base_url": None, # optional — Composio backend override
"timeout_seconds": 60, # optional — request timeout
"file_workdir": None, # optional — default: ~/.cache/aisquare-pipe/composio-files
}
The COMPOSIO_API_KEY environment variable is intentionally not read — config is the single source of truth, matching every other pipe connector.
Connect an account
Tool execution requires the user to have an ACTIVE connected account for the tool's toolkit (except no-auth toolkits like Hacker News). Either connect in the Composio dashboard, or programmatically:
from aisquare_pipe_composio import initiate_connection, wait_for_active, connection_status
request = initiate_connection(config, "gmail")
print("Authorize at:", request.redirect_url) # send the user here
account = wait_for_active(config, request.id) # blocks until OAuth completes
print(connection_status(config, "gmail")) # "ACTIVE"
ComposioSource().list_resources(config) browses all toolkits with your connection status per toolkit.
Pull (composio-source)
from aisquare.pipe import PullParams
from aisquare_pipe_composio import ComposioSource
source = ComposioSource()
params = PullParams(params={
"tool": "GMAIL_FETCH_EMAILS",
"arguments": {"max_results": 10},
"unwrap": True, # one envelope per message instead of one blob
})
for envelope in source.pull(config, params):
print(envelope.metadata["composio_tool"], envelope.data)
PullParams keys:
tool(required) — tool slug, case-insensitivearguments— tool input arguments (see the tool's schema in Composio docs)unwrap—False(default): one envelope with the whole result;True: auto-fan-out when the result is a list or a single-list-key dict;"dot.path": explicit path to the list (raises if not a list). Unwrapped envelopes carryitem_index/item_countmetadata.download_files— also yield one bytes envelope per file output (real MIME type,filename+file_fieldmetadata). Files are materialised underfile_workdir.user_id,connected_account_id,tool_version— per-call overrides
Push (composio-sink)
from aisquare.pipe import DataEnvelope, PushParams
from aisquare_pipe_composio import ComposioSink
sink = ComposioSink()
envelope = DataEnvelope(
content_type="application/json",
data={"channel": "#alerts", "text": "deploy finished"},
source_id="my-app",
)
result = sink.push(envelope, config, PushParams(params={"tool": "SLACK_SEND_MESSAGE"}))
assert result.success
Argument layering (later wins, shallow per-key merge):
- Envelope payload — a JSON-object envelope is the base arguments; with
data_key="text"the payload is nested under that argument; withfile_arg="file"a binary envelope is uploaded and the file reference passed in that argument envelope.metadata["composio_arguments"]— per-envelope steering from upstreamparams["arguments"]— operator overrides, always win
# Text envelope into a named argument:
PushParams(params={"tool": "SLACK_SEND_MESSAGE", "data_key": "text",
"arguments": {"channel": "#general"}})
# Binary envelope as a file upload:
PushParams(params={"tool": "GOOGLEDRIVE_UPLOAD_FILE", "file_arg": "file_to_upload"})
The tool's response data is returned in PushResult.metadata["data"].
Toolkit-pinned connectors (factory)
from aisquare.pipe import Pipeline, PullParams, PushParams
from aisquare_pipe_composio import composio_source, composio_sink
GmailSource = composio_source("gmail") # name: composio-gmail-source
SlackSink = composio_sink("slack") # name: composio-slack-sink
result = Pipeline(source=GmailSource(), sink=SlackSink()).run(
{"composio-gmail-source": config, "composio-slack-sink": config},
pull_params=PullParams(params={"tool": "GMAIL_FETCH_EMAILS", "unwrap": True}),
push_params=PushParams(params={"tool": "SLACK_SEND_MESSAGE", "data_key": "text",
"arguments": {"channel": "#inbox"}}),
)
Pinned classes reject tools from other toolkits and scope list_resources() to their toolkit. They are deliberately not entry-point registered (pipe list shows the three generic connectors only): entry points are static, Composio has ~500 toolkits — build what you need on demand.
Trigger events (composio-triggers-source)
Prerequisite: enable trigger instances in Composio (dashboard → toolkit → triggers), e.g. GMAIL_NEW_GMAIL_MESSAGE. Then:
from aisquare.pipe import PullParams
from aisquare_pipe_composio import ComposioTriggersSource
source = ComposioTriggersSource()
config = {
"api_key": "ck_...",
"user_id": "default",
"trigger_slugs": ["GMAIL_NEW_GMAIL_MESSAGE"], # optional filter
"poll_interval_seconds": 10,
"cursor_path": None, # optional — default: ~/.cache/aisquare-pipe/composio-cursor.json
}
for envelope in source.pull(config): # polls forever
print(envelope.data["payload"])
- One
application/jsonenvelope per event;envelope.data["payload"]is the app payload. idempotency_keymetadata (composio:event:<id>) is stable across re-polls, pairing with sinks that dedupe (e.g. aisquare-gateway).- Position is a timestamp watermark + bounded seen-id ring persisted atomically to
cursor_path;PullParamssincesets the initial watermark (default: now).max_polls/sleepparams support tests and one-shot drains. cursor_pathdefaults to~/.cache/aisquare-pipe/composio-cursor.json(honouring$XDG_CACHE_HOME) — per-user, not shared/tmp. A pre-0.1.1 cursor at/tmp/composio-pipe-cursor.jsonis migrated automatically on first run.list_resources(config)browses available trigger types and your active trigger instances.
Example pipelines
These examples compose with other pipe connectors — install them first: pip install aisquare-pipe-local aisquare-pipe-salesforce (both included in aisquare-pipe[full]).
from aisquare.pipe import Pipeline, PullParams, PushParams
from aisquare_pipe_composio import ComposioSource, ComposioSink
# Save Gmail attachments to disk (composio → local)
from aisquare_pipe_local import LocalSink
Pipeline(source=ComposioSource(), sink=LocalSink()).run(
{"composio-source": config, "local-sink": {"base_path": "/tmp/attachments"}},
pull_params=PullParams(params={
"tool": "GMAIL_GET_ATTACHMENT",
"arguments": {"message_id": "...", "attachment_id": "...", "file_name": "x.pdf"},
"download_files": True,
}),
)
# Salesforce records into Notion (salesforce → composio)
from aisquare_pipe_salesforce import SalesforceSource
Pipeline(source=SalesforceSource(), sink=ComposioSink()).run(
{"salesforce-source": sf_config, "composio-sink": config},
pull_params=PullParams(params={"object_type": "Account", "limit": 10}),
push_params=PushParams(params={
"tool": "NOTION_ADD_PAGE_CONTENT",
"data_key": "content",
"arguments": {"parent_block_id": "..."},
}),
)
Features
- Whole Composio catalog through three connectors + an on-demand factory
- Retry with exponential backoff on rate limits (HTTP 429)
- SDK exceptions mapped to framework errors; failed pushes return
PushResult(success=False), never raise toolkit_filterallow-listing for governance- File outputs → bytes envelopes with real MIME types; binary envelopes → file-upload arguments (uploads restricted to the connector's own workdir)
- Connection status surfaced in
list_resources(); programmatic OAuth helpers
Notes & limitations
- Toolkit pinning is a slug-prefix check (
GMAIL_*belongs togmail). It avoids a per-call API lookup; uselist_resources()/Composio docs for ground truth on slugs. - Multi-tenant deployments must set
user_idper end-user — otherwise everything executes as"default"against whatever account that entity has connected. - File downloads are buffered in memory when yielded as envelopes (the SDK enforces a 100 MB cap per file); streamed envelopes are a future upgrade.
- Trigger polling reads Composio's event log (the SDK's first-class trigger interface is a realtime websocket). The log endpoint is versioned under
/api/v3.1/internal/— pinned SDK versions keep this stable, but it is the most drift-prone surface; all access is isolated inclient.py. - Tool arguments and results are never logged — only tool slugs and counts.
- The
composioSDK is pinned>=0.13.1,<2.0(verified against 0.13.1 / SDK v1.0.0-rc2). All SDK touchpoints live inclient.py; version bumps should only ever touch that file.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file aisquare_pipe_composio-0.1.1.tar.gz.
File metadata
- Download URL: aisquare_pipe_composio-0.1.1.tar.gz
- Upload date:
- Size: 36.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
445735ac000a9c197490c52f456ef17d847d8adceb84d4dcfea285a0f861959b
|
|
| MD5 |
8b59053a263f2ec5e4da012f67e907c0
|
|
| BLAKE2b-256 |
3cc62b57279974b970dd5d10c374f78cb628a63eb2f08696952d91c3d5d0cee2
|
Provenance
The following attestation bundles were made for aisquare_pipe_composio-0.1.1.tar.gz:
Publisher:
publish.yml on AISquare-Studio/pipe
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
aisquare_pipe_composio-0.1.1.tar.gz -
Subject digest:
445735ac000a9c197490c52f456ef17d847d8adceb84d4dcfea285a0f861959b - Sigstore transparency entry: 1973298722
- Sigstore integration time:
-
Permalink:
AISquare-Studio/pipe@59f471b3275a13163851e93c0ad05a381d77c63d -
Branch / Tag:
refs/heads/main - Owner: https://github.com/AISquare-Studio
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@59f471b3275a13163851e93c0ad05a381d77c63d -
Trigger Event:
workflow_dispatch
-
Statement type:
File details
Details for the file aisquare_pipe_composio-0.1.1-py3-none-any.whl.
File metadata
- Download URL: aisquare_pipe_composio-0.1.1-py3-none-any.whl
- Upload date:
- Size: 26.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b921d835e519ad9d807b0e4da0af8e09d641a8bd8d06a5c118711b84afec1b82
|
|
| MD5 |
7558dc05ea03677a9da76be1cea093fd
|
|
| BLAKE2b-256 |
a1916e39f0553af48b92314d92e5079e926f8ff3ccf2cd155522c71e3fe23e31
|
Provenance
The following attestation bundles were made for aisquare_pipe_composio-0.1.1-py3-none-any.whl:
Publisher:
publish.yml on AISquare-Studio/pipe
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
aisquare_pipe_composio-0.1.1-py3-none-any.whl -
Subject digest:
b921d835e519ad9d807b0e4da0af8e09d641a8bd8d06a5c118711b84afec1b82 - Sigstore transparency entry: 1973298774
- Sigstore integration time:
-
Permalink:
AISquare-Studio/pipe@59f471b3275a13163851e93c0ad05a381d77c63d -
Branch / Tag:
refs/heads/main - Owner: https://github.com/AISquare-Studio
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@59f471b3275a13163851e93c0ad05a381d77c63d -
Trigger Event:
workflow_dispatch
-
Statement type: