Development
This page summarizes the local development workflow for MicroDCS, including environment setup, test execution, documentation work, and container packaging.
Environment Setup
MicroDCS requires Python ≥ 3.14 and uses uv for dependency management, virtual environment creation, and packaging.
Why Python 3.14?
Python 3.14 is the minimum version where the annotation semantics MicroDCS needs work correctly end-to-end without workarounds. The requirement is driven by PEP 749 (Deferred Evaluation of Annotations) and the new annotationlib module.
MicroDCS relies heavily on runtime type inspection:
- mashumaro resolves type annotations at runtime to build its JSON (orjson) and MessagePack serialization codecs. This means annotations must be real types at runtime, not opaque strings.
DataClassResponseMixin[R]usesGeneric[R]with runtime introspection of__orig_bases__to determine the response type. Generated model code likeDataClassResponseMixin["Hello"]passes a forward reference that must resolve correctly.ProtocolBinding[PH]resolves its generic type parameter at runtime viaget_args()to find the associatedProtocolHandlerclass, including lazyForwardRefresolution through the 3.14annotationlib.ForwardRef.- Generated dataclasses use
InitVar[Hello | None], union type aliases (type Greetings = Hello | Bye), and forward references freely. These all need to round-trip throughtyping.get_type_hints()correctly. register_callback()unwrapsTypeAliasType(the runtime object behind thetype X = Ystatement) andUnionType(X | Y) to register individual dataclass handlers.
Before Python 3.14, there were two problematic alternatives:
- PEP 563 (
from __future__ import annotations) — stringifies all annotations at definition time. This breaks mashumaro's runtime codegen,get_type_hints()resolution of generic parameters, and any code that inspects annotations as live type objects. - No PEP 563 on older Python — requires careful class ordering to avoid
NameErroron forward references, manualupdate_forward_refs()calls, andif TYPE_CHECKINGguards with duplicated imports. This is fragile and error-prone with generated code.
PEP 749 solves this by making annotation evaluation lazy by default: annotations are stored in a form that is evaluated on access rather than at class creation time. This means forward references, union syntax, and type aliases all work correctly both as static type information and when inspected at runtime — exactly the semantics mashumaro, get_type_hints(), and the generic resolution patterns in MicroDCS require.
Additional 3.14 features used throughout the codebase:
annotationlib.ForwardRef— the new lazy forward reference type, used inProtocolBinding.get_protocol_handler()for generic parameter resolutiontypestatement (PEP 695, Python 3.12+) — used for type aliases liketype Greetings = Hello | Byein generated models andtype JobOrderIdCall = AbortCall | CancelCall | ...in processorsStrEnum(Python 3.11+) — used forDirection,MessageIntent,ProcessorBinding, andErrorKindenumskw_onlydataclasses (Python 3.10+) — used universally for all model dataclassesmatch/case(Python 3.10+) — used in validation and content-type routing
Common Commands
Run the main unit test suite:
uv run pytest tests/ --ignore=tests/test_mqtt_integration.py --ignore=tests/test_msgpack_integration.py
Run test coverage:
uv run pytest --cov=microdcs --cov-report=term-missing tests/ --ignore=tests/test_mqtt_integration.py --ignore=tests/test_msgpack_integration.py
Run the example application:
MessagePack Buffer Limit
The MessagePack-RPC transport sets msgpack.Unpacker(max_buffer_size=...) to bound
the size of buffered, not-yet-decoded payload data.
- Default limit:
8388608bytes (8 MiB) - Runtime override: set
APP_MSGPACK_MAX_BUFFER_SIZE
This helps protect the process from excessive memory usage caused by oversized or malformed input streams.
Configuration Validation
At startup, the runtime configuration is validated before protocol handlers enter their main loops. Validation fails fast with a clear error message if any check does not pass.
Checked constraints include:
- Required non-empty values:
- instance_id
- redis.hostname
- redis.key_prefix
- mqtt.hostname
- mqtt.identifier
- msgpack.hostname
- Port ranges:
- redis.port, mqtt.port, msgpack.port must be in 1..65535
- Positive values:
- mqtt.connect_timeout, mqtt.publish_timeout, mqtt.message_workers
- msgpack.max_queued_connections, msgpack.max_concurrent_requests
- msgpack.max_buffer_size
- Non-negative values:
- mqtt.incoming_queue_size, mqtt.outgoing_queue_size
- mqtt.dedupe_ttl_seconds, mqtt.binding_outgoing_queue_size
- msgpack.binding_outgoing_queue_size
- processing.shutdown_grace_period
- processing.message_expiry_interval (when configured)
Startup pre-check sequence:
- Validate static field constraints (required fields, ranges, and bounds).
- Check TCP connectivity to Redis using redis.hostname:redis.port.
- Check TCP connectivity to MQTT broker using mqtt.hostname:mqtt.port.
- Check MessagePack server bindability on msgpack.hostname:msgpack.port.
Environment variables are still parsed from APP_* settings, then validated by these runtime checks.
Deployment Modes
A MicroDCS application consists of two roles: processor (handles incoming events, runs state machines) and publisher (maintains retained MQTT topics for northbound MES integration). Both can run in a single process (default) or be split into separate Kubernetes deployments.
Instance Role Flags
| Variable | Default | Effect |
|---|---|---|
APP_IS_PROCESSOR_INSTANCE |
true |
Enables protocol handlers, bindings, and processor lifecycle |
APP_IS_PUBLISHER_INSTANCE |
true |
Enables MQTTPublisher additional tasks (e.g. JobOrderPublisher) |
For production, set one flag to false per deployment: multiple processor replicas with shared MQTT subscriptions and a single publisher replica that owns retained topics.
Publisher Configuration (APP_PUBLISHER_*)
| Variable | Default | Description |
|---|---|---|
APP_PUBLISHER_RETAINED_TTL_SECONDS |
172800 |
MQTT v5 Message Expiry Interval for retained topics (48 hours) |
APP_PUBLISHER_STREAM_READ_COUNT |
50 |
Max entries per Redis XREAD batch |
APP_PUBLISHER_STREAM_BLOCK_MS |
500 |
XREAD BLOCK timeout (milliseconds) |
The topic prefix identifier (e.g. "machinery-jobs") is a constructor parameter of JobOrderPublisher, not an environment variable. It is wired in app/__main__.py.
Kubernetes Manifest
The deploy/k8s.yaml manifest defines two Deployments:
microdcs-processor(replicas: 2) — processors plus SFC engine (APP_IS_PUBLISHER_INSTANCE=false)microdcs-publisher(replicas: 1) — publisher only (APP_IS_PROCESSOR_INSTANCE=false)
The publisher must be a single replica to avoid conflicting retained writes. The SFC engine is different: it runs on every processor replica and relies on Redis consumer groups plus CAS Lua scripts for work distribution and recovery. Equipment integrations must treat push_command deliveries as idempotent by deduplicating on the correlation_id attached by the engine. The app handles MQTT and Redis reconnection internally with backoff, so health probes on external dependencies are not included — Kubernetes restart-on-crash is sufficient.
SFC Engine Wiring
The relevant SFC wiring in app/__main__.py is:
from microdcs.sfc_engine import SfcEngine
machinery_jobs_processor = MachineryJobsCloudEventProcessor(
microdcs.runtime_config.instance_id,
microdcs.runtime_config.processing,
"machinery-jobs",
microdcs.redis_connection_pool,
microdcs.redis_key_schema,
job_acceptance_config,
)
sfc_engine = SfcEngine(
microdcs.redis_connection_pool,
microdcs.redis_key_schema,
nb_processor=machinery_jobs_processor,
sb_processors={"greetings": greetings_processor},
consumer_name=microdcs.runtime_config.instance_id,
)
greetings_processor.register_action_completion_handler(sfc_engine.complete_action)
greetings_processor.register_action_failure_handler(sfc_engine.fail_action)
# For pull_event actions also add:
# greetings_processor.register_pull_completion_handler(sfc_engine.pull_event_handler)
machinery_jobs_processor.register_scope_handler(sfc_engine.register_scope)
microdcs.add_additional_task(sfc_engine)
The register_pull_completion_handler line is required when the recipe uses pull_event actions — it registers sfc_engine.pull_event_handler, which writes the incoming CloudEvent as a pull_event: work item into sfc:work:{scope} so any live engine replica can complete the waiting action. The example recipe uses only push_command, so this wiring is omitted in app/__main__.py.
tests/example_sfc.py provides a reusable build_example_work_master() helper containing an SFC recipe payload so you can seed Redis or publish a ConfigWorkMaster event during manual testing.
Local Development
For local development, both roles run in a single process (default). Start MQTT and Redis via Docker and run the app:
docker start mosquitto 2>/dev/null || docker run --rm -it --name mosquitto -p 1883:1883 eclipse-mosquitto mosquitto -c /mosquitto-no-auth.conf
docker start redis 2>/dev/null || docker run --rm -it --name redis -p 6379:6379 redis:latest
uv run python -m app
Or use the VS Code task Run App (plain) which starts both services automatically.
For manual SFC testing, use the example Work Master from tests/example_sfc.py and publish a StoreAndStartCall that references it.
Generate typed models from a JSON Schema file:
Generate models with advanced options (custom metadata, init fields, validation, and root-union workaround):
uv run microdcs dataclassgen dataclasses \
--custom-metadata \
--init-fields 'mystatus->MyStatus' \
--validation \
--collapse-root-workaround \
my-schema.schema.json
Generate the SFC recipe dataclasses:
Documentation
The documentation site is built with Zensical.
Container Build
The project ships with a distroless multi-stage Docker build.
Main Libraries
The framework is built around a small set of core dependencies:
- aiomqtt and Paho MQTT Python for MQTT communication
- mashumaro for dataclass serialization
- msgpack-python for MessagePack encoding
- redis-py for persistence
- datamodel-code-generator for model generation
- Typer for CLI tooling