Started with two blockers for clinical: qhorus#154 (correlationId in InboundHumanMessage) and qhorus#153 (CDI event on message receipt). Simple issues. Neither was.

The inbound path was half-done

The design question for #154 was whether to add correlationId to InboundHumanMessage alone, or expand NormalisedMessage to carry the full parameter set that messageService.send() accepts. The argument for the fuller design: ChannelGateway.receiveHumanMessage() was hard-coding four parameters to null with no principled reason — the normaliser SPI is supposed to be the complete translation point.

We expanded NormalisedMessage to seven fields: type, content, senderInstanceId, correlationId, inReplyTo, artefactRefs, target. The gateway became a clean 1:1 mapping. ClinicalInboundNormaliser now passes correlationId through, so clinical’s PI backend can supply the deviation UUID and have the commitment auto-fulfill on DONE/DECLINE without a bypass workaround.

The CDI event that wasn’t

#153 asked for a CDI event on message receipt so clinical could observe PI responses without polling. The obvious implementation — fire Event<MessageReceivedEvent> from MessageService.send() — works fine for an embedded harness. It’s wrong for a platform where agents can be anywhere.

I asked where qhorus was supposed to sit in the topology: same JVM, same machine, LAN, WAN. All of them. CDI events don’t cross JVM boundaries. If MessageService fires a CDI event, distributed harnesses get nothing.

We looked at Vert.x EventBus (local vs cluster consumers), EIP Channel Adapter patterns, MicroProfile Reactive Messaging. The answer was consistent: the right abstraction is a transport-agnostic SPI. We built MessageObserver:

@FunctionalInterface
public interface MessageObserver {
    void onMessage(MessageReceivedEvent event);
    default Scope scope() { return Scope.LOCAL; }
    enum Scope { LOCAL, CLUSTER }
}

InProcessMessageBus is the @DefaultBean — fires a CDI event for embedded harnesses, zero configuration, zero overhead. Future Kafka or WebSocket implementations declare Scope.CLUSTER. Multiple implementations coexist via Instance<MessageObserver>.

The accompanying architecture document (docs/messaging-architecture.md) diagrams the two notification paths — ChannelBackend fanOut (per-channel, targeted) and MessageObserver (global, pluggable transport) — and explains why both exist. Claude caught something I missed in code review: fireAsync() inside @Transactional dispatches before the transaction commits. The MessageReceivedEvent record is intentionally self-contained — channelName, channelId, type, senderId, correlationId, content — so observers don’t need to query the database at all.

Claudony flagged a gap the document had glossed over: in a multi-node fleet, each node has its own channel store and observer set. The backend model doesn’t fix cross-node delivery. That’s now documented explicitly as a known gap (qhorus#162).


<
Previous Post
Testing the Handler, Not the Bus
>
Next Post
What the mock was hiding