Skip to main content

High-performance Rust extension for BCS data decoding

Project description

bcs-rs: Rust transport and decoding for BCSz

bcs-rs is a Rust extension module that accelerates and hardens the client side of the BCSz beamline control system:

  • High-performance Z85 decoding for large detector blobs.
  • A Rust-backed ZeroMQ + CURVE client for the BCS server.
  • A thin PyO3 layer that exposes this functionality as the bcs_rs._core Python module.

The design goal is that Python code uses a small, stable API in bcs_rs._core while Rust handles all low-level transport, timing, and blob decoding.


Current Python BCSz behavior (for context)

The existing BCSz client in Python:

  • Creates a global ZeroMQ context.
  • Uses a REQ socket with CURVE security to talk to the LabVIEW BCS server.
  • Fetches the server public key from an unauthenticated endpoint (public on port).
  • Sends JSON requests (command + parameters) and receives JSON responses.
  • Encodes motor status as a Python Flag bitfield.

Relevant modules and status flags

import sys
if sys.platform[:3] == "win":
    # zmq.asyncio does not support the default (proactor) event loop on Windows.
    # so set the event loop to one zmq supports
    pass  # no asyncio. Do nothing
import zmq
import zmq.utils.z85
import json
import time
from enum import Flag  # for MotorStatus


class MotorStatus(Flag):
    HOME = 1
    FORWARD_LIMIT = 2
    REVERSE_LIMIT = 4
    MOTOR_DIRECTION = 8
    MOTOR_OFF = 16
    MOVE_COMPLETE = 32
    FOLLOWING_ERROR = 64
    NOT_IN_DEAD_BAND = 128
    FORWARD_SW_LIMIT = 256
    REVERSE_SW_LIMIT = 512
    MOTOR_DISABLED = 1024
    RAW_MOTOR_DIRECTION = 2048
    RAW_FORWARD_LIMIT = 4096
    RAW_REVERSE_LIMIT = 8192
    RAW_FORWARD_SW_LIMIT = 16384
    RAW_REVERSE_SW_LIMIT = 32768
    RAW_MOVE_COMPLETE = 65536
    MOVE_LT_THRESHOLD = 131072

    def is_set(self, flag):
        return bool(self._value_ & flag._value_)

BCSz server key fetch

class BCSServer:
    _zmq_socket: zmq.Socket

    @staticmethod
    async def _get_server_public_key(addr, port):
        clear_socket = _zmq_context.socket(zmq.REQ)
        clear_socket.connect(f"tcp://{addr}:{port}")
        await clear_socket.send("public".encode())
        server_public = await clear_socket.recv()
        clear_socket.close()
        return server_public

BCSz connect (ZeroMQ + CURVE)

async def connect(self, addr="127.0.0.1", port=5577):
    global _zmq_context

    if not _zmq_context:
        if "zmq.asyncio" in sys.modules:
            _zmq_context = zmq.asyncio.Context()
        else:
            _zmq_context = zmq.Context()

    self._zmq_socket = _zmq_context.socket(zmq.REQ)

    (client_public_key, client_secret_key) = zmq.curve_keypair()

    server_public_key = await self._get_server_public_key(addr, port)

    print(f"Server Public Key {server_public_key}")

    self._zmq_socket.setsockopt(zmq.CURVE_SERVERKEY, server_public_key)
    self._zmq_socket.setsockopt(zmq.CURVE_PUBLICKEY, client_public_key)
    self._zmq_socket.setsockopt(zmq.CURVE_SECRETKEY, client_secret_key)

    self._zmq_socket.connect(f"tcp://{addr}:{port + 1}")

BCSz request path

async def bcs_request(self, command_name, param_dict, debugging=False):
    if debugging:
        print(f"API command {command_name} BEGIN.")

    api_call_start = time.time()
    param_dict["command"] = command_name
    param_dict["_unused"] = "_unused"
    if "self" in param_dict:
        del param_dict["self"]
    await self._zmq_socket.send(json.dumps(param_dict).encode())
    response_dict = json.loads(await self._zmq_socket.recv())
    response_dict["API_delta_t"] = time.time() - api_call_start

    if debugging:
        print(f"API command {command_name} END {response_dict['API_delta_t']} s.")

    return response_dict

All higher-level scan and control methods in BCSz ultimately delegate to bcs_request.


Rust implementation: future setup

The Rust implementation replaces the Python transport with a Rust BcsConnection while keeping the high-level Python API:

  • Rust owns all ZeroMQ + CURVE details and JSON I/O.
  • Rust measures and injects API_delta_t timing into the response.
  • Rust models MotorStatus as a typed bitflag set.
  • Python keeps the BCSServer API and all scan wrappers, but no longer does JSON or timing.

High-level design:

  • bcs-rs exposes a PyO3 module bcs_rs._core.
  • bcs_rs._core.BcsConnection is the Rust-backed client.
  • BCSServer in Python is composed of a BcsConnection and delegates to it.

Only two Python methods need to change:

  • BCSServer.connect → construct a Rust BcsConnection.
  • BCSServer.bcs_request → delegate to BcsConnection.bcs_request and simply log if debugging is enabled.

Everything else in the Python client can remain unchanged.


Crate layout

The bcs-rs crate is organized into:

  • z85.rs: high-performance Z85 decoder (exported as decode_z85 / decode_z85_parallel).
  • transport.rs: BCS control client implemented in Rust using ZeroMQ + CURVE and JSON.
  • lib.rs: PyO3 glue exposing both Z85 helpers and the BCS transport API as bcs_rs._core.

Only lib.rs defines the Python module; z85 and transport are internal Rust modules.


Core Rust transport API

The central Rust type is a connection object that owns the ZeroMQ context and REQ socket:

pub struct BcsConnection {
    ctx: rzmq::Context,
    req: rzmq::Socket,
}

It provides a minimal interface:

impl BcsConnection {
    pub fn connect(
        addr: &str,
        port: u16,
        recv_timeout: Duration,
        send_timeout: Duration,
    ) -> Result<Self, BcsError>;

    pub fn bcs_request(
        &self,
        command_name: &str,
        params: BTreeMap<String, serde_json::Value>,
    ) -> Result<serde_json::Value, BcsError>;
}
  • connect:
    • Creates a ZeroMQ context.
    • Creates a REQ socket.
    • Generates a client CURVE key pair.
    • Contacts addr:port with a plain REQ socket to retrieve the server public key by sending the "public" command.
    • Configures CURVE_SERVERKEY, CURVE_PUBLICKEY, and CURVE_SECRETKEY.
    • Sets RCVTIMEO and SNDTIMEO to the requested timeouts.
    • Connects the secure REQ socket to tcp://{addr}:{port + 1}.
  • bcs_request:
    • Adds "command" and "_unused" into the parameter map.
    • Serializes the payload to JSON and sends it on the REQ socket.
    • Measures elapsed time with Instant::now() / elapsed().
    • Receives the JSON response and deserializes it into serde_json::Value.
    • Injects API_delta_t (seconds as f64) into the response object before returning it.

The error type BcsError wraps underlying ZeroMQ and JSON errors and is converted to PyErr in lib.rs.


Motor status bitflags in Rust

The Python MotorStatus enum encodes motor controller state as a bitfield. In Rust this will be modeled as a typed flag set so that callers can inspect motor status without re-implementing bit arithmetic in Python.

Planned Rust representation:

bitflags::bitflags! {
    pub struct MotorStatus: u32 {
        const HOME                 = 1;
        const FORWARD_LIMIT        = 2;
        const REVERSE_LIMIT        = 4;
        const MOTOR_DIRECTION      = 8;
        const MOTOR_OFF            = 16;
        const MOVE_COMPLETE        = 32;
        const FOLLOWING_ERROR      = 64;
        const NOT_IN_DEAD_BAND     = 128;
        const FORWARD_SW_LIMIT     = 256;
        const REVERSE_SW_LIMIT     = 512;
        const MOTOR_DISABLED       = 1024;
        const RAW_MOTOR_DIRECTION  = 2048;
        const RAW_FORWARD_LIMIT    = 4096;
        const RAW_REVERSE_LIMIT    = 8192;
        const RAW_FORWARD_SW_LIMIT = 16384;
        const RAW_REVERSE_SW_LIMIT = 32768;
        const RAW_MOVE_COMPLETE    = 65536;
        const MOVE_LT_THRESHOLD    = 131072;
    }
}

