Node Processor Refactoring Summary
Complete refactoring of node processor architecture extracting common control flow logic into a centralized NodeProcessExecutor class, creating a clean state machine pattern for processing nodes
Node Processor Refactoring Summary
Overview
This refactoring extracts common control flow logic from all server processors into a centralized NodeProcessExecutor class, creating a clean state machine pattern for processing nodes.
Key Changes
1. New Core Component: NodeProcessExecutor
Location: /krill-sdk/src/commonMain/kotlin/krill/zone/node/NodeProcessExecutor.kt
Purpose: Centralized executor that handles:
- Job lifecycle management (creation, tracking, completion)
- Mutex-based deduplication to prevent race conditions
- Timestamp-based deduplication (100ms window)
- Automatic error handling and node state transitions
- Child node execution on success
- Automatic error state setting on failure
Key Features:
submit()method accepts task logic as a lambda returning Boolean (success/failure)- Handles all coroutine job management, invokeOnCompletion callbacks
- Protects against ERROR state processing
- Provides
keepJobRunningoption for long-running tasks (e.g., cron jobs) - Thread-safe with mutex protection
2. Refactored BaseNodeProcessor
Changes:
- Now uses
NodeProcessExecutorfor all operations - Simplified base
post()method handles common state transitions - Constructor reordered:
fileOperationsfirst, then overrides - Removed duplicate mutex/jobs map - now handled by executor
3. All Server Processors Refactored
Before Pattern (Example):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private val jobs = mutableMapOf<String, Job>()
private val mutex = Mutex()
override fun post(node: Node) {
super.post(node)
scope.launch {
mutex.withLock {
if (!jobs.contains(node.id) && node.isMine()) {
val job = scope.launch {
when (node.state) {
NodeState.EXECUTED -> {
try {
// processing logic mixed with control flow
process(node)
} catch (e: Exception) {
nodeManager.error(node)
}
}
}
}
jobs[node.id] = job
job.invokeOnCompletion {
jobs.remove(node.id)
}
}
}
}
}
After Pattern:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
override fun post(node: Node) {
super.post(node) // Calls BaseNodeProcessor.post() first
if (!node.isMine()) return // Early exit for non-owned nodes
scope.launch {
when (node.state) {
NodeState.EXECUTED -> {
executor.submit(
node = node,
shouldProcess = { it == NodeState.EXECUTED },
executeChildren = true,
keepJobRunning = false
) { n ->
// Pure business logic - returns Boolean
processMyFeature(n)
}
}
NodeState.USER_EDIT -> {
nodeManager.execute(node)
}
NodeState.DELETING -> {
executor.cancel(node.id)
}
else -> {}
}
}
}
private suspend fun processMyFeature(node: Node): Boolean {
return try {
// Business logic here
nodeManager.complete(node)
true // Success
} catch (e: Exception) {
logger.e(e) { "Error processing" }
false // Executor sets ERROR state
}
}
4. Processors Refactored
All server processors now follow the new pattern:
ServerCronProcessorServerCalculationProcessorServerComputeProcessorServerLambdaProcessorServerTriggerProcessorServerRuleProcessorServerBeaconProcessorServerServerProcessorServerClientProcessorServerDataSourceProcessorServerDataPointProcessor
Benefits
1. Consistency
- All processors now follow the exact same pattern
- Easier to understand and maintain
- New developers can learn one pattern
2. Reduced Boilerplate
- ~30-50 lines of control flow code removed from each processor
- No more manual job management
- No more manual mutex handling
3. Better Error Handling
- Automatic error state transitions
- Consistent error logging
- No forgotten
nodeManager.error()calls
4. Thread Safety
- Centralized mutex protection
- Prevents duplicate processing
- Timestamp-based deduplication prevents rapid-fire updates
5. Testability
- Pure business logic methods are easy to test
- No need to mock jobs or mutex
- Test success/failure paths separately
6. Performance
- Single job map per executor (not per processor)
- Efficient timestamp-based deduplication
- Proper job cleanup on completion
State Machine Flow
stateDiagram-v2
[*] --> post: Node state change
post --> BaseNodeProcessor.post(): super.post(node)
BaseNodeProcessor.post() --> handleBaseOperations: Common states
handleBaseOperations --> ProcessorSpecific: Not handled
ProcessorSpecific --> executor.submit(): EXECUTED state
executor.submit() --> CheckDedup: Check conditions
CheckDedup --> LaunchJob: Not duplicate
CheckDedup --> Skip: Duplicate/ERROR state
LaunchJob --> ExecuteTask: Run lambda
ExecuteTask --> Success: Returns true
ExecuteTask --> Failure: Returns false
Success --> ExecuteChildren: If configured
Success --> Complete: Job done
Failure --> SetError: Auto error state
Complete --> [*]
SetError --> [*]
Skip --> [*]
Migration Guide
For New Processors
Follow the Processor Pattern Guide for creating new processors.
For Existing Code
- Remove
jobsmap andmutexfrom processor - Change
post()to useexecutor.submit() - Extract business logic into separate method returning Boolean
- Remove manual
nodeManager.error()calls - Return true on success, false on failure
Related Documents
Testing
All refactored processors maintain 100% existing behavior:
- All existing tests pass
- No breaking changes to API
- Backward compatible with existing nodes
Conclusion
This refactoring significantly improves code quality, maintainability, and consistency across the entire processor architecture. The centralized executor pattern provides a solid foundation for future processor development and makes the codebase easier to understand and modify.