Skip to content

Persistence

MicroDCS uses Redis to persist operational state needed for delivery coordination, job tracking, and multi-instance processing patterns. This page summarizes the persistence requirements behind that choice and shows how the Redis API is used.

Persistence Requirements

The dataclasses used for OPC UA Job Management need to be persisted in a form that supports schema evolution and can store nested structures without excessive mapping overhead. A document-oriented persistence approach fits those requirements well.

The framework also needs persistence support for CloudEvent deduplication and for coordinating state publication so that changed variable values are emitted by a single publisher in multi-instance scenarios.

Redis

MicroDCS stores dataclasses in Redis JSON. The associated dataschema value is written to an additional _dataschema field and is derived from the model Config class during serialization.

For the OPC UA Job Management implementation, Redis eventing is used to coordinate a single-instance publisher from a multi-instance receiver and event-publisher setup. Events are published directly to MQTT, while state variables are published through the OPC UA pub-sub mechanism so unchanged values do not need to be sent repeatedly.

Key Schema

All Redis keys are generated by RedisKeySchema with a configurable prefix (default: microdcs-test). Key patterns:

Pattern Redis Type Purpose
{prefix}:cededupe:{hash} string CloudEvent deduplication (source + id hash)
{prefix}:transdedupe:{hash} string Transaction deduplication (scope + id hash)
{prefix}:counter:{name} string (integer) Named counters
{prefix}:joborder:{id} JSON Job order and state document
{prefix}:joborder:list:{scope} sorted set Job order IDs ordered by priority
{prefix}:jobresponse:{id} JSON Job response document
{prefix}:jobresponse:list:{scope} sorted set Job response IDs ordered by start time
{prefix}:workmaster:{id} JSON Work master document
{prefix}:workmaster:list:{scope} set Work master IDs
{prefix}:equipment:list:{scope} set Equipment IDs
{prefix}:materialclass:list:{scope} set Material class IDs
{prefix}:materialdefinition:list:{scope} set Material definition IDs
{prefix}:personnel:list:{scope} set Personnel IDs
{prefix}:physicalasset:list:{scope} set Physical asset IDs
{prefix}:jobacceptance:{scope} string (integer) Per-scope max downloadable job orders override
{prefix}:event:receiver:list stream Receiver event stream
{prefix}:event:responder:list stream Responder event stream
{prefix}:joborder:changes:{scope} stream Job change log consumed by the Job Order Publisher
{prefix}:joborder:changes:_global stream Global sentinel stream for new-scope discovery
{prefix}:pubseq:{scope} string (integer) Monotonic sequence counter per scope for state-index publishes
{prefix}:publisher:stream-cursors hash Last-processed stream ID per scope, for publisher restart recovery
{prefix}:active-scopes set All scopes that have had at least one job stored
{prefix}:sfc:work:{scope} stream SFC work items distributed via consumer group sfc-engine
{prefix}:sfc:execution:{job_id} JSON SFC execution state for a running job (current step, action states)
{prefix}:sfc:active-jobs set Job IDs with active (non-completed, non-failed) SFC execution state
{prefix}:poststartlock:{config_identifier} string Distributed lock ensuring only one replica executes post_start()

Job Change Stream

JobOrderAndStateDAO.save() and JobResponseDAO.save() each append a change record to two Redis streams as part of the same pipeline(transaction=True) block that writes the document and updates the secondary index. This makes the stream write atomic with the state change — if the pipeline fails, neither the state nor the stream entry is written.

Per-scope stream (joborder:changes:{scope})

Written by JobOrderAndStateDAO.save() with the transition name as change_type:

pipe.xadd(
    key_schema.job_change_stream(scope),
    {
        "change_type": change_type,  # transition name, e.g. "Store", "Start", "Clear"
        "job_order_id": job_order_id,
        "scope": scope,
        "ts": datetime.now(UTC).isoformat(),
    },
    maxlen=5000,
    approximate=True,
)

Written by JobResponseDAO.save() with change_type always "ResultUpdate":

pipe.xadd(
    key_schema.job_change_stream(scope),
    {
        "change_type": "ResultUpdate",
        "job_order_id": job_response.job_order_id,
        "scope": scope,
        "ts": datetime.now(UTC).isoformat(),
    },
    maxlen=5000,
    approximate=True,
)

