Skip to main content

RCH: Partial-Expansion Anytime Focal Search solver for heterogeneous multi-agent TSP

Project description

RCH Solver

RCH — a solver for the Heterogeneous Multi-Agent Travelling Salesman Problem (MTSP).

This package wraps the high-performance C++ solver core via pybind11 and makes it available as a regular Python library.


Installation

Prerequisites

  • Python >= 3.8
  • A C++17 compiler (g++ >= 7, clang++ >= 5)
  • CMake >= 3.15
  • pip
  • matplotlib (only needed for visualization helpers)

Install from PyPI

pip install PyRCH

If you also want plotting helpers:

pip install "PyRCH[viz]"

Install from source

cd MTSP-Solver

# Install (builds the C++ extension automatically)
pip install .

# Or, for development (editable install):
pip install -e .

Note: On Ubuntu 24+ managed Python installs, you may need to add --break-system-packages or use a virtual environment.

Release To PyPI

If you are publishing manually from a Linux development machine, upload the sdist only:

python -m build --sdist
twine check dist/*
twine upload dist/pyrch-*.tar.gz

Why not upload the local wheel? A wheel built locally on Linux is typically tagged like linux_x86_64 or linux_aarch64. PyPI rejects those platform tags. Linux wheels uploaded to PyPI should be built as compliant manylinux wheels, which this repository produces through GitHub Actions.

To publish wheels:

  1. Configure PyPI Trusted Publishing for this repository.
  2. Push a version tag such as v0.0.1.
  3. Let .github/workflows/publish-pypi.yml build and upload the wheels plus sdist.

Quick Start

1. Solve a JSON problem file

import rch

result = rch.solve("path/to/problem.json", time_limit=10)

print(result["status"])        # "success" or "failed"
print(result["timeout"])       # True if time-limit was reached
print(result["statistics"])    # {"max_cost": ..., "sum_cost": ..., "solve_time": ..., ...}

for route in result["routes"]:
    print(f"Agent {route['agent_id']}: path={route['path']}, cost={route['cost']}")

# Anytime improvement history
for snapshot in result["anytime"]:
    print(f"  t={snapshot['time']:.3f}s  max_cost={snapshot['max_cost']:.2f}")

2. Solve from a Python dict

import rch

problem_data = {
    "nodes": [
        {"id": 0, "x": 0.0, "y": 0.0, "type": "depot"},
        {"id": 1, "x": 1.0, "y": 2.0, "type": "target"},
        {"id": 2, "x": 3.0, "y": 1.0, "type": "target"},
    ],
    "agents": [
        {"id": 0, "type": "UAV", "start_node": 0, "end_node": 0},
        {"id": 1, "type": "UAV", "start_node": 0, "end_node": 0},
    ],
    "costs": {
        "UAV": [
            [0.0, 2.24, 3.16],
            [2.24, 0.0, 2.24],
            [3.16, 2.24, 0.0],
        ]
    },
    "options": {
        "return_to_end": True,
        "objective": "min_max"
    }
}

result = rch.solve(problem_data, time_limit=5)
print(result)

3. Planner API (recommended)

import rch

planner = rch.Planner()

# Add nodes
planner.add_depot(0, x=0.0, y=0.0)
planner.add_target(1, x=1.0, y=2.0)
planner.add_target(2, x=3.0, y=1.0)

# Add agents
planner.add_agent(agent_id=0, agent_type="UAV", start_node=0, end_node=0, time_limit=6.0)
planner.add_agent(agent_id=1, agent_type="UGV", start_node=0, end_node=0, time_limit=9.0)

# Set cost matrices (one per agent type)
planner.set_cost_matrix("UAV", [
    [0.0, 2.24, 3.16],
    [2.24, 0.0, 2.24],
    [3.16, 2.24, 0.0],
])
planner.set_cost_matrix("UGV", [
    [0.0, 1.80, 4.20],
    [1.80, 0.0, 2.90],
    [4.20, 2.90, 0.0],
])

# Add constraints (optional)
planner.add_assignment(1, ["UAV"])
planner.add_assignment(2, ["UGV"])
planner.add_time_window(1, start=0.0, end=3.0)
planner.add_time_window(2, start=0.0, end=6.0)

# Set solver options
planner.set_options(return_to_end=True, objective="min_max", time_limit=5)

# Visualize the problem map
planner.show_map()

# Solve
result = planner.solve()
print(result["status"])
for route in result["routes"]:
    print(f"  Agent {route['agent_id']}: path={route['path']}, cost={route['cost']:.3f}")

# Visualize the result
planner.show_result(result)

4. Visualization

show_map — view the problem before solving

# Via Planner method:
planner.show_map()

# Or via module-level function:
rch.show_map(planner)

# Draw on an existing axes (e.g. for subplots):
import matplotlib.pyplot as plt
fig, ax = plt.subplots()
rch.show_map(planner, ax=ax, show=False)
plt.savefig("map.png")

Depots are drawn as black stars (★), targets as grey dots, and each agent's start position as a coloured triangle.

show_result — view routes after solving

result = planner.solve()

# Via Planner method:
planner.show_result(result)

# Or via module-level function:
rch.show_result(planner, result)

Each agent's route is drawn with a distinct colour and directional arrows.

return_to_end=False handling: when the solver option return_to_end is False, the solver internally appends the depot as the last node in each route. show_result automatically detects this and removes the trailing depot from the visualization — the route will end at the last target visited, without drawing an edge back to the depot.

5. Low-level programmatic API

from rch import Problem, Solver, Options, ObjectiveType, NodeType
from rch import Node, Agent, AssignmentConstraint, TimeWindowConstraint

# Build a problem instance in code
problem = Problem()

# Add nodes
for nid, (x, y), ntype in [(0, (0, 0), NodeType.DEPOT),
                             (1, (1, 2), NodeType.TARGET),
                             (2, (3, 1), NodeType.TARGET)]:
    n = Node()
    n.id = nid
    n.position.x = x
    n.position.y = y
    n.type = ntype
    problem.add_node(n)

# Add agents
a = Agent()
a.id = 0
a.type = "UAV"
a.start_node = 0
a.end_node = 0
problem.add_agent(a, 0)

a2 = Agent()
a2.id = 1
a2.type = "UAV"
a2.start_node = 0
a2.end_node = 0
problem.add_agent(a2, 1)

# Set cost matrix (one per agent type)
import math
nodes_xy = [(0,0), (1,2), (3,1)]
n = len(nodes_xy)
cost = [[0.0]*n for _ in range(n)]
for i in range(n):
    for j in range(n):
        dx = nodes_xy[i][0] - nodes_xy[j][0]
        dy = nodes_xy[i][1] - nodes_xy[j][1]
        cost[i][j] = math.sqrt(dx*dx + dy*dy)
problem.set_cost_matrix("UAV", cost)

# Configure options
problem.options().objective = ObjectiveType.MinMax
problem.options().return_to_end = True
problem.options().time_limit = 5.0

# Solve
opts = problem.options()
solver = Solver(problem, opts)
ret = solver.solve()
result = solver.get_result()

# Note: this low-level API follows the original C++ convention:
# ret == 1 means success, ret == 0 means failure.
print(f"Return code: {ret}")
print(f"Paths: {result.paths}")
print(f"Costs: {result.times}")

JSON Input Format

{
  "nodes": [
    {"id": 0, "x": 0.0, "y": 0.0, "z": 0.0, "type": "depot"},
    {"id": 1, "x": 1.5, "y": 2.3, "z": 0.0, "type": "target"}
  ],
  "agents": [
    {
      "id": 0,
      "type": "TypeA",
      "start_node": 0,
      "end_node": 0,
      "max_length": 100.0,
      "capacity": 10.0
    }
  ],
  "costs": {
    "TypeA": [[0.0, 1.5], [1.5, 0.0]]
  },
  "constraints": [
    {
      "kind": "assignment",
      "items": [
        {"node": 1, "types": ["TypeA"]}
      ]
    },
    {
      "kind": "timewindow",
      "items": [
        {"node": 1, "start": 0.0, "end": 50.0}
      ]
    }
  ],
  "options": {
    "return_to_end": true,
    "objective": "min_max",
    "time_limit": 60
  }
}

Fields

Field Description
nodes List of nodes. Each has id, x, y, optional z, type ("depot" or "target").
agents List of agents. Each has id, type, start_node, end_node, optional max_length, capacity.
costs Dict mapping agent type name → cost matrix (2D array, row = from node id, col = to node id).
constraints Optional. List of constraint blocks. kind = "assignment" or "timewindow".
options Optional. return_to_end (bool), objective ("min_max" or "min_sum"), time_limit (seconds).

Result Format

{
    "status": "success",       # "success" or "failed"
    "timeout": False,          # True if solver hit time limit
    "routes": [
        {
            "agent_id": 0,
            "path": [0, 2, 0],    # ordered node IDs
            "cost": 6.32          # route cost
        },
        ...
    ],
    "statistics": {
        "solve_time": 0.123,      # wall-clock time (s)
        "max_cost": 6.32,         # maximum route cost
        "sum_cost": 10.56,        # total cost of all routes
        "n_generated": 1500,      # labels generated
        "n_expanded": 800,        # labels expanded
        "last_update_time": 0.08  # time of last solution improvement
    },
    "anytime": [
        {"time": 0.01, "max_cost": 9.5, "sum_cost": 15.2},
        {"time": 0.05, "max_cost": 7.1, "sum_cost": 12.0},
        ...
    ]
}

API Reference

rch.solve(source, *, time_limit=-1) → dict

Solve an MTSP instance.

  • source — file path (str / Path), raw JSON string, or Python dict.
  • time_limit — override the time limit in seconds (default: use the value in JSON).

rch.Planner (recommended)

High-level builder API. All mutating methods return self for chaining.

  • add_depot(node_id, *, x, y, z=0) — add a depot node.
  • add_target(node_id, *, x, y, z=0, demand=0) — add a target node.
  • add_agent(*, agent_id, agent_type, start_node, end_node, order=None, capacity_limit=-1, time_limit=-1) — add an agent. time_limit is the max travel distance/time (≤0 means no limit).
  • set_cost_matrix(agent_type, matrix) — set cost matrix for an agent type.
  • add_assignment(node_id, types) — assign a node to a list of allowed agent types.
  • add_time_window(node_id, start, end) — add a time window constraint for a node.
  • set_options(*, return_to_end=None, objective=None, time_limit=None) — set solver options. objective accepts "min_max" or "min_sum".
  • solve() → dict — run the solver and return a result dict.
  • show_map(**kwargs) → Axes — visualize the problem map (depots, targets, agent starts).
  • show_result(result, **kwargs) → Axes — visualize solved routes on the map.

rch.show_map(planner, *, ax=None, figsize=(8,6), show=True) → Axes

Plot the problem map: depots (★), targets (●), and agent start positions (▲).

rch.show_result(planner, result, *, ax=None, figsize=(8,6), show=True) → Axes

Plot solved routes on the map. Each agent's path is drawn with a distinct colour and directional arrows. When return_to_end=False, the trailing depot is automatically stripped from the visualization.

rch.Problem

Low-level programmatic problem builder. Methods:

  • add_node(node: Node) — add a node.
  • add_agent(agent: Agent, id: int) — add an agent.
  • set_cost_matrix(agent_type: str, matrix: List[List[float]]) — set cost matrix.
  • set_assignment_constraint(c: AssignmentConstraint) — set assignment constraint.
  • set_timewindow_constraint(c: TimeWindowConstraint) — set time window constraint.
  • options() → Options — access/modify solver options.

rch.Solver

Low-level solver. Construct with Solver(problem, options).

  • solve() → int — run the solver (1 = success, 0 = failure).
  • get_result() → Result — get the final result.
  • get_result_process() → List[Tuple[float, Result]] — get full anytime history.

rch.ObjectiveType

Enum: ObjectiveType.MinMax, ObjectiveType.MinSum.

rch.NodeType

Enum: NodeType.DEPOT, NodeType.TARGET.


License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyrch-0.0.1.tar.gz (507.5 kB view details)

Uploaded Source

File details

Details for the file pyrch-0.0.1.tar.gz.

File metadata

  • Download URL: pyrch-0.0.1.tar.gz
  • Upload date:
  • Size: 507.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for pyrch-0.0.1.tar.gz
Algorithm Hash digest
SHA256 beda4c18c316f81223b5efa9f8c3763053a084ef148dda1bf8e5c90060753ad2
MD5 db3d58ce4cb5ea4be00249e12a058bd0
BLAKE2b-256 80ca3c2e7110f7c7e97dbfaae85af3db6080f141ba25f4721cb2a04586469ffb

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page