Your First Processor
This tutorial walks through building a CloudEvent processor from scratch, covering the processor lifecycle, incoming/outgoing event handlers, the response chain, and wiring into the MicroDCS application.
Prerequisites
- A MicroDCS project initialized with
microdcs init(see the README) - A JSON Schema describing your message types
- Generated dataclasses from that schema via
uv run microdcs dataclassgen dataclasses <schema>.schema.json
Overview
A processor is a class that handles CloudEvents for a specific domain. It receives incoming events, deserializes them into typed dataclasses, runs your business logic, and returns response dataclasses that the framework wraps back into outgoing CloudEvents. Processors are protocol-agnostic — the same processor works over MQTT and MessagePack-RPC.
Step 1: Define a JSON Schema
Create a schema file in schemas/. Here is a minimal example with a Ping request and a Pong response:
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$id": "https://example.com/schemas/ping-pong/",
"title": "PingPong",
"description": "Ping/Pong message types.",
"oneOf": [
{ "$ref": "#/$defs/Ping" },
{ "$ref": "#/$defs/Pong" }
],
"$defs": {
"Ping": {
"type": "object",
"properties": {
"message": { "type": "string" }
},
"required": ["message"],
"x-request-type": { "$ref": "#/$defs/Ping" },
"x-response-type": { "$ref": "#/$defs/Pong" },
"x-cloudevent-type": "com.example.ping.v1",
"x-cloudevent-dataschema": "https://example.com/schemas/ping-pong/v1.0.0/ping"
},
"Pong": {
"type": "object",
"properties": {
"reply": { "type": "string" }
},
"required": ["reply"],
"x-cloudevent-type": "com.example.pong.v1",
"x-cloudevent-dataschema": "https://example.com/schemas/ping-pong/v1.0.0/pong"
}
}
}
Key schema extensions:
| Extension | Purpose |
|---|---|
x-cloudevent-type |
Sets the type field on the CloudEvent envelope |
x-cloudevent-dataschema |
Sets the dataschema field on the CloudEvent envelope |
x-request-type |
Declares the request dataclass (self-referencing for echo patterns) |
x-response-type |
Declares the response dataclass — this drives the response chain |
Generate the dataclasses:
This produces a file like models/ping_pong.py with Ping and Pong dataclasses. If x-response-type is set, Ping will inherit from DataClassResponseMixin[Pong].
Step 2: Understand the Generated Dataclasses
The generator produces dataclasses that look like this (simplified):
from dataclasses import dataclass
from microdcs.dataclass import (
DataClassConfig,
DataClassResponseMixin,
DataClassValidationMixin,
)
from microdcs.models.ping_pong_mixin import PingPongDataClassMixin
@dataclass(kw_only=True)
class Ping(
DataClassValidationMixin, DataClassResponseMixin["Pong"], PingPongDataClassMixin
):
__request_object__: InitVar[Ping | None] = None
message: str
class Config(DataClassConfig):
response_type: str = "Pong"
cloudevent_type: str = "com.example.ping.v1"
cloudevent_dataschema: str = "https://example.com/schemas/ping-pong/v1.0.0/ping"
@dataclass(kw_only=True)
class Pong(DataClassValidationMixin, PingPongDataClassMixin):
reply: str
class Config(DataClassConfig):
cloudevent_type: str = "com.example.pong.v1"
cloudevent_dataschema: str = "https://example.com/schemas/ping-pong/v1.0.0/pong"
Step 3: Write the Processor
Create your processor in app/processors/ping_pong.py:
import logging
from microdcs import ProcessingConfig
from microdcs.common import (
CloudEvent,
CloudEventProcessor,
MessageIntent,
ProcessorBinding,
incoming,
outgoing,
processor_config,
)
from app.models.ping_pong import Ping, Pong
logger = logging.getLogger("processor.ping_pong")
@processor_config(binding=ProcessorBinding.NORTHBOUND)
class PingPongProcessor(CloudEventProcessor):
def __init__(
self,
instance_id: str,
runtime_config: ProcessingConfig,
config_identifier: str,
):
super().__init__(instance_id, runtime_config, config_identifier)
@incoming(Ping)
async def handle_ping(self, ping: Ping) -> Pong | None:
logger.info("Received ping: %s", ping.message)
# Use the response chain to create a Pong from the Ping
return ping.response(reply=f"pong: {ping.message}")
@outgoing(Pong)
async def produce_pong(self, **kwargs) -> Pong | None:
return Pong(reply=kwargs.get("reply", "unsolicited pong"))
async def process_response_cloudevent(
self, cloudevent: CloudEvent
) -> list[CloudEvent] | CloudEvent | None:
return None
async def handle_cloudevent_expiration(
self, cloudevent: CloudEvent, timeout: int
) -> list[CloudEvent] | CloudEvent | None:
logger.warning("Event %s expired after %ds", cloudevent.id, timeout)
return None
async def trigger_outgoing_event(self, **kwargs) -> None:
return None
Anatomy of this processor
@processor_config(binding=...) — Required class decorator. The binding direction controls which MQTT topic intents the processor subscribes and publishes to:
| Binding | Subscribes to | Publishes to |
|---|---|---|
NORTHBOUND |
commands | data, events, metadata |
SOUTHBOUND |
data, events, metadata | commands |
The names come from the OT/ISA-95 automation pyramid. Northbound means the processor faces up toward higher-level systems (MES, ERP, cloud) — it receives commands from above and publishes data/events/metadata back up. Southbound means the processor faces down toward field-level systems (PLCs, sensors, equipment) — it receives data/events/metadata from below and sends commands down. In practice: if your processor executes work when told to, it is northbound; if it orchestrates other systems by issuing commands, it is southbound.
You can override the defaults with explicit subscribe_intents and publish_intents sets.
@incoming(Ping) — Registers handle_ping as the callback for incoming CloudEvents whose type matches Ping.Config.cloudevent_type. The framework deserializes the CloudEvent payload into a Ping instance and passes it as the first argument.
@outgoing(Pong) — Registers produce_pong as a callback for programmatically generating outgoing events. Invoke it via self.callback_outgoing(Pong, intent=MessageIntent.EVENT).
Three abstract methods must be implemented:
process_response_cloudevent— handles transport-level response messages (e.g. MQTT response topic replies)handle_cloudevent_expiration— called when a published event's expiry interval elapsestrigger_outgoing_event— entry point for application-driven outbound events (timers, API calls, etc.)
Step 4: Wire It Up
In app/__main__.py, register the processor with protocol handlers:
from microdcs.core import MicroDCS
from microdcs.mqtt import MQTTHandler, MQTTProtocolBinding, OTELInstrumentedMQTTHandler
from app.processors.ping_pong import PingPongProcessor
microdcs = MicroDCS()
microdcs.register_protocol_handler(
MQTTHandler(
microdcs.runtime_config.mqtt,
microdcs.redis_connection_pool,
microdcs.redis_key_schema,
),
OTELInstrumentedMQTTHandler(
microdcs.runtime_config.mqtt,
microdcs.redis_connection_pool,
microdcs.redis_key_schema,
),
)
ping_pong_processor = PingPongProcessor(
microdcs.runtime_config.instance_id,
microdcs.runtime_config.processing,
"ping-pong",
)
microdcs.register_protocol_binding(
MQTTProtocolBinding(
ping_pong_processor,
microdcs.runtime_config.processing,
microdcs.runtime_config.mqtt,
)
)
asyncio.run(microdcs.main())
The config_identifier string ("ping-pong") determines the MQTT topic namespace for this processor's bindings.
Adding a MessagePack-RPC Binding
To also expose the processor over MessagePack-RPC (e.g. for a sidecar container), register a MessagePackHandler and a MessagePackProtocolBinding for the same processor:
from microdcs.msgpack import (
MessagePackHandler,
MessagePackProtocolBinding,
OTELInstrumentedMessagePackHandler,
)
microdcs.register_protocol_handler(
MessagePackHandler(
microdcs.runtime_config.msgpack,
microdcs.redis_connection_pool,
microdcs.redis_key_schema,
),
OTELInstrumentedMessagePackHandler(
microdcs.runtime_config.msgpack,
microdcs.redis_connection_pool,
microdcs.redis_key_schema,
),
)
microdcs.register_protocol_binding(
MessagePackProtocolBinding(
ping_pong_processor,
microdcs.runtime_config.processing,
microdcs.runtime_config.msgpack,
)
)
The processor instance is the same — one processor, multiple transports. A sidecar container can now call the publish RPC method to submit CloudEvents for processing, and receives outgoing events as notification frames. See MessagePack-RPC Transport for the sidecar architecture and client usage.
The Response Chain
The response chain is the mechanism by which a request dataclass creates properly typed response objects. It is built from three components:
DataClassResponseMixin[R]
When a JSON Schema type has x-response-type, the code generator makes the request class inherit from DataClassResponseMixin[R] where R is the response type. This adds a .response(**kwargs) method to every instance of the request class.
# Ping inherits DataClassResponseMixin["Pong"]
pong = ping.response(reply="hello back")
# pong is a Pong instance
The takeover parameter
.response() accepts an optional takeover parameter — a list of field names to copy from the request to the response. This is useful when request and response share fields (common in OPC UA patterns):
@dataclass(kw_only=True)
class StoreCall(DataClassResponseMixin["StoreResponse"], ...):
job_order_id: str
job_order: ISA95JobOrderDataType
@dataclass(kw_only=True)
class StoreResponse(...):
job_order_id: str
return_status: MethodReturnStatus
# In the processor:
response = store_call.response(
takeover=["job_order_id"],
return_status=MethodReturnStatus(status_code=0),
)
# response.job_order_id is copied from store_call.job_order_id
__request_object__ injection
When the response dataclass declares an __request_object__ InitVar field, .response() automatically injects the calling request instance into it. The mixin __post_init__ method can then copy hidden fields or perform computed initialization from the request object.
This is the pattern used when request and response are the same type (echo/mirror patterns). The Hello dataclass in the greetings example demonstrates this:
@dataclass(kw_only=True)
class Hello(DataClassResponseMixin["Hello"], GreetingsDataClassMixin):
__request_object__: InitVar[Hello | None] = None
_hidden_str: str | None = None
name: str
When you call hello.response(name="world"):
DataClassResponseMixin.response()sees__request_object__in the response class constructor- It injects
self(the requestHello) as__request_object__ - The mixin
__post_init__receives the originalHelloand copies_hidden_strand other hidden fields from it
This means hidden fields (prefixed with _) survive the request-to-response round-trip without appearing in the serialized payload.
Full response chain flow
Incoming CloudEvent
→ framework deserializes payload into request dataclass
→ __post_init__ runs, populating hidden fields from custom metadata
→ your @incoming handler receives the typed request
→ you call request.response(**kwargs)
→ DataClassResponseMixin resolves the response type from Generic[R]
→ if response class has __request_object__, injects the request
→ if takeover is set, copies listed fields
→ response __post_init__ runs, copying hidden fields from __request_object__
→ you return the response
→ framework serializes it into an outgoing CloudEvent
→ hidden fields are extracted via __get_custom_metadata__()
and placed into CloudEvent custommetadata
How callback_incoming orchestrates the response
When you return a dataclass (or list of dataclasses) from an @incoming handler, the framework's callback_incoming method in CloudEventProcessor handles the rest:
- Wraps each response in a new
CloudEventviacreate_event()(setssource,datacontenttype,expiryintervalfromProcessingConfig) - Propagates envelope attributes from the request CloudEvent to the response:
correlationid— copied so the response stays correlated to the requestcausationid— set to the request CloudEvent'sid(causal chain)subject— copied if present on the request
- Serializes the payload via
serialize_payload(), which:- Calls
.to_jsonb()or.to_msgpack()on the dataclass depending ondatacontenttype - Sets
typeanddataschemaon the CloudEvent from the dataclassConfig - Calls
__get_custom_metadata__()to extract hidden fields intocustommetadata
- Calls
You do not need to create CloudEvent objects yourself in @incoming handlers — just return your typed dataclass instances.
Writing a mixin for hidden fields
Hidden fields (prefixed with _) are excluded from the serialized payload by DataClassMixin.__post_serialize__. To make them survive the request → response round-trip, you need a mixin that:
- On deserialization: reads hidden field values from CloudEvent
custommetadata(passed via__custom_metadata__or_consume_custom_metadata()) - On response creation: copies hidden fields from the request object (passed via
__request_object__) - On serialization: exports hidden fields back into
custommetadata(via__get_custom_metadata__())
Here is the pattern, using the generated greetings mixin as a reference:
from typing import TYPE_CHECKING, Any
from microdcs.dataclass import DataClassMixin
if TYPE_CHECKING:
from app.models.ping_pong import PingPong
class PingPongDataClassMixin(DataClassMixin):
def __post_init__(
self,
__request_object__: PingPong | None = None,
__custom_metadata__: dict[str, Any] | None = None,
) -> None:
# Call parent __post_init__ if it exists (important for MRO chain)
super_post_init = getattr(super(), "__post_init__", None)
if super_post_init is not None:
super_post_init()
# --- Deserialization path ---
# When the framework deserializes a CloudEvent payload, custom metadata
# from the CloudEvent envelope is injected via __custom_metadata__ InitVar
# or stored in a context var consumed by _consume_custom_metadata().
if __custom_metadata__ is None:
__custom_metadata__ = self._consume_custom_metadata()
if __custom_metadata__ is not None:
if hasattr(self, "_my_hidden_field"):
self._my_hidden_field = __custom_metadata__.get("x-my-hidden-field")
# --- Response creation path ---
# When .response() is called and the response class has __request_object__,
# the request instance is passed here so hidden fields can be copied over.
if __request_object__ is not None:
if hasattr(self, "_my_hidden_field"):
self._my_hidden_field = getattr(
__request_object__, "_my_hidden_field", None
)
def __get_custom_metadata__(self) -> dict[str, str]:
# --- Serialization path ---
# Called by CloudEvent.serialize_payload() to extract hidden fields
# back into the CloudEvent custommetadata dict.
custom_metadata = {}
value = getattr(self, "_my_hidden_field", None)
if value is not None:
custom_metadata["x-my-hidden-field"] = value
return custom_metadata
The three paths correspond to the three lifecycle stages:
| Stage | Mechanism | What happens |
|---|---|---|
| Incoming deserialization | __custom_metadata__ / _consume_custom_metadata() |
Hidden fields populated from CloudEvent custommetadata |
| Response creation | __request_object__ |
Hidden fields copied from request to response instance |
| Outgoing serialization | __get_custom_metadata__() |
Hidden fields exported back into CloudEvent custommetadata |
If your dataclass has no hidden fields, you still need to create the mixin file (the code generator expects it), but it can simply inherit from DataClassMixin with no overrides.
CloudEvent Attributes in Handlers
Processors can declare which CloudEvent envelope attributes should be extracted and passed as keyword arguments to @incoming handlers:
class MyProcessor(CloudEventProcessor):
def __init__(self, ...):
super().__init__(...)
self._event_attributes.extend([
CloudeventAttributeTuple("subject", "subject"),
CloudeventAttributeTuple("correlationid", "correlationid"),
])
@incoming(Ping)
async def handle_ping(self, ping: Ping, *, subject: str, correlationid: str) -> ...:
...
Helper decorators @scope_from_subject and @asset_id_from_subject extract structured information from the subject attribute automatically:
@scope_from_subject
@incoming(Ping)
async def handle_ping(self, ping: Ping, *, scope: str) -> ...:
# scope is the part of subject before the first "/"
...
Pre/Post Callback Hooks
Two optional hooks let you intercept the incoming callback flow:
__pre_outgoing_callback__ — Called before the @incoming handler. Can inspect the raw CloudEvent and decide whether to proceed:
async def __pre_outgoing_callback__(self, cloudevent, **kwargs):
if some_condition:
return False, None # skip the main callback
return True, None # proceed normally
__post_outgoing_callback__ — Called after the @incoming handler. Can inspect or modify the response list:
async def __post_outgoing_callback__(self, responses, cloudevent, **kwargs):
# responses is the list of dataclass objects returned by @incoming handler
# modify or filter them before the framework wraps them into CloudEvents
return responses
Testing Your Processor
Use pytest and unittest.mock to test processors without external dependencies:
import pytest
from unittest.mock import AsyncMock, MagicMock
from microdcs import ProcessingConfig
from app.processors.ping_pong import PingPongProcessor
from app.models.ping_pong import Ping
@pytest.fixture
def processor():
config = MagicMock(spec=ProcessingConfig)
config.cloudevent_source = "test-source"
config.message_expiry_interval = 60
return PingPongProcessor("test-instance", config, "ping-pong")
class TestPingPongProcessor:
@pytest.mark.asyncio
async def test_handle_ping(self, processor):
ping = Ping(message="hello")
result = await processor.handle_ping(ping)
assert result.reply == "pong: hello"
Next Steps
- Study the built-in
GreetingsCloudEventProcessorinsrc/microdcs/processors/greetings.pyfor a complete working example - Study
MachineryJobsCloudEventProcessorinsrc/microdcs/processors/machinery_jobs.pyfor a production-grade processor with Redis persistence and state machines - See Persistence for Redis-backed state management patterns