The stream is bounded with maxlen=5000 and approximate=True. At a job cadence of one job every few seconds, this covers well over 30 minutes of publisher downtime.

Global sentinel stream (joborder:changes:_global)

Every DAO save also appends an identical entry to joborder:changes:_global. The Job Order Publisher always includes this stream in its XREAD loop. When a message arrives for a scope not yet tracked, the publisher adds the scope's per-scope stream to its XREAD set, eliminating the need for periodic polling of active-scopes.

Active scopes set (active-scopes)

JobOrderAndStateDAO.save() also issues SADD active-scopes {scope} inside the pipeline on every call. Since SADD is idempotent, this registers the scope on first Store and is a no-op on subsequent saves.

Publisher cursor persistence

The JobOrderPublisher tracks its read position in each stream using Redis hash publisher:stream-cursors. After each successful XREAD batch, all cursors are saved with HSET. On startup (or after an MQTT reconnect), cursors are reloaded with HGETALL so the publisher resumes from where it left off without reprocessing entries.

The publisher also maintains a monotonic sequence counter per scope (pubseq:{scope}) incremented with INCR on every state-index publish. The MES uses sequence gaps to detect missed updates after a connectivity outage.

Redis API Usage Examples

All DAO classes are async and use redis.asyncio. They are initialized with a shared redis.ConnectionPool and a RedisKeySchema instance.

Setup

import redis.asyncio as redis
from microdcs.redis import (
    RedisKeySchema,
    CloudEventDedupeDAO,
    JobOrderAndStateDAO,
    JobResponseDAO,
)

# Create connection pool and key schema (usually done by MicroDCS core)
pool = redis.ConnectionPool(host="localhost", port=6379, protocol=3)
client = redis.Redis(connection_pool=pool)
key_schema = RedisKeySchema(prefix="microdcs-app")

CloudEvent Deduplication

CloudEventDedupeDAO uses atomic SET NX with a TTL to track already-seen event IDs. Returns True if the event is a duplicate.

dedupe_dao = CloudEventDedupeDAO(client, key_schema, ttl=600)

is_dup = await dedupe_dao.is_duplicate("urn:source:1", "event-id-123")
# First call: False (new event, key created with 600s TTL)

is_dup = await dedupe_dao.is_duplicate("urn:source:1", "event-id-123")
# Second call: True (duplicate, key already exists)

Transaction Deduplication

TransactionDedupeDAO works the same way, scoped by a scope string and transaction ID:

from microdcs.redis import TransactionDedupeDAO

tx_dao = TransactionDedupeDAO(client, key_schema, ttl=600)

is_dup = await tx_dao.is_duplicate("plant-1", "tx-abc")
# False on first call, True on subsequent calls within the TTL

Counters

CounterDAO provides atomic INCR for named counters:

from microdcs.redis import CounterDAO

counter_dao = CounterDAO(client, key_schema)

value = await counter_dao.increment("jobs_processed")
# Returns 1 on first call, 2 on second, etc.

current = await counter_dao.get("jobs_processed")
# Returns the current value, or 0 if the counter does not exist

Job Orders

JobOrderAndStateDAO stores job order documents as Redis JSON with a sorted set index by priority:

from microdcs.models.machinery_jobs import (
    ISA95JobOrderAndStateDataType,
    ISA95JobOrderDataType,
)

joborder_dao = JobOrderAndStateDAO(client, key_schema)

# Save a job order (serialized as JSON, indexed by priority in sorted set)
job = ISA95JobOrderAndStateDataType(
    job_order=ISA95JobOrderDataType(job_order_id="job-001", priority=10)
)
await joborder_dao.save(job, scope="plant-1")

# Retrieve by ID
job = await joborder_dao.retrieve("job-001")

# List all job order IDs in a scope (ordered by priority)
ids = await joborder_dao.list("plant-1")
# ["job-001", ...]

# Delete a job order and remove from sorted set
await joborder_dao.delete("job-001", scope="plant-1")

Job Responses

JobResponseDAO stores job response documents as Redis JSON and uses a RediSearch index for queries by job_order_id and normalized state:

from microdcs.models.machinery_jobs import (
    ISA95JobResponseDataType,
    ISA95StateDataType,
    LocalizedText,
)

