Quick Reference: Node Processor Pattern
Creating a New Processor
1. Define the Interface
1
2
3
4
5
| package krill.zone.krillapp.myfeature
import krill.zone.node.*
interface MyFeatureProcessor : NodeProcessor
|
2. Create the Server Implementation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
| package krill.zone.krillapp.myfeature
import kotlinx.coroutines.*
import krill.zone.io.*
import krill.zone.node.*
class ServerMyFeatureProcessor(
fileOperations: FileOperations, // Always first
private val myDependency: MyDependency, // Private dependencies
override val nodeManager: NodeManager, // Overrides from base
override val eventBus: NodeEventBus,
override val scope: CoroutineScope // Scope always last
) : BaseNodeProcessor(fileOperations, nodeManager, eventBus, scope), MyFeatureProcessor {
override fun post(node: Node) {
super.post(node)
// Early exit if not my node
if (!node.isMine()) return
scope.launch {
when (node.state) {
NodeState.EXECUTED -> {
executor.submit(
node = node,
shouldProcess = { it == NodeState.EXECUTED },
executeChildren = true, // false if no children
keepJobRunning = false // true for long-running tasks
) { n ->
processMyFeature(n)
}
}
NodeState.USER_EDIT -> {
// Handle edits if needed
nodeManager.execute(node)
}
NodeState.DELETING -> {
executor.cancel(node.id)
}
else -> {}
}
}
}
/**
* Pure business logic - returns true on success, false on failure
*/
private suspend fun processMyFeature(node: Node): Boolean {
return try {
// Your business logic here
nodeManager.complete(node)
true
} catch (e: Exception) {
logger.e(e) { "Error processing ${node.name()}" }
false // Executor will set ERROR state
}
}
override suspend fun process(node: Node) {
// Leave empty when using executor pattern
}
}
|
3. Add to ProcessModule.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
| single<MyFeatureProcessor> {
if (SystemInfo.isServer()) {
ServerMyFeatureProcessor(
get(), // fileOperations
get(), // myDependency
get(), // nodeManager
get(), // eventBus
get() // scope
)
} else {
ClientNodeProcessor(get(), get(), get())
}
}
|
4. Write Tests
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
| class ServerMyFeatureProcessorTest {
@Test
fun `processMyFeature returns true on success`() = runTest {
// Setup
val processor = ServerMyFeatureProcessor(...)
val node = Node(...)
// Execute
val result = processor.processMyFeature(node)
// Verify
assertTrue(result)
verify { nodeManager.complete(node) }
}
@Test
fun `processMyFeature returns false on failure`() = runTest {
// Setup - mock to throw exception
every { myDependency.doSomething() } throws RuntimeException("Test error")
// Execute
val result = processor.processMyFeature(node)
// Verify
assertFalse(result)
}
}
|
Key Rules
✅ DO
- Return
Boolean from task lambda (true = success, false = failure) - Use
node.isMine() to check ownership - Put pure business logic in separate methods
- Use meaningful log messages with the logger
- Call
super.post(node) first in post() method - Order constructor parameters: fileOperations, private deps, overrides, scope
- Let executor handle job management and error states
- Use
executor.submit() for node processing
❌ DON’T
- Call
SystemInfo.isServer() in post() - Koin already filters this - Manage jobs or mutex manually - executor handles this
- Call
nodeManager.error() manually - executor does this on failure - Mix control flow with business logic
- Forget to call
super.post(node) first
Common Patterns
Long-Running Tasks (Cron, Compute)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| NodeState.EXECUTED -> {
executor.submit(
node = node,
shouldProcess = { it == NodeState.EXECUTED },
executeChildren = true,
keepJobRunning = true // Keep job alive
) { n ->
while (true) {
// Do work
delay(interval)
}
true
}
}
|
One-Time Processing (Lambda, Calculation)
1
2
3
4
5
6
7
8
9
10
| NodeState.EXECUTED -> {
executor.submit(
node = node,
shouldProcess = { it == NodeState.EXECUTED },
executeChildren = true,
keepJobRunning = false // Job completes after execution
) { n ->
processOnce(n)
}
}
|
Handling User Edits
1
2
3
4
5
| NodeState.USER_EDIT -> {
// Cancel existing job and restart
executor.cancel(node.id)
nodeManager.execute(node) // Will trigger EXECUTED state
}
|
Cleanup on Deletion
1
2
3
4
| NodeState.DELETING -> {
executor.cancel(node.id)
// Additional cleanup if needed
}
|
Constructor Parameter Order
Always follow this order for consistency:
fileOperations: FileOperations - First parameter- Private dependencies (e.g.,
private val myService: MyService) override val nodeManager: NodeManageroverride val eventBus: NodeEventBusoverride val scope: CoroutineScope - Last parameter
Logging Best Practices
1
2
3
4
5
6
7
8
9
10
11
| // Info logging
logger.i { "Processing ${node.name()} with value ${node.value()}" }
// Error logging with exception
logger.e(e) { "Error processing ${node.name()}: ${e.message}" }
// Debug logging
logger.d { "State transition: ${node.state} -> EXECUTED" }
// Warning logging
logger.w { "Unexpected condition for ${node.name()}" }
|
State Transitions
Common state flow for processors:
1
2
3
| NONE → execute() → EXECUTED → submit() → processing → complete() → NONE
↓
ERROR (on failure)
|
Example Processors
Reference these for complete examples:
ServerCronProcessor - Long-running loop with keepJobRunningServerCalculationProcessor - Simple one-time processingServerLambdaProcessor - Complex processing with external executionServerTriggerProcessor - Event-driven processing with state evaluation