Post

Icon NodeManager StateFlow Architecture Review

Comprehensive analysis of NodeManager's StateFlow-based architecture, identifying inefficiencies and proposing improvements for better reactive UI integration

NodeManager StateFlow Architecture Review

NodeManager StateFlow Architecture Review

Current Architecture Analysis

Core Data Structure

The NodeManager stores nodes in a map with the following structure:

1
2
3
4
5
6
7
8
9
protected val nodes: MutableMap<String, NodeFlow> = mutableMapOf()

sealed class NodeFlow {
    data class Success(
        val node: MutableStateFlow<Node>,  // ⚠️ Mutable state
        val instance: String = Uuid.random().toString()
    ) : NodeFlow()
    data class Error(val msg: String, val exception: Exception?) : NodeFlow()
}

Current Workflow Pattern

Throughout the codebase, the typical pattern is:

  1. Read node from manager → Get NodeFlow.Success containing MutableStateFlow<Node>
  2. Extract snapshot value → Access node.value to read current state
  3. Modify node → Create copy with changes
  4. Call update() → Post back to NodeManager via nodeManager.update(modifiedNode)
  5. NodeManager updates source → Updates the MutableStateFlow via flow.node.update { node }
  6. Observers react → NodeObserver collects emissions and triggers node.type.emit(node)

Key Components

1. NodeManager (Client vs Server)

Server (ServerNodeManager):

  • Actor-based serialization for thread-safe updates
  • File persistence for durability
  • Observes only nodes owned by this server (node.isMine())
  • Full cluster state management

Client (ClientNodeManager):

  • No actor pattern (single-threaded UI context)
  • No file operations
  • Observes ALL nodes for UI reactivity
  • Updates posted to server via HTTP

2. NodeObserver

1
2
3
4
5
6
7
8
9
10
class DefaultNodeObserver {
    suspend fun observe(nodeFlow: NodeFlow.Success) {
        scope.launch {
            nodeFlow.node.collect { node ->
                // Emit to processor for state-specific handling
                node.type.emit(node)
            }
        }
    }
}

Server-Side Processors

Processors follow similar pattern:

  1. Read node via nodeManager.readNode()
  2. Extract value, compute result
  3. Update via nodeManager.update(node.copy(...))

Critical Analysis

✅ What Works Well

  1. Single Source of Truth: The map in NodeManager is the canonical state
  2. Clear Separation: Server vs Client implementations handle different concerns
  3. Thread Safety: Server uses actor pattern effectively
  4. Reactive Updates: StateFlow emissions propagate changes throughout system
  5. Error Handling: NodeFlow sealed class provides type-safe error states

⚠️ Concerns & Inefficiencies

1. Double Wrapping Anti-Pattern

You’re storing MutableStateFlow<Node> in a map, then wrapping it in NodeFlow.Success, then returning it. This creates unnecessary indirection:

1
Map<String, NodeFlow.Success(MutableStateFlow<Node>)>

The NodeFlow wrapper adds little value since you’re storing in a map anyway (nullability is handled by map lookup).

2. Read-Modify-Update Race Conditions (Client)

In the client, this pattern is dangerous:

1
2
3
val node = (nodeManager.readNode(id) as NodeFlow.Success).node.value
// ... time passes, user clicks button ...
nodeManager.update(node.copy(state = NodeState.EXECUTED))

Problem: The node is a snapshot. If the StateFlow updated between read and update, those changes are lost. You’re overwriting with stale data.

Current Mitigation: The client implementation checks for duplicates:

1
2
3
if (existing is NodeFlow.Success && node == existing.node.value) {
    return  // Ignore duplicate
}

But this doesn’t prevent lost updates if fields differ.

3. Composables Don’t Subscribe to Source StateFlow

Most composables do this:

1
2
val nodeFlow = nodeManager.readNode(nodeId)
val node = (nodeFlow as? NodeFlow.Success)?.node?.value

They take a snapshot but don’t subscribe. To get updates, they must:

  • Re-read on recomposition (inefficient)
  • Or manually manage StateFlow subscription (boilerplate)

The MutableStateFlow exists but isn’t being leveraged in UI!

4. NodeObserver Redundancy

The NodeObserver collects from each MutableStateFlow and calls node.type.emit(). This is an extra layer of indirection. Why not have processors/UI subscribe directly to the StateFlow?

5. Update Function Semantic Overload

nodeManager.update() does multiple things:

  • Create new node (if doesn’t exist)
  • Update existing node
  • Trigger HTTP post to server (client-side user edits)
  • Trigger file persistence (server-side)
  • Call observe() to start collection

This violates Single Responsibility Principle.

6. Subscription Count Warning Ignored

1
2
3
if (flow.node.subscriptionCount.value > 1) {
    logger.e("node has multiple observers - probably a bug")
}

