Not a field bug — the last structural anti-pattern from the Phase 1–4 dispatch
rework. After Phase 4 a node that finished process() still drove its own
fan-out by calling ServerNodeManager.executeSources(self), which did an
O(N) nodePersistence.loadAll() scan on every value/state change to find the
nodes listing it as a source and invoked each. The source was still
responsible for “telling” its observers — a push wearing an observer costume —
and the same coupling was duplicated in ~6 per-processor onSourceTrigger
overrides and a global EventMonitor arm.
The reverse edge (“who observes me?”) was recomputed by a full-table scan per
change instead of being held as a live subscription. The genuine observer
pattern (DefaultNodeObserver collecting a per-node Flow<Node>) existed but
only client-side; the server abandoned it in Phase 4 because it self-waked (a
write re-entered the writer’s own processor).
Bring the observer pattern to the server in the correct shape. ServerNodeManager
now owns one hot per-node MutableSharedFlow<NodePublication> (flowFor(id)),
published from the single CRUD write point (update) and from publish() for
digital sources whose value is written outside update (DataPoint snapshot,
pin change via EventMonitor). A new NodeObservationRegistry holds one
collector job per observer that merges its sources’ flows and, on emission,
wakes it through the unchanged seam invoke(observer, by = source.id(),
verb = actionOf(source)). Lifecycle is driven off CRUD in CrudLifecycleTask
(startup walk + wire on create/edit, unwire on delete) — no scan. executeSources,
the per-processor onSourceTrigger overrides, and KrillApp.wakeFromSource
are deleted; every fan-out point became a publish()/firing-update(). RESET
is terminal by construction: invoke pins a thread-bound PropagationContext
(via a ThreadContextElement) whose suppressPublish flag is set for a RESET
verb, so the reset write’s publication is suppressed at the publish point rather
than relying on each processor to remember. The same context carries a
monotonic propagation epoch; each collector drops an emission whose
(observerId, epoch) it already handled (bounded LRU), so a cyclic wiring fires
each observer at most once per propagation and terminates.
Two design reconciliations worth recording:
update() publishes on firing writes only. Creation
(isNewNode), transient/lifecycle states (PROCESSING, DELETING,
USER_EDIT, RESET, ERROR, …) and propagate = false settles
(setStateToNone, setErrorState, updatePinState) do not wake
observers — preserving the pre-change behavior that only genuine fires
fanned out. Pin observer wakes flow through the single PIN_CHANGED →
EventMonitor.publish path.SharedFlow is retained on delete (bounded by total node ids seen).server/jvmTest regression tests assert the contract without a scan and
without wall-clock timing: a source change wakes each observer exactly once
(NodeObservationRegistryTest), the per-(observer, epoch) dedupe caps a
cyclic wiring, editing sources rewires, a deleted source leaves observers
intact and self-heals, and the publish gate fires only on firing writes
(ServerNodeManagerFlowTest). The re-plumbed ServerTaskListProcessorResetTest
keeps RESET-terminal green.grep -rn "executeSources\|onSourceTrigger\|wakeFromSource"
server/src/jvmMain shared/src/commonMain must return only comments — a code
hit means the push path crept back.Cross-host source→observer never worked server-side before (the old
executeSources scan only saw local nodes). Two halves were added:
ServerNodeManager.publish broadcasts a SOURCE_TRIGGERED
event onto the general /events stream while the hop TTL is > 0 — the source
addresses no one; a peer reacts only because it subscribed.PeerObservationClient (the server analog of the app’s
EventClient) opens an authenticated SSE connection to a peer’s /events,
keeps a bounded in-memory mirror of referenced remote nodes from the
value-bearing events, and on a SOURCE_TRIGGERED publishes the mirrored node
into the local flowFor(identity) so the local collector reacts exactly
as to a local source. Subscriptions are ref-counted by NodeObservationRegistry
(open on the first remote-source observer of a host, close on the last). The
hop TTL is decremented at ingress and threaded through the flow boundary in
NodePublication (alongside the epoch) so a remote-ingress wake doesn’t reset
the cross-host cycle budget; at 0 the wake updates local state but announces
nothing further (D11).Gotcha that bit: SOURCE_TRIGGERED was never emitted before, so
SourceTriggerPayload was missing from the EventPayload polymorphic
registration in Serializer.kt. Emitting it on /events would have crashed
serialization at runtime (the swarm-wide “missing subclass = runtime crash”
trap). Registered it + added SourceTriggeredEventSerializationTest as a guard.
replay = 1 shared
flows, prompt cancel on delete/edit, and a cheap unchanged-wiring no-op so a
hot value write doesn’t churn the collector. A Pi-class node-count benchmark
(task 9.7) is still outstanding before broad rollout.SOURCE_TRIGGERED on every
firing publish even with no peers (the observer model means the source can’t
know its remote observers). It’s loss-tolerant (tryPostEvent) and the app’s
EventClient no-ops it, but a high-frequency local source adds /events
chatter; a “broadcast only if a peer is connected” optimization is possible.SOURCE_TRIGGERED arrived → the wake was dropped (no mirror for …).
Fixed by (1) seeding the mirror from the peer’s /nodes on every (re)connect
and (2) an on-demand /node/<id> fetch when a wake references an un-mirrored
source, caching the result. A wake is only dropped now if the peer is
unreachable / the node is gone.