No project description provided
Project description
amasto
Fully async, type-safe Python client for the Mastodon API.
[!WARNING] This project contains code generated by LLMs (Large Language Models). A significant portion of the codebase has not been fully reviewed or tested. Use at your own risk.
Features
- Async-first — All I/O uses
async/awaitvia httpx - Type-safe — Typed endpoint descriptors and Pydantic response models; ships with
py.typed - Version-aware — Automatic server version detection via NodeInfo; models mark field availability with
since()/Unsupported - Pagination —
PaginatedHttpMethod.paginate()async iterator transparently followsLink: rel="next"headers across pages - Streaming — Real-time WebSocket streaming with typed events, automatic reconnection, and exponential back-off
- Minimal surface area — Small, deliberate public API
Requirements
- Python ≥ 3.14
Installation
pip install amasto
Or with uv:
uv add amasto
Quick start
import asyncio
from amasto import Amasto
async def main() -> None:
client = Amasto("https://mastodon.social", "YOUR_ACCESS_TOKEN")
# Post a status
status = await client.api.v1.statuses.post(body={"status": "Hello from amasto!"})
# Read a single status by ID
status = await client.api.v1.statuses["123456"].get()
# List accounts the user is following
accounts = await client.api.v1.accounts["123456"].following.get()
asyncio.run(main())
Client
Amasto is the main entry point. It wraps an httpx.AsyncClient and automatically discovers the server's Mastodon version via the NodeInfo protocol on first use.
from semver import Version
client = Amasto(
"https://mastodon.social", # base URL
"YOUR_ACCESS_TOKEN", # API key (Bearer token)
mastodon_version=Version(4, 3, 0), # optional: skip auto-detection
)
Resources
API access uses a resource-based pattern where every endpoint is reachable as a chain of attribute accesses on the client:
client.api.v1.<resource>.<method>(params=..., body=...)
client.api.v1.<resource>["id"].<sub_resource>.<method>(...)
Each leaf node is an HttpMethod instance that is:
- Async-callable —
await method(params=..., body=...)executes the HTTP request and returns a validated response. - Introspectable —
.method,.path, and.requiresexpose the HTTP verb, URL path, and minimum server version. - Test-friendly —
.parse(data)validates data against the response type without making HTTP calls.
List endpoints (bookmarks, timelines, followers, …) use PaginatedHttpMethod which adds a .paginate() async iterator:
# Single page (default behaviour, backwards compatible)
statuses = await client.api.v1.timelines.home.get()
# Iterate across all pages automatically
async for status in client.api.v1.timelines.home.get.paginate(params={"limit": 40}):
print(f"{status.account.username}: {status.content}")
# Stop after a maximum number of items
async for status in client.api.v1.timelines.home.get.paginate(max_items=200):
print(status.content)
# Validate data without HTTP (useful in tests)
status = client.api.v1.statuses["123"].get.parse({
"id": "123",
"content": "<p>Hello</p>",
"account": {"id": "1", "username": "alice", ...},
...
})
API v1 (client.api.v1)
| Resource | Access pattern |
|---|---|
accounts |
.get, .post, .verify_credentials.get, ["id"].get, ["id"].follow.post, … |
announcements |
.get, ["id"].dismiss.post, ["id"].reactions["name"].put |
apps |
.post, .verify_credentials.get |
blocks |
.get |
bookmarks |
.get |
conversations |
.get, ["id"].delete, ["id"].read.post |
custom_emojis |
.get |
directory |
.get |
domain_blocks |
.get, .post, .delete |
emails |
.confirmations.post |
endorsements |
.get |
favourites |
.get |
featured_tags |
.get, .post, .suggestions.get, ["id"].delete |
follow_requests |
.get, ["id"].authorize.post, ["id"].reject.post |
followed_tags |
.get |
instance |
.get, .peers.get, .activity.get, .rules.get, .domain_blocks.get, … |
lists |
.get, .post, ["id"].get, ["id"].put, ["id"].delete, ["id"].accounts.get |
markers |
.get, .post |
media |
["id"].get, ["id"].put, ["id"].delete |
mutes |
.get |
notifications |
.get, .clear.post, .unread_count.get, .requests.*, ["id"].get, ["id"].dismiss.post |
polls |
["id"].get, ["id"].votes.post |
preferences |
.get |
profile |
.avatar.delete, .header.delete |
push |
.subscription.get, .subscription.post, .subscription.put, .subscription.delete |
reports |
.post |
scheduled_statuses |
.get, ["id"].get, ["id"].put, ["id"].delete |
search |
.get |
statuses |
.get, .post, ["id"].get, ["id"].put, ["id"].delete, ["id"].context.get, ["id"].favourite.post, … |
streaming |
.user(), .user_notification(), .public(), .public_local(), .public_remote(), .hashtag(tag), .list(list_id), .direct() |
suggestions |
.get, ["id"].delete |
tags |
["key"].get, ["key"].follow.post, ["key"].unfollow.post |
timelines |
.public.get, .home.get, .link.get, .direct.get, .tag["hashtag"].get, .list["id"].get |
trends |
.tags.get, .statuses.get, .links.get |
API v2 (client.api.v2)
| Resource | Access pattern |
|---|---|
filters |
.get, .post, ["id"].get/put/delete, ["id"].keywords.get/post, .keywords["id"].get/put/delete |
instance |
.get |
media |
.post |
notifications |
.get, .unread_count.get, .policy.get/patch, ["group_key"].dismiss.post, ["group_key"].accounts.get |
search |
.get |
suggestions |
.get |
OEmbed (client.api.oembed)
| Access | Description |
|---|---|
.get |
Fetch oEmbed data for a status URL |
OAuth (client.oauth)
| Resource | Access |
|---|---|
authorize |
.get — Authorization URL |
token |
.post — Obtain a token |
revoke |
.post — Revoke a token |
userinfo |
.get — Authenticated user info |
Health (client.health)
| Access | Description |
|---|---|
.get |
Server health check |
Examples
Reading timelines
# Home timeline
statuses = await client.api.v1.timelines.home.get(params={"limit": 20})
for s in statuses:
print(f"{s.account.username}: {s.content}")
# Local public timeline
statuses = await client.api.v1.timelines.public.get(params={"local": True})
# Hashtag timeline
statuses = await client.api.v1.timelines.tag["python"].get()
Posting and interacting with statuses
# Post a new status
status = await client.api.v1.statuses.post(body={
"status": "Hello, Fediverse!",
"visibility": "public",
"language": "en",
})
# Post a status with a poll
status = await client.api.v1.statuses.post(body={
"status": "What's your favourite language?",
"poll": {
"options": ["Python", "Rust", "TypeScript"],
"expires_in": 86400,
"multiple": False,
},
})
# Favourite / boost / bookmark
await client.api.v1.statuses["123456"].favourite.post()
await client.api.v1.statuses["123456"].reblog.post()
await client.api.v1.statuses["123456"].bookmark.post()
# Get thread context
context = await client.api.v1.statuses["123456"].context.get()
print(context.ancestors, context.descendants)
Accounts
# Verify your own credentials
me = await client.api.v1.accounts.verify_credentials.get()
print(me.username, me.followers_count)
# Look up another account
account = await client.api.v1.accounts["12345"].get()
# List followers with pagination
followers = await client.api.v1.accounts["12345"].followers.get(
params={"limit": 40},
)
Search
results = await client.api.v1.search.get(params={
"q": "python",
"resolve": True,
"limit": 10,
})
for account in results.accounts:
print(account.acct)
for status in results.statuses:
print(status.content)
Notifications
# Fetch only mentions
mentions = await client.api.v1.notifications.get(params={
"types": ["mention"],
"limit": 30,
})
for n in mentions:
print(f"{n.account.username} mentioned you: {n.status.content}")
Pagination
List endpoints accept max_id, since_id, min_id, and limit for cursor-based pagination:
# First page
statuses = await client.api.v1.timelines.home.get(params={"limit": 40})
# Next page — use the last item's ID
if statuses:
older = await client.api.v1.timelines.home.get(params={
"max_id": statuses[-1].id,
"limit": 40,
})
Streaming
The client supports real-time streaming via WebSocket. Each streaming method returns an async iterator that yields typed event models:
async for event in client.api.v1.streaming.user():
match event:
case UpdateEvent() as e:
print(f"New status: {e.status.content}")
case DeleteEvent() as e:
print(f"Deleted: {e.status_id}")
case NotificationEvent() as e:
print(f"Notification: {e.notification.type}")
Available streams
| Method | Stream | Description |
|---|---|---|
streaming.user() |
user |
Statuses, notifications, and other events for the authenticated user |
streaming.user_notification() |
user:notification |
Notifications only |
streaming.public() |
public |
All public statuses (pass only_media=True for media-only) |
streaming.public_local() |
public:local |
Local public statuses |
streaming.public_remote() |
public:remote |
Remote public statuses |
streaming.hashtag(tag) |
hashtag |
Statuses with the given hashtag (pass local=True for local only) |
streaming.list(list_id) |
list |
Statuses from a specific list |
streaming.direct() |
direct |
Direct messages |
Reconnection
All streaming methods automatically reconnect with exponential back-off when the connection drops. Customize the policy with ReconnectPolicy:
from amasto import ReconnectPolicy
policy = ReconnectPolicy(max_retries=10, initial_delay=0.5, max_delay=60.0)
async for event in client.api.v1.streaming.user(reconnect=policy):
...
Event types
| Event class | Trigger | Key fields |
|---|---|---|
UpdateEvent |
New status | status |
DeleteEvent |
Status deleted | status_id |
NotificationEvent |
New notification | notification |
FiltersChangedEvent |
Filters updated | — |
ConversationEvent |
Direct conversation updated | conversation |
AnnouncementEvent |
New announcement | announcement |
AnnouncementReactionEvent |
Reaction on announcement | name, count, announcement_id |
AnnouncementDeleteEvent |
Announcement deleted | announcement_id |
StatusUpdateEvent |
Status edited | status |
EncryptedMessageEvent |
Encrypted message | encrypted_message |
NotificationsMergedEvent |
Notification requests merged | — |
Models
Response models live under amasto.models and are re-exported from amasto.models.v1 and amasto.models.v2. All models are frozen Pydantic BaseModel subclasses.
V1 models
Account, AccountRole, AccountSource, AccountWarning, Announcement, AnnouncementAccount, AnnouncementDeleteEvent, AnnouncementEvent, AnnouncementReactionEvent, AnnouncementStatus, Appeal, Application, AsyncRefresh, Context, Conversation, ConversationEvent, CredentialAccount, CredentialApplication, CustomEmoji, DeleteEvent, DomainBlock, EncryptedMessage, EncryptedMessageEvent, Error, ExtendedDescription, FamiliarFollowers, FeaturedTag, Field, FiltersChangedEvent, IdentityProof, InstanceStats, InstanceUrls, List, Marker, MediaAttachment, MutedAccount, Notification, NotificationEvent, NotificationRequest, NotificationsMergedEvent, Poll, PollOption, Preferences, PreviewCard, PreviewCardAuthor, PrivacyPolicy, Quote, QuoteApproval, Reaction, Relationship, RelationshipSeveranceEvent, Report, Role, Rule, ScheduledStatus, ScheduledStatusParams, ScheduledStatusParamsPoll, Search, ShallowQuote, Status, StatusEdit, StatusEditPoll, StatusEditPollOption, StatusMention, StatusSource, StatusTag, StatusUpdateEvent, StreamEvent, Suggestion, Tag, TagHistory, TermsOfService, Token, Translation, TranslationAttachment, TranslationPoll, TranslationPollOption, TrendsLink, UpdateEvent, WebPushAlerts, WebPushSubscription
V2 models
Filter, FilterKeyword, FilterResult, FilterStatus, Instance, InstanceConfiguration, InstanceConfigurationAccounts, InstanceConfigurationMediaAttachments, InstanceConfigurationPolls, InstanceConfigurationStatuses, InstanceConfigurationTimelinesAccess, InstanceConfigurationTimelinesFeedAccess, InstanceConfigurationTranslation, InstanceConfigurationUrls, InstanceConfigurationVapid, InstanceContact, InstanceIcon, InstanceRegistrations, InstanceThumbnail, InstanceThumbnailVersions, InstanceUsage, InstanceUsageUsers, NotificationPolicy, NotificationPolicySummary
Version awareness
Model fields annotated with since("x.y.z") resolve to Unsupported when the connected server is older than the specified version, so your code can safely handle missing data.
A field can be in one of three states:
| State | Meaning |
|---|---|
str, int, … |
Supported and has a value |
None |
Supported but has no value |
Unsupported |
Server too old to support this field |
from amasto import Unsupported
status = await client.api.v1.statuses["123456"].get()
# Pattern matching (Python 3.14+)
match status.text:
case str() as t:
print(t) # supported, has a value
case None:
pass # supported, no value
case Unsupported():
pass # server too old for this field
# Or use isinstance
if not isinstance(status.poll, Unsupported):
print(status.poll)
Endpoints can also declare requires="x.y.z" to indicate the minimum server version.
Error handling
amasto raises httpx.HTTPStatusError for non-2xx responses:
import httpx
try:
status = await client.api.v1.statuses["nonexistent"].get()
except httpx.HTTPStatusError as e:
print(e.response.status_code) # 404
print(e.response.json()) # {"error": "Record not found"}
Dependencies
| Package | Purpose |
|---|---|
| httpx ≥ 0.28.1 | Async HTTP client |
| pydantic ≥ 2.12.5 | Response validation and models |
| semver ≥ 3.0.4 | Server version parsing |
| websockets ≥ 16.0 | WebSocket streaming |
License
See LICENSE for details.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file amasto-0.2.2.tar.gz.
File metadata
- Download URL: amasto-0.2.2.tar.gz
- Upload date:
- Size: 35.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.11.1 {"installer":{"name":"uv","version":"0.11.1","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ea2d7794faa2b2a779b93a5f43d0ed174e7f843525073fc40e3f23cd7962cee5
|
|
| MD5 |
9a0019909c291040097047246980c7bb
|
|
| BLAKE2b-256 |
dadd148f3c97674335f51e4656d356ca75e3be6dbce531d28f8faaf0d5831837
|
File details
Details for the file amasto-0.2.2-py3-none-any.whl.
File metadata
- Download URL: amasto-0.2.2-py3-none-any.whl
- Upload date:
- Size: 79.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.11.1 {"installer":{"name":"uv","version":"0.11.1","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b7d2dd99cdf56b1639d390cbc788ea70b20088efb262e95f5600c7688edfd70e
|
|
| MD5 |
4cda3660fc8bf084d96a83fdb667baea
|
|
| BLAKE2b-256 |
069bd7369a9aa7588e5720948da710c4930d7d31b05d829e0e5e548732668a66
|