Snapshot Data Lifecycle Deep Dive
Comprehensive analysis of Snapshot data lifecycle including client-server communication, processing flows, deduplication mechanisms, and identified bugs with fixes
Snapshot Data Lifecycle Deep Dive
Overview
This document provides a comprehensive analysis of how Snapshot data flows through the Krill system, from user input on the client to storage on the server, and back to all connected clients via WebSocket. It identifies critical bugs, redundancies, and provides actionable TODO items with agent prompts for fixes.
Core Components
Snapshot Data Class
1
2
3
4
data class Snapshot(
val timestamp: Long = 0L,
val value: String = ""
)
The Snapshot class represents a point-in-time value for a DataPoint. It contains:
- timestamp: Epoch milliseconds when the value was captured
- value: String representation of the data (can be parsed as Double for numeric types)
NodeState Enum (Key States)
1
2
3
4
5
6
enum class NodeState {
// ... other states ...
EXECUTED, // Node action requested (e.g., toggle BOOL, run trigger)
SNAPSHOT_UPDATE, // Data value changed (e.g., sensor reading, user input)
// ... other states ...
}
Key Classes
| Class | Purpose |
|---|---|
Snapshot | Point-in-time value with timestamp |
SnapshotProcessor | Queue for pending snapshots before storage |
SnapshotTracker | Tracks processed timestamps to detect stale data |
SnapshotQueueService | Drains queue and persists to DataStore |
DataPointMetaData | Contains the current snapshot value |
ServerDataPointProcessor | Processes DataPoint state changes |
NodeTrafficControl | Transaction ID deduplication |
Client-Server Snapshot Flow
User Input to Server Storage
sequenceDiagram
participant User
participant UI as Compose UI
participant CNM as ClientNodeManager
participant TC as TrafficControl
participant HTTP as NodeHttp
participant Server as Ktor Server
participant SNM as ServerNodeManager
participant NE as NodeEventBus
participant SDP as ServerDataPointProcessor
participant SP as SnapshotProcessor
participant SQS as SnapshotQueueService
participant DS as DataStore
User->>UI: Enter new value
UI->>CNM: nodeManager.updateSnapshot(node)
CNM->>CNM: Generate tid, set SNAPSHOT_UPDATE
CNM->>TC: trafficControl.record(tid)
CNM->>HTTP: postNode(host, node)
HTTP->>Server: POST /node/{id}
Server->>TC: Check tid exists
alt Duplicate TID
Server-->>HTTP: 409 Conflict
else New TID
Server->>TC: record(tid)
Server->>SNM: updateSnapshot(node)
SNM->>SNM: updateInternal() - verify()
SNM->>SDP: emit via StateFlow
SDP->>SDP: processMyDataPoint()
SDP->>SP: enqueue(node, snapshot)
SP->>SQS: size.collect triggers drain
SQS->>DS: post(node) - persist to file
SNM->>NE: broadcast(node)
NE->>WebSocket: sendSerialized(node)
Server-->>HTTP: 200 OK
end
Server to Client WebSocket Broadcast
sequenceDiagram
participant SNM as ServerNodeManager
participant NE as NodeEventBus
participant SM as ServerSocketManager
participant WS as WebSocket
participant CSM as ClientSocketManager
participant TC as TrafficControl
participant CNM as ClientNodeManager
participant UI as Compose UI
SNM->>NE: broadcast(node)
NE->>SM: broadcast(node)
SM->>WS: sendSerialized(node)
WS->>CSM: receiveDeserialized()
CSM->>TC: contains(tid)?
alt TID is mine
CSM->>CSM: Skip (ack of my POST)
else TID is new
CSM->>CNM: update(node)
CNM->>CNM: Check timestamp > existing
CNM->>UI: StateFlow.update()
UI->>UI: Recompose with new value
end
Server-Side Processing Flow
SNAPSHOT_UPDATE Processing
flowchart TD
A[Node StateFlow emits] --> B{Check node.state}
B -->|SNAPSHOT_UPDATE| C{node.isMine()?}
C -->|Yes| D[processMyDataPoint]
C -->|No| E[process - send to external server]
D --> F{Duplicate snapshot?}
F -->|Yes| G[Skip - return true]
F -->|No| H{Snapshot stale?}
H -->|Yes| I[Skip - return true]
H -->|No| J[snapshotTracker.post]
J --> K[nodeManager.running]
K --> L[snapshotProcessor.enqueue]
L --> M[handleSpecialEvents]
M --> N{Parent is Zigbee?}
N -->|Yes| O[zigbee.sendCommand]
N -->|No| P[Return success]
O --> P
E --> Q[Find host server]
Q --> R[nodeHttp.postNode]
R --> S[nodeManager.complete]
EXECUTED Processing (DataPoint with BOOL Type)
flowchart TD
A[Node StateFlow emits] --> B{Check node.state}
B -->|EXECUTED| C[executeDataPoint]
C --> D{meta.dataType?}
D -->|BOOL| E{snapshot.value <= 0?}
E -->|Yes - is OFF| F["updateSnapshot(value=1)"]
E -->|No - is ON| G["updateSnapshot(value=0)"]
F --> H[SNAPSHOT_UPDATE triggers]
G --> H
D -->|Other| I[Return true - no action]
style F fill:#f96,stroke:#333
style G fill:#f96,stroke:#333
⚠️ BUG IDENTIFIED: This toggle logic causes problems when a Connection executor sets a target DataPoint to EXECUTED state with a specific value - the value gets toggled instead of preserved!
Connection Executor Flow
Current (Buggy) Behavior
flowchart TD
A[Connection EXECUTED] --> B[Get source snapshot]
B --> C[Get target node]
C --> D["Copy snapshot to target"]
D --> E["target.state = EXECUTED ⚠️"]
E --> F[ServerDataPointProcessor.post]
F --> G{DataType.BOOL?}
G -->|Yes| H["executeDataPoint TOGGLES value! ⚠️"]
G -->|No| I[No toggle - OK]
H --> J[Value is now OPPOSITE]
style E fill:#f96,stroke:#333
style H fill:#f00,stroke:#333
Problem Scenario: Zigbee Plug Control
sequenceDiagram
participant Plug as Zigbee Plug
participant ZB as ServerZigbeeBoss
participant DP1 as DataPoint (Zigbee)
participant CONN as Connection
participant DP2 as Target DataPoint
participant SDP as ServerDataPointProcessor
Note over Plug: User turns ON plug manually
Plug->>ZB: ReportAttributesCommand (value=1)
ZB->>DP1: updateSnapshot(value="1")
Note over DP1: State: SNAPSHOT_UPDATE
DP1->>CONN: Execute child (Connection)
CONN->>DP2: Copy value="1", state=EXECUTED
Note over DP2: State: EXECUTED, value="1"
DP2->>SDP: post(node)
SDP->>SDP: executeDataPoint() - BOOL toggle!
SDP->>DP2: updateSnapshot(value="0") ⚠️
Note over DP2: Now value="0" (toggled!)
DP2->>ZB: handleSpecialEvents
ZB->>Plug: turnOff() ⚠️
Note over Plug: Plug turns OFF immediately!
The plug immediately turns off because the Connection executor sets EXECUTED state, which triggers the toggle logic!
Transaction ID (tid) Deduplication
Purpose
The tid (Transaction ID) prevents duplicate processing when:
- Client POST reaches server, server broadcasts update
- Client receives its own update back via WebSocket
- Without tid check, client would re-POST, creating a loop
Flow
flowchart TD
A[Client: Generate tid] --> B[trafficControl.record tid]
B --> C[POST to server]
C --> D[Server: Check tid exists]
D -->|Exists| E[409 Conflict - duplicate]
D -->|New| F[Server: record tid]
F --> G[Process node]
G --> H[Broadcast via WebSocket]
H --> I[Client receives broadcast]
I --> J[trafficControl.contains tid?]
J -->|Yes - my tid| K[Skip - this is ack of my POST]
J -->|No| L[Update local node]
style K fill:#9f9,stroke:#333
Cleanup
1
2
3
4
5
6
7
// NodeTrafficControl - auto-cleanup after 20 seconds
scope.launch {
delay(20000)
mutex.withLock {
map.remove(tid)
}
}
Snapshot Queue Processing
Queue Architecture
flowchart LR
subgraph Input
A[processMyDataPoint] --> B[snapshotProcessor.enqueue]
end
subgraph Queue
B --> C[SnapshotQueue]
C --> D[Deduplicate by timestamp]
D --> E[Check if stale]
end
subgraph Service
E --> F[SnapshotQueueService]
F --> G[size StateFlow triggers]
G --> H[drainProcessableSnapshots]
H --> I[Sort by timestamp]
I --> J[dataStore.post each]
end
subgraph Storage
J --> K[ServerDataStore]
K --> L[File: data/YYYY/MM/DD/nodeId]
end
SnapshotTracker Staleness Check
The SnapshotTracker maintains a map of node IDs to their last processed timestamps:
1
2
3
4
5
6
7
// map: MutableMap<String, Long> - tracks last processed timestamp per node ID
suspend fun stale(node: Node): Boolean {
val meta = node.meta as DataPointMetaData
if (meta.snapshot.timestamp <= 0L) return true // Invalid timestamp
if (map[node.id] == null) return false // Never seen - not stale
return map[node.id]!! >= meta.snapshot.timestamp // Already processed newer/equal
}
Identified Bugs and Issues
Bug 1: BOOL Toggle on Connection Copy
Location: ServerConnectionProcessor.process() method, in the main snapshot copy block
Root Cause: Setting state = NodeState.EXECUTED on target triggers toggle
Impact: Zigbee plugs turn off immediately when turned on manually
flowchart TD
A["Connection.process()"] --> B["target.state = EXECUTED"]
B --> C["ServerDataPointProcessor sees EXECUTED"]
C --> D["executeDataPoint() toggles BOOL"]
D --> E["Value becomes opposite ⚠️"]
style B fill:#f96,stroke:#333
style D fill:#f00,stroke:#333
Fix: Use NodeState.SNAPSHOT_UPDATE instead of NodeState.EXECUTED
Bug 2: executeDataPoint Return Value Inconsistency
Location: ServerDataPointProcessor.executeDataPoint() method, BOOL toggle logic
Issue: Returns true when toggling to 1, false when toggling to 0
Impact: May cause false error states
1
2
3
4
5
6
7
return if (meta.snapshot.value.toDouble() <= 0) {
nodeManager.updateSnapshot(/*...*/ value = "1")
true // ✅ Success
} else {
nodeManager.updateSnapshot(/*...*/ value = "0")
false // ❌ False means ERROR state!
}
Fix: Return true in both cases since both are successful operations
Bug 3: ConnectionResolver.execute vs ServerConnectionProcessor.process Inconsistency
Location: Two different implementations for Connection execution
Issue: ConnectionResolver.execute() uses SNAPSHOT_UPDATE (correct), but ServerConnectionProcessor.process() uses EXECUTED (incorrect)
1
2
3
4
5
// ConnectionResolver.execute() - CORRECT
nodeManager.updateSnapshot(targetNode.copy(...))
// ServerConnectionProcessor.process() - INCORRECT
nodeManager.update(target.value.copy(state = NodeState.EXECUTED, ...))
Bug 4: Potential Race in SnapshotTracker
Location: SnapshotTracker class, stale() and post() methods
Issue: Check and update not atomic - possible race condition
Bug 5: Memory Leak in processedSnapshots Cleanup
Location: ServerDataPointProcessor.processMyDataPoint() method, processedSnapshots cleanup block
Issue: Cleanup removes oldest entries but set operations may be slow
Redundancy Analysis
1. Duplicate Snapshot Checking
Snapshots are checked for duplicates in multiple places:
| Location | Method | Purpose |
|---|---|---|
ServerDataPointProcessor | processedSnapshots set | Dedup by timestamp |
SnapshotTracker | stale() check | Detect out-of-order |
SnapshotQueue | add() duplicate check | Queue-level dedup |
ServerNodeManager | verify() timestamp compare | Reject older data |
Recommendation: Consolidate into single deduplication point
2. State Transition Duplication
Multiple places set state and call update:
1
2
3
4
5
6
7
8
// BaseNodeProcessor.post()
nodeManager.complete(node)
// NodeProcessExecutor.handleBaseOperations()
fileOperations.update(node.copy(state = NodeState.NONE))
// ServerDataPointProcessor.processMyDataPoint()
nodeManager.running(node)
3. Traffic Control vs Timestamp Deduplication
Both NodeTrafficControl (tid-based) and timestamp comparisons are used. Consider if both are necessary.
TODO Items with Agent Prompts
TODO 1: Fix Connection Executor State Bug
Priority: HIGH
Files: krill-sdk/src/commonMain/kotlin/krill/zone/krillapp/executor/connection/ServerConnectionProcessor.kt
Agent Prompt:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
In ServerConnectionProcessor.kt, the process() method incorrectly sets the target
DataPoint to NodeState.EXECUTED when copying a snapshot value. This causes BOOL
DataPoints to toggle their value instead of receiving the copied value.
Change the main snapshot copy block from:
nodeManager.update(target.value.copy(
timestamp = Clock.System.now().toEpochMilliseconds(),
meta = targetMeta.copy(snapshot = newSnapshot),
state = NodeState.EXECUTED
))
To:
nodeManager.updateSnapshot(target.value.copy(
meta = targetMeta.copy(snapshot = newSnapshot)
))
Note: updateSnapshot() automatically sets state to SNAPSHOT_UPDATE and generates
a new timestamp, so we don't need to specify them explicitly. This avoids the
toggle logic that executes for EXECUTED state.
TODO 2: Fix executeDataPoint Return Value
Priority: MEDIUM
Files: krill-sdk/src/commonMain/kotlin/krill/zone/krillapp/datapoint/ServerDataPointProcessor.kt
Agent Prompt:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
In ServerDataPointProcessor.kt, the executeDataPoint() method returns false when
toggling a BOOL value from 1 to 0. This is incorrect because both toggle directions
are successful operations.
Change the BOOL toggle logic in executeDataPoint() from:
DataType.BOOL -> {
return if (meta.snapshot.value.toDouble() <= 0) {
nodeManager.updateSnapshot(/*...*/ value = "1")
true
} else {
nodeManager.updateSnapshot(/*...*/ value = "0")
false // <-- BUG: should be true
}
}
To:
DataType.BOOL -> {
val newValue = if (meta.snapshot.value.toDouble() <= 0) "1" else "0"
// updateSnapshot() will auto-generate timestamp and set SNAPSHOT_UPDATE state
nodeManager.updateSnapshot(node.copy(
meta = meta.copy(snapshot = Snapshot(value = newValue))
))
return true // Both directions are success
}
Note: The Snapshot is created with just the value because updateSnapshot()
automatically assigns a new timestamp and sets the state to SNAPSHOT_UPDATE.
This is handled in BaseNodeManager.updateSnapshot():
update(node.copy(state = NodeState.SNAPSHOT_UPDATE,
timestamp = Clock.System.now().toEpochMilliseconds(),
tid = Uuid.random().toString()))
TODO 3: Add Unit Test for Connection to BOOL DataPoint
Priority: HIGH
Files: New test file in krill-sdk/src/test/kotlin/
Agent Prompt:
1
2
3
4
5
6
7
8
9
10
Create a unit test that verifies when a Connection executor copies a value "1"
from a source DataPoint to a target DataPoint with DataType.BOOL, the target
receives value "1" (not toggled to "0").
Test should:
1. Create source DataPoint with snapshot value "1"
2. Create target DataPoint with DataType.BOOL and snapshot value "0"
3. Create Connection with source and target
4. Execute the Connection
5. Assert target snapshot value is "1" (copied) not "0" (toggled)
TODO 4: Consolidate Snapshot Deduplication
Priority: LOW
Files: Multiple - analysis required
Agent Prompt:
1
2
3
4
5
6
7
8
9
10
11
12
Analyze the snapshot deduplication logic across these files:
- ServerDataPointProcessor.kt (processedSnapshots set)
- SnapshotTracker.kt (stale check)
- SnapshotQueue.kt (add duplicate check)
- ServerNodeManager.kt (verify timestamp compare)
Create a proposal to consolidate these into a single, clear deduplication
strategy. Consider:
1. What is the source of truth for "already processed"?
2. Where should deduplication happen (earliest possible point)?
3. How to handle out-of-order arrivals?
4. Memory management for tracking processed timestamps
TODO 5: Document EXECUTED vs SNAPSHOT_UPDATE Contract
Priority: MEDIUM
Files: Documentation
Agent Prompt:
1
2
3
4
5
6
7
8
9
10
11
12
13
Create documentation clarifying when to use EXECUTED vs SNAPSHOT_UPDATE states:
EXECUTED:
- Trigger an action (run cron, execute lambda, toggle BOOL)
- Implies "do something" based on current state
- For BOOL DataPoints: toggles the value
SNAPSHOT_UPDATE:
- Update data value without triggering action logic
- Implies "store this value"
- For BOOL DataPoints: stores value as-is without toggle
Add examples showing correct usage in Connection, Calculation, Lambda executors.
TODO 6: Fix Race Condition in SnapshotTracker
Priority: LOW
Files: krill-sdk/src/commonMain/kotlin/krill/zone/krillapp/datapoint/SnapshotTracker.kt
Agent Prompt:
1
2
3
4
5
6
7
8
The SnapshotTracker has a potential race condition between stale() check and
post() update. Both use mutex.withLock individually, but a sequence of
stale() -> post() is not atomic.
Consider:
1. Adding a single atomic checkAndPost() method
2. Using a concurrent map with atomic operations
3. Documenting that callers must ensure exclusive access
Best Practices for Snapshot Handling
✅ DO
- Use
updateSnapshot()for data updates - Ensures SNAPSHOT_UPDATE state - Use
execute()for action triggers - Correctly triggers executeDataPoint - Always check
isMine()before processing - Respect node ownership - Use
tidfor cross-process deduplication - Prevents client-server echo loops - Compare timestamps before updates - Reject stale data
❌ DON’T
- Don’t set EXECUTED state when copying data - Causes BOOL toggle
- Don’t use
update()with manual state changes - Use helper methods - Don’t skip
tidrecording on HTTP calls - Breaks deduplication - Don’t process DELETING state nodes - Already cleaned up