Async PX4 drone controller โ sequential autonomous flight via MAVLink
Project description
mavpilot
๐ท๐บ ะ ัััะบะฐั ะฒะตััะธั
Async PX4 drone controller for Python โ sequential autonomous flight via MAVLink, with built-in live 3D visualization and a hardware-free mock mode.
Contents
- Features
- Installation
- Quick start โ mock mode
- Library usage
- CLI reference
- API reference
- Coordinate system
- Visualization
- Architecture
- Connecting to real hardware
- Development
- License
Features
| Pure asyncio API | Write sequential mission logic with await โ no callbacks, no state machines |
| PX4 OFFBOARD mode | Streams SET_POSITION_TARGET_LOCAL_NED at 50 Hz |
| Precision landing | Vision-guided descent via a simple callback API |
| Body-relative movement | goto_body_relative() without manual NED/heading math |
| Yaw slew-rate limiting | Smooth heading transitions (15 ยฐ/s default, configurable) |
| Browser visualization | Live 3D trajectory + telemetry over HTTP+SSE โ no npm, no CDN required |
| Mock mode | Built-in physics simulator โ test your full mission without SITL or hardware |
| Thread-safe | Heartbeat, receiver, and setpoint-streamer threads run in the background |
Installation
pip install mavpilot
Or from source:
git clone https://github.com/Onikore/mavpilot
cd mavpilot
pip install -e ".[dev]"
Runtime dependency: pymavlink (installed automatically).
Quick start โ mock mode
No drone or SITL needed:
# Square flight pattern
python -m mavpilot --mock
# Star/pentagram flight pattern
python -m mavpilot --mock --pattern star
# Precision landing demo
python -m mavpilot --mock --precision-land
Open http://localhost:8765 in a browser to watch the live 3D visualization.
Library usage
import asyncio
from mavpilot import DroneController
async def mission():
# `async with` connects on entry and tears down on exit (aclose());
# if the block exits via an exception while still armed, it triggers
# emergency_land() first.
async with DroneController(
connection_string="udp:127.0.0.1:14540", # SITL default
enable_viz=True, # browser viz on :8765
) as drone:
await drone.apply_safe_params() # recommended PX4 safety params
await drone.wait_until_ready() # wait for EKF / LOCAL_POSITION_NED
await drone.takeoff(altitude_m=5.0)
# NED coordinates (x=North, y=East, z=Down)
await drone.goto(x=10, y=0, z=-5)
await drone.goto(x=10, y=10, z=-5, yaw_deg=90)
await drone.goto_body_relative(forward_m=5, right_m=0, down_m=0)
await drone.hover(duration_s=3.0)
await drone.land()
asyncio.run(mission())
Precision landing
Supply a callback that returns the landing marker offset in body FRD frame:
from mavpilot import DroneController, MarkerObservation
def get_marker() -> MarkerObservation | None:
# plug in your vision pipeline here
# dx = forward offset (m), dy = right offset (m)
return MarkerObservation(dx=0.3, dy=-0.1)
async def mission():
async with DroneController(mock=True, enable_viz=False) as drone:
await drone.takeoff(altitude_m=10.0)
result = await drone.precision_land(
get_marker_offset=get_marker,
descent_rate_mps=0.3,
final_altitude_m=0.5,
horizontal_tolerance_m=0.15,
min_altitude_floor_m=0.3, # new in v0.2.0
)
if not result:
# status โ {ABORTED_AT_FLOOR, MARKER_LOST, TIMEOUT}
print(f"precision_land did not land cleanly: {result.status.value}")
print(f"final position: {result.final_position}")
Converting camera pixels to body offset
from mavpilot.utils import pixel_to_body_offset
dx, dy = pixel_to_body_offset(
px_norm_x=0.1, # normalized [-1, 1]
px_norm_y=-0.05,
camera_hfov_deg=90.0,
camera_vfov_deg=60.0,
altitude_above_ground_m=drone.get_local_position().altitude,
camera_mount_yaw_deg=0.0,
)
Live telemetry
Subscribe to ~10 Hz telemetry snapshots (works even with enable_viz=False):
async with DroneController(mock=True, enable_viz=False) as drone:
# Callback style โ returns an unsubscribe function:
unsubscribe = drone.subscribe(lambda t: print(t["x"], t["y"], t["z"]))
...
unsubscribe()
# Or async-iterator style:
async for t in drone.telemetry_stream():
if t["landed"] == 1:
break
CLI reference
python -m mavpilot [OPTIONS]
Options:
--connection STR MAVLink endpoint [default: udp:127.0.0.1:14540]
--mock Hardware-free simulator mode
--viz-port INT Browser visualization port [default: 8765]
--viz-host STR Interface the visualization server binds to [default: 127.0.0.1]
Use 0.0.0.0 to expose on LAN (telemetry visible to everyone on the network)
--no-viz Disable browser visualization
--precision-land Use precision landing with a simulated marker
--pattern {square,star} Demo flight pattern [default: square]
--log-level {DEBUG,INFO,WARNING,ERROR} Logging verbosity [default: INFO]
--loop-hz FLOAT OFFBOARD setpoint publish rate in Hz [default: 50.0]
--watchdog-s FLOAT Telemetry watchdog timeout in seconds [default: 2.0]
--version Print the mavpilot version and exit
Error handling and Ctrl-C
- Ctrl-C at any point during a mission calls
emergency_land(). This chains:AUTO_LANDmode switch, wait up to 10 s for touchdown, sendMAV_CMD_NAV_LANDif mode switch is stuck, and as a last resortDO_FLIGHTTERMINATION(immediate motor cut โ drone falls). - RTL is not part of
emergency_land(). Return-to-launch is a separate nominal operation (drone.return_to_launch()), not an emergency procedure. - Any unhandled exception in the mission body (including
KeyboardInterrupt) also triggersemergency_land().
Telemetry watchdog & protocol safety (v0.2.0)
- Telemetry watchdog โ
telemetry_watchdog_s(default 2 s). If no freshLOCAL_POSITION_NEDarrives within this window, the streamer latches a watchdog flag and the next mission call (takeoff/goto/set_yaw/land/return_to_launch/precision_land) raisesDroneError.emergency_land()deliberately ignores the flag โ it is the recovery path the watchdog is meant to trigger. - EKF health gate โ
wait_until_ready()now also validates EKF AHRS health (SYS_STATUSbit 5), not just position freshness. send_command_long()โ exposes the COMMAND_ACK Future API: it awaits the terminal ACK keyed by(cmd_id, target_sys, target_comp).IN_PROGRESSextends the deadline; a duplicate in-flight command, a timeout, or a non-ACCEPTEDresult each raiseDroneError.get_yaw_deg()is normalized to[-180, 180].
API reference
The full, always-current reference is generated from docstrings and published at the GitHub Pages docs site โ treat that as the source of truth. The tables below are a quick map.
DroneController(โฆ)
DroneController(
connection_string = "udp:127.0.0.1:14540",
source_system = 255,
source_component = MAV_COMP_ID_MISSIONPLANNER,
loop_hz = 50.0, # setpoint publish rate
enable_viz = True, # start browser viz server
viz_port = 8765,
viz_host = "127.0.0.1", # 0.0.0.0 to expose on LAN
mock = False, # no-hardware simulator
yaw_slew_rate_deg = 15.0, # max yaw rate (deg/s)
telemetry_watchdog_s = 2.0, # LOCAL_POSITION_NED silence โ watchdog
)
Flight methods
| Method | Description |
|---|---|
await connect(timeout_s) |
Open MAVLink and start background threads |
await reconnect(timeout_s) |
Re-open the link after loss; clears watchdog/send-fault latches (re-arm + re-enter OFFBOARD after) |
await apply_safe_params() |
Write recommended PX4 safety params, verified via PARAM_VALUE read-back |
await wait_until_ready(timeout_s) |
Block until EKF reports LOCAL_POSITION_NED |
await takeoff(altitude_m, timeout_s) |
Arm, enter OFFBOARD, climb |
await goto(x, y, z, yaw_deg, โฆ) |
Fly to NED position |
await goto_relative(dx, dy, dz, โฆ) |
NED offset from current position |
await goto_body_relative(fwd, right, down, โฆ) |
Body FRD offset |
await set_yaw(yaw_deg, timeout_s) |
Rotate in-place |
await hover(duration_s) |
Hold position |
await land(timeout_s) |
Switch to AUTO_LAND, wait until on ground |
await precision_land(callback, โฆ) |
Vision-guided descent; returns PrecisionLandResult |
await return_to_launch(timeout_s) |
Switch to AUTO_RTL, wait until landed |
await emergency_land() |
Chain: AUTO_LAND โ NAV_LAND โ DO_FLIGHTTERMINATION |
await aclose() / async with |
Stop all threads and close connection (preferred) |
close() |
Synchronous teardown (deprecated; prefer aclose()) |
Telemetry
| Method | Returns |
|---|---|
get_local_position() |
Position(x, y, z) in NED meters |
get_yaw_rad() / get_yaw_deg() |
Current heading |
is_armed() |
bool |
ever_armed() |
bool โ sticky; armed at any point this session |
is_offboard() |
bool |
landed_state() |
int (1 = on ground, 2 = in air) |
link_alive() |
bool โ neither watchdog nor send-fault latched |
subscribe(cb) |
Register a ~10 Hz telemetry callback; returns an unsubscribe fn |
telemetry_stream() |
async for telemetry snapshots (works with viz disabled) |
Data classes
from mavpilot import Position, MarkerObservation
# NED position (x=North, y=East, z=Down)
pos: Position # pos.altitude == -pos.z
# Marker offset in body FRD frame
obs: MarkerObservation # dx=forward, dy=right, dz=down (optional)
Coordinate system
mavpilot uses the PX4 NED convention from LOCAL_POSITION_NED:
| Axis | Direction | Note |
|---|---|---|
| x | North (+) | |
| y | East (+) | |
| z | Down (+) | altitude = -z |
Coordinate transform utilities:
from mavpilot.utils import body_to_ned, ned_to_body, pixel_to_body_offset
Visualization
A lightweight self-contained HTTP+SSE server serves a Three.js 3D view with no build step and no external package manager. Open http://localhost:8765 while the drone is running.
The right-hand panel displays:
- Armed status and flight mode
- Live position, velocity, heading, battery
- Active setpoint
- Command log (takeoff, goto, land, โฆ)
- PX4 STATUSTEXT messages
The UI is composed of native ES modules served from mavpilot/viz/static/ (index.html + styles.css + main.js/scene.js/sse.js/telemetry.js/log.js) โ no bundler, but a modern browser with ES-module support is required. The max_clients parameter (default 32) caps concurrent SSE connections; excess clients receive HTTP 503.
Architecture
asyncio event loop <-- your mission code
|
v
DroneController
|
+-- heartbeat_thread (1 Hz MAVLink heartbeat)
+-- receiver_thread (parses incoming MAVLink โ self._tel)
+-- streamer_thread (publishes SET_POSITION_TARGET_LOCAL_NED @ 50 Hz)
+-- viz_server (optional HTTP+SSE โ browser)
All shared state is protected by _tel_lock and _setpoint_lock. No asyncio primitives are needed in user mission code โ the asyncio loop and background threads only touch shared dicts through these locks.
Module layout (v0.2.0)
mavpilot/
โโโ controller.py # DroneController facade (composition root)
โโโ errors.py # DroneError
โโโ types.py # Position, MarkerObservation, PrecisionLand{Status,Result}
โโโ utils.py # coordinate transforms, pinhole, yaw normalization
โโโ constants.py # PX4 mode bits, MAV_CMD ids, type_masks
โโโ cli.py # argparse entrypoint
โโโ core/ # internal flight-stack collaborators
โ โโโ connection.py # MAVLinkConnection โ pymavlink + I/O lock + heartbeat/receiver
โ โโโ telemetry.py # Telemetry โ incoming-message parsing + state cache
โ โโโ commands.py # CommandSender โ COMMAND_LONG with asyncio.Future ACK routing
โ โโโ streamer.py # OffboardStreamer โ setpoint thread + telemetry watchdog
โ โโโ mission.py # MissionOps โ takeoff/goto/hover/land/rtl/emergency_land
โ โโโ precision_land.py # PrecisionLand โ vision descent with altitude floor
โ โโโ safety.py # SafetyOps โ wait_until_ready
โ โโโ mock.py # MockMavConnection + in-process simulator
โโโ viz/ # browser UI server (HTTP + SSE) + static ES modules
Every MAVLink send and recv goes through MAVLinkConnection, which holds the single threading lock. Each subsystem receives its dependencies via constructor injection โ easy to mock in tests.
Connecting to real hardware
# Serial (Raspberry Pi <-> Pixhawk via UART)
drone = DroneController(connection_string="/dev/ttyAMA0")
# UDP (SITL or companion computer bridge)
drone = DroneController(connection_string="udp:192.168.1.10:14540")
# TCP
drone = DroneController(connection_string="tcp:127.0.0.1:5760")
Recommended safety parameters (set by apply_safe_params() defaults):
| Parameter | Value | Purpose |
|---|---|---|
COM_RCL_EXCEPT |
7 | No failsafe in offboard / mission / hold |
COM_OBL_RC_ACT |
4 | RC loss โ hold, not RTL |
COM_OF_LOSS_T |
2.0 s | Offboard loss timeout |
COM_RC_IN_MODE |
1 | RC stick input not required |
Development
# Install in editable mode with dev extras
pip install -e ".[dev]"
# Run tests
pytest -q
# Lint
ruff check mavpilot/
# Type check
mypy mavpilot/
License
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file mavpilot-0.3.0.tar.gz.
File metadata
- Download URL: mavpilot-0.3.0.tar.gz
- Upload date:
- Size: 59.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a81ff8bda1cb7a1166414703a98d4b9152d9ebe3a7c0dca0e20fae40b3d08efd
|
|
| MD5 |
97ffce419f3c7ee7846b55bb3d2aec10
|
|
| BLAKE2b-256 |
9df132ad342aa9e8896b26b40e2bca4d6e066bbf8fe96b5cc792e7acab168410
|
Provenance
The following attestation bundles were made for mavpilot-0.3.0.tar.gz:
Publisher:
ci.yml on Onikore/mavpilot
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
mavpilot-0.3.0.tar.gz -
Subject digest:
a81ff8bda1cb7a1166414703a98d4b9152d9ebe3a7c0dca0e20fae40b3d08efd - Sigstore transparency entry: 1689805395
- Sigstore integration time:
-
Permalink:
Onikore/mavpilot@c7f04f2d5b1173c703dc76cc72a71a78caac3a3b -
Branch / Tag:
refs/tags/v0.3.0 - Owner: https://github.com/Onikore
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
ci.yml@c7f04f2d5b1173c703dc76cc72a71a78caac3a3b -
Trigger Event:
push
-
Statement type:
File details
Details for the file mavpilot-0.3.0-py3-none-any.whl.
File metadata
- Download URL: mavpilot-0.3.0-py3-none-any.whl
- Upload date:
- Size: 63.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
abc32b6502a948351c248b14ed4fb140f7ca3005bd2bd5311560611f272bd421
|
|
| MD5 |
0bb64e9e883ad049112a6e70bcf92129
|
|
| BLAKE2b-256 |
8f42c914c061be75ce3264a66953ce7a9160d33f2b2b79aec22f2da6b27f1e68
|
Provenance
The following attestation bundles were made for mavpilot-0.3.0-py3-none-any.whl:
Publisher:
ci.yml on Onikore/mavpilot
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
mavpilot-0.3.0-py3-none-any.whl -
Subject digest:
abc32b6502a948351c248b14ed4fb140f7ca3005bd2bd5311560611f272bd421 - Sigstore transparency entry: 1689805565
- Sigstore integration time:
-
Permalink:
Onikore/mavpilot@c7f04f2d5b1173c703dc76cc72a71a78caac3a3b -
Branch / Tag:
refs/tags/v0.3.0 - Owner: https://github.com/Onikore
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
ci.yml@c7f04f2d5b1173c703dc76cc72a71a78caac3a3b -
Trigger Event:
push
-
Statement type: