Persistence
MicroDCS uses Redis to persist operational state needed for delivery coordination, job tracking, and multi-instance processing patterns. This page summarizes the persistence requirements behind that choice and shows how the Redis API is used.
Persistence Requirements
The dataclasses used for OPC UA Job Management need to be persisted in a form that supports schema evolution and can store nested structures without excessive mapping overhead. A document-oriented persistence approach fits those requirements well.
The framework also needs persistence support for CloudEvent deduplication and for coordinating state publication so that changed variable values are emitted by a single publisher in multi-instance scenarios.
Redis
MicroDCS stores dataclasses in Redis JSON. The associated dataschema value is written to an additional _dataschema field and is derived from the model Config class during serialization.
For the OPC UA Job Management implementation, Redis eventing is used to coordinate a single-instance publisher from a multi-instance receiver and event-publisher setup. Events are published directly to MQTT, while state variables are published through the OPC UA pub-sub mechanism so unchanged values do not need to be sent repeatedly.
Key Schema
All Redis keys are generated by RedisKeySchema with a configurable prefix (default: microdcs-test). Key patterns:
| Pattern | Redis Type | Purpose |
|---|---|---|
{prefix}:cededupe:{hash} |
string |
CloudEvent deduplication (source + id hash) |
{prefix}:transdedupe:{hash} |
string |
Transaction deduplication (scope + id hash) |
{prefix}:counter:{name} |
string (integer) |
Named counters |
{prefix}:joborder:{id} |
JSON |
Job order and state document |
{prefix}:joborder:list:{scope} |
sorted set |
Job order IDs ordered by priority |
{prefix}:jobresponse:{id} |
JSON |
Job response document |
{prefix}:jobresponse:list:{scope} |
sorted set |
Job response IDs ordered by start time |
{prefix}:workmaster:{id} |
JSON |
Work master document |
{prefix}:workmaster:list:{scope} |
set |
Work master IDs |
{prefix}:equipment:list:{scope} |
set |
Equipment IDs |
{prefix}:materialclass:list:{scope} |
set |
Material class IDs |
{prefix}:materialdefinition:list:{scope} |
set |
Material definition IDs |
{prefix}:personnel:list:{scope} |
set |
Personnel IDs |
{prefix}:physicalasset:list:{scope} |
set |
Physical asset IDs |
{prefix}:jobacceptance:{scope} |
string (integer) |
Per-scope max downloadable job orders override |
{prefix}:event:receiver:list |
stream |
Receiver event stream |
{prefix}:event:responder:list |
stream |
Responder event stream |
{prefix}:joborder:changes:{scope} |
stream |
Job change log consumed by the Job Order Publisher |
{prefix}:joborder:changes:_global |
stream |
Global sentinel stream for new-scope discovery |
{prefix}:pubseq:{scope} |
string (integer) |
Monotonic sequence counter per scope for state-index publishes |
{prefix}:publisher:stream-cursors |
hash |
Last-processed stream ID per scope, for publisher restart recovery |
{prefix}:active-scopes |
set |
All scopes that have had at least one job stored |
{prefix}:sfc:work:{scope} |
stream |
SFC work items distributed via consumer group sfc-engine |
{prefix}:sfc:execution:{job_id} |
JSON |
SFC execution state for a running job (current step, action states) |
{prefix}:sfc:active-jobs |
set |
Job IDs with active (non-completed, non-failed) SFC execution state |
{prefix}:poststartlock:{config_identifier} |
string |
Distributed lock ensuring only one replica executes post_start() |
Job Change Stream
JobOrderAndStateDAO.save() and JobResponseDAO.save() each append a change record to two Redis streams as part of the same pipeline(transaction=True) block that writes the document and updates the secondary index. This makes the stream write atomic with the state change — if the pipeline fails, neither the state nor the stream entry is written.
Per-scope stream (joborder:changes:{scope})
Written by JobOrderAndStateDAO.save() with the transition name as change_type:
pipe.xadd(
key_schema.job_change_stream(scope),
{
"change_type": change_type, # transition name, e.g. "Store", "Start", "Clear"
"job_order_id": job_order_id,
"scope": scope,
"ts": datetime.now(UTC).isoformat(),
},
maxlen=5000,
approximate=True,
)
Written by JobResponseDAO.save() with change_type always "ResultUpdate":
pipe.xadd(
key_schema.job_change_stream(scope),
{
"change_type": "ResultUpdate",
"job_order_id": job_response.job_order_id,
"scope": scope,
"ts": datetime.now(UTC).isoformat(),
},
maxlen=5000,
approximate=True,
)
The stream is bounded with maxlen=5000 and approximate=True. At a job cadence of one job every few seconds, this covers well over 30 minutes of publisher downtime.
Global sentinel stream (joborder:changes:_global)
Every DAO save also appends an identical entry to joborder:changes:_global. The Job Order Publisher always includes this stream in its XREAD loop. When a message arrives for a scope not yet tracked, the publisher adds the scope's per-scope stream to its XREAD set, eliminating the need for periodic polling of active-scopes.
Active scopes set (active-scopes)
JobOrderAndStateDAO.save() also issues SADD active-scopes {scope} inside the pipeline on every call. Since SADD is idempotent, this registers the scope on first Store and is a no-op on subsequent saves.
Publisher cursor persistence
The JobOrderPublisher tracks its read position in each stream using Redis hash publisher:stream-cursors. After each successful XREAD batch, all cursors are saved with HSET. On startup (or after an MQTT reconnect), cursors are reloaded with HGETALL so the publisher resumes from where it left off without reprocessing entries.
The publisher also maintains a monotonic sequence counter per scope (pubseq:{scope}) incremented with INCR on every state-index publish. The MES uses sequence gaps to detect missed updates after a connectivity outage.
Redis API Usage Examples
All DAO classes are async and use redis.asyncio. They are initialized with a shared redis.ConnectionPool and a RedisKeySchema instance.
Setup
import redis.asyncio as redis
from microdcs.redis import (
RedisKeySchema,
CloudEventDedupeDAO,
JobOrderAndStateDAO,
JobResponseDAO,
)
# Create connection pool and key schema (usually done by MicroDCS core)
pool = redis.ConnectionPool(host="localhost", port=6379, protocol=3)
client = redis.Redis(connection_pool=pool)
key_schema = RedisKeySchema(prefix="microdcs-app")
CloudEvent Deduplication
CloudEventDedupeDAO uses atomic SET NX with a TTL to track already-seen event IDs. Returns True if the event is a duplicate.
dedupe_dao = CloudEventDedupeDAO(client, key_schema, ttl=600)
is_dup = await dedupe_dao.is_duplicate("urn:source:1", "event-id-123")
# First call: False (new event, key created with 600s TTL)
is_dup = await dedupe_dao.is_duplicate("urn:source:1", "event-id-123")
# Second call: True (duplicate, key already exists)
Transaction Deduplication
TransactionDedupeDAO works the same way, scoped by a scope string and transaction ID:
from microdcs.redis import TransactionDedupeDAO
tx_dao = TransactionDedupeDAO(client, key_schema, ttl=600)
is_dup = await tx_dao.is_duplicate("plant-1", "tx-abc")
# False on first call, True on subsequent calls within the TTL
Counters
CounterDAO provides atomic INCR for named counters:
from microdcs.redis import CounterDAO
counter_dao = CounterDAO(client, key_schema)
value = await counter_dao.increment("jobs_processed")
# Returns 1 on first call, 2 on second, etc.
current = await counter_dao.get("jobs_processed")
# Returns the current value, or 0 if the counter does not exist
Job Orders
JobOrderAndStateDAO stores job order documents as Redis JSON with a sorted set index by priority:
from microdcs.models.machinery_jobs import (
ISA95JobOrderAndStateDataType,
ISA95JobOrderDataType,
)
joborder_dao = JobOrderAndStateDAO(client, key_schema)
# Save a job order (serialized as JSON, indexed by priority in sorted set)
job = ISA95JobOrderAndStateDataType(
job_order=ISA95JobOrderDataType(job_order_id="job-001", priority=10)
)
await joborder_dao.save(job, scope="plant-1")
# Retrieve by ID
job = await joborder_dao.retrieve("job-001")
# List all job order IDs in a scope (ordered by priority)
ids = await joborder_dao.list("plant-1")
# ["job-001", ...]
# Delete a job order and remove from sorted set
await joborder_dao.delete("job-001", scope="plant-1")
Job Responses
JobResponseDAO stores job response documents as Redis JSON and uses a RediSearch index for queries by job_order_id and normalized state:
from microdcs.models.machinery_jobs import (
ISA95JobResponseDataType,
ISA95StateDataType,
LocalizedText,
)
jobresponse_dao = JobResponseDAO(client, key_schema)
# Must be called once before any save/query operations (idempotent)
await jobresponse_dao.initialize()
# Save a job response
response = ISA95JobResponseDataType(
job_response_id="resp-001",
job_order_id="job-001",
job_state=[
ISA95StateDataType(state_text=LocalizedText(text="AllowedToStart")),
ISA95StateDataType(state_text=LocalizedText(text="Ready")),
],
)
await jobresponse_dao.save(response, scope="plant-1")
# Retrieve by response ID
resp = await jobresponse_dao.retrieve("resp-001")
# Query by job order ID (uses RediSearch index)
resp = await jobresponse_dao.retrieve_by_job_order_id("job-001")
# Query all responses in a given state within a scope (uses RediSearch index)
state = [
ISA95StateDataType(state_text=LocalizedText(text="AllowedToStart")),
ISA95StateDataType(state_text=LocalizedText(text="Ready")),
]
responses = await jobresponse_dao.retrieve_by_state("plant-1", state)
# Delete a job response
await jobresponse_dao.delete("resp-001", scope="plant-1")
Work Masters
WorkMasterDAO stores work master definitions as Redis JSON with a set index:
from microdcs.redis import WorkMasterDAO
from microdcs.models.machinery_jobs import ISA95WorkMasterDataType
workmaster_dao = WorkMasterDAO(client, key_schema)
# Save
wm = ISA95WorkMasterDataType(id="wm-001")
await workmaster_dao.save(wm, scope="plant-1")
# Retrieve
wm = await workmaster_dao.retrieve("wm-001")
# Check membership
exists = await workmaster_dao.is_member("wm-001", "plant-1")
# List all IDs in scope
ids = await workmaster_dao.list("plant-1")
# Delete
await workmaster_dao.delete("wm-001", scope="plant-1")
Set-Based List DAOs
Several DAOs manage simple sets of IDs for resource tracking. They all follow the same pattern — EquipmentListDAO, MaterialClassListDAO, MaterialDefinitionListDAO, PersonnelListDAO, and PhysicalAssetListDAO:
from microdcs.redis import EquipmentListDAO
equipment_dao = EquipmentListDAO(client, key_schema)
# Add an ID to the set
await equipment_dao.add_to_list("eq-001", scope="plant-1")
# Check membership
exists = await equipment_dao.is_member("eq-001", "plant-1")
# Remove from set
await equipment_dao.remove_from_list("eq-001", scope="plant-1")
SFC Execution State
SfcExecutionDAO stores per-job SFC execution state as Redis JSON documents and provides atomic compare-and-swap (CAS) operations via Lua scripts for multi-instance coordination.
Document Structure
Each sfc:execution:{job_id} document maps to an SfcExecutionState dataclass:
| Field | Type | Purpose |
|---|---|---|
job_id |
string |
Job Order ID |
scope |
string |
Station/equipment scope |
work_master_id |
string |
Work Master that provided the recipe |
current_step |
string |
Name of the currently active SFC step (branch name during branching) |
correlation_id |
string |
Job-level CloudEvent correlationid (surrogate UUID); constant for the job lifetime, carried on every CE emitted for this job |
active_steps |
list[string] |
All currently active step names; populated during branch execution |
actions |
dict[string, SfcActionExecution] |
Per-action execution state |
completed |
bool |
Whether the recipe has completed successfully |
failed |
bool |
Whether the recipe has failed |
error |
string or null |
Error message if failed |
Each SfcActionExecution entry tracks:
| Field | Type | Purpose |
|---|---|---|
name |
string |
Action name (matches SfcActionAssociation.name in recipe) |
state |
SfcActionState |
pending, dispatched, waiting, completed, or failed |
command_ce_id |
string or null |
CloudEvent id of the dispatched command CE; routing key for matching incoming responses back to this action |
attempt |
int |
Dispatch attempt counter |
result |
any or null |
Action result data |
error |
string or null |
Error message if action failed |
CAS Lua Scripts
Every SFC state mutation uses one of four Lua scripts. Each script runs atomically on the Redis server:
cas_action_state — Compare-and-swap on a single action's state:
- Reads
$.actions.{name}.statefrom the JSON document - If current state ≠ expected state → returns
ALREADY_HANDLED - Sets new state, optionally updates
command_ce_idandattempt - Optionally
XADDs a follow-up work item to the SFC work stream - Returns
OK
Used for: pending → dispatched (push_command dispatch), pending → waiting (pull_event setup), dispatched → completed (action completion), waiting → completed (pull_event received), state → failed (action failure).
cas_advance_step — Compare-and-swap on the current step:
- Reads
$.current_stepfrom the JSON document - If current step ≠ expected step → returns
ALREADY_HANDLED - Sets new
current_stepandactive_steps - Optionally
XADDs a follow-up work item (e.g.,dispatch_action:{next_action}) - Returns
OK
Used for: step transitions after all actions in a step are completed.
cas_finish — Mark execution as completed or failed:
- Sets
$.completed = trueor$.failed = true(with optional error message) - Removes the
job_idfrom thesfc:active-jobsset - Returns
OK
cas_branch_advance — Atomic replace/remove of a completed step within active_steps:
- Reads
$.active_stepsfrom the JSON document - If completed_step is not in
active_steps→ returnsALREADY_HANDLED(idempotent for races) - Replaces completed_step with next_step (or removes it when next_step is empty, i.e. the path end)
- Optionally
XADDs a follow-up work item (e.g.,dispatch_action:{name}for the next branch step) - Returns JSON-encoded new
active_stepslist on success
Used for: advancing individual branch paths during simultaneous (AND) execution; removing the final step of a selection (OR) path on completion. The caller checks whether the returned list is empty to detect branch convergence.
Active Jobs Set
The sfc:active-jobs set tracks all job IDs with non-terminal execution state. It is:
- Added to by
SfcExecutionDAO.save()when the execution state is not completed/failed - Removed from by the
cas_finishLua script atomically when marking a job completed/failed - Read by
_recovery_scan()on startup to find jobs that need resumption
SFC Work Stream
The SFC work stream (sfc:work:{scope}) distributes work items across SFC engine instances using a Redis Stream consumer group.
Stream Schema
Each stream entry contains:
| Field | Value | Purpose |
|---|---|---|
job_id |
Job Order ID | Which job needs attention |
action |
start_recipe / dispatch_action:{name} / resume |
What to do |
scope |
Station scope | For routing and key construction |
The stream is bounded with maxlen=~5000 (approximate trimming).
Consumer Group
Group name: sfc-engine (constant SFC_CONSUMER_GROUP).
- Created by
SfcExecutionDAO.ensure_consumer_group()withXGROUP CREATE ... MKSTREAM(idempotent — ignoresBUSYGROUPerror) - Read by each engine instance via
XREADGROUPwithBLOCKtimeout (default 2 seconds) andCOUNTlimit (default 10) - Acknowledged by
XACKafter successful processing of each work item
Writers
| Writer | Action | Trigger |
|---|---|---|
| NB processor | start_recipe |
Job reaches AllowedToStart via StoreAndStart |
| SFC engine CAS | dispatch_action:{name} |
Written atomically inside cas_action_state or cas_advance_step when the next step has a push_command action |
| SFC engine recovery | resume |
On startup, for each active incomplete job found in sfc:active-jobs |
XAUTOCLAIM Recovery
Each engine instance periodically calls XAUTOCLAIM with a configurable minimum idle time (default 30 seconds). This transfers pending entries from dead consumers to the calling consumer. Because all push_command deliveries carry a correlation_id, re-dispatch after XAUTOCLAIM is safe — equipment handles duplicates idempotently.
Distributed post_start Lock
When multiple replicas are running, each replica calls post_start() on startup. For processors that must only run post_start() once across the replica set (for example, to avoid sending duplicate initialisation messages), set post_start_singleton = True on the processor class.
When post_start_singleton = True, MicroDCS.main() calls PostStartLockDAO.acquire() before calling post_start(). The DAO issues a Redis SET NX EX command:
- If the key did not exist, the lock is acquired and
post_start()is executed on this instance. - If the key already exists (another replica already holds the lock),
post_start()is skipped silently.
The lock key is poststartlock:{config_identifier} and expires automatically after ProcessingConfig.post_start_lock_ttl seconds (default: 30 s). This TTL ensures that a crashed instance does not permanently prevent other replicas from running post_start() on restart.
from microdcs.redis import PostStartLockDAO, RedisKeySchema
key_schema = RedisKeySchema(prefix="myapp")
dao = PostStartLockDAO(redis_client, key_schema, ttl=30)
acquired = await dao.acquire("my-processor")
if acquired:
await processor.post_start()
References
- Redis course notes
- Redis Labs Python developer path
- RedisVL hash vs JSON guide - JSON fits the nested model structure used by MicroDCS
- Redis OM for Python: globally unique primary keys - not required here because MicroDCS does not generate new primary keys through Redis OM