Krill Connectivity & Synchronization Report
Krill Connectivity & Synchronization Report
Krill Connectivity & Synchronization Report
Generated: 2025-12-03
Version: 1.0
Focus Areas: Node synchronization, MQTT, WebSockets, Beacons, StateFlows, Race Conditions
Table of Contents
- Executive Summary
- Architecture Overview
- Connectivity Flows
- Critical Issues Identified
- Race Conditions
- Exponential Execution Risks
- StateFlow Usage Issues
- MQTT Synchronization Problems
- Server-to-Server Synchronization
- WASM Client Special Cases
- Action Items
- Additional Findings
- Recommendations
- Prompt for Future Updates
Executive Summary
The Krill application is a distributed KMP (Kotlin Multiplatform) system with servers, clients (Android, iOS, Desktop), and WASM apps that synchronize node state across a network. The system uses:
- Multicast UDP beacons for node discovery
- MQTT for real-time updates (except WASM)
- WebSockets for WASM client updates
- REST APIs for node CRUD operations
- StateFlows for reactive state management
Key Findings:
- ✅ Good: Multicast beacon discovery enables automatic peer detection
- ✅ Good: Multiple transport mechanisms for different platforms
- ⚠️ Issues: Multiple race conditions in node updates
- ⚠️ Issues: Potential for infinite loops in bidirectional synchronization
- ⚠️ Issues: StateFlow update conflicts
- ⚠️ Issues: Missing synchronization in critical sections
- ⚠️ Issues: Broadcast storms possible with server-to-server beacons
Architecture Overview
Node Types
1
2
3
4
5
6
7
8
KrillApp (sealed class hierarchy)
├── Server - Hosts other nodes, runs processors
│ ├── Pin - GPIO control (Raspberry Pi)
│ ├── DataPoint - Data collection/processing
│ └── RuleEngine - Automation rules
├── Client - User interface applications
├── SerialDevice - Hardware integrations
└── Project - Organizational nodes
Platform Matrix
| Platform | Beacon Discovery | MQTT | WebSocket | REST API | Node Observer |
|---|---|---|---|---|---|
| Server (JVM) | ✅ Send/Receive | ✅ Broker | ✅ Server | ✅ Host | ServerNodeObserver |
| Android | ✅ Receive | ✅ Client | ❌ | ✅ Client | DefaultNodeObserver |
| iOS | ✅ Receive | ✅ Client | ❌ | ✅ Client | DefaultNodeObserver |
| Desktop (JVM) | ✅ Receive | ✅ Client | ❌ | ✅ Client | DefaultNodeObserver |
| WASM | ❌ | ❌ | ✅ Client | ⚠️ Via WebSocket | No-op observer |
Key Components
- NodeManager - Central node registry with
MutableMap<String, NodeFlow> - BeaconService - Multicast UDP discovery (239.255.0.69:45317)
- MQTTBroker - Server-side message broker (port 8883)
- SharedMqttClient - Platform-specific MQTT clients
- ServerSocketManager - WebSocket connections for WASM
- NodeEventBus - Event distribution within process
- NodeObserver - StateFlow collectors for node updates
Connectivity Flows
1. Initial Discovery & Connection Flow
sequenceDiagram
participant App as Client App
participant Beacon as BeaconService
participant NCM as NodeConnectionManager
participant Server as Krill Server
participant NM as NodeManager
App->>NM: init()
App->>Beacon: start() - receive beacons
Server->>Beacon: sendBeacons(NodeWire)
Note over Server: Every 2-5 seconds
Beacon->>App: onPeerSeen(NodeWire)
App->>NCM: connect(peer)
NCM->>Server: GET /trust (cert)
NCM->>Server: GET /node/{id}
Server-->>NCM: Node (full object)
NCM->>NM: update(node, observe=true)
NM->>NM: Store in nodes map
NM->>NM: Start StateFlow observer
Note over App,Server: Connection established
2. Node Update Propagation (Normal Flow)
sequenceDiagram
participant User as User Action
participant App as Client App
participant Server as Krill Server
participant MQTT as MQTT Broker
participant Apps as Other Apps
participant Beacon as BeaconService
User->>App: Edit/Update Node
App->>App: nm.update(node, post=true)
App->>Server: POST /node/{id}
Server->>Server: nm.update(node)
Server->>Server: fileOperations.update(node)
Server->>Server: NodeEventBus.post(node)
Server->>MQTT: publish(node)
Server->>Server: ServerSocketManager.broadcast(node)
Server->>Beacon: sendBeacons(node.toWire())
MQTT-->>Apps: MQTT message
Apps->>Apps: nm.update(node)
Apps->>Apps: StateFlow.update()
Note over Apps: UI auto-updates via StateFlow
3. Server-to-Server Synchronization Flow
sequenceDiagram
participant S1 as Server 1
participant B1 as Beacon Service
participant S2 as Server 2
participant B2 as Beacon Service
participant NC as NodeConnectionManager
S1->>B1: sendBeacons(self)
B1-->>B2: Multicast packet
B2->>S2: onPeerSeen(Server1Wire)
S2->>S2: readNode(Server1.id)
alt Node not found
S2->>NC: connect(Server1Wire)
NC->>S1: GET /node/{id}
S1-->>NC: Node object
NC->>S2: nm.update(node, observe=true)
else Node found but in ERROR
S2->>S2: update state to PAIRING
S2->>NC: connect(Server1Wire)
else Node found and OK
S2->>S2: Skip connection
end
Note over S1,S2: ⚠️ ISSUE: No child node sync!
4. WASM Client Update Flow
sequenceDiagram
participant WASM as WASM App
participant WS as WebSocket
participant Server as Server
participant MQTT as MQTT Broker
WASM->>Server: GET /health
Server-->>WASM: Host Node
WASM->>Server: GET /nodes
Server-->>WASM: List<Node>
WASM->>WASM: nm.update() for each
WASM->>Server: WebSocket connect /ws
Server->>WASM: WebSocket established
Note over WASM,Server: WASM cannot make REST calls to other servers (CORS)
loop Node updates
Server->>MQTT: publish(node)
Server->>WS: sendSerialized(SocketData(node))
WS-->>WASM: SocketData
WASM->>WASM: nm.update(node)
end
User->>WASM: Edit node
WASM->>WS: sendSerialized(SocketData(node))
WS->>Server: Receive update
Server->>Server: nm.update(node)
Note over Server: Server propagates to swarm
5. Background Processor Execution Flow
sequenceDiagram
participant Proc as Processor (exec)
participant Node as Node StateFlow
participant Obs as ServerNodeObserver
participant Chain as buildChain()
participant EB as NodeEventBus
Proc->>Node: update snapshot/value
Node->>Node: StateFlow.update()
Node->>Obs: Flow collector triggered
Obs->>Obs: Check state == EXECUTED
Obs->>Chain: buildChain().invoke(scope)
Chain->>Chain: exec(node) - current processor
Chain->>Chain: delay(100ms)
loop For each child
Chain->>Chain: childChain()
Note over Chain: Recursive execution
end
Obs->>Obs: nm.update(state=NONE)
Obs->>EB: NodeEventBus.post(node)
EB->>EB: MQTT publish
EB->>EB: WebSocket broadcast
EB->>EB: Beacon send
Note over Proc,EB: ⚠️ ISSUE: Recursive chain can amplify!
6. Node Deletion Cascade Flow
sequenceDiagram
participant User
participant App
participant NM as NodeManager
participant Server
participant MQTT
User->>App: Delete node
App->>Server: DELETE /node/{id}
Server->>NM: delete(node)
NM->>NM: observer.remove(id)
NM->>NM: fileOperations.delete(id)
NM->>NM: nodes.remove(id)
NM->>Server: nodeHttp.deleteNode(node)
NM->>NM: NodeEventBus.post(state=DELETING)
NM->>NM: Find children (parent==id)
loop For each child
NM->>NM: scope.launch { delete(child) }
Note over NM: ⚠️ Recursive deletion
end
Server->>MQTT: Publish delete event
Note over NM: ⚠️ ISSUE: Race with concurrent creates!
Critical Issues Identified
1. StateFlow Update Race Conditions
Location: NodeManager.update()
1
2
3
4
5
6
7
8
9
// Lines 138-148 in NodeManager.kt
if (nodes[node.id] == null) {
nodes[node.id] = NodeFlow.Success(MutableStateFlow(copy))
copy.type.exec(scope, copy)
} else {
if (nodes[node.id] is NodeFlow.Success) {
(nodes[node.id] as NodeFlow.Success).node.update { copy }
}
}
Issues:
- Check-then-act race: Between checking
nodes[node.id] == nulland setting it, another thread could insert - No mutex protection on the nodes map
- StateFlow.update doesn’t guarantee atomic read-modify-write across the map lookup
Impact: Duplicate node entries, lost updates, inconsistent state
2. NodeManager.update() Concurrent Modification
Location: NodeManager.update() lines 118-167
Issues:
- The
nodesmap is accessed without synchronization - Multiple concurrent calls to
update()for the same node can race readNode()can return stale data during an updatepostflag triggers async POST but doesn’t wait for completion
Scenario:
1
2
3
4
Thread 1: update(node1, post=true) -> starts POST
Thread 2: update(node1, post=false) -> reads node before POST completes
Thread 3: MQTT receives node1 -> another update
Result: Last write wins, middle update lost
3. Infinite Beacon Loop Between Servers
Location: Lifecycle.kt lines 84-107 + Beacon.kt
Current Flow:
- Server A updates a node
- NodeEventBus broadcasts the event
- Event handler calls
Multicast(scope).sendBeacons(node.toWire()) - Server B receives beacon
- Server B updates the node via
nm.refresh() - Server B’s NodeEventBus broadcasts
- Server B sends beacon back to Server A
- Infinite loop!
Code Evidence:
1
2
// Lifecycle.kt:92
scope.launch { Multicast(scope).sendBeacons(node.toWire()) }
Missing: Beacon deduplication, timestamp checking, or originator tracking
4. MQTT Publish Without Connection Check
Location: MQTTBroker.publish() lines 61-83
1
2
3
4
5
6
7
8
9
10
suspend fun publish(node: Node) {
publishMutex.withLock {
val clients = broker.getConnectedClientIds()
if (clients.isEmpty()) {
Logger.d("MQTT Broker: no clients connected, skipping publish")
return
}
broker.publish(...)
}
}
Issues:
- Check
clients.isEmpty()then publish is not atomic - Client can disconnect between check and publish
- No retry mechanism for transient failures
- Exception handling logs but doesn’t propagate
5. Recursive buildChain() Execution
Location: NodeFunctions.kt lines 10-33
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
fun NodeFlow.Success.buildChain(): Chain {
val children = nm.nodes()
.filter { it.parent == this.node.value.id }
.map { nm.readNode(it.id) as NodeFlow.Success }
val childChains = children.map { child -> child.buildChain() }
return {
this@buildChain.exec(this)
delay(100)
for (childChain in childChains) {
childChain()
}
}
}
Issues:
- Exponential execution: If node tree is deep, execution time grows exponentially
- No cycle detection: Circular parent references cause infinite recursion
- No timeout: Long chains can block indefinitely
- Memory: All chains built upfront, not lazily
- Concurrent modifications:
nm.nodes()can change during iteration
Example:
1
2
3
4
5
6
7
8
9
10
11
Server (3 children)
├── DataPoint A (2 children)
│ ├── Trigger 1
│ └── Trigger 2
├── DataPoint B (2 children)
│ ├── Trigger 3
│ └── Trigger 4
└── DataPoint C
Execution count: 1 + 3 + 4 = 8 exec() calls
With 10 levels: Thousands of exec() calls
6. Server-to-Server Sync Missing Children
Location: Lifecycle.kt lines 155-166 and NodeConnectionManager.kt
Current Behavior: When Server B discovers Server A via beacon:
- It downloads Server A’s host node only
- It does NOT download Server A’s child nodes
- Child nodes only sync if they individually send beacons or MQTT
Code:
1
2
3
4
// NodeConnectionManager.kt:24
val host = response.body<Node>()
nm.update(node = host.copy(state = NodeState.EXECUTED), observe = true)
// ⚠️ Only updates the server node, not children!
Expected: Should call GET /nodes to fetch all children owned by that server
7. WASM Update Reflection
Location: ServerSocketManager.jvm.kt lines 38-42
1
2
3
4
5
val socketData = receiveDeserialized<SocketData>(...)
val host = nm.readNode(socketData.node.host) as NodeFlow.Success
if (host.node.value.id == installId())
nm.update(socketData.node)
Issues:
- Cast without check:
as NodeFlow.Successcan throw - Only updates if hosted locally: Remote nodes from WASM are ignored
- No broadcasting: WASM update isn’t propagated to MQTT/other WebSockets
- CORS workaround incomplete: WASM sends to host, but host doesn’t relay to other servers
8. NodeObserver StateFlow Collection Overlap
Location: DefaultNodeObserver.kt lines 19-46
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
override fun observe(flow: NodeFlow.Success) {
if (!SystemInfo.isServer() && !jobs.containsKey(flow.node.value.id)) {
val n = flow.node
jobs[flow.node.value.id] = scope.launch {
// ...
if (jobs.containsKey(n.value.id)) {
jobs[n.value.id]?.cancel() // ⚠️ Cancels itself!
}
jobs[n.value.id] = scope.launch {
try {
flow.node.collect(collector)
// ...
}
}
}
}
}
Issues:
- Self-canceling: New job cancels itself immediately (lines 37-39)
- Double launch: Launches a job that launches another job
- Key collision: Both use same key
n.value.id - Race:
jobs.containsKey()check not synchronized with map mutation
9. Beacon Timestamp Not Used
Location: NodeWire.kt and BeaconService.kt
1
2
3
4
5
data class NodeWire(
val timestamp: Long, // ⚠️ Generated but never checked
val id: String,
// ...
)
Issues:
- Timestamp is sent but receivers don’t use it for deduplication
- Old beacons can overwrite newer state
- No expiration of stale beacons
- Network delays can cause out-of-order updates
10. File Operations Not Atomic
Location: NodeManager.kt lines 229-234
1
2
3
4
5
6
7
private suspend fun post(node: Node) {
if (node.host == installId) {
fileOperations.update(node)
} else {
nodeHttp.postNode(node)
}
}
Issues:
fileOperations.update()implementations are platform-specific- No guarantee of atomic write
- Concurrent writes can corrupt files
- No write-ahead log or journaling
- Crashes during write leave partial state
Race Conditions
Summary Table
| # | Race Condition | Location | Severity | Impact |
|---|---|---|---|---|
| RC1 | Map check-then-set | NodeManager.update() L138 | HIGH | Duplicate nodes, lost updates |
| RC2 | MQTT client disconnect | MQTTBroker.publish() L64 | MEDIUM | Failed publishes |
| RC3 | NodeObserver job map | DefaultNodeObserver.observe() L36 | HIGH | Observers don’t start |
| RC4 | File write concurrent | FileOperations.update() | MEDIUM | Corrupted persistence |
| RC5 | Swarm set update | NodeManager.updateSwarm() L40 | LOW | Inconsistent swarm view |
| RC6 | Delete vs create | NodeManager.delete() L170 | MEDIUM | Zombie nodes |
| RC7 | StateFlow vs map | NodeManager.update() L145 | HIGH | Flow emits wrong value |
| RC8 | Beacon dedup | BeaconService receive | MEDIUM | Duplicate processing |
| RC9 | Connection set | NodeConnectionManager L15 | LOW | Duplicate connections |
| RC10 | MQTT topic set | SharedMqttClient.jvm.kt L119 | MEDIUM | Duplicate subscriptions |
Exponential Execution Risks
1. buildChain() Recursion
Complexity: O(2^depth) in worst case for binary tree
Mitigation Needed:
- Add max depth limit (e.g., 10 levels)
- Implement cycle detection
- Use iterative traversal instead of recursion
- Add timeout per chain execution
2. Delete Cascade
Location: NodeManager.delete() L183-188
1
2
3
4
5
nodes().filter { n -> n.parent == node.id }.forEach { n ->
scope.launch {
delete(n) // ⚠️ Recursive
}
}
Risk: Deleting a parent with 100 children → 100 concurrent launches → each may have children → exponential
Mitigation:
- Add deletion depth limit
- Use iterative BFS/DFS
- Batch deletions
3. Beacon Storm
Scenario:
- 5 servers on network
- Node update triggers beacon from each
- Each server receives 4 beacons
- Each beacon triggers update + beacon send
- 5^n potential messages
Mitigation:
- Rate limit beacon sends
- Deduplicate by timestamp
- Only beacon on user-initiated changes, not MQTT updates
4. MQTT Subscription Explosion
Location: SharedMqttClient.jvm.kt L118
1
2
3
4
5
6
7
8
9
actual override suspend fun subscribe(node: List<Node>) {
val newTopics = topicMutex.withLock {
val topics = node.map { "/${it.id}" }
val toSubscribe = topics.filter { it !in subscribedTopics }
subscribedTopics.addAll(toSubscribe)
toSubscribe
}
// ...
}
Risk: If 1000 nodes × 10 clients = 10,000 subscriptions
Mitigation:
- Use wildcard topics (e.g.,
/server-id/#) - Subscribe to parent topics only
- Implement topic hierarchy
StateFlow Usage Issues
1. MutableStateFlow in NodeFlow.Success
Location: NodeFlow.kt L12
1
2
3
4
data class Success(
val node: MutableStateFlow<Node>,
val instance: String = Uuid.random().toString()
) : NodeFlow(...)
Issues:
- Exposed mutability: Callers can directly update the flow
- No encapsulation: No control over who updates
- Equality: Two
Successwith same node but different instance aren’t equal
Better Design:
1
2
3
4
5
6
7
8
9
class Success(
private val _node: MutableStateFlow<Node>
) : NodeFlow(...) {
val node: StateFlow<Node> = _node.asStateFlow()
fun update(transform: (Node) -> Node) {
_node.update(transform)
}
}
2. StateFlow.update Without Transaction
Location: NodeManager.update() L145
1
(nodes[node.id] as NodeFlow.Success).node.update { copy }
Issues:
updatelambda receives current value, returns new value- But map lookup
nodes[node.id]isn’t atomic with update - Node could be removed from map between lookup and update
Fix:
1
2
val nodeFlow = nodes[node.id] as? NodeFlow.Success ?: return
nodeFlow.node.update { copy }
3. Hot StateFlow Collectors
Location: ServerNodeObserver.kt L27-29
1
2
3
4
n.node.collect { collectedNode ->
Logger.i("emit ${n.node.subscriptionCount.value} ...")
// ...
}
Issues:
- Nested collection: Collecting inside a collector (L28 collects n.node again)
- Infinite recursion risk: If collector updates the same flow
- Subscription leak:
subscriptionCount.valuesuggests multiple subscribers but they’re not cleaned up
4. Concurrent StateFlow Updates
Location: ServerNodeObserver.kt L40-44
1
2
3
4
5
6
if (meta.snapshot.timestamp > cnm.snapshot.timestamp) {
nm.update(
collectedNode.copy(state = NodeState.NONE, meta = meta),
post = true
)
}
Issues:
- Check timestamp, then update - not atomic
- Another update could occur between check and update
post = truetriggers async HTTP call, but StateFlow updated synchronously
MQTT Synchronization Problems
1. Topic Naming Convention
Current: /${node.id} (e.g., /abc-123-def-456)
Issues:
- No hierarchy: Can’t subscribe to all nodes from a server
- No wildcards: Must subscribe to each node individually
- Scale: 1000 nodes = 1000 subscriptions per client
Recommendation:
1
2
3
/server/{server-id}/node/{node-id}
/server/{server-id}/node/+ # All nodes from server
/server/+/node/{specific-node} # One node across servers
2. QoS Level
Current: Qos.AT_MOST_ONCE
Issues:
- Message loss: Network issues drop updates
- No acknowledgment: Publisher doesn’t know if received
- Order not guaranteed: Updates can arrive out of sequence
Recommendation:
- Use
Qos.AT_LEAST_ONCEfor critical updates - Add sequence numbers to detect gaps
- Implement periodic full-state sync
3. Retained Messages
Current: retain = false
Issues:
- Late joiners miss state: Client connecting after update doesn’t get current value
- Requires full sync: Must fetch via HTTP on connect
Recommendation:
- Use
retain = truefor state updates - Clear retained messages on node delete
- Implement retention policy (TTL)
4. No Authentication
Current: Broker accepts any client
Issues:
- Security: No authentication or authorization
- Spoofing: Malicious client can publish fake updates
- DoS: Flood broker with messages
Recommendation:
- Enable username/password authentication
- Use client certificates
- Implement ACLs per topic
5. Publish Mutex Doesn’t Prevent Reordering
Location: MQTTBroker.kt L23, L62
1
2
3
4
5
6
7
8
private val publishMutex = Mutex()
suspend fun publish(node: Node) {
publishMutex.withLock {
// ...
broker.publish(...)
}
}
Issues:
- Only prevents concurrent publishes to broker
- Doesn’t prevent reordering at receiver
- Network can deliver out of order
- Multiple servers publishing same node ID
Example:
1
2
3
4
Server A: publish(node v1) at T1
Server B: publish(node v2) at T2
Client receives: v2, then v1 (due to network delay)
Result: Client has old v1 state
6. MQTT Client Reconnection
Location: SharedMqttClient.jvm.kt L22-95
Issues:
- No automatic reconnection: If disconnected, stays disconnected
- Subscriptions lost: Must resubscribe after reconnect
- onDisconnected callback: Called but doesn’t retry
- Topic tracking:
subscribedTopics.clear()on disconnect (L85) loses subscription intent
Fix Needed:
1
2
3
4
5
6
7
8
9
10
11
private var shouldReconnect = true
private var reconnectJob: Job? = null
override fun onDisconnected(node: Node) {
if (shouldReconnect) {
reconnectJob = scope.launch {
delay(5000) // Exponential backoff
connect(onConnected, onDisconnected)
}
}
}
Server-to-Server Synchronization
Current State
What Works:
- Server A broadcasts beacon every 2-5 seconds
- Server B receives beacon and connects
- Server B downloads Server A’s host node only
- Server B observes Server A’s host node
What’s Missing:
- Child Node Sync
1 2 3
// NodeConnectionManager.kt only gets host val host = response.body<Node>() nm.update(node = host.copy(state = NodeState.EXECUTED), observe = true)
Should be:
1 2 3 4 5 6 7 8 9 10 11
val host = response.body<Node>() nm.update(node = host, observe = true) // Fetch all children val children = httpClient.get("${peer.url}/nodes") .body<List<Node>>() .filter { it.host == host.id } children.forEach { child -> nm.update(child, observe = true) }
- MQTT Cross-Server Subscribe
- Servers don’t subscribe to other servers’ MQTT brokers
- Updates on Server A don’t reach Server B via MQTT
- Only beacons propagate (every 2-5 seconds, with dedup issues)
- Bidirectional Updates
graph LR S1[Server 1] -->|Beacon| S2[Server 2] S2 -->|Beacon| S1 S1 -->|Update node| S1 S1 -.->|No MQTT| S2 S2 -.->|No MQTT| S1 - Conflict Resolution
- No vector clocks or version numbers
- No last-write-wins timestamp enforcement
- Updates can be lost if both servers modify same node
Synchronization Scenarios
Scenario 1: New Node Created on Server A
1
2
3
4
5
6
7
8
9
10
1. User on Server A creates DataPoint X
2. Server A: nm.update(X, post=true)
3. Server A: fileOperations.create(X)
4. Server A: NodeEventBus.post(X)
5. Server A: MQTT.publish(X) → Local clients get update
6. Server A: Beacon.send(X.toWire()) → Sent every 2-5 sec
7. Server B: Beacon.receive(X)
8. Server B: nm.refresh(X)
9. Server B: HTTP GET /node/X → ✅ Server B gets node
10. Server B: nm.update(X)
Time to propagate: 2-5 seconds (beacon interval)
Issue: Beacon storms if many nodes created
Scenario 2: Node Updated on Server A
1
2
3
4
5
6
7
8
1. Background processor on Server A updates DataPoint X
2. Server A: StateFlow.update(X)
3. Server A: NodeEventBus.post(X)
4. Server A: MQTT.publish(X) → Local clients get update
5. Server A: Beacon.send(X.toWire())
6. Server B: Beacon.receive(X)
7. Server B: nm.refresh(X)
8. Server B: HTTP GET /node/X → ✅ Gets latest
Issue: Server B doesn’t subscribe to Server A’s MQTT, so only gets updates via beacons (2-5 sec delay)
Scenario 3: Concurrent Updates on Both Servers
1
2
3
4
5
6
1. Server A: DataPoint X = {value: 100, timestamp: T1}
2. Server B: DataPoint X = {value: 200, timestamp: T2}
3. Server A: Beacon.send(X)
4. Server B: Beacon.send(X)
5. Server A receives B's beacon: nm.refresh(X) → HTTP GET → {value: 200, T2}
6. Server B receives A's beacon: nm.refresh(X) → HTTP GET → {value: 100, T1}
Result: Both servers overwrite each other indefinitely (infinite loop)
Fix Needed: Compare timestamps, keep newer value
Recommended Architecture Changes
graph TB
subgraph "Server A"
A1[NodeManager A]
A2[MQTT Broker A]
A3[Beacon Service A]
A4[StateFlows A]
end
subgraph "Server B"
B1[NodeManager B]
B2[MQTT Broker B]
B3[Beacon Service B]
B4[StateFlows B]
end
A3 -.->|Discover| B3
B3 -.->|Discover| A3
A2 <-->|MQTT Bridge| B2
A1 -->|REST API| B1
B1 -->|REST API| A1
A4 -.->|Subscribe| B2
B4 -.->|Subscribe| A2
Components:
- MQTT Bridge: Servers subscribe to each other’s MQTT brokers
- Timestamp-based Conflict Resolution: Use NodeWire.timestamp
- Periodic Full Sync: Every N minutes, exchange all node IDs and checksums
- Beacon Deduplication: Track last beacon timestamp per node
WASM Client Special Cases
Architecture Constraints
Why Different:
- CORS: Browser security prevents cross-origin REST API calls
- No UDP: Browser can’t send/receive multicast beacons
- No MQTT: No native MQTT client support in WASM
Current Solution:
- WebSocket to hosting server
- All updates routed through host
- Host acts as proxy
Issues with Current Implementation
1. Single Point of Failure
graph LR
W[WASM Client] <-->|WebSocket| S1[Host Server]
S1 -.->|MQTT| S2[Other Servers]
S2 -.->|No direct path| W
Issue: WASM only gets updates from its host server, not from other servers directly
2. Asymmetric Update Flow
WASM → Server:
1
2
3
4
5
6
7
// ClientSocketManager.kt
httpClient.webSocket(urlString = ws.toString()) {
while (isActive) {
val socketData = receiveDeserialized<SocketData>(...)
nm.update(socketData.node)
}
}
Server → WASM:
1
2
3
4
// ServerSocketManager.jvm.kt
val socketData = receiveDeserialized<SocketData>(...)
if (host.node.value.id == installId())
nm.update(socketData.node)
Issue: Server only processes WASM updates if it’s the host, doesn’t relay to other servers!
3. WASM Can’t Edit Remote Nodes
Scenario:
- WASM client connected to Server A
- Node X is hosted on Server B
- User edits Node X in WASM
- WASM sends update to Server A via WebSocket
- Server A checks:
if (host.node.value.id == installId())→ FALSE - Update is silently dropped!
Fix Needed:
1
2
3
4
5
6
7
8
9
10
// ServerSocketManager.jvm.kt
val socketData = receiveDeserialized<SocketData>(...)
val targetHost = nm.readNode(socketData.node.host)
if (targetHost.id == installId()) {
nm.update(socketData.node)
} else {
// Relay to target server
nodeHttp.postNode(socketData.node)
}
4. No Offline Capability
Issue: WASM has no persistence (FileOperations.wasmJs.kt is no-op)
Impact:
- Refresh page → lose all state
- Must re-download all nodes on reconnect
- No caching
Recommendation:
- Use IndexedDB for persistence
- Cache nodes in localStorage
- Implement service worker for offline mode
5. WebSocket Reconnection
Location: ClientSocketManager.kt
Current: Single webSocket() call, no error recovery
Issues:
- Connection drops → WASM stops receiving updates
- No reconnection logic
- No exponential backoff
- User must refresh page
Fix:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
suspend fun start() {
while (isActive) {
try {
connectWebSocket()
} catch (e: Exception) {
Logger.e("WebSocket failed", e)
delay(5000) // Retry
}
}
}
private suspend fun connectWebSocket() {
httpClient.webSocket(...) {
// ...
}
}
Action Items
Critical (Fix Immediately)
- [CR-1] Fix NodeManager Map Synchronization
- Add
Mutexaroundnodesmap operations - Ensure atomic check-then-set operations
- File:
NodeManager.kt```kotlin private val nodesMutex = Mutex()
suspend fun update(node: Node, …) { nodesMutex.withLock { // existing logic } } ```
- Add
- [CR-2] Fix Infinite Beacon Loop
- Add originator tracking to NodeWire
- Don’t send beacon for updates received via beacon/MQTT
- File:
Lifecycle.kt,BeaconService.kt```kotlin data class NodeWire( val timestamp: Long, val id: String, val type: KrillApp, val url: String, val host: String, val originServer: String // NEW )
// Only beacon if originated locally if (node.host == installId()) { Multicast(scope).sendBeacons(node.toWire()) } ```
- [CR-3] Fix buildChain() Infinite Recursion
- Add cycle detection
- Add depth limit (max 20 levels)
- Add timeout
- File:
NodeFunctions.kt1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
fun NodeFlow.Success.buildChain( visited: Set<String> = emptySet(), depth: Int = 0 ): Chain { if (depth > 20) throw IllegalStateException("Max depth") if (node.value.id in visited) { return { /* no-op */ } } val newVisited = visited + node.value.id val children = nm.nodes() .filter { it.parent == node.value.id } .map { nm.readNode(it.id) as NodeFlow.Success } val childChains = children.map { it.buildChain(newVisited, depth + 1) } return { withTimeout(30_000) { this@buildChain.exec(this) delay(100) for (childChain in childChains) { childChain() } } } }
- [CR-4] Fix NodeObserver Double Launch
- Remove self-canceling logic
- Simplify job management
- File:
DefaultNodeObserver.kt1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
override fun observe(flow: NodeFlow.Success) { val id = flow.node.value.id if (!SystemInfo.isServer() && !jobs.containsKey(id)) { jobs[id] = scope.launch { try { flow.node.collect { collectedNode -> when (collectedNode.state) { NodeState.EXECUTED -> { flow.buildChain().invoke(scope) } else -> { Logger.w("Node not processed: ${collectedNode.state}") } } } } finally { Logger.w("Exited node observing job $id") } } } }
- [CR-5] Fix WASM Relay to Remote Servers
- Make server relay WASM updates to target servers
- File:
ServerSocketManager.jvm.kt1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
try { while (isActive) { val socketData = receiveDeserialized<SocketData>(...) val targetHost = nm.readNode(socketData.node.host) when (targetHost) { is NodeFlow.Success -> { if (targetHost.node.value.id == installId()) { nm.update(socketData.node) } else { // Relay to actual host server scope.launch { nodeHttp.postNode(socketData.node) } } } is NodeFlow.Error -> { Logger.e("Unknown host for node ${socketData.node.id}") } } } } catch (t: Throwable) { Logger.e("WebSocket error", t) }
High Priority
- [HP-1] Implement Server-to-Server Child Sync
- Fetch all children when connecting to peer server
- File:
NodeConnectionManager.kt1 2 3 4 5 6 7 8 9 10 11 12
if (success) { // Get host node val host = httpClient.get(peer.url).body<Node>() nm.update(node = host.copy(state = NodeState.EXECUTED), observe = true) // Get all children val allNodes = httpClient.get("${peer.url}/nodes").body<List<Node>>() val children = allNodes.filter { it.host == peer.id } children.forEach { child -> nm.update(child, observe = true) } }
- [HP-2] Add Timestamp-Based Conflict Resolution
- Use NodeWire.timestamp for deduplication
- File:
NodeManager.kt1 2 3 4 5 6 7 8 9 10 11 12 13 14
fun update(node: Node, post: Boolean = false, observe: Boolean = false) { val existing = readNode(node.id) if (existing is NodeFlow.Success) { val existingTimestamp = (existing.node.value.meta as? TimestampedMetadata)?.timestamp ?: 0 val newTimestamp = (node.meta as? TimestampedMetadata)?.timestamp ?: 0 if (newTimestamp <= existingTimestamp) { Logger.d("Ignoring older update for ${node.id}") return } } // ... rest of update logic }
- [HP-3] Add MQTT Reconnection Logic
- Automatically reconnect on disconnect
- Resubscribe to topics
- File:
SharedMqttClient.jvm.kt1 2 3 4 5 6 7 8 9 10 11 12 13
actual override suspend fun connect(onConnected: (Node) -> Unit, onDisconnected: (Node) -> Unit) { val reconnectingOnDisconnected: (Node) -> Unit = { node -> onDisconnected(node) if (shouldReconnect) { scope.launch { delay(5000) connect(onConnected, reconnectingOnDisconnected) } } } // ... existing connection logic with reconnectingOnDisconnected }
- [HP-4] Add WebSocket Reconnection for WASM
- Retry on connection failure
- File:
ClientSocketManager.kt```kotlin fun start() { scope.launch { while (isActive) { try { connectWebSocket() } catch (e: Exception) { Logger.e(“WebSocket failed, retrying in 5s”, e) delay(5000) } } } }
private suspend fun connectWebSocket() { httpClient.webSocket(urlString = ws.toString()) { while (isActive) { val socketData = receiveDeserialized
(...) nm.update(socketData.node) } } } ``` - [HP-5] Encapsulate MutableStateFlow
- Hide mutation, expose StateFlow
File:
NodeFlow.kt```kotlin data class Success( private val _node: MutableStateFlow, val instance: String = Uuid.random().toString() ) : NodeFlow(exec = { scope -> _node.value.type.exec(scope, _node.value) }) { val node: StateFlow
= _node.asStateFlow() fun updateNode(transform: (Node) -> Node) { _node.update(transform) } }
// Update all callers to use .updateNode() instead of .node.update() ```
Medium Priority
- [MP-1] Implement Beacon Deduplication
- Track last beacon timestamp per node
- File:
BeaconService.kt
- [MP-2] Add MQTT Topic Hierarchy
- Use
/server/{id}/node/{nodeId}format - Enable wildcard subscriptions
- Use
- [MP-3] Implement MQTT QoS 1
- Change from AT_MOST_ONCE to AT_LEAST_ONCE
- Add sequence numbers
- [MP-4] Add Deletion Depth Limit
- Prevent unbounded recursive deletion
- File:
NodeManager.kt
- [MP-5] Add Authentication to MQTT
- Require username/password
- Implement ACLs
- [MP-6] Implement File Write Atomicity
- Use temp file + rename pattern
- Add write-ahead logging
- [MP-7] Add WASM Persistence
- Use IndexedDB for node storage
- File:
FileOperations.wasmJs.kt
- [MP-8] Rate Limit Beacon Sends
- Debounce beacon sends (max 1 per second per node)
- File:
Lifecycle.kt
- [MP-9] Add Metrics/Monitoring
- Track update latency
- Monitor beacon traffic
- Log MQTT message rates
- [MP-10] Implement Periodic Full Sync
- Every 5 minutes, compare node checksums between servers
- Detect and fix drift
Additional Findings
1. No Error State Recovery
Observation: Nodes can enter NodeState.ERROR but there’s no automatic recovery
Location: PeerConnector.kt L48 shows reconnection on ERROR, but only for server nodes
Recommendation:
- Implement periodic health checks
- Auto-retry failed operations
- Expose retry action in UI
2. Memory Leaks in NodeManager
Issue: nodes map grows unbounded, never cleaned up
Impact: Long-running servers accumulate deleted nodes in memory
Fix:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
fun cleanup() {
val staleNodes = nodes.filter { (id, flow) ->
flow is NodeFlow.Error ||
(flow is NodeFlow.Success && flow.node.value.state == NodeState.DELETING)
}
staleNodes.keys.forEach { nodes.remove(it) }
}
// Call periodically
scope.launch {
while (isActive) {
delay(60_000)
cleanup()
}
}
3. Missing Logging Context
Issue: Logs don’t include correlation IDs or request traces
Example:
1
2
Logger.i("updating node: read node file op")
// Which node? From where? Why?
Recommendation:
- Add structured logging with context
- Include node ID, operation, source (MQTT/HTTP/Beacon)
4. No Circuit Breaker for External Calls
Issue: HTTP calls to other servers can hang indefinitely
Location: NodeHttp.kt, NodeConnectionManager.kt
Recommendation:
1
2
3
4
5
6
7
httpClient.get(url) {
timeout {
requestTimeoutMillis = 5000
connectTimeoutMillis = 3000
socketTimeoutMillis = 5000
}
}
5. No Backpressure Handling
Issue: Fast producers (MQTT messages) can overwhelm slow consumers (StateFlow collectors)
Recommendation:
- Use
buffer()orconflate()on StateFlows - Implement rate limiting on MQTT processing
6. Hardcoded Configuration
Locations:
DiscoveryConfig.PORT = 45317MQTTBrokerport = 8883ServerMetaData.port
Issue: Can’t run multiple servers on same host for testing
Recommendation:
- Use environment variables
- Support config files
7. No Schema Versioning
Issue: Node serialization format has no version field
Impact: Breaking changes to Node/NodeMetaData break compatibility
Recommendation:
1
2
3
4
5
6
@Serializable
data class Node(
val schemaVersion: Int = 1, // NEW
val id: String,
// ...
)
8. Certificate Management
Issue: TLS certificates hardcoded at /etc/krill/certs/krill.crt
Security: Private key password is "changeit" (L28 in MQTTBroker.kt)
Recommendation:
- Use environment variable for cert path
- Support cert rotation
- Use secure secret management (not hardcoded password)
9. No Transaction Support
Issue: Multi-step operations aren’t atomic
Example: Creating node + children can partially fail
Recommendation:
- Implement transaction boundaries
- Add rollback capability
- Use database for persistence instead of files
10. Platform-Specific Code Duplication
Observation: MQTT clients have duplicate logic across platforms
Files:
SharedMqttClient.jvm.ktSharedMqttClient.android.ktSharedMqttClient.ios.kt
Recommendation:
- Extract common logic to
BaseMqttClient - Reduce duplication
Recommendations
Short Term (1-2 Weeks)
- Fix Critical Race Conditions
- Add mutexes to NodeManager
- Fix observer double launch
- Stop infinite beacon loop
- Add Defensive Programming
- Null checks before casts
- Timeout all network calls
- Add max depth limits
- Improve Observability
- Add structured logging
- Log correlation IDs
- Add performance metrics
Medium Term (1-2 Months)
- Redesign Server-to-Server Sync
- Implement MQTT bridge between servers
- Add timestamp-based conflict resolution
- Fetch all children on peer discovery
- Harden MQTT
- Upgrade to QoS 1
- Add authentication
- Implement topic hierarchy
- Improve WASM Support
- Add IndexedDB persistence
- Implement proper relaying
- Add WebSocket reconnection
Long Term (3-6 Months)
- Replace File Persistence with Database
- Use SQLite for local storage
- Add transaction support
- Enable atomic updates
- Implement Event Sourcing
- Log all node changes as events
- Replay for consistency
- Enable audit trail
- Add Consensus Protocol
- Use Raft or similar for multi-server consistency
- Elect leader for coordination
- Handle network partitions
- Performance Optimization
- Batch MQTT publishes
- Implement caching layer
- Optimize StateFlow collectors
Prompt for Future Updates
Use this prompt to update this report in the future:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
This KMP codebase has apps for different platforms and a Ktor server. Apps launch, initialize, and wait for peers to advertise themselves as servers on the same network using UDP multicast beacons. When an app sees a beacon from a server (see KrillApp.Server), it downloads and displays all child nodes that server owns in a graph of connected nodes.
Apps connect over MQTT for real-time updates. If a user edits or deletes a node, we POST to the server host node, which processes the change from the REST POST/DELETE, broadcasts the change to other connected apps over MQTT, and advertises it via BeaconService. The server also runs background processes that update and execute chains of nodes and cleanly informs clients of new info over MQTT.
The WASM module is an exception: it doesn't use MQTT and can't POST to other servers due to CORS. There's a dedicated WebSocket connection for the WASM client to get updates from the hosting server. Instead of making REST calls to servers, WASM sends changes to its host, which informs the rest of the swarm.
**Server nodes also act as apps** and should get beacons from other servers so everyone has a synchronized list of nodes in NodeManager.
**Your Task:**
1. Do a deep dive into the connectivity and synchronization process
2. Update ConnectivityReport.md with your findings
3. Use Mermaid syntax for diagrams
4. Analyze flows and identify issues that would cause:
- Updates to go wrong
- Race conditions
- Exponential executions
- Infinite loops
5. Create Mermaid diagrams showing logic errors
6. Provide a list of action items to harden the flow
7. Include findings, recommendations, and notes
8. Update this prompt based on what you learn
**Key Files to Review:**
- `krill-sdk/src/commonMain/kotlin/krill/zone/KrillApp.kt` - Application architecture
- `krill-sdk/src/commonMain/kotlin/krill/zone/node/NodeManager.kt` - Node management and StateFlows
- `krill-sdk/src/commonMain/kotlin/krill/zone/beacon/BeaconService.kt` - Multicast discovery
- `krill-sdk/src/jvmMain/kotlin/krill/zone/io/SharedMqttClient.jvm.kt` - MQTT client
- `server/src/main/kotlin/krill/zone/server/mqtt/MQTTBroker.kt` - MQTT broker
- `krill-sdk/src/commonMain/kotlin/krill/zone/io/ClientSocketManager.kt` - WASM WebSocket
- `shared/src/jvmMain/kotlin/krill/zone/server/ServerSocketManager.jvm.kt` - Server WebSocket
- `server/src/main/kotlin/krill/zone/server/Routes.kt` - REST API
- `server/src/main/kotlin/krill/zone/server/Lifecycle.kt` - Server lifecycle and events
**Focus Areas:**
- StateFlow usage and concurrent updates
- MQTT message handling and synchronization
- WebSocket communication for WASM
- Server-to-server synchronization via beacons
- Race conditions in NodeManager
- Infinite loops in beacon/MQTT propagation
- buildChain() recursive execution
- Node deletion cascades
- Error handling and recovery
This code is in early development, so look for logic flaws and bugs.
Conclusion
The Krill connectivity system has a solid foundation with multi-transport support (beacons, MQTT, WebSockets, REST) and cross-platform capabilities. However, it suffers from several critical issues related to concurrency, synchronization, and distributed system consistency.
Immediate Action Required:
- Fix race conditions in NodeManager
- Stop infinite beacon loops
- Add recursion limits to buildChain()
- Fix WASM relay logic
Architecture Improvements Needed:
- Proper server-to-server synchronization
- Timestamp-based conflict resolution
- MQTT reconnection and QoS upgrades
- Transaction boundaries for multi-step operations
Long-Term Vision: Consider evolving toward a more robust distributed architecture with:
- Consensus protocol for multi-server deployments
- Event sourcing for auditability
- Database-backed persistence
- Proper circuit breakers and backpressure
The current implementation works for single-server or trusted network scenarios but needs hardening for production use with multiple servers and untrusted networks.
Report Version: 1.0
Last Updated: 2025-12-03
Next Review: After implementing critical action items