DefaultNodeObserver could dispatch node state changes after close() had been called — most visibly as duplicate processor wakeups across SSE reconnections. Callers that called observe() immediately after close() (e.g. on reconnect) would find a new collection job start and dispatch stale state.
close() used scope.launch { mutex.withLock { jobs.values.forEach { it.cancel() }; jobs.clear() } } — an asynchronous dispatch into the caller’s scope. Two races followed:
StateFlow emission enqueued on the dispatcher before close() was called (FIFO position 1) would execute before the close coroutine (position 2), dispatching a value that should have been suppressed.jobs map; if a concurrent observe() call ran after the map was cleared, it would find !jobs.containsKey(id) true and register a brand new collection job, leaking observation past the close boundary.All coroutines (including the setup wrappers in observe() and remove()) were also launched directly in the caller-provided scope, so they were siblings of all other caller work rather than children of a scope owned by NodeObserver.
Introduced a private observerScope as a structured child of the caller’s scope:
1
2
3
private val observerScope = CoroutineScope(
scope.coroutineContext + SupervisorJob(scope.coroutineContext[Job])
)
scope.launch calls in observe() and remove() changed to observerScope.launch.close() replaced with observerScope.cancel() — synchronous, atomic, cancels all children immediately.SupervisorJob keeps individual node collection failures isolated; the parent Job link ensures the parent scope’s cancellation still propagates to observerScope.After close(), any observerScope.launch calls (from racing observe() or remove()) are immediately no-ops because the scope is cancelled.
CoroutineScope as a child of whatever external scope it was given — never launch directly into the caller’s scope.close() / cancel() / stop() style teardown must be synchronous. An async scope.launch { ... } inside a cancel method creates a teardown race window by definition.StandardTestDispatcher (non-eager, non-unconfined), which exposes FIFO scheduling races invisible to UnconfinedTestDispatcher: close stops already-queued emission from being dispatched and observe after close is a no-op.