High-performance Rust extension for BCS data decoding
Project description
bcs-rs: Rust transport and decoding for BCSz
bcs-rs is a Rust extension module that accelerates and hardens the client side of the BCSz beamline control system:
- High-performance Z85 decoding for large detector blobs.
- A Rust-backed ZeroMQ + CURVE client for the BCS server.
- A thin PyO3 layer that exposes this functionality as the
bcs_rs._corePython module.
The design goal is that Python code uses a small, stable API in bcs_rs._core while Rust handles all low-level transport, timing, and blob decoding.
Current Python BCSz behavior (for context)
The existing BCSz client in Python:
- Creates a global ZeroMQ context.
- Uses a REQ socket with CURVE security to talk to the LabVIEW BCS server.
- Fetches the server public key from an unauthenticated endpoint (
publiconport). - Sends JSON requests (
command+ parameters) and receives JSON responses. - Encodes motor status as a Python
Flagbitfield.
Relevant modules and status flags
import sys
if sys.platform[:3] == "win":
# zmq.asyncio does not support the default (proactor) event loop on Windows.
# so set the event loop to one zmq supports
pass # no asyncio. Do nothing
import zmq
import zmq.utils.z85
import json
import time
from enum import Flag # for MotorStatus
class MotorStatus(Flag):
HOME = 1
FORWARD_LIMIT = 2
REVERSE_LIMIT = 4
MOTOR_DIRECTION = 8
MOTOR_OFF = 16
MOVE_COMPLETE = 32
FOLLOWING_ERROR = 64
NOT_IN_DEAD_BAND = 128
FORWARD_SW_LIMIT = 256
REVERSE_SW_LIMIT = 512
MOTOR_DISABLED = 1024
RAW_MOTOR_DIRECTION = 2048
RAW_FORWARD_LIMIT = 4096
RAW_REVERSE_LIMIT = 8192
RAW_FORWARD_SW_LIMIT = 16384
RAW_REVERSE_SW_LIMIT = 32768
RAW_MOVE_COMPLETE = 65536
MOVE_LT_THRESHOLD = 131072
def is_set(self, flag):
return bool(self._value_ & flag._value_)
BCSz server key fetch
class BCSServer:
_zmq_socket: zmq.Socket
@staticmethod
async def _get_server_public_key(addr, port):
clear_socket = _zmq_context.socket(zmq.REQ)
clear_socket.connect(f"tcp://{addr}:{port}")
await clear_socket.send("public".encode())
server_public = await clear_socket.recv()
clear_socket.close()
return server_public
BCSz connect (ZeroMQ + CURVE)
async def connect(self, addr="127.0.0.1", port=5577):
global _zmq_context
if not _zmq_context:
if "zmq.asyncio" in sys.modules:
_zmq_context = zmq.asyncio.Context()
else:
_zmq_context = zmq.Context()
self._zmq_socket = _zmq_context.socket(zmq.REQ)
(client_public_key, client_secret_key) = zmq.curve_keypair()
server_public_key = await self._get_server_public_key(addr, port)
print(f"Server Public Key {server_public_key}")
self._zmq_socket.setsockopt(zmq.CURVE_SERVERKEY, server_public_key)
self._zmq_socket.setsockopt(zmq.CURVE_PUBLICKEY, client_public_key)
self._zmq_socket.setsockopt(zmq.CURVE_SECRETKEY, client_secret_key)
self._zmq_socket.connect(f"tcp://{addr}:{port + 1}")
BCSz request path
async def bcs_request(self, command_name, param_dict, debugging=False):
if debugging:
print(f"API command {command_name} BEGIN.")
api_call_start = time.time()
param_dict["command"] = command_name
param_dict["_unused"] = "_unused"
if "self" in param_dict:
del param_dict["self"]
await self._zmq_socket.send(json.dumps(param_dict).encode())
response_dict = json.loads(await self._zmq_socket.recv())
response_dict["API_delta_t"] = time.time() - api_call_start
if debugging:
print(f"API command {command_name} END {response_dict['API_delta_t']} s.")
return response_dict
All higher-level scan and control methods in BCSz ultimately delegate to bcs_request.
Rust implementation: future setup
The Rust implementation replaces the Python transport with a Rust BcsConnection while keeping the high-level Python API:
- Rust owns all ZeroMQ + CURVE details and JSON I/O.
- Rust measures and injects
API_delta_ttiming into the response. - Rust models
MotorStatusas a typed bitflag set. - Python keeps the
BCSServerAPI and all scan wrappers, but no longer does JSON or timing.
High-level design:
bcs-rsexposes a PyO3 modulebcs_rs._core.bcs_rs._core.BcsConnectionis the Rust-backed client.BCSServerin Python is composed of aBcsConnectionand delegates to it.
Only two Python methods need to change:
BCSServer.connect→ construct a RustBcsConnection.BCSServer.bcs_request→ delegate toBcsConnection.bcs_requestand simply log ifdebuggingis enabled.
Everything else in the Python client can remain unchanged.
Crate layout
The bcs-rs crate is organized into:
z85.rs: high-performance Z85 decoder (exported asdecode_z85/decode_z85_parallel).transport.rs: BCS control client implemented in Rust using ZeroMQ + CURVE and JSON.lib.rs: PyO3 glue exposing both Z85 helpers and the BCS transport API asbcs_rs._core.
Only lib.rs defines the Python module; z85 and transport are internal Rust modules.
Core Rust transport API
The central Rust type is a connection object that owns the ZeroMQ context and REQ socket:
pub struct BcsConnection {
ctx: rzmq::Context,
req: rzmq::Socket,
}
It provides a minimal interface:
impl BcsConnection {
pub fn connect(
addr: &str,
port: u16,
recv_timeout: Duration,
send_timeout: Duration,
) -> Result<Self, BcsError>;
pub fn bcs_request(
&self,
command_name: &str,
params: BTreeMap<String, serde_json::Value>,
) -> Result<serde_json::Value, BcsError>;
}
connect:- Creates a ZeroMQ context.
- Creates a REQ socket.
- Generates a client CURVE key pair.
- Contacts
addr:portwith a plain REQ socket to retrieve the server public key by sending the"public"command. - Configures
CURVE_SERVERKEY,CURVE_PUBLICKEY, andCURVE_SECRETKEY. - Sets
RCVTIMEOandSNDTIMEOto the requested timeouts. - Connects the secure REQ socket to
tcp://{addr}:{port + 1}.
bcs_request:- Adds
"command"and"_unused"into the parameter map. - Serializes the payload to JSON and sends it on the REQ socket.
- Measures elapsed time with
Instant::now()/elapsed(). - Receives the JSON response and deserializes it into
serde_json::Value. - Injects
API_delta_t(seconds asf64) into the response object before returning it.
- Adds
The error type BcsError wraps underlying ZeroMQ and JSON errors and is converted to PyErr in lib.rs.
Motor status bitflags in Rust
The Python MotorStatus enum encodes motor controller state as a bitfield. In Rust this will be modeled as a typed flag set so that callers can inspect motor status without re-implementing bit arithmetic in Python.
Planned Rust representation:
bitflags::bitflags! {
pub struct MotorStatus: u32 {
const HOME = 1;
const FORWARD_LIMIT = 2;
const REVERSE_LIMIT = 4;
const MOTOR_DIRECTION = 8;
const MOTOR_OFF = 16;
const MOVE_COMPLETE = 32;
const FOLLOWING_ERROR = 64;
const NOT_IN_DEAD_BAND = 128;
const FORWARD_SW_LIMIT = 256;
const REVERSE_SW_LIMIT = 512;
const MOTOR_DISABLED = 1024;
const RAW_MOTOR_DIRECTION = 2048;
const RAW_FORWARD_LIMIT = 4096;
const RAW_REVERSE_LIMIT = 8192;
const RAW_FORWARD_SW_LIMIT = 16384;
const RAW_REVERSE_SW_LIMIT = 32768;
const RAW_MOVE_COMPLETE = 65536;
const MOVE_LT_THRESHOLD = 131072;
}
}
This type will be exposed to Python via PyO3 in one of two forms:
- As a
#[pyclass]with helper methods such asis_set(name: str) -> bool, or - As integer bitfields on response objects with small helper functions that map raw integers to
MotorStatusinstances.
The long-term intent is that any motor-related responses coming back from the BCS server carry a typed MotorStatus alongside raw numeric values so that client code can write clear, intention-revealing checks instead of manual bit masking.
Python-facing PyO3 layer
lib.rs exposes the transport API and Z85 helpers to Python through a PyO3 module:
#[pymodule]
mod _core {
use super::*;
#[pyclass]
pub struct BcsConnection {
inner: transport::BcsConnection,
}
#[pymethods]
impl BcsConnection {
#[new]
fn new(
addr: String,
port: u16,
recv_timeout_ms: Option<u64>,
send_timeout_ms: Option<u64>,
) -> PyResult<Self> {
let recv = Duration::from_millis(recv_timeout_ms.unwrap_or(5000));
let send = Duration::from_millis(send_timeout_ms.unwrap_or(5000));
let inner = transport::BcsConnection::connect(&addr, port, recv, send)?;
Ok(Self { inner })
}
fn bcs_request<'py>(
&self,
py: Python<'py>,
command_name: &str,
params: &PyAny,
) -> PyResult<&'py PyAny> {
let dict: &PyDict = params.downcast()?;
let map = python_to_serde_map(dict)?; // PyDict -> BTreeMap<String, Value>
let resp_val = self.inner.bcs_request(command_name, map)?;
let resp_py = serde_to_python(py, &resp_val)?; // Value -> PyObject
Ok(resp_py.into_ref(py))
}
}
#[pyfunction]
fn decode_z85(data: &str) -> PyResult<Vec<u8>> { /* existing */ }
#[pyfunction]
fn decode_z85_parallel(py: Python<'_>, data: &str) -> PyResult<Vec<u8>> { /* existing */ }
}
From Python this appears as:
from bcs_rs._core import BcsConnection, decode_z85, decode_z85_parallel
The existing Z85 functions stay unchanged; the new BcsConnection class is the transport entry point.
The helper functions python_to_serde_map and serde_to_python are responsible for converting between Python dict / list / scalars and serde_json::Value. They can be implemented manually or via pyo3-serde.
Future Python BCSz integration
The Python BCSServer class in BCSz will be composed of a BcsConnection and will delegate all transport to it.
from bcs_rs._core import BcsConnection
class BCSServer:
def __init__(self, addr: str = "127.0.0.1", port: int = 5577) -> None:
self._addr = addr
self._port = port
self._conn: BcsConnection | None = None
async def connect(self, addr: str | None = None, port: int | None = None) -> None:
if addr is not None:
self._addr = addr
if port is not None:
self._port = port
# Blocking Rust constructor → run in a thread
self._conn = await asyncio.to_thread(
BcsConnection,
self._addr,
self._port,
5000, # recv_timeout_ms
5000, # send_timeout_ms
)
async def bcs_request(self, command_name: str, param_dict: dict, debugging: bool = False) -> dict:
if self._conn is None:
raise RuntimeError("BCSServer not connected")
response = await asyncio.to_thread(self._conn.bcs_request, command_name, param_dict)
if debugging:
api_dt = response.get("API_delta_t", None)
print(f"API command {command_name} END {api_dt} s.")
return response
All of the high-level BCSz methods (acquire_data, list_motors, sc_* scans, etc.) can continue to call await self.bcs_request(...) unchanged. Only the underlying transport and timing move into Rust.
Extensibility
With this scaffolding in place, future enhancements can be added without breaking the Python API:
- Connection pooling in Rust for higher throughput when multiple concurrent requests are issued.
- Typed Rust helpers for heavy endpoints (for example, turning
GetInstrumentAcquired2DBase85directly into a decoded 2D array) while still exposing a simple function to Python. - Additional metrics and tracing on the Rust side to profile and debug BCS communication without instrumenting Python.
BCSz Server rewrite in Rust
The main python functionality of BCSz is to control the beamline using a zmq socket connection to the BCS server. Here is the relevent main ptyhon code that will be rewritten in Rust:
Python Code
Relevant modules and status flags
import sys
if sys.platform[:3] == "win":
# zmq.asyncio does not support the default (proactor) event loop on windows.
# so set the event loop to one zmq supports
pass # no asyncio. Do nothing
import zmq
import zmq.utils.z85
import json
import time
from enum import Flag # for MotorStatus
<p>class MotorStatus(Flag):
HOME = 1
FORWARD_LIMIT = 2
REVERSE_LIMIT = 4
MOTOR_DIRECTION = 8
MOTOR_OFF = 16
MOVE_COMPLETE = 32
FOLLOWING_ERROR = 64
NOT_IN_DEAD_BAND = 128
FORWARD_SW_LIMIT = 256
REVERSE_SW_LIMIT = 512
MOTOR_DISABLED = 1024
RAW_MOTOR_DIRECTION = 2048
RAW_FORWARD_LIMIT = 4096
RAW_REVERSE_LIMIT = 8192
RAW_FORWARD_SW_LIMIT = 16384
RAW_REVERSE_SW_LIMIT = 32768
RAW_MOVE_COMPLETE = 65536
MOVE_LT_THRESHOLD = 131072</p>
<pre><code>def is_set(self, flag):
return bool(self._value_ & flag._value_)
</code></pre>
BCSServer (Python, key public method)
class BCSServer:
"""..."""
_zmq_socket: zmq.Socket
@staticmethod
async def _get_server_public_key(addr, port):
clear_socket = _zmq_context.socket(zmq.REQ)
clear_socket.connect(f'tcp://{addr}:{port}')
await clear_socket.send('public'.encode())
server_public = await clear_socket.recv()
clear_socket.close()
return server_public
connect method (Python, establishing CURVE ZMQ connection)
async def connect(self, addr='127.0.0.1', port=5577):
"""
(formerly the Constructor) Supply the zmq address string, addr, to reach this endstation.
"""
global _zmq_context
# the first server object will create the global zmq context
if not _zmq_context:
if 'zmq.asyncio' in sys.modules: # Using asyncio? (was the module imported?)
_zmq_context = zmq.asyncio.Context()
else:
_zmq_context = zmq.Context()
self._zmq_socket = _zmq_context.socket(zmq.REQ)
(client_public_key, client_secret_key) = zmq.curve_keypair()
# server_public_key = asyncio.get_running_loop().run_until_complete(self._get_server_public_key(addr, port))
server_public_key = await self._get_server_public_key(addr, port)
print(f'Server Public Key {server_public_key}')
self._zmq_socket.setsockopt(zmq.CURVE_SERVERKEY, server_public_key)
self._zmq_socket.setsockopt(zmq.CURVE_PUBLICKEY, client_public_key)
self._zmq_socket.setsockopt(zmq.CURVE_SECRETKEY, client_secret_key)
self._zmq_socket.connect(f'tcp://{addr}:{port + 1}')
bcs_request method (Python, sends a request to the BCS server)
async def bcs_request(self, command_name, param_dict, debugging=False):
"""
The method responsible for direct communication to the BCS server
:param command_name: Name of the API endpoint
:type command_name: str
:param param_dict: Parameter dictionary
:type param_dict: dict
"""
if debugging:
print(f"API command {command_name} BEGIN.")
api_call_start = time.time()
param_dict['command'] = command_name
param_dict['_unused'] = '_unused'
if 'self' in param_dict:
del param_dict['self']
await self._zmq_socket.send(json.dumps(param_dict).encode())
response_dict = json.loads(await self._zmq_socket.recv())
response_dict['API_delta_t'] = time.time() - api_call_start
if debugging:
print(f"API command {command_name} END {response_dict['API_delta_t']} s.")
return response_dict
Rust Implementation
The goals of the rust implementation is to construct a rust extension module that can
act as a drop-in replacement that handles all metadata and the bcs_request() method.
This leaves the rest of the BCSz client code to be written in Python, and only the communication with the BCS server to be handled by the rust extension module.
Crate layout
The bcs-rs crate is organized into:
z85.rs: high-performance Z85 decoder (already exported asdecode_z85/decode_z85_parallel).transport.rs: BCS control client implemented in Rust using ZeroMQ + CURVE.lib.rs: PyO3 glue exposing both Z85 helpers and the BCS transport API asbcs_rs._core.
Only lib.rs is visible to Python; z85 and transport remain internal Rust modules.
Core Rust transport API
The central Rust type is a connection object that owns the ZeroMQ context and REQ socket:
pub struct BcsConnection {
ctx: rzmq::Context,
req: rzmq::Socket,
}
It provides a small interface:
impl BcsConnection {
pub fn connect(
addr: &str,
port: u16,
recv_timeout: Duration,
send_timeout: Duration,
) -> Result<Self, BcsError>;
pub fn raw_request(&self, json: &str) -> Result<String, BcsError>;
}
connect:- Creates a ZeroMQ context.
- Creates a REQ socket.
- Generates a client CURVE key pair.
- Contacts
addr:portwith a plain REQ socket to retrieve the server public key (sending the"public"command). - Configures
CURVE_SERVERKEY,CURVE_PUBLICKEY, andCURVE_SECRETKEY. - Sets
RCVTIMEOandSNDTIMEOto the requested timeouts. - Connects the secure REQ socket to
tcp://{addr}:{port + 1}.
raw_request:- Sends the provided JSON string on the REQ socket.
- Blocks until the full response is received.
- Returns the response body as a JSON string.
The error type BcsError wraps underlying ZeroMQ and JSON errors and is converted to PyErr in lib.rs.
Motor status bitflags in Rust
The Python MotorStatus enum encodes motor controller state as a bitfield. In Rust this will be modeled as a typed flag set so that callers can inspect motor status without re-implementing bit arithmetic in Python.
Planned Rust representation:
bitflags::bitflags! {
pub struct MotorStatus: u32 {
const HOME = 1;
const FORWARD_LIMIT = 2;
const REVERSE_LIMIT = 4;
const MOTOR_DIRECTION = 8;
const MOTOR_OFF = 16;
const MOVE_COMPLETE = 32;
const FOLLOWING_ERROR = 64;
const NOT_IN_DEAD_BAND = 128;
const FORWARD_SW_LIMIT = 256;
const REVERSE_SW_LIMIT = 512;
const MOTOR_DISABLED = 1024;
const RAW_MOTOR_DIRECTION = 2048;
const RAW_FORWARD_LIMIT = 4096;
const RAW_REVERSE_LIMIT = 8192;
const RAW_FORWARD_SW_LIMIT = 16384;
const RAW_REVERSE_SW_LIMIT = 32768;
const RAW_MOVE_COMPLETE = 65536;
const MOVE_LT_THRESHOLD = 131072;
}
}
This type will be exposed to Python via PyO3 in one of two forms:
- As a
#[pyclass]with helper methods such asis_set(name: str) -> bool, or - As integer bitfields on response objects with small helper functions that map raw integers to
MotorStatusinstances.
The long-term intent is that any motor-related responses coming back from the BCS server carry a typed MotorStatus alongside raw numeric values so that client code can write clear, intention-revealing checks instead of manual bit masking.
Python-facing PyO3 layer
lib.rs exposes the transport API to Python through a PyO3 module:
#[pymodule]
mod _core {
use super::*;
#[pyclass]
pub struct BcsConnection {
inner: transport::BcsConnection,
}
#[pymethods]
impl BcsConnection {
#[new]
fn new(
addr: String,
port: u16,
recv_timeout_ms: Option<u64>,
send_timeout_ms: Option<u64>,
) -> PyResult<Self> {
let recv = Duration::from_millis(recv_timeout_ms.unwrap_or(5000));
let send = Duration::from_millis(send_timeout_ms.unwrap_or(5000));
let inner = transport::BcsConnection::connect(&addr, port, recv, send)?;
Ok(Self { inner })
}
fn raw_request(&self, json: &str) -> PyResult<String> {
Ok(self.inner.raw_request(json)?)
}
}
#[pyfunction]
fn decode_z85(data: &str) -> PyResult<Vec<u8>> { /* existing */ }
#[pyfunction]
fn decode_z85_parallel(py: Python<'_>, data: &str) -> PyResult<Vec<u8>> { /* existing */ }
}
From Python, this appears as:
from bcs_rs._core import BcsConnection, decode_z85, decode_z85_parallel
The existing Z85 functions stay unchanged; only the new BcsConnection class is added.
Future Python BCSz integration
The Python BCSServer class in BCSz will use BcsConnection as its transport layer, while preserving the existing high-level API.
from bcs_rs._core import BcsConnection
class BCSServer:
def __init__(self, addr: str = "127.0.0.1", port: int = 5577) -> None:
self._addr = addr
self._port = port
self._conn: BcsConnection | None = None
async def connect(self, addr: str | None = None, port: int | None = None) -> None:
if addr is not None:
self._addr = addr
if port is not None:
self._port = port
self._conn = await asyncio.to_thread(
BcsConnection,
self._addr,
self._port,
5000, # recv_timeout_ms
5000, # send_timeout_ms
)
async def bcs_request(self, command_name: str, param_dict: dict, debugging: bool = False) -> dict:
if self._conn is None:
raise RuntimeError("BCSServer not connected")
api_call_start = time.perf_counter()
payload = dict(param_dict)
payload["command"] = command_name
payload["_unused"] = "_unused"
payload.pop("self", None)
json_in = json.dumps(payload)
json_out = await asyncio.to_thread(self._conn.raw_request, json_in)
response_dict = json.loads(json_out)
response_dict["API_delta_t"] = time.perf_counter() - api_call_start
return response_dict
All of the high-level BCSz methods (acquire_data, list_motors, sc_* scans, etc.) can continue to call await self.bcs_request(...) unchanged. Only the underlying transport has moved into Rust.
Extensibility
With this scaffolding in place, future enhancements can be added without breaking the Python API:
- Connection pooling in Rust for higher throughput when multiple concurrent requests are issued.
- Typed Rust helpers for heavy endpoints (for example, turning
GetInstrumentAcquired2DBase85directly into a decoded 2D array) while still exposing a simple function to Python. - Additional metrics and tracing on the Rust side to profile and debug BCS communication.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file bcs_rs-0.1.7.tar.gz.
File metadata
- Download URL: bcs_rs-0.1.7.tar.gz
- Upload date:
- Size: 22.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: maturin/1.11.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
56535ea6ca38c4f77b644e9c2251e3bc10ef34eec2a0a0d311d9b04b449d8eee
|
|
| MD5 |
4b1776e3677d511fa2ea059ad84dbd70
|
|
| BLAKE2b-256 |
3f90795a4abd5a649b833aed1030799c7258e3cdbf642a1e4e0f43c41b2f4e48
|
File details
Details for the file bcs_rs-0.1.7-cp39-abi3-macosx_11_0_arm64.whl.
File metadata
- Download URL: bcs_rs-0.1.7-cp39-abi3-macosx_11_0_arm64.whl
- Upload date:
- Size: 480.2 kB
- Tags: CPython 3.9+, macOS 11.0+ ARM64
- Uploaded using Trusted Publishing? No
- Uploaded via: maturin/1.11.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
731e046e2321aa11441b7fc2c70702e37963c9c4cf7deb9a8ee5858ef7fede20
|
|
| MD5 |
239ec1d50474eda3be3ef54a98d6fdb6
|
|
| BLAKE2b-256 |
739b5cadcd36eae0f19ac6faf36f554175b5784929e520a6a34234cc0ba0c24a
|