This type will be exposed to Python via PyO3 in one of two forms:

  • As a #[pyclass] with helper methods such as is_set(name: str) -> bool, or
  • As integer bitfields on response objects with small helper functions that map raw integers to MotorStatus instances.

The long-term intent is that any motor-related responses coming back from the BCS server carry a typed MotorStatus alongside raw numeric values so that client code can write clear, intention-revealing checks instead of manual bit masking.


Python-facing PyO3 layer

lib.rs exposes the transport API and Z85 helpers to Python through a PyO3 module:

#[pymodule]
mod _core {
    use super::*;

    #[pyclass]
    pub struct BcsConnection {
        inner: transport::BcsConnection,
    }

    #[pymethods]
    impl BcsConnection {
        #[new]
        fn new(
            addr: String,
            port: u16,
            recv_timeout_ms: Option<u64>,
            send_timeout_ms: Option<u64>,
        ) -> PyResult<Self> {
            let recv = Duration::from_millis(recv_timeout_ms.unwrap_or(5000));
            let send = Duration::from_millis(send_timeout_ms.unwrap_or(5000));
            let inner = transport::BcsConnection::connect(&addr, port, recv, send)?;
            Ok(Self { inner })
        }

        fn bcs_request<'py>(
            &self,
            py: Python<'py>,
            command_name: &str,
            params: &PyAny,
        ) -> PyResult<&'py PyAny> {
            let dict: &PyDict = params.downcast()?;
            let map = python_to_serde_map(dict)?;      // PyDict -> BTreeMap<String, Value>
            let resp_val = self.inner.bcs_request(command_name, map)?;
            let resp_py = serde_to_python(py, &resp_val)?; // Value -> PyObject
            Ok(resp_py.into_ref(py))
        }
    }

    #[pyfunction]
    fn decode_z85(data: &str) -> PyResult<Vec<u8>> { /* existing */ }

    #[pyfunction]
    fn decode_z85_parallel(py: Python<'_>, data: &str) -> PyResult<Vec<u8>> { /* existing */ }
}

From Python this appears as:

from bcs_rs._core import BcsConnection, decode_z85, decode_z85_parallel

The existing Z85 functions stay unchanged; the new BcsConnection class is the transport entry point.

The helper functions python_to_serde_map and serde_to_python are responsible for converting between Python dict / list / scalars and serde_json::Value. They can be implemented manually or via pyo3-serde.


Future Python BCSz integration

The Python BCSServer class in BCSz will be composed of a BcsConnection and will delegate all transport to it.

from bcs_rs._core import BcsConnection

class BCSServer:
    def __init__(self, addr: str = "127.0.0.1", port: int = 5577) -> None:
        self._addr = addr
        self._port = port
        self._conn: BcsConnection | None = None

    async def connect(self, addr: str | None = None, port: int | None = None) -> None:
        if addr is not None:
            self._addr = addr
        if port is not None:
            self._port = port
        # Blocking Rust constructor → run in a thread
        self._conn = await asyncio.to_thread(
            BcsConnection,
            self._addr,
            self._port,
            5000,  # recv_timeout_ms
            5000,  # send_timeout_ms
        )

    async def bcs_request(self, command_name: str, param_dict: dict, debugging: bool = False) -> dict:
        if self._conn is None:
            raise RuntimeError("BCSServer not connected")

        response = await asyncio.to_thread(self._conn.bcs_request, command_name, param_dict)

        if debugging:
            api_dt = response.get("API_delta_t", None)
            print(f"API command {command_name} END {api_dt} s.")

        return response

All of the high-level BCSz methods (acquire_data, list_motors, sc_* scans, etc.) can continue to call await self.bcs_request(...) unchanged. Only the underlying transport and timing move into Rust.


Extensibility

With this scaffolding in place, future enhancements can be added without breaking the Python API:

  • Connection pooling in Rust for higher throughput when multiple concurrent requests are issued.
  • Typed Rust helpers for heavy endpoints (for example, turning GetInstrumentAcquired2DBase85 directly into a decoded 2D array) while still exposing a simple function to Python.
  • Additional metrics and tracing on the Rust side to profile and debug BCS communication without instrumenting Python.