This warning suggests the architecture doesn’t expect multiple collectors. But with Compose, you WANT multiple subscribers!


Proposed Improvements

Embrace StateFlow fully and eliminate redundant layers.

Architecture Changes

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/**
 * Simplified - no NodeFlow wrapper needed
 */
class ClientNodeManager(...) {
    // Exposed as StateFlow for UI consumption
    private val nodes: MutableMap<String, MutableStateFlow<Node>> = mutableMapOf()
    
    // For UI to discover available nodes
    private val _nodeIds = MutableStateFlow<Set<String>>(emptySet())
    val nodeIds: StateFlow<Set<String>> = _nodeIds
    
    /**
     * Get or create a StateFlow for a node.
     * UI can subscribe directly and get automatic updates.
     */
    fun getNodeFlow(id: String): StateFlow<Node>? {
        return nodes[id]
    }
    
    /**
     * Update via transformation function - safer than copy/replace
     */
    suspend fun updateNode(id: String, transform: (Node) -> Node) {
        nodes[id]?.update(transform)
        
        // Post to server if needed
        val updated = nodes[id]?.value
        if (updated?.state == NodeState.USER_EDIT) {
            scope.launch {
                findServer(updated)?.let { server ->
                    nodeHttp.postNode(host = server, node = updated)
                }
            }
        }
    }
    
    /**
     * Overload for full replacement when you have the new node
     */
    suspend fun updateNode(node: Node) {
        updateNode(node.id) { node }
    }
}

Composable Usage

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Composable
fun NodeCard(nodeId: String) {
    val nodeManager: NodeManager = koinInject()
    
    // Direct subscription - recomposes automatically on updates!
    val node by nodeManager.getNodeFlow(nodeId)
        ?.collectAsState() 
        ?: return Text("Node not found")
    
    // Render using current node state
    Card {
        Text(node.name())
        Text("State: ${node.state}")
        
        Button(onClick = {
            scope.launch {
                // Transform function ensures we're updating latest state
                nodeManager.updateNode(nodeId) { current ->
                    current.copy(state = NodeState.EXECUTED)
                }
            }
        }) {
            Text("Execute")
        }
    }
}

Processor Usage

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class ServerCalculationProcessor {
    suspend fun process(node: Node): Boolean {
        val meta = node.meta as CalculationMetaData
        
        // Get target StateFlow
        val targetFlow = nodeManager.getNodeFlow(meta.target) ?: return false
        
        // Transform with latest state
        nodeManager.updateNode(meta.target) { current ->
            val tm = current.meta as DataPointMetaData
            val newSnapshot = Snapshot(now(), computedValue)
            current.copy(
                meta = tm.copy(snapshot = newSnapshot),
                state = NodeState.EXECUTED
            )
        }
        
        return true
    }
}

Benefits

No more read-update races: Transform function always works with latest state
Direct StateFlow subscription: Composables recompose automatically
Simpler code: Eliminate NodeFlow wrapper, NodeObserver indirection
Better Compose integration: Natural use of collectAsState()
Less boilerplate: No manual LaunchedEffect for loading
Multiple subscribers welcome: StateFlow is designed for this!


Option B: Immutable Repository with Event Channel

If you want to avoid mutable state in the map:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class ClientNodeManager(...) {
    // Immutable snapshots
    private val nodes: MutableMap<String, Node> = mutableMapOf()
    
    // Single event channel for all updates
    private val _updates = MutableSharedFlow<NodeUpdate>()
    val updates: SharedFlow<NodeUpdate> = _updates
    
    data class NodeUpdate(val id: String, val node: Node, val updateType: UpdateType)
    
    suspend fun updateNode(node: Node) {
        nodes[node.id] = node
        _updates.emit(NodeUpdate(node.id, node, UpdateType.MODIFIED))
    }
    
    fun getNode(id: String): Node? = nodes[id]
}

Composable Usage

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Composable
fun NodeCard(nodeId: String) {
    val nodeManager: NodeManager = koinInject()
    var node by remember { mutableStateOf(nodeManager.getNode(nodeId)) }
    
    // Subscribe to updates
    LaunchedEffect(nodeId) {
        nodeManager.updates
            .filter { it.id == nodeId }
            .collect { update ->
                node = update.node
            }
    }
    
    // Render...
}

Trade-offs

✅ True immutability - easier to reason about
✅ Clear event stream for debugging
❌ More boilerplate in composables
❌ Filtering updates per-node is less efficient than per-StateFlow subscription


Option C: Hybrid - Keep Current, Fix Usage