jobresponse_dao = JobResponseDAO(client, key_schema)

# Must be called once before any save/query operations (idempotent)
await jobresponse_dao.initialize()

# Save a job response
response = ISA95JobResponseDataType(
    job_response_id="resp-001",
    job_order_id="job-001",
    job_state=[
        ISA95StateDataType(state_text=LocalizedText(text="AllowedToStart")),
        ISA95StateDataType(state_text=LocalizedText(text="Ready")),
    ],
)
await jobresponse_dao.save(response, scope="plant-1")

# Retrieve by response ID
resp = await jobresponse_dao.retrieve("resp-001")

# Query by job order ID (uses RediSearch index)
resp = await jobresponse_dao.retrieve_by_job_order_id("job-001")

# Query all responses in a given state within a scope (uses RediSearch index)
state = [
    ISA95StateDataType(state_text=LocalizedText(text="AllowedToStart")),
    ISA95StateDataType(state_text=LocalizedText(text="Ready")),
]
responses = await jobresponse_dao.retrieve_by_state("plant-1", state)

# Delete a job response
await jobresponse_dao.delete("resp-001", scope="plant-1")

Work Masters

WorkMasterDAO stores work master definitions as Redis JSON with a set index:

from microdcs.redis import WorkMasterDAO
from microdcs.models.machinery_jobs import ISA95WorkMasterDataType

workmaster_dao = WorkMasterDAO(client, key_schema)

# Save
wm = ISA95WorkMasterDataType(id="wm-001")
await workmaster_dao.save(wm, scope="plant-1")

# Retrieve
wm = await workmaster_dao.retrieve("wm-001")

# Check membership
exists = await workmaster_dao.is_member("wm-001", "plant-1")

# List all IDs in scope
ids = await workmaster_dao.list("plant-1")

# Delete
await workmaster_dao.delete("wm-001", scope="plant-1")

Set-Based List DAOs

Several DAOs manage simple sets of IDs for resource tracking. They all follow the same pattern — EquipmentListDAO, MaterialClassListDAO, MaterialDefinitionListDAO, PersonnelListDAO, and PhysicalAssetListDAO:

from microdcs.redis import EquipmentListDAO

equipment_dao = EquipmentListDAO(client, key_schema)

# Add an ID to the set
await equipment_dao.add_to_list("eq-001", scope="plant-1")

# Check membership
exists = await equipment_dao.is_member("eq-001", "plant-1")

# Remove from set
await equipment_dao.remove_from_list("eq-001", scope="plant-1")

SFC Execution State

SfcExecutionDAO stores per-job SFC execution state as Redis JSON documents and provides atomic compare-and-swap (CAS) operations via Lua scripts for multi-instance coordination.

Document Structure

Each sfc:execution:{job_id} document maps to an SfcExecutionState dataclass:

Field Type Purpose
job_id string Job Order ID
scope string Station/equipment scope
work_master_id string Work Master that provided the recipe
current_step string Name of the currently active SFC step (branch name during branching)
correlation_id string Job-level CloudEvent correlationid (surrogate UUID); constant for the job lifetime, carried on every CE emitted for this job
active_steps list[string] All currently active step names; populated during branch execution
actions dict[string, SfcActionExecution] Per-action execution state
completed bool Whether the recipe has completed successfully
failed bool Whether the recipe has failed
error string or null Error message if failed

Each SfcActionExecution entry tracks:

Field Type Purpose
name string Action name (matches SfcActionAssociation.name in recipe)
state SfcActionState pending, dispatched, waiting, completed, or failed
command_ce_id string or null CloudEvent id of the dispatched command CE; routing key for matching incoming responses back to this action
attempt int Dispatch attempt counter
result any or null Action result data
error string or null Error message if action failed

CAS Lua Scripts

Every SFC state mutation uses one of four Lua scripts. Each script runs atomically on the Redis server:

cas_action_state — Compare-and-swap on a single action's state:

  1. Reads $.actions.{name}.state from the JSON document
  2. If current state ≠ expected state → returns ALREADY_HANDLED
  3. Sets new state, optionally updates command_ce_id and attempt
  4. Optionally XADDs a follow-up work item to the SFC work stream
  5. Returns OK

Used for: pending → dispatched (push_command dispatch), pending → waiting (pull_event setup), dispatched → completed (action completion), waiting → completed (pull_event received), state → failed (action failure).