BCSz Server rewrite in Rust

The main python functionality of BCSz is to control the beamline using a zmq socket connection to the BCS server. Here is the relevent main ptyhon code that will be rewritten in Rust:

Python Code

Relevant modules and status flags
import sys
if sys.platform[:3] == "win":
    # zmq.asyncio does not support the default (proactor) event loop on windows.
    # so set the event loop to one zmq supports
    pass  # no asyncio. Do nothing
import zmq
import zmq.utils.z85
import json
import time
from enum import Flag  # for MotorStatus
<p>class MotorStatus(Flag):
HOME = 1
FORWARD_LIMIT = 2
REVERSE_LIMIT = 4
MOTOR_DIRECTION = 8
MOTOR_OFF = 16
MOVE_COMPLETE = 32
FOLLOWING_ERROR = 64
NOT_IN_DEAD_BAND = 128
FORWARD_SW_LIMIT = 256
REVERSE_SW_LIMIT = 512
MOTOR_DISABLED = 1024
RAW_MOTOR_DIRECTION = 2048
RAW_FORWARD_LIMIT = 4096
RAW_REVERSE_LIMIT = 8192
RAW_FORWARD_SW_LIMIT = 16384
RAW_REVERSE_SW_LIMIT = 32768
RAW_MOVE_COMPLETE = 65536
MOVE_LT_THRESHOLD = 131072</p>
<pre><code>def is_set(self, flag):
    return bool(self._value_ & flag._value_)
</code></pre>
BCSServer (Python, key public method)
class BCSServer:
    """..."""
    _zmq_socket: zmq.Socket

    @staticmethod
    async def _get_server_public_key(addr, port):
        clear_socket = _zmq_context.socket(zmq.REQ)
        clear_socket.connect(f'tcp://{addr}:{port}')
        await clear_socket.send('public'.encode())
        server_public = await clear_socket.recv()
        clear_socket.close()
        return server_public
connect method (Python, establishing CURVE ZMQ connection)
async def connect(self, addr='127.0.0.1', port=5577):
    """
    (formerly the Constructor) Supply the zmq address string, addr, to reach this endstation.
    """
    global _zmq_context

    # the first server object will create the global zmq context
    if not _zmq_context:
        if 'zmq.asyncio' in sys.modules:    # Using asyncio? (was the module imported?)
            _zmq_context = zmq.asyncio.Context()
        else:
            _zmq_context = zmq.Context()

    self._zmq_socket = _zmq_context.socket(zmq.REQ)

    (client_public_key, client_secret_key) = zmq.curve_keypair()

    # server_public_key = asyncio.get_running_loop().run_until_complete(self._get_server_public_key(addr, port))
    server_public_key = await self._get_server_public_key(addr, port)

    print(f'Server Public Key {server_public_key}')

    self._zmq_socket.setsockopt(zmq.CURVE_SERVERKEY, server_public_key)
    self._zmq_socket.setsockopt(zmq.CURVE_PUBLICKEY, client_public_key)
    self._zmq_socket.setsockopt(zmq.CURVE_SECRETKEY, client_secret_key)

    self._zmq_socket.connect(f'tcp://{addr}:{port + 1}')
bcs_request method (Python, sends a request to the BCS server)
async def bcs_request(self, command_name, param_dict, debugging=False):
    """
    The method responsible for direct communication to the BCS server

    :param command_name: Name of the API endpoint
    :type command_name: str
    :param param_dict: Parameter dictionary
    :type param_dict: dict
    """
    if debugging:
        print(f"API command {command_name} BEGIN.")

    api_call_start = time.time()
    param_dict['command'] = command_name
    param_dict['_unused'] = '_unused'
    if 'self' in param_dict:
        del param_dict['self']
    await self._zmq_socket.send(json.dumps(param_dict).encode())
    response_dict = json.loads(await self._zmq_socket.recv())
    response_dict['API_delta_t'] = time.time() - api_call_start

    if debugging:
        print(f"API command {command_name} END {response_dict['API_delta_t']} s.")

    return response_dict

Rust Implementation

The goals of the rust implementation is to construct a rust extension module that can act as a drop-in replacement that handles all metadata and the bcs_request() method.

This leaves the rest of the BCSz client code to be written in Python, and only the communication with the BCS server to be handled by the rust extension module.

Crate layout

The bcs-rs crate is organized into:

  • z85.rs: high-performance Z85 decoder (already exported as decode_z85 / decode_z85_parallel).
  • transport.rs: BCS control client implemented in Rust using ZeroMQ + CURVE.
  • lib.rs: PyO3 glue exposing both Z85 helpers and the BCS transport API as bcs_rs._core.

Only lib.rs is visible to Python; z85 and transport remain internal Rust modules.

Core Rust transport API

The central Rust type is a connection object that owns the ZeroMQ context and REQ socket:

pub struct BcsConnection {
    ctx: rzmq::Context,
    req: rzmq::Socket,
}

It provides a small interface:

impl BcsConnection {
    pub fn connect(
        addr: &str,
        port: u16,
        recv_timeout: Duration,
        send_timeout: Duration,
    ) -> Result<Self, BcsError>;

    pub fn raw_request(&self, json: &str) -> Result<String, BcsError>;
}
  • connect:
    • Creates a ZeroMQ context.
    • Creates a REQ socket.
    • Generates a client CURVE key pair.
    • Contacts addr:port with a plain REQ socket to retrieve the server public key (sending the "public" command).
    • Configures CURVE_SERVERKEY, CURVE_PUBLICKEY, and CURVE_SECRETKEY.
    • Sets RCVTIMEO and SNDTIMEO to the requested timeouts.
    • Connects the secure REQ socket to tcp://{addr}:{port + 1}.
  • raw_request:
    • Sends the provided JSON string on the REQ socket.
    • Blocks until the full response is received.
    • Returns the response body as a JSON string.

The error type BcsError wraps underlying ZeroMQ and JSON errors and is converted to PyErr in lib.rs.

Motor status bitflags in Rust

The Python MotorStatus enum encodes motor controller state as a bitfield. In Rust this will be modeled as a typed flag set so that callers can inspect motor status without re-implementing bit arithmetic in Python.

Planned Rust representation:

bitflags::bitflags! {
    pub struct MotorStatus: u32 {
        const HOME                 = 1;
        const FORWARD_LIMIT        = 2;
        const REVERSE_LIMIT        = 4;
        const MOTOR_DIRECTION      = 8;
        const MOTOR_OFF            = 16;
        const MOVE_COMPLETE        = 32;
        const FOLLOWING_ERROR      = 64;
        const NOT_IN_DEAD_BAND     = 128;
        const FORWARD_SW_LIMIT     = 256;
        const REVERSE_SW_LIMIT     = 512;
        const MOTOR_DISABLED       = 1024;
        const RAW_MOTOR_DIRECTION  = 2048;
        const RAW_FORWARD_LIMIT    = 4096;
        const RAW_REVERSE_LIMIT    = 8192;
        const RAW_FORWARD_SW_LIMIT = 16384;
        const RAW_REVERSE_SW_LIMIT = 32768;
        const RAW_MOVE_COMPLETE    = 65536;
        const MOVE_LT_THRESHOLD    = 131072;
    }
}

This type will be exposed to Python via PyO3 in one of two forms:

  • As a #[pyclass] with helper methods such as is_set(name: str) -> bool, or
  • As integer bitfields on response objects with small helper functions that map raw integers to MotorStatus instances.

The long-term intent is that any motor-related responses coming back from the BCS server carry a typed MotorStatus alongside raw numeric values so that client code can write clear, intention-revealing checks instead of manual bit masking.

Python-facing PyO3 layer

lib.rs exposes the transport API to Python through a PyO3 module:

#[pymodule]
mod _core {
    use super::*;

    #[pyclass]
    pub struct BcsConnection {
        inner: transport::BcsConnection,
    }

    #[pymethods]
    impl BcsConnection {
        #[new]
        fn new(
            addr: String,
            port: u16,
            recv_timeout_ms: Option<u64>,
            send_timeout_ms: Option<u64>,
        ) -> PyResult<Self> {
            let recv = Duration::from_millis(recv_timeout_ms.unwrap_or(5000));
            let send = Duration::from_millis(send_timeout_ms.unwrap_or(5000));
            let inner = transport::BcsConnection::connect(&addr, port, recv, send)?;
            Ok(Self { inner })
        }

        fn raw_request(&self, json: &str) -> PyResult<String> {
            Ok(self.inner.raw_request(json)?)
        }
    }

    #[pyfunction]
    fn decode_z85(data: &str) -> PyResult<Vec<u8>> { /* existing */ }

    #[pyfunction]
    fn decode_z85_parallel(py: Python<'_>, data: &str) -> PyResult<Vec<u8>> { /* existing */ }
}

From Python, this appears as:

from bcs_rs._core import BcsConnection, decode_z85, decode_z85_parallel

The existing Z85 functions stay unchanged; only the new BcsConnection class is added.

Future Python BCSz integration

The Python BCSServer class in BCSz will use BcsConnection as its transport layer, while preserving the existing high-level API.

from bcs_rs._core import BcsConnection

class BCSServer:
    def __init__(self, addr: str = "127.0.0.1", port: int = 5577) -> None:
        self._addr = addr
        self._port = port
        self._conn: BcsConnection | None = None

    async def connect(self, addr: str | None = None, port: int | None = None) -> None:
        if addr is not None:
            self._addr = addr
        if port is not None:
            self._port = port
        self._conn = await asyncio.to_thread(
            BcsConnection,
            self._addr,
            self._port,
            5000,  # recv_timeout_ms
            5000,  # send_timeout_ms
        )

    async def bcs_request(self, command_name: str, param_dict: dict, debugging: bool = False) -> dict:
        if self._conn is None:
            raise RuntimeError("BCSServer not connected")

        api_call_start = time.perf_counter()

        payload = dict(param_dict)
        payload["command"] = command_name
        payload["_unused"] = "_unused"
        payload.pop("self", None)

        json_in = json.dumps(payload)
        json_out = await asyncio.to_thread(self._conn.raw_request, json_in)
        response_dict = json.loads(json_out)
        response_dict["API_delta_t"] = time.perf_counter() - api_call_start
        return response_dict

All of the high-level BCSz methods (acquire_data, list_motors, sc_* scans, etc.) can continue to call await self.bcs_request(...) unchanged. Only the underlying transport has moved into Rust.

Extensibility

With this scaffolding in place, future enhancements can be added without breaking the Python API:

  • Connection pooling in Rust for higher throughput when multiple concurrent requests are issued.
  • Typed Rust helpers for heavy endpoints (for example, turning GetInstrumentAcquired2DBase85 directly into a decoded 2D array) while still exposing a simple function to Python.
  • Additional metrics and tracing on the Rust side to profile and debug BCS communication.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

bcs_rs-0.1.7.tar.gz (22.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

bcs_rs-0.1.7-cp39-abi3-macosx_11_0_arm64.whl (480.2 kB view details)

Uploaded CPython 3.9+macOS 11.0+ ARM64

File details

Details for the file bcs_rs-0.1.7.tar.gz.

File metadata

  • Download URL: bcs_rs-0.1.7.tar.gz
  • Upload date:
  • Size: 22.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: maturin/1.11.5

File hashes

Hashes for bcs_rs-0.1.7.tar.gz
Algorithm Hash digest
SHA256 56535ea6ca38c4f77b644e9c2251e3bc10ef34eec2a0a0d311d9b04b449d8eee
MD5 4b1776e3677d511fa2ea059ad84dbd70
BLAKE2b-256 3f90795a4abd5a649b833aed1030799c7258e3cdbf642a1e4e0f43c41b2f4e48

See more details on using hashes here.

File details

Details for the file bcs_rs-0.1.7-cp39-abi3-macosx_11_0_arm64.whl.

File metadata

File hashes

Hashes for bcs_rs-0.1.7-cp39-abi3-macosx_11_0_arm64.whl
Algorithm Hash digest
SHA256 731e046e2321aa11441b7fc2c70702e37963c9c4cf7deb9a8ee5858ef7fede20
MD5 239ec1d50474eda3be3ef54a98d6fdb6
BLAKE2b-256 739b5cadcd36eae0f19ac6faf36f554175b5784929e520a6a34234cc0ba0c24a

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page