Python SDK

SDK Reference

Complete reference for robotops — the Python client library for connecting robots to the Element Robotics fleet platform.

Overview

The robotops SDK exposes a single public class — RobotClient — that manages the full lifecycle of a robot's connection to the platform. Under the hood it composes three internal pieces:

Installation

Requires Python 3.9+ and a single runtime dependency (paho-mqtt).

terminal
# Install from PyPI
pip install robotops

# Or install in development mode from source
git clone https://github.com/element-robotics/robotops-sdk
pip install -e ".[dev]"

Quick start

Five lines to connect a robot and start streaming data to your fleet dashboard.

robot_controller.py
from robotops import RobotClient

robot = RobotClient(
    robot_id="arm-042",
    org_id="acme-corp",
    api_key="sk-••••••••••••••••",
    backend_url="https://api.elementrobotics.xyz",
)

robot.start()
robot.report_status("online")

# Your robot's work loop
while running:
    robot.send_telemetry("temp_c", value=read_temp(), unit="C")
    time.sleep(5)

robot.stop()   # publishes "offline", disconnects cleanly

RobotClient

class RobotClient(
  robot_id: str,
  org_id: str,
  api_key: str,
  backend_url: str,
  heartbeat_interval_s: int = 30,
  reconnect_min_delay_s: float = 1.0,
  reconnect_max_delay_s: float = 60.0,
)
ParameterTypeDescription
robot_id str required Unique identifier for this robot within your org. Use a stable, human-readable slug like "arm-042" or "delivery-bot-3". Appears in dashboard URLs and MQTT topics.
org_id str required Your organisation's slug as set up in the platform (e.g. "acme-corp"). Embedded in every MQTT topic for tenant isolation.
api_key str required SDK secret key starting with sk-. Issued by the admin via manage.py issue-api-key. Never stored in plain text on the server — only its bcrypt hash is persisted.
backend_url str required Base URL of the Element Robotics backend, e.g. "https://api.elementrobotics.xyz". The SDK calls POST /auth/broker-config here on start() to resolve MQTT credentials. No trailing slash.
heartbeat_interval_s int optional How often (in seconds) to publish a heartbeat. Default 30. The platform's watchdog marks a robot offline if no heartbeat arrives within 75 seconds, so keep this well below that threshold.
reconnect_min_delay_s float optional Starting delay for exponential back-off reconnect. Default 1.0 s.
reconnect_max_delay_s float optional Maximum reconnect back-off delay. Default 60.0 s. Actual delay is jittered ±10% to prevent thundering herd.

start()

start(wait_for_connection: bool = True, connection_timeout_s: float = 10.0) → None
raises TimeoutError raises ConnectionError

Resolves MQTT broker credentials from the backend, connects to the broker, and starts the heartbeat background thread. Must be called before any publish method.

Calling start() a second time is a no-op (logged as a warning).

ParameterTypeDescription
wait_for_connection bool optional Block until the MQTT connection is established. Default True. Set to False for non-blocking startup (messages queued until connected).
connection_timeout_s float optional Seconds to wait before raising TimeoutError. Only used when wait_for_connection=True. Default 10.0.
example
# Block until connected (default)
robot.start()

# Non-blocking — connect in background, messages will queue
robot.start(wait_for_connection=False)

# Extend timeout for slow networks
robot.start(connection_timeout_s=30.0)

stop()

stop() → None

Publishes an "offline" status message, stops the heartbeat thread, and disconnects cleanly from the broker. Because the status is published before disconnecting, the dashboard reflects the robot's offline state immediately rather than waiting for the LWT timeout.

Clean vs. unclean disconnect: Call stop() to signal a deliberate shutdown. If the process crashes or the network drops without stop() being called, the broker fires the Last Will & Testament (LWT) message, which also transitions the robot to "offline" on the dashboard.
example
import signal

def shutdown(sig, frame):
    robot.stop()

signal.signal(signal.SIGINT, shutdown)
signal.signal(signal.SIGTERM, shutdown)

report_status()

report_status(status: Literal["online", "idle", "fault", "offline"]) → None
raises ValueError — invalid status

Publishes the robot's current operational status to the platform. The message is sent with QoS 1 and retain=True so any subscriber (including a freshly loaded dashboard) always receives the latest known state immediately.

See Status values for the full list and their meanings.

example
robot.report_status("online")   # robot is active and healthy
robot.report_status("idle")     # connected but not doing work
robot.report_status("fault")    # use report_fault() instead — it sets this automatically
robot.report_status("offline")  # use stop() instead — it sets this and disconnects cleanly

report_fault()

report_fault(
  code: str,
  severity: Literal["info","warning","error","critical"] = "error",
  data: dict | None = None,
) → str
→ fault_id (UUID string) raises ValueError — empty code or invalid severity

Reports a discrete fault event to the platform. Each call generates a UUID fault_id that the backend uses to deduplicate QoS 1 redeliveries — the same fault is never recorded twice even if the network retries the publish.

Calling this method automatically transitions the robot's status to "fault", so operators see the alert on the dashboard immediately without a separate report_status("fault") call.

ParameterTypeDescription
code str required Machine-readable fault code. Use UPPER_SNAKE_CASE so the backend can group and count by fault type — e.g. "MOTOR_OVERHEAT", "LOW_BATTERY", "SENSOR_FAILURE". Whitespace is stripped and the code is uppercased automatically.
severity str optional One of "info", "warning", "error", "critical". Defaults to "error". See Severity levels.
data dict optional Arbitrary JSON-serialisable dict of contextual telemetry captured at fault time. Useful for post-incident analysis — e.g. {"motor": 3, "temp_c": 87, "load_pct": 94}.
example
# Basic fault
fault_id = robot.report_fault("LOW_BATTERY", severity="warning")