cas_advance_step — Compare-and-swap on the current step:

  1. Reads $.current_step from the JSON document
  2. If current step ≠ expected step → returns ALREADY_HANDLED
  3. Sets new current_step and active_steps
  4. Optionally XADDs a follow-up work item (e.g., dispatch_action:{next_action})
  5. Returns OK

Used for: step transitions after all actions in a step are completed.

cas_finish — Mark execution as completed or failed:

  1. Sets $.completed = true or $.failed = true (with optional error message)
  2. Removes the job_id from the sfc:active-jobs set
  3. Returns OK

cas_branch_advance — Atomic replace/remove of a completed step within active_steps:

  1. Reads $.active_steps from the JSON document
  2. If completed_step is not in active_steps → returns ALREADY_HANDLED (idempotent for races)
  3. Replaces completed_step with next_step (or removes it when next_step is empty, i.e. the path end)
  4. Optionally XADDs a follow-up work item (e.g., dispatch_action:{name} for the next branch step)
  5. Returns JSON-encoded new active_steps list on success

Used for: advancing individual branch paths during simultaneous (AND) execution; removing the final step of a selection (OR) path on completion. The caller checks whether the returned list is empty to detect branch convergence.

Active Jobs Set

The sfc:active-jobs set tracks all job IDs with non-terminal execution state. It is:

  • Added to by SfcExecutionDAO.save() when the execution state is not completed/failed
  • Removed from by the cas_finish Lua script atomically when marking a job completed/failed
  • Read by _recovery_scan() on startup to find jobs that need resumption

SFC Work Stream

The SFC work stream (sfc:work:{scope}) distributes work items across SFC engine instances using a Redis Stream consumer group.

Stream Schema

Each stream entry contains:

Field Value Purpose
job_id Job Order ID Which job needs attention
action start_recipe / dispatch_action:{name} / resume What to do
scope Station scope For routing and key construction

The stream is bounded with maxlen=~5000 (approximate trimming).

Consumer Group

Group name: sfc-engine (constant SFC_CONSUMER_GROUP).

  • Created by SfcExecutionDAO.ensure_consumer_group() with XGROUP CREATE ... MKSTREAM (idempotent — ignores BUSYGROUP error)
  • Read by each engine instance via XREADGROUP with BLOCK timeout (default 2 seconds) and COUNT limit (default 10)
  • Acknowledged by XACK after successful processing of each work item

Writers

Writer Action Trigger
NB processor start_recipe Job reaches AllowedToStart via StoreAndStart
SFC engine CAS dispatch_action:{name} Written atomically inside cas_action_state or cas_advance_step when the next step has a push_command action
SFC engine recovery resume On startup, for each active incomplete job found in sfc:active-jobs

XAUTOCLAIM Recovery

Each engine instance periodically calls XAUTOCLAIM with a configurable minimum idle time (default 30 seconds). This transfers pending entries from dead consumers to the calling consumer. Because all push_command deliveries carry a correlation_id, re-dispatch after XAUTOCLAIM is safe — equipment handles duplicates idempotently.

Distributed post_start Lock

When multiple replicas are running, each replica calls post_start() on startup. For processors that must only run post_start() once across the replica set (for example, to avoid sending duplicate initialisation messages), set post_start_singleton = True on the processor class.

class MyProcessor(CloudEventProcessor):
    post_start_singleton = True
    ...

When post_start_singleton = True, MicroDCS.main() calls PostStartLockDAO.acquire() before calling post_start(). The DAO issues a Redis SET NX EX command:

  • If the key did not exist, the lock is acquired and post_start() is executed on this instance.
  • If the key already exists (another replica already holds the lock), post_start() is skipped silently.

The lock key is poststartlock:{config_identifier} and expires automatically after ProcessingConfig.post_start_lock_ttl seconds (default: 30 s). This TTL ensures that a crashed instance does not permanently prevent other replicas from running post_start() on restart.

from microdcs.redis import PostStartLockDAO, RedisKeySchema

key_schema = RedisKeySchema(prefix="myapp")
dao = PostStartLockDAO(redis_client, key_schema, ttl=30)

acquired = await dao.acquire("my-processor")
if acquired:
    await processor.post_start()

References