Symptom

DefaultNodeObserver could dispatch node state changes after close() had been called — most visibly as duplicate processor wakeups across SSE reconnections. Callers that called observe() immediately after close() (e.g. on reconnect) would find a new collection job start and dispatch stale state.

Root cause

close() used scope.launch { mutex.withLock { jobs.values.forEach { it.cancel() }; jobs.clear() } } — an asynchronous dispatch into the caller’s scope. Two races followed:

  1. Queued-emission race: a StateFlow emission enqueued on the dispatcher before close() was called (FIFO position 1) would execute before the close coroutine (position 2), dispatching a value that should have been suppressed.
  2. Post-close observe() leak: the async close coroutine clears the jobs map; if a concurrent observe() call ran after the map was cleared, it would find !jobs.containsKey(id) true and register a brand new collection job, leaking observation past the close boundary.

All coroutines (including the setup wrappers in observe() and remove()) were also launched directly in the caller-provided scope, so they were siblings of all other caller work rather than children of a scope owned by NodeObserver.

Fix

Introduced a private observerScope as a structured child of the caller’s scope:

1
2
3
private val observerScope = CoroutineScope(
    scope.coroutineContext + SupervisorJob(scope.coroutineContext[Job])
)

After close(), any observerScope.launch calls (from racing observe() or remove()) are immediately no-ops because the scope is cancelled.

Prevention