# With contextual data
fault_id = robot.report_fault(
    "MOTOR_OVERHEAT",
    severity="critical",
    data={"motor": 3, "temp_c": 94.2, "load_pct": 87},
)
print(f"Fault recorded: {fault_id}")

send_telemetry()

send_telemetry(
  metric: str, *,
  value: float | None = None,
  unit: str | None = None,
  lat: float | None = None,
  lng: float | None = None,
  heading: float | None = None,
  data: dict | None = None,
) → None
raises ValueError — empty metric or no payload fields provided

Streams a sensor reading to the platform. Published with QoS 0 (fire-and-forget) — suitable for high-frequency sensors where occasional loss is acceptable.

Three payload shapes are supported and can be combined. At least one of value, lat/lng, or data must be provided.

Telemetry shapes

Numeric metric — temperature, battery %, motor current, RPM, etc.

numeric
robot.send_telemetry("temp_c",       value=87.3,   unit="C")
robot.send_telemetry("battery_pct",  value=42.0,   unit="%")
robot.send_telemetry("motor_rpm",    value=1450.0, unit="rpm")
robot.send_telemetry("current_a",    value=4.2,    unit="A")

GPS / location — latitude, longitude, and optional heading.

gps
robot.send_telemetry(
    "location",
    lat=37.7749, lng=-122.4194, heading=45.0,
)

Structured JSON — free-form dict for complex or multi-field readings.

structured
robot.send_telemetry(
    "motor_state",
    data={"motor": 3, "rpm": 1200, "current_a": 4.2, "temp_c": 71.5},
)

Combined — shapes can be mixed in a single call.

combined
robot.send_telemetry(
    "motor_temp",
    value=87.3, unit="C",
    data={"motor_id": 3, "warning_threshold": 85.0},
)
ParameterTypeDescription
metricstr requiredMetric name. Use lower_snake_case for consistent querying — e.g. "temp_c", "battery_pct", "location".
valuefloat optionalNumeric reading (int is coerced to float). Required for numeric payloads.
unitstr optionalSI or domain unit — e.g. "C", "%", "m/s", "rpm". Ignored when value is not set.
latfloat optionalLatitude in decimal degrees. Pair with lng.
lngfloat optionalLongitude in decimal degrees.
headingfloat optionalCourse over ground in degrees (0–360). Only meaningful alongside lat/lng.
datadict optionalArbitrary JSON-serialisable dict for structured payloads.

Properties

robot.is_connectedbool

True if the MQTT connection is currently established. Safe to poll from any thread.

robot.statusstr | None

The last status string passed to report_status(), or None if report_status() has never been called. This reflects the value most recently sent by the client, not necessarily what the server has stored.

example
if robot.is_connected:
    robot.send_telemetry("rpm", value=1200)

print(f"Last reported status: {robot.status}")

Status values

ValueMeaning
"online"Robot is active and healthy. Shown as a green indicator on the dashboard.
"idle"Connected and reachable but not actively doing work. Shown as yellow.
"fault"An error condition is present. Set automatically by report_fault(). Shown as red with a pulsing border.
"offline"Robot has disconnected. Set by stop() (clean) or by the LWT / watchdog (unclean). Shown as grey.

Severity levels

ValueWhen to use
"info"Noteworthy events that don't require immediate action — e.g. a mode transition, a scheduled maintenance ping.
"warning"Degraded conditions approaching a threshold — e.g. battery at 20%, temperature rising.
"error"A component failure that affects robot operation but doesn't require emergency stop. Default severity.
"critical"Immediate human intervention required — e.g. hardware failure, collision, fire sensor triggered.

MQTT topic schema

All topics are prefixed with robotops/<org_id>/<robot_id>/. The org slug is embedded in every topic to enable broker-level ACL tenant isolation.

Topic suffixQoSRetainedPublished by
…/status1Yesreport_status()
…/heartbeat0NoHeartbeatLoop (every 30 s)
…/faults1Noreport_fault()
…/telemetry0Nosend_telemetry()
Why QoS 1 for status and faults? These are high-value events where loss is unacceptable. retain=True on status means a dashboard reload always shows the current state without waiting for the next publish. Faults are QoS 1 but not retained — each is a discrete event, not current state.

Error handling

ExceptionWhen raised
ConnectionError start() — backend unreachable, or API key rejected (HTTP 401).
TimeoutError start() with wait_for_connection=True — broker did not respond within connection_timeout_s.
ValueError report_status() — invalid status string.
report_fault() — empty code or invalid severity.
send_telemetry() — empty metric or no payload fields provided.
error handling
try:
    robot.start()
except ConnectionError as e:
    print(f"Backend unreachable or bad API key: {e}")
    sys.exit(1)
except TimeoutError:
    print("Broker did not respond in time — check network")
    sys.exit(1)

Logging

The SDK uses Python's standard logging module under the logger name robotops. By default no handler is attached, so output is suppressed. To enable SDK logs in your application:

logging setup
import logging

# Enable all robotops logs at DEBUG level
logging.basicConfig(level=logging.DEBUG)

# Or scope it to the SDK only
logging.getLogger("robotops").setLevel(logging.INFO)

Key log events and their levels:

LevelEvent
INFOBroker config resolved, connection established, status published, started/stopped.
WARNINGConnection lost (reconnect starting), fault reported, duplicate start() call.
DEBUGEach telemetry and heartbeat publish.
ERRORBroker config HTTP errors, unrecoverable connection failures.
SDK version 0.1.0 · Element Robotics
← Back to home