Machinery Jobs – MES Northbound Publishing
This page describes the design of the northbound MQTT publishing interface that makes job order state and job response data reliably available to a Manufacturing Execution System (MES) across connectivity outages. It extends the existing MachineryJobsCloudEventProcessor without modifying its command handling logic and adds a dedicated publisher component that maintains retained MQTT topics per job.
Background
The OPC 40001-3: Machinery Job Management specification defines two integration approaches for exposing job state to clients:
- Event-based – Clients subscribe to
ISA95JobOrderStatusEventTypeevents fired on each state transition. The existingMachineryJobsCloudEventProcessorfully implements this mode. Events are low-latency and efficient but provide no recovery path for clients that miss messages during a connectivity outage. - List/variable-based – The
ISA95JobOrderReceiverObjectTypeexposes aJobOrderListvariable and theISA95JobResponseProviderObjectTypeexposes aJobOrderResponseListvariable. These always reflect current state and can be published as retained MQTT messages so that reconnecting clients receive the full picture immediately.
The existing implementation covers the event-based mode. The list/variable-based mode is absent.
Why the event-based mode alone is insufficient
Job lifecycle in MicroDCS involves two distinct actors:
- The MES creates and controls job order definitions (Store, StoreAndStart, Start, Abort, etc.)
- The DCS drives physical state transitions and produces all job response data (process actuals, serial numbers, torque values, temperatures, machine IDs)
The MES is therefore the originator of job definitions but a consumer of state changes and job response data that it cannot reconstruct from its own records. If the MES misses a DCS-originated status event or a job response update during a connectivity outage, it has no way to detect the gap from the event stream alone.
The system must tolerate MES connectivity outages of up to 30 minutes. In high-cadence scenarios (in-house parts production, job iterations in the order of seconds), dozens of jobs can complete and accumulate response data during such an outage. The existing Clear method — already implemented — acts as the MES acknowledgment: the MES calls Clear after successfully processing a job response, which transitions the job to EndState. Jobs in clearable states (Ended, Aborted) therefore remain available in Redis until the MES explicitly clears them, but nothing currently exposes this durably over MQTT.
Why a single full-list retained publish does not fit
Publishing the complete JobOrderList or JobOrderResponseList on every state transition is traffic-inefficient for this application:
- Job order objects carry all input parameters: equipment, material, personnel, physical asset, and work master references. These are large and do not change after a job is stored.
- Job response objects accumulate all process data over the job's lifetime. In a traceability-oriented production context (serial numbers, set-point vs. actual torques, temperatures) a single response entry can reach several kilobytes.
Publishing complete lists on every transition would retransmit unchanged large payloads repeatedly. The split design below avoids this.
Design
Component overview
The extension adds one new component — the Job Order Publisher — as a separate Kubernetes Deployment. The stream write that feeds it is encapsulated in the existing DAOs, so no processor — northbound or southbound — needs to be aware of the publishing infrastructure.
flowchart TB
MES["MES\n(MQTT client)"]
subgraph Broker["MQTT broker"]
cmd["commands topics\n(existing)"]
retained["retained topics\n(new)"]
events["events topics\n(existing)"]
end
subgraph K8s["Kubernetes"]
subgraph NB["Command Processor Deployment (×N pods)"]
proc["MachineryJobsCloudEventProcessor"]
end
subgraph SB["Southbound Processor Deployments"]
south["DCS-facing processors"]
end
subgraph Publisher["Job Order Publisher Deployment (×1 pod)"]
pub["JobOrderPublisher"]
end
end
Redis[("Redis\nJobOrderAndStateDAO\nJobResponseDAO\njoborder:changes:{scope} Stream\npubseq:{scope}")]
MES -- "method call CloudEvents" --> cmd
cmd --> proc
proc -- "status events" --> events
events --> MES
proc -- "via DAO" --> Redis
south -- "via DAO" --> Redis
pub -- "XREAD BLOCK" --> Redis
pub -- "retained publishes" --> retained
retained --> MES
Why a separate container
The command processor scales horizontally and is latency-sensitive — it handles real-time commands from the MES and the DCS. The publisher is a singleton, throughput-oriented, and not latency-sensitive (MQTT retained delivery is inherently best-effort). Keeping them separate gives independent restart policies, resource limits, logging, and alerting. A bug or resource leak in the publisher does not affect command processing.
High availability
Leader election for the publisher is not needed. Pod restart time is 10–30 seconds. The system is already designed to tolerate 30-minute MES outages through the retained topic and sequence number scheme described below. A pod restart is two orders of magnitude smaller than the gap the recovery mechanism already handles. The retained topics are the availability story for the publisher. replicas: 1 with no locking is the correct configuration.
Topic design
All topics are scoped per machine using the same scope identifier used as the CloudEvent subject in the existing processor. All retained topics carry an MQTT v5 Message Expiry Interval configured via retained_ttl_seconds (default: 48 hours). This removes stale topics automatically if a scope goes inactive — for example after a machine is decommissioned or a scope is renamed — without requiring explicit cleanup logic in the publisher.
| Topic | Retained | Payload | Published when |
|---|---|---|---|
{prefix}/{scope}/state-index |
Yes | State index (see below) | Every state transition |
{prefix}/{scope}/order/{job_order_id} |
Yes | Full ISA95JobOrderDataType |
On Store / StoreAndStart / Update; deleted on Clear |
{prefix}/{scope}/result/{job_order_id} |
Yes | Full ISA95JobResponseDataType |
On job reaching a clearable state (Ended / Aborted); deleted on Clear |
{prefix}/{scope}/events |
No | ISA95JobOrderStatusEventType CloudEvent |
Every state transition (existing, unchanged) |
{prefix}/{scope}/commands |
No | Method call CloudEvent | From MES (existing, unchanged) |
{prefix}/{scope}/metadata |
Yes | Metadata/capabilities CloudEvent | On publisher startup (existing, unchanged) |
The {prefix} is the existing configurable MQTT topic prefix for the machinery-jobs processor (e.g. app/jobs, configured via APP_PROCESSING_TOPIC_PREFIXES). The {scope} is the machine scope identifier used as the CloudEvent subject. The retained topics for the publisher (state-index, order/{id}, result/{id}) are new siblings alongside the existing events, commands, and metadata intent topics.
Deletion of retained topics on Clear is performed by publishing a zero-byte retained message to the same topic, which is the standard MQTT mechanism for clearing retained state.
The order/{job_order_id} and result/{job_order_id} topics carry large payloads but are written rarely — once on creation, once on update, once on reaching a clearable state, and deleted on Clear. The state-index topic changes on every transition but carries a small payload (state identifiers only, not full objects).
Job response data is published only when a job transitions to a clearable state (Ended or Aborted), not incrementally during execution. The publisher checks the current job state before publishing a result/{job_order_id} topic — ResultUpdate stream entries for jobs still in progress are ignored to prevent premature publication of partial results. The result/{job_order_id} topic therefore reflects the completed response and is not updated further before Clear.
Note:
Interrupted(substatesHeld,Suspended) is not a clearable state — it is a resumable pause state. Jobs inInterruptedcan resume execution viaResume. OnlyEndedandAbortedallow theCleartransition toEndState.
State index payload
The state index is the high-frequency retained topic. It contains only the fields that change on transitions and the minimum needed for the MES to understand which per-job topics to fetch.
{
"seq": 103,
"scope": "machine-42",
"published_at": "2026-04-11T14:23:01Z",
"jobs": [
{
"job_order_id": "JO-2026-0441",
"state": [
{"state_text": {"text": "Running", "locale": "en"}, "state_number": 3}
],
"has_result": false
},
{
"job_order_id": "JO-2026-0442",
"state": [
{"state_text": {"text": "Ended", "locale": "en"}, "state_number": 5}
],
"has_result": true
},
{
"job_order_id": "JO-2026-0443",
"state": [
{"state_text": {"text": "NotAllowedToStart", "locale": "en"}, "state_number": 1},
{"state_text": {"text": "Ready", "locale": "en"}, "state_number": 11}
],
"has_result": false
}
]
}
The seq field is a monotonically increasing integer per scope, stored in Redis and incremented atomically on each state-index publish. It survives publisher restarts. The has_result flag tells the MES whether a result/{job_order_id} topic exists before subscribing to it.
Reconnect and resync
On MES reconnect the sequence number provides gap detection without requiring stream replay or delta reconstruction:
sequenceDiagram
participant MES
participant Broker as MQTT broker
participant Redis
MES->>Broker: CONNECT + SUBSCRIBE {prefix}/{scope}/state-index
Broker-->>MES: RETAINED state-index (seq=103, jobs=[...])
note over MES: last seen seq was 87 — gap detected
loop for each clearable job with has_result=true not yet cleared
MES->>Broker: SUBSCRIBE {prefix}/{scope}/result/{id}
Broker-->>MES: RETAINED result/{id} (full response data)
note over MES: process response internally
MES->>Broker: PUBLISH Clear command CloudEvent
Broker->>Redis: routed to command processor
note over Redis: Clear executed, job transitions to EndState
Broker-->>MES: UPDATED state-index (seq=104)
end
note over MES: reconcile remaining queued jobs against internal state
If the sequence number matches the MES's last-seen value, no transitions were missed and no further action is needed. If it is ahead, the MES reads the retained order/{id} and result/{id} topics for jobs listed in the state index and reconciles against its internal records.
Clear semantics
The Clear method (already implemented in MachineryJobsCloudEventProcessor) is the MES acknowledgment that a job response has been received and processed. The MES workflow:
- Job reaches a clearable state (
EndedorAborted) - MES reads the retained
result/{job_order_id}topic - MES records response data internally
- MES calls
Clear→ command processor transitions job toEndStatein Redis → publisher deletes the retainedorder/{id}andresult/{id}topics → job is removed from the state-index
Note: Clear transitions the job state machine to EndState but does not delete the job from Redis. Jobs in EndState are excluded from the state-index (they have reached their final state). The publisher treats Clear stream entries as a signal to remove retained topics. Redis cleanup of EndState jobs is a separate TTL-based or scheduled concern — the publisher and the state-index do not list them.
During an outage, steps 1–4 are deferred. Jobs accumulate in clearable states with their response data preserved in retained topics until the MES reconnects and works through the backlog. The 48-hour TTL (default for retained_ttl_seconds) on retained topics acts as a safety net for the case where Clear is never called — for example if a job is abandoned after a machine fault.
Changes to existing DAOs
The stream write is encapsulated in JobOrderAndStateDAO and JobResponseDAO so that no processor needs to be aware of the publishing infrastructure. Any call to save() on either DAO automatically appends a change record to the scope stream. This makes it structurally impossible for a new processor — northbound or southbound — to write job state without the publisher being notified.
JobOrderAndStateDAO.save() appends a record with the transition name as change_type:
The xadd is added to the same pipeline(transaction=True) block that already performs the JSON write and sorted-set update, so the stream entry is written atomically with the state change. If the pipeline fails, neither the state nor the stream entry is written.
# Inside the existing pipeline(transaction=True) block:
pipe.xadd(
self._key_schema.job_change_stream(scope),
{
"change_type": change_type, # transition name, e.g. "Store", "Start", "Clear"
"job_order_id": job_order_and_state.job_order.job_order_id,
"scope": scope,
"ts": datetime.now(UTC).isoformat(),
},
maxlen=5000,
approximate=True,
)
JobResponseDAO.save() appends a record with change_type of ResultUpdate, also inside its existing pipeline:
# Inside the existing pipeline(transaction=True) block:
pipe.xadd(
self._key_schema.job_change_stream(scope),
{
"change_type": "ResultUpdate",
"job_order_id": job_response.job_order_id,
"scope": scope,
"ts": datetime.now(UTC).isoformat(),
},
maxlen=5000,
approximate=True,
)
The stream is bounded with maxlen=5000 and approximate=True. At a job cadence of one job every few seconds, this covers well over 30 minutes of publisher downtime with negligible Redis memory overhead.
The following Redis key schema additions are required:
| Key | Type | Purpose |
|---|---|---|
joborder:changes:{scope} |
Stream | Change log consumed by the publisher |
pubseq:{scope} |
String (integer) | Monotonic sequence counter per scope |
publisher:stream-cursors |
Hash | Last-processed stream ID per scope, for publisher restart recovery |
active-scopes |
Set | All scopes that have had at least one job stored |
joborder:changes:_global |
Stream | Sentinel stream — every DAO save appends here for new-scope discovery |
The active-scopes set is updated with SADD active-scopes {scope} inside JobOrderAndStateDAO.save() on the first Store for a new scope.
In addition to active-scopes, a sentinel stream joborder:changes:_global receives an entry on every DAO save alongside the per-scope stream. The publisher's main XREAD loop always includes this global stream. When a message arrives on the global stream for a scope not yet tracked, the publisher adds the scope's per-scope stream to its XREAD set. This eliminates the need for periodic polling of active-scopes and ensures new scopes are discovered immediately.
Job Order Publisher
The publisher is a standalone asyncio service in a new src/microdcs/publishers/ module. It has no command routing logic — it only reads from Redis and writes to MQTT.
Startup
- Connect to Redis and the MQTT broker
- Load last-processed stream IDs from
publisher:stream-cursorsto resume after restart without reprocessing - For each scope in
active-scopes, publish the current state-index retained message to cover the gap during the pod restart - Enter the main XREAD loop
Main loop
# Build stream set: per-scope streams + global sentinel stream
streams = {f"joborder:changes:{scope}": last_id[scope] for scope in active_scopes}
streams["joborder:changes:_global"] = last_id.get("_global", "0")
results = await redis.xread(streams=streams, block=500, count=50)
for stream, messages in results:
for message_id, fields in messages:
scope = fields["scope"]
if stream == "joborder:changes:_global":
# Discover new scope — add its per-scope stream to the XREAD set
if scope not in active_scopes:
active_scopes.add(scope)
last_id[scope] = "0" # read from beginning of new scope stream
last_id["_global"] = message_id
else:
await dispatch(fields["change_type"], scope, fields["job_order_id"])
last_id[scope] = message_id
await save_cursors()
Dispatch handlers
change_type |
Action |
|---|---|
Store / StoreAndStart |
Publish retained order/{id} with configured TTL, update state-index |
Update |
Re-publish retained order/{id} with configured TTL, update state-index |
ResultUpdate |
Check job state — if clearable (Ended / Aborted), publish retained result/{id} with configured TTL and update state-index; otherwise ignore |
| Any other transition | Update state-index only |
Clear |
Zero-byte retained publish to order/{id} and result/{id}, update state-index |
The state-index update always reads the full job list for the scope from JobOrderAndStateDAO.list(scope), checks JobResponseDAO for each job to populate has_result, atomically increments pubseq:{scope} with INCR, and publishes the assembled payload as a retained message with a 48-hour TTL. Jobs in EndState are excluded from the state-index.
Known limitation — state-index rebuild cost: Each state-index update performs N+1 Redis reads (list + one has_result check per job). In high-cadence scenarios with many concurrent jobs this may become a bottleneck. A future optimization could cache the state index in-process and apply stream deltas incrementally instead of rebuilding from Redis on every transition.
Configuration
@dataclass
class JobOrderPublisherConfig:
retained_ttl_seconds: int = 172800 # 48 hours
stream_read_count: int = 50
stream_block_ms: int = 500
The publisher reuses the existing RedisConfig and MQTTConfig from RuntimeConfig for Redis and MQTT connections — these are not duplicated. Topic prefixes are resolved from ProcessingConfig.topic_prefixes using the same get_topic_prefix_for_identifier() mechanism as the MQTT binding. QoS 1 is hardcoded (matching the MQTT processor) and not exposed as configuration.
Implementation plan
Phase 1 – Redis key schema and DAO stream writes
Scope: changes to the existing codebase only. No new containers.
- Add
job_change_stream,job_change_stream_global,pub_seq,publisher_cursors,active_scopesmethods toRedisKeySchema - Add
xadd(per-scope stream + global sentinel stream) andSADD active-scopestoJobOrderAndStateDAO.save()inside the existingpipeline(transaction=True)block - Add
xadd(per-scope stream + global sentinel stream) toJobResponseDAO.save()inside the existingpipeline(transaction=True)block - Unit tests: verify stream entries written atomically with DAO save; verify no entry written when pipeline fails
- Update
docs/persistence.md: expand key schema table withjoborder:changes:{scope}stream,pubseq:{scope},publisher:stream-cursors, andactive-scopeskeys; add "Job Change Stream" section documenting the stream write pattern inJobOrderAndStateDAO.save()andJobResponseDAO.save() - Update
docs/concepts.md: add glossary entries for Job Change Stream, State Index, Retained Topic
Acceptance: joborder:changes:{scope} contains one entry per successful DAO save. Stream is bounded by maxlen=5000.
Phase 2 – Publisher models and MQTT client
Scope: new module scaffolding, no business logic.
- Create
src/microdcs/publishers/package - Implement
StateIndexEntryandStateIndexas@dataclass(kw_only=True)classes extendingDataClassMixin(consistent with all other MicroDCS models — the project does not use Pydantic) - Implement async MQTT wrapper using
aiomqttsupporting retained publish with TTL and zero-byte delete - Integration test: publish retained with TTL to a test broker, verify retained on reconnect, verify deletion
- Update this document's "Job Order Publisher" section with final module structure and MQTT wrapper API
Acceptance: retained publish and delete work correctly against a real broker in CI.
Phase 3 – Publisher core logic
Scope: JobOrderPublisher class and all dispatch handlers.
- Implement
_load_cursors/_save_cursorsusingpublisher:stream-cursors(includes global sentinel cursor) - Implement XREAD BLOCK loop with per-scope streams + global sentinel stream; discover new scopes from sentinel entries
- Implement
_publish_state_index: readJobOrderAndStateDAO.list(scope), excludeEndStatejobs, checkJobResponseDAOforhas_result,INCR pubseq:{scope}, publish retained with configured TTL - Implement
_publish_job_order: read DAO, serialize, publish retainedorder/{id}with configured TTL - Implement
_publish_job_result: read DAO, check job state is clearable (Ended/Aborted) before publishing retainedresult/{id}with configured TTL; ignoreResultUpdatefor in-progress jobs - Implement
_delete_job_topics: zero-byte retained publish fororder/{id}andresult/{id} - Implement startup sequence: load cursors, publish initial state-index for all active scopes, enter loop
- Integration tests: inject stream entries to Redis, verify correct retained topics appear on broker
- Update this document's "Dispatch handlers" table and "Main loop" section with final implementation details
- Update
docs/concepts.md: add a new "Job Order Publisher" section under Framework Concepts explaining the publisher as a standalone component consuming Redis streams and writing retained MQTT topics; add glossary entries for Job Order Publisher, Sequence Number (pubseq), Stream Cursor - Update
docs/overall-design.md: add the publisher as a component in the system architecture description - Update
docs/persistence.md: add "Job Order Publisher State" section documentingSfcExecutionDAO-equivalent cursor persistence and sequence counter usage
Acceptance: correct retained topics on startup; correct publishes and deletions per stream entry; sequence number increments monotonically; cursor survives restart without reprocessing.
Phase 4 – Kubernetes deployment
- Add
JobOrderPublisherConfigto configuration schema - Write Kubernetes
Deploymentmanifest:replicas: 1, resource limits, liveness probe (Redis ping), readiness probe (MQTT connected) - Add MQTT credentials to secret management alongside existing broker credentials
- Deploy to staging and run a 30-minute connectivity interruption test: disconnect MES MQTT client, run jobs through several complete cycles including clearable states, reconnect, verify state-index seq gap, all retained topics present, no response data loss
- Update
docs/development.md: add publisher deployment and configuration section coveringJobOrderPublisherConfig, Kubernetes manifest, and local development setup - Update this document's "Configuration" section with final environment variable names and defaults
Acceptance: MES reconnect after 30-minute outage shows correct seq gap, all uncleared terminal jobs have retained result topics, Clear drains the backlog cleanly.
Phase 5 – MES integration
- Document topic layout and payload schemas for the MES integration team
- Implement MES reconnect resync: read state-index, compare seq, fetch
order/{id}andresult/{id}for each listed job, reconcile internal state, callClearfor terminal jobs whose response has been processed - End-to-end test covering high-cadence scenario (job every few seconds), 30-minute outage, reconnect resync, and confirmed zero response data loss
- Update
docs/technical-standards.md: add "MQTT Retained Messages" subsection under MQTT v5 documenting the retained topic pattern, 48-hour TTL, and zero-byte deletion convention - Update
docs/index.md: add MES publishing to the "Start Here" reading path and mention retained-topic-based MES reconnect in the overview - Update
docs/information-model-standards.md: expand the Machinery Job Management section with the list/variable-based integration mode and how retained topics map toJobOrderListandJobOrderResponseList