SDK Reference
Complete reference for robotops — the Python client library for connecting robots to the Element Robotics fleet platform.
Overview
The robotops SDK exposes a single public class — RobotClient — that manages
the full lifecycle of a robot's connection to the platform. Under the hood it composes three
internal pieces:
- ConnectionManager — paho-MQTT lifecycle, LWT, exponential back-off reconnect, thread-safe publish queue.
- HeartbeatLoop — background thread that pings the platform on a configurable interval.
- Broker config resolution — on
start()the SDK callsPOST /auth/broker-configon the backend to retrieve real MQTT credentials. No broker host or password ever appears in robot code.
Installation
Requires Python 3.9+ and a single runtime dependency (paho-mqtt).
# Install from PyPI pip install robotops # Or install in development mode from source git clone https://github.com/element-robotics/robotops-sdk pip install -e ".[dev]"
Quick start
Five lines to connect a robot and start streaming data to your fleet dashboard.
from robotops import RobotClient robot = RobotClient( robot_id="arm-042", org_id="acme-corp", api_key="sk-••••••••••••••••", backend_url="https://api.elementrobotics.xyz", ) robot.start() robot.report_status("online") # Your robot's work loop while running: robot.send_telemetry("temp_c", value=read_temp(), unit="C") time.sleep(5) robot.stop() # publishes "offline", disconnects cleanly
RobotClient
robot_id: str,
org_id: str,
api_key: str,
backend_url: str,
heartbeat_interval_s: int = 30,
reconnect_min_delay_s: float = 1.0,
reconnect_max_delay_s: float = 60.0,
)
| Parameter | Type | Description |
|---|---|---|
| robot_id | str required | Unique identifier for this robot within your org. Use a stable, human-readable slug like "arm-042" or "delivery-bot-3". Appears in dashboard URLs and MQTT topics. |
| org_id | str required | Your organisation's slug as set up in the platform (e.g. "acme-corp"). Embedded in every MQTT topic for tenant isolation. |
| api_key | str required | SDK secret key starting with sk-. Issued by the admin via manage.py issue-api-key. Never stored in plain text on the server — only its bcrypt hash is persisted. |
| backend_url | str required | Base URL of the Element Robotics backend, e.g. "https://api.elementrobotics.xyz". The SDK calls POST /auth/broker-config here on start() to resolve MQTT credentials. No trailing slash. |
| heartbeat_interval_s | int optional | How often (in seconds) to publish a heartbeat. Default 30. The platform's watchdog marks a robot offline if no heartbeat arrives within 75 seconds, so keep this well below that threshold. |
| reconnect_min_delay_s | float optional | Starting delay for exponential back-off reconnect. Default 1.0 s. |
| reconnect_max_delay_s | float optional | Maximum reconnect back-off delay. Default 60.0 s. Actual delay is jittered ±10% to prevent thundering herd. |
start()
Resolves MQTT broker credentials from the backend, connects to the broker, and starts the heartbeat background thread. Must be called before any publish method.
Calling start() a second time is a no-op (logged as a warning).
| Parameter | Type | Description |
|---|---|---|
| wait_for_connection | bool optional | Block until the MQTT connection is established. Default True. Set to False for non-blocking startup (messages queued until connected). |
| connection_timeout_s | float optional | Seconds to wait before raising TimeoutError. Only used when wait_for_connection=True. Default 10.0. |
# Block until connected (default) robot.start() # Non-blocking — connect in background, messages will queue robot.start(wait_for_connection=False) # Extend timeout for slow networks robot.start(connection_timeout_s=30.0)
stop()
Publishes an "offline" status message, stops the heartbeat thread, and disconnects cleanly from the broker. Because the status is published before disconnecting, the dashboard reflects the robot's offline state immediately rather than waiting for the LWT timeout.
stop() to signal a deliberate shutdown. If the process crashes or the network drops without stop() being called, the broker fires the Last Will & Testament (LWT) message, which also transitions the robot to "offline" on the dashboard.
import signal def shutdown(sig, frame): robot.stop() signal.signal(signal.SIGINT, shutdown) signal.signal(signal.SIGTERM, shutdown)
report_status()
Publishes the robot's current operational status to the platform. The message is sent with QoS 1 and retain=True so any subscriber (including a freshly loaded dashboard) always receives the latest known state immediately.
See Status values for the full list and their meanings.
robot.report_status("online") # robot is active and healthy robot.report_status("idle") # connected but not doing work robot.report_status("fault") # use report_fault() instead — it sets this automatically robot.report_status("offline") # use stop() instead — it sets this and disconnects cleanly
report_fault()
code: str,
severity: Literal["info","warning","error","critical"] = "error",
data: dict | None = None,
) → str
Reports a discrete fault event to the platform. Each call generates a UUID fault_id that the backend uses to deduplicate QoS 1 redeliveries — the same fault is never recorded twice even if the network retries the publish.
Calling this method automatically transitions the robot's status to "fault", so operators see the alert on the dashboard immediately without a separate report_status("fault") call.
| Parameter | Type | Description |
|---|---|---|
| code | str required | Machine-readable fault code. Use UPPER_SNAKE_CASE so the backend can group and count by fault type — e.g. "MOTOR_OVERHEAT", "LOW_BATTERY", "SENSOR_FAILURE". Whitespace is stripped and the code is uppercased automatically. |
| severity | str optional | One of "info", "warning", "error", "critical". Defaults to "error". See Severity levels. |
| data | dict optional | Arbitrary JSON-serialisable dict of contextual telemetry captured at fault time. Useful for post-incident analysis — e.g. {"motor": 3, "temp_c": 87, "load_pct": 94}. |
# Basic fault fault_id = robot.report_fault("LOW_BATTERY", severity="warning") # With contextual data fault_id = robot.report_fault( "MOTOR_OVERHEAT", severity="critical", data={"motor": 3, "temp_c": 94.2, "load_pct": 87}, ) print(f"Fault recorded: {fault_id}")
send_telemetry()
metric: str, *,
value: float | None = None,
unit: str | None = None,
lat: float | None = None,
lng: float | None = None,
heading: float | None = None,
data: dict | None = None,
) → None
Streams a sensor reading to the platform. Published with QoS 0 (fire-and-forget) — suitable for high-frequency sensors where occasional loss is acceptable.
Three payload shapes are supported and can be combined. At least one of value, lat/lng, or data must be provided.
Telemetry shapes
Numeric metric — temperature, battery %, motor current, RPM, etc.
robot.send_telemetry("temp_c", value=87.3, unit="C") robot.send_telemetry("battery_pct", value=42.0, unit="%") robot.send_telemetry("motor_rpm", value=1450.0, unit="rpm") robot.send_telemetry("current_a", value=4.2, unit="A")
GPS / location — latitude, longitude, and optional heading.
robot.send_telemetry( "location", lat=37.7749, lng=-122.4194, heading=45.0, )
Structured JSON — free-form dict for complex or multi-field readings.
robot.send_telemetry( "motor_state", data={"motor": 3, "rpm": 1200, "current_a": 4.2, "temp_c": 71.5}, )
Combined — shapes can be mixed in a single call.
robot.send_telemetry( "motor_temp", value=87.3, unit="C", data={"motor_id": 3, "warning_threshold": 85.0}, )
| Parameter | Type | Description |
|---|---|---|
| metric | str required | Metric name. Use lower_snake_case for consistent querying — e.g. "temp_c", "battery_pct", "location". |
| value | float optional | Numeric reading (int is coerced to float). Required for numeric payloads. |
| unit | str optional | SI or domain unit — e.g. "C", "%", "m/s", "rpm". Ignored when value is not set. |
| lat | float optional | Latitude in decimal degrees. Pair with lng. |
| lng | float optional | Longitude in decimal degrees. |
| heading | float optional | Course over ground in degrees (0–360). Only meaningful alongside lat/lng. |
| data | dict optional | Arbitrary JSON-serialisable dict for structured payloads. |
Properties
robot.is_connected → bool
True if the MQTT connection is currently established. Safe to poll from any thread.
robot.status → str | None
The last status string passed to report_status(), or None if report_status() has never been called. This reflects the value most recently sent by the client, not necessarily what the server has stored.
if robot.is_connected: robot.send_telemetry("rpm", value=1200) print(f"Last reported status: {robot.status}")
Status values
| Value | Meaning |
|---|---|
| "online" | Robot is active and healthy. Shown as a green indicator on the dashboard. |
| "idle" | Connected and reachable but not actively doing work. Shown as yellow. |
| "fault" | An error condition is present. Set automatically by report_fault(). Shown as red with a pulsing border. |
| "offline" | Robot has disconnected. Set by stop() (clean) or by the LWT / watchdog (unclean). Shown as grey. |
Severity levels
| Value | When to use |
|---|---|
| "info" | Noteworthy events that don't require immediate action — e.g. a mode transition, a scheduled maintenance ping. |
| "warning" | Degraded conditions approaching a threshold — e.g. battery at 20%, temperature rising. |
| "error" | A component failure that affects robot operation but doesn't require emergency stop. Default severity. |
| "critical" | Immediate human intervention required — e.g. hardware failure, collision, fire sensor triggered. |
MQTT topic schema
All topics are prefixed with robotops/<org_id>/<robot_id>/. The org slug is embedded in every topic to enable broker-level ACL tenant isolation.
| Topic suffix | QoS | Retained | Published by |
|---|---|---|---|
| …/status | 1 | Yes | report_status() |
| …/heartbeat | 0 | No | HeartbeatLoop (every 30 s) |
| …/faults | 1 | No | report_fault() |
| …/telemetry | 0 | No | send_telemetry() |
retain=True on status means a dashboard reload always shows the current state without waiting for the next publish. Faults are QoS 1 but not retained — each is a discrete event, not current state.
Error handling
| Exception | When raised |
|---|---|
| ConnectionError | start() — backend unreachable, or API key rejected (HTTP 401). |
| TimeoutError | start() with wait_for_connection=True — broker did not respond within connection_timeout_s. |
| ValueError | report_status() — invalid status string.report_fault() — empty code or invalid severity.send_telemetry() — empty metric or no payload fields provided. |
try: robot.start() except ConnectionError as e: print(f"Backend unreachable or bad API key: {e}") sys.exit(1) except TimeoutError: print("Broker did not respond in time — check network") sys.exit(1)
Logging
The SDK uses Python's standard logging module under the logger name robotops. By default no handler is attached, so output is suppressed. To enable SDK logs in your application:
import logging # Enable all robotops logs at DEBUG level logging.basicConfig(level=logging.DEBUG) # Or scope it to the SDK only logging.getLogger("robotops").setLevel(logging.INFO)
Key log events and their levels:
| Level | Event |
|---|---|
| INFO | Broker config resolved, connection established, status published, started/stopped. |
| WARNING | Connection lost (reconnect starting), fault reported, duplicate start() call. |
| DEBUG | Each telemetry and heartbeat publish. |
| ERROR | Broker config HTTP errors, unrecoverable connection failures. |