Skip to main content

No project description provided

Project description

amasto

Fully async, type-safe Python client for the Mastodon API.

[!WARNING] This project contains code generated by LLMs (Large Language Models). A significant portion of the codebase has not been fully reviewed or tested. Use at your own risk.

Features

  • Async-first — All I/O uses async/await via httpx
  • Type-safe — Typed endpoint descriptors and Pydantic response models; ships with py.typed
  • Version-aware — Automatic server version detection via NodeInfo; models mark field availability with since() / Unsupported
  • PaginationPaginatedHttpMethod.paginate() async iterator transparently follows Link: rel="next" headers across pages
  • Streaming — Real-time WebSocket streaming with typed events, automatic reconnection, and exponential back-off
  • Minimal surface area — Small, deliberate public API

Requirements

  • Python ≥ 3.14

Installation

pip install amasto

Or with uv:

uv add amasto

Quick start

import asyncio
from amasto import Amasto

async def main() -> None:
    client = Amasto("https://mastodon.social", "YOUR_ACCESS_TOKEN")

    # Post a status
    status = await client.api.v1.statuses.post(body={"status": "Hello from amasto!"})

    # Read a single status by ID
    status = await client.api.v1.statuses["123456"].get()

    # List accounts the user is following
    accounts = await client.api.v1.accounts["123456"].following.get()

asyncio.run(main())

Client

Amasto is the main entry point. It wraps an httpx.AsyncClient and automatically discovers the server's Mastodon version via the NodeInfo protocol on first use.

from semver import Version

client = Amasto(
    "https://mastodon.social",   # base URL
    "YOUR_ACCESS_TOKEN",         # API key (Bearer token)
    mastodon_version=Version(4, 3, 0),  # optional: skip auto-detection
)

Resources

API access uses a resource-based pattern where every endpoint is reachable as a chain of attribute accesses on the client:

client.api.v1.<resource>.<method>(params=..., body=...)
client.api.v1.<resource>["id"].<sub_resource>.<method>(...)

Each leaf node is an HttpMethod instance that is:

  • Async-callableawait method(params=..., body=...) executes the HTTP request and returns a validated response.
  • Introspectable.method, .path, and .requires expose the HTTP verb, URL path, and minimum server version.
  • Test-friendly.parse(data) validates data against the response type without making HTTP calls.

List endpoints (bookmarks, timelines, followers, …) use PaginatedHttpMethod which adds a .paginate() async iterator:

# Single page (default behaviour, backwards compatible)
statuses = await client.api.v1.timelines.home.get()

# Iterate across all pages automatically
async for status in client.api.v1.timelines.home.get.paginate(params={"limit": 40}):
    print(f"{status.account.username}: {status.content}")

# Stop after a maximum number of items
async for status in client.api.v1.timelines.home.get.paginate(max_items=200):
    print(status.content)
# Validate data without HTTP (useful in tests)
status = client.api.v1.statuses["123"].get.parse({
    "id": "123",
    "content": "<p>Hello</p>",
    "account": {"id": "1", "username": "alice", ...},
    ...
})

API v1 (client.api.v1)

Resource Access pattern
accounts .get, .post, .verify_credentials.get, ["id"].get, ["id"].follow.post, …
announcements .get, ["id"].dismiss.post, ["id"].reactions["name"].put
apps .post, .verify_credentials.get
blocks .get
bookmarks .get
conversations .get, ["id"].delete, ["id"].read.post
custom_emojis .get
directory .get
domain_blocks .get, .post, .delete
emails .confirmations.post
endorsements .get
favourites .get
featured_tags .get, .post, .suggestions.get, ["id"].delete
follow_requests .get, ["id"].authorize.post, ["id"].reject.post
followed_tags .get
instance .get, .peers.get, .activity.get, .rules.get, .domain_blocks.get, …
lists .get, .post, ["id"].get, ["id"].put, ["id"].delete, ["id"].accounts.get
markers .get, .post
media ["id"].get, ["id"].put, ["id"].delete
mutes .get
notifications .get, .clear.post, .unread_count.get, .requests.*, ["id"].get, ["id"].dismiss.post
polls ["id"].get, ["id"].votes.post
preferences .get
profile .avatar.delete, .header.delete
push .subscription.get, .subscription.post, .subscription.put, .subscription.delete
reports .post
scheduled_statuses .get, ["id"].get, ["id"].put, ["id"].delete
search .get
statuses .get, .post, ["id"].get, ["id"].put, ["id"].delete, ["id"].context.get, ["id"].favourite.post, …
streaming .user(), .user_notification(), .public(), .public_local(), .public_remote(), .hashtag(tag), .list(list_id), .direct()
suggestions .get, ["id"].delete
tags ["key"].get, ["key"].follow.post, ["key"].unfollow.post
timelines .public.get, .home.get, .link.get, .direct.get, .tag["hashtag"].get, .list["id"].get
trends .tags.get, .statuses.get, .links.get

API v2 (client.api.v2)

Resource Access pattern
filters .get, .post, ["id"].get/put/delete, ["id"].keywords.get/post, .keywords["id"].get/put/delete
instance .get
media .post
notifications .get, .unread_count.get, .policy.get/patch, ["group_key"].dismiss.post, ["group_key"].accounts.get
search .get
suggestions .get

OEmbed (client.api.oembed)

Access Description
.get Fetch oEmbed data for a status URL

OAuth (client.oauth)

Resource Access
authorize .get — Authorization URL
token .post — Obtain a token
revoke .post — Revoke a token
userinfo .get — Authenticated user info

Health (client.health)

Access Description
.get Server health check

Examples

Reading timelines

# Home timeline
statuses = await client.api.v1.timelines.home.get(params={"limit": 20})
for s in statuses:
    print(f"{s.account.username}: {s.content}")

# Local public timeline
statuses = await client.api.v1.timelines.public.get(params={"local": True})

# Hashtag timeline
statuses = await client.api.v1.timelines.tag["python"].get()

Posting and interacting with statuses

# Post a new status
status = await client.api.v1.statuses.post(body={
    "status": "Hello, Fediverse!",
    "visibility": "public",
    "language": "en",
})

# Post a status with a poll
status = await client.api.v1.statuses.post(body={
    "status": "What's your favourite language?",
    "poll": {
        "options": ["Python", "Rust", "TypeScript"],
        "expires_in": 86400,
        "multiple": False,
    },
})

# Favourite / boost / bookmark
await client.api.v1.statuses["123456"].favourite.post()
await client.api.v1.statuses["123456"].reblog.post()
await client.api.v1.statuses["123456"].bookmark.post()

# Get thread context
context = await client.api.v1.statuses["123456"].context.get()
print(context.ancestors, context.descendants)

Accounts

# Verify your own credentials
me = await client.api.v1.accounts.verify_credentials.get()
print(me.username, me.followers_count)

# Look up another account
account = await client.api.v1.accounts["12345"].get()

# List followers with pagination
followers = await client.api.v1.accounts["12345"].followers.get(
    params={"limit": 40},
)

Search

results = await client.api.v1.search.get(params={
    "q": "python",
    "resolve": True,
    "limit": 10,
})
for account in results.accounts:
    print(account.acct)
for status in results.statuses:
    print(status.content)

Notifications

# Fetch only mentions
mentions = await client.api.v1.notifications.get(params={
    "types": ["mention"],
    "limit": 30,
})
for n in mentions:
    print(f"{n.account.username} mentioned you: {n.status.content}")

Pagination

List endpoints accept max_id, since_id, min_id, and limit for cursor-based pagination:

# First page
statuses = await client.api.v1.timelines.home.get(params={"limit": 40})

# Next page — use the last item's ID
if statuses:
    older = await client.api.v1.timelines.home.get(params={
        "max_id": statuses[-1].id,
        "limit": 40,
    })

Streaming

The client supports real-time streaming via WebSocket. Each streaming method returns an async iterator that yields typed event models:

async for event in client.api.v1.streaming.user():
    match event:
        case UpdateEvent() as e:
            print(f"New status: {e.status.content}")
        case DeleteEvent() as e:
            print(f"Deleted: {e.status_id}")
        case NotificationEvent() as e:
            print(f"Notification: {e.notification.type}")

Available streams

Method Stream Description
streaming.user() user Statuses, notifications, and other events for the authenticated user
streaming.user_notification() user:notification Notifications only
streaming.public() public All public statuses (pass only_media=True for media-only)
streaming.public_local() public:local Local public statuses
streaming.public_remote() public:remote Remote public statuses
streaming.hashtag(tag) hashtag Statuses with the given hashtag (pass local=True for local only)
streaming.list(list_id) list Statuses from a specific list
streaming.direct() direct Direct messages

Reconnection

All streaming methods automatically reconnect with exponential back-off when the connection drops. Customize the policy with ReconnectPolicy:

from amasto import ReconnectPolicy

policy = ReconnectPolicy(max_retries=10, initial_delay=0.5, max_delay=60.0)

async for event in client.api.v1.streaming.user(reconnect=policy):
    ...

Event types

Event class Trigger Key fields
UpdateEvent New status status
DeleteEvent Status deleted status_id
NotificationEvent New notification notification
FiltersChangedEvent Filters updated
ConversationEvent Direct conversation updated conversation
AnnouncementEvent New announcement announcement
AnnouncementReactionEvent Reaction on announcement name, count, announcement_id
AnnouncementDeleteEvent Announcement deleted announcement_id
StatusUpdateEvent Status edited status
EncryptedMessageEvent Encrypted message encrypted_message
NotificationsMergedEvent Notification requests merged

Models

Response models live under amasto.models and are re-exported from amasto.models.v1 and amasto.models.v2. All models are frozen Pydantic BaseModel subclasses.

V1 models

Account, AccountRole, AccountSource, AccountWarning, Announcement, AnnouncementAccount, AnnouncementDeleteEvent, AnnouncementEvent, AnnouncementReactionEvent, AnnouncementStatus, Appeal, Application, AsyncRefresh, Context, Conversation, ConversationEvent, CredentialAccount, CredentialApplication, CustomEmoji, DeleteEvent, DomainBlock, EncryptedMessage, EncryptedMessageEvent, Error, ExtendedDescription, FamiliarFollowers, FeaturedTag, Field, FiltersChangedEvent, IdentityProof, InstanceStats, InstanceUrls, List, Marker, MediaAttachment, MutedAccount, Notification, NotificationEvent, NotificationRequest, NotificationsMergedEvent, Poll, PollOption, Preferences, PreviewCard, PreviewCardAuthor, PrivacyPolicy, Quote, QuoteApproval, Reaction, Relationship, RelationshipSeveranceEvent, Report, Role, Rule, ScheduledStatus, ScheduledStatusParams, ScheduledStatusParamsPoll, Search, ShallowQuote, Status, StatusEdit, StatusEditPoll, StatusEditPollOption, StatusMention, StatusSource, StatusTag, StatusUpdateEvent, StreamEvent, Suggestion, Tag, TagHistory, TermsOfService, Token, Translation, TranslationAttachment, TranslationPoll, TranslationPollOption, TrendsLink, UpdateEvent, WebPushAlerts, WebPushSubscription

V2 models

Filter, FilterKeyword, FilterResult, FilterStatus, Instance, InstanceConfiguration, InstanceConfigurationAccounts, InstanceConfigurationMediaAttachments, InstanceConfigurationPolls, InstanceConfigurationStatuses, InstanceConfigurationTimelinesAccess, InstanceConfigurationTimelinesFeedAccess, InstanceConfigurationTranslation, InstanceConfigurationUrls, InstanceConfigurationVapid, InstanceContact, InstanceIcon, InstanceRegistrations, InstanceThumbnail, InstanceThumbnailVersions, InstanceUsage, InstanceUsageUsers, NotificationPolicy, NotificationPolicySummary

Version awareness

Model fields annotated with since("x.y.z") resolve to Unsupported when the connected server is older than the specified version, so your code can safely handle missing data.

A field can be in one of three states:

State Meaning
str, int, … Supported and has a value
None Supported but has no value
Unsupported Server too old to support this field
from amasto import Unsupported

status = await client.api.v1.statuses["123456"].get()

# Pattern matching (Python 3.14+)
match status.text:
    case str() as t:
        print(t)          # supported, has a value
    case None:
        pass              # supported, no value
    case Unsupported():
        pass              # server too old for this field

# Or use isinstance
if not isinstance(status.poll, Unsupported):
    print(status.poll)

Endpoints can also declare requires="x.y.z" to indicate the minimum server version.

Error handling

amasto raises httpx.HTTPStatusError for non-2xx responses:

import httpx

try:
    status = await client.api.v1.statuses["nonexistent"].get()
except httpx.HTTPStatusError as e:
    print(e.response.status_code)  # 404
    print(e.response.json())       # {"error": "Record not found"}

Dependencies

Package Purpose
httpx ≥ 0.28.1 Async HTTP client
pydantic ≥ 2.12.5 Response validation and models
semver ≥ 3.0.4 Server version parsing
websockets ≥ 16.0 WebSocket streaming

License

See LICENSE for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

amasto-0.2.2.tar.gz (35.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

amasto-0.2.2-py3-none-any.whl (79.4 kB view details)

Uploaded Python 3

File details

Details for the file amasto-0.2.2.tar.gz.

File metadata

  • Download URL: amasto-0.2.2.tar.gz
  • Upload date:
  • Size: 35.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.1 {"installer":{"name":"uv","version":"0.11.1","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for amasto-0.2.2.tar.gz
Algorithm Hash digest
SHA256 ea2d7794faa2b2a779b93a5f43d0ed174e7f843525073fc40e3f23cd7962cee5
MD5 9a0019909c291040097047246980c7bb
BLAKE2b-256 dadd148f3c97674335f51e4656d356ca75e3be6dbce531d28f8faaf0d5831837

See more details on using hashes here.

File details

Details for the file amasto-0.2.2-py3-none-any.whl.

File metadata

  • Download URL: amasto-0.2.2-py3-none-any.whl
  • Upload date:
  • Size: 79.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.1 {"installer":{"name":"uv","version":"0.11.1","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for amasto-0.2.2-py3-none-any.whl
Algorithm Hash digest
SHA256 b7d2dd99cdf56b1639d390cbc788ea70b20088efb262e95f5600c7688edfd70e
MD5 4cda3660fc8bf084d96a83fdb667baea
BLAKE2b-256 069bd7369a9aa7588e5720948da710c4930d7d31b05d829e0e5e548732668a66

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page