Keep the existing architecture but fix how it’s used:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// Add convenience extension to NodeManager
fun NodeManager.observeNodeAsState(id: String): StateFlow<Node>? {
    return when (val flow = readNode(id)) {
        is NodeFlow.Success -> flow.node
        is NodeFlow.Error -> null
    }
}

// Add atomic update operation
suspend fun NodeManager.atomicUpdate(id: String, transform: (Node) -> Node) {
    when (val flow = readNode(id)) {
        is NodeFlow.Success -> {
            flow.node.update { current ->
                val updated = transform(current)
                // Side effects (HTTP post, etc.) handled here
                update(updated)  // Triggers existing machinery
                updated
            }
        }
        is NodeFlow.Error -> throw IllegalStateException("Node not found: $id")
    }
}

Composable Usage

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Composable
fun NodeCard(nodeId: String) {
    val nodeManager: NodeManager = koinInject()
    
    // Subscribe directly to the StateFlow!
    val node by nodeManager.observeNodeAsState(nodeId)
        ?.collectAsState()
        ?: return Text("Node not found")
    
    Button(onClick = {
        scope.launch {
            nodeManager.atomicUpdate(nodeId) { current ->
                current.copy(state = NodeState.EXECUTED)
            }
        }
    }) {
        Text("Execute")
    }
}

Benefits

✅ Minimal changes to existing code
✅ Fixes the race condition issue
✅ Enables direct StateFlow subscription
❌ Keeps the complex multi-layer architecture
❌ Doesn’t address NodeObserver redundancy


Recommendations

Immediate (Low-Effort Wins)

  1. Add observeNodeAsState() helper - Let composables subscribe to StateFlow directly
  2. Add atomicUpdate() helper - Prevent read-update races with transform function
  3. Document the pattern - Make it clear composables should subscribe, not snapshot

Short-Term (Moderate Refactor)

  1. Simplify client-side to Option A - Remove NodeFlow wrapper on client
  2. Make NodeObserver opt-in - Only for nodes that need type-specific processing
  3. Split update() into create/update/sync - Clear separation of concerns

Long-Term (If Needed)

  1. Consider Option B for server - If immutability is important for distributed consistency
  2. Add optimistic updates - Client updates immediately, rollback on server conflict
  3. Event sourcing - Store node changes as events for better debugging/replay

Key Insight: You Already Have the Infrastructure!

The MutableStateFlow<Node> in your map IS the reactive source of truth.

Your composables just need to subscribe to it instead of taking snapshots. Once you do that:

  • ✅ Updates flow automatically to UI
  • ✅ No need to re-read on recomposition
  • ✅ No race conditions with transform-based updates
  • ✅ Multiple composables can observe same node (what StateFlow is designed for!)

The problem isn’t the architecture - it’s that you’re not fully leveraging the StateFlow you already have.


Example Migration

Before (Current Pattern)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Composable
fun MyScreen() {
    val nodeManager: NodeManager = koinInject()
    var nodeFlow by remember { mutableStateOf<NodeFlow?>(null) }
    
    LaunchedEffect(nodeId) {
        nodeFlow = nodeManager.readNode(nodeId)
    }
    
    val node = (nodeFlow as? NodeFlow.Success)?.node?.value
    
    node?.let {
        Text(it.name())
        Button(onClick = {
            scope.launch {
                nodeManager.update(it.copy(state = NodeState.EXECUTED))
            }
        }) { Text("Execute") }
    }
}

After (Leveraging StateFlow)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Composable
fun MyScreen() {
    val nodeManager: NodeManager = koinInject()
    
    // Direct subscription - automatically recomposes
    val node by nodeManager.observeNodeAsState(nodeId)
        ?.collectAsState() 
        ?: return Text("Loading...")
    
    Text(node.name())
    Button(onClick = {
        scope.launch {
            // Atomic update with latest state
            nodeManager.atomicUpdate(nodeId) { current ->
                current.copy(state = NodeState.EXECUTED)
            }
        }
    }) { Text("Execute") }
}

Difference:

  • ✅ Less code
  • ✅ No manual state management
  • ✅ No LaunchedEffect needed
  • ✅ Automatic recomposition
  • ✅ No race conditions

Conclusion

Your architecture is fundamentally sound - you chose StateFlow, which is perfect for this. The issue is that you’re fighting against it by taking snapshots instead of subscribing.

Recommended Path Forward:

  1. Add the helper functions (observeNodeAsState, atomicUpdate) - 1 hour
  2. Migrate a few composables to the new pattern - verify it works - 2 hours
  3. Update documentation and examples - 1 hour
  4. Gradually migrate rest of codebase - ongoing

This will give you:

  • Cleaner code
  • Better Compose integration
  • No race conditions
  • Less boilerplate
  • Proper reactive architecture

The infrastructure is already there - you just need to use it properly! 🚀

This post is licensed under CC BY 4.0 by the author.