Thread Safety Patterns Applied to HostProcessor
Thread Safety Patterns Applied to HostProcessor
Thread Safety Patterns Applied to HostProcessor
Pattern 1: Thread-Safe Dependency Injection
Before:
1
private val nodeManager : NodeManager by inject(mode = LazyThreadSafetyMode.NONE)
After:
1
private val nodeManager : NodeManager by inject(mode = LazyThreadSafetyMode.SYNCHRONIZED)
Why: In multi-threaded environments, multiple threads calling emit() could trigger double initialization of dependencies. SYNCHRONIZED ensures thread-safe lazy initialization.
Pattern 2: Mutex-Protected Shared State
PeerSessionManager
Before:
1
2
3
4
5
6
7
8
9
class PeerSessionManager {
private val knownSessions = mutableMapOf<String, String>()
fun add(id: String, session: String) {
knownSessions[id] = session // RACE CONDITION!
}
fun isKnownSession(id: String) = knownSessions.values.contains(id) // RACE CONDITION!
}
After:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class PeerSessionManager {
private val knownSessions = mutableMapOf<String, String>()
private val mutex = Mutex()
suspend fun add(id: String, session: String) {
mutex.withLock {
knownSessions[id] = session // THREAD SAFE
}
}
suspend fun isKnownSession(id: String): Boolean {
return mutex.withLock {
knownSessions.values.contains(id) // THREAD SAFE
}
}
}
Why: MutableMap is not thread-safe. Multiple coroutines could read/write simultaneously causing corruption.
Pattern 3: Wire Processing Serialization
Before:
1
2
3
4
5
6
7
8
9
beaconService.start(node) { wire ->
// Process immediately - multiple wires could process concurrently
when (wire.type) {
KrillApp.Server -> {
peerSessionManager.add(wire.id, wire.sessionId) // RACE!
nodeManager.update(wire.toNode()) // RACE!
}
}
}
After:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private val wireProcessingMutex = Mutex()
beaconService.start(node) { wire ->
scope.launch {
try {
processWire(wire, node) // Serialized processing
} catch (e: Exception) {
logger.e("Error processing wire: ${e.message}", e)
}
}
}
private suspend fun processWire(wire: NodeWire, node: Node) {
wireProcessingMutex.withLock {
// Only one wire processed at a time
processHostWire(wire, node)
}
}
Why: Multiple beacon messages could arrive simultaneously. Without serialization, session state could be corrupted.
Pattern 4: Separation of Concerns
Before: Monolithic Logic
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
when (check) {
is NodeFlow.Error -> {
// 20 lines of logic
peerSessionManager.add(...)
nodeManager.update(...)
if (wire.type == KrillApp.Server) {
serverHandshakeProcess.trustServer(wire)
} else if (wire.type == KrillApp.Client) {
beaconManager.sendSignal(node)
}
}
is NodeFlow.Success -> {
// Another 20 lines
if (!peerSessionManager.isKnownSession(wire.sessionId)) {
// More nested logic
}
}
}
After: Clear Functions
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
when (check) {
is NodeFlow.Error -> {
handleNewPeer(wire, node, isKnownSession)
}
is NodeFlow.Success -> {
handleKnownPeer(wire, node, check.node.value, isKnownSession)
}
}
private suspend fun handleNewPeer(wire: NodeWire, node: Node, isKnownSession: Boolean) {
// Clear, focused logic for new peer discovery
}
private suspend fun handleKnownPeer(wire: NodeWire, node: Node, existingNode: Node, isKnownSession: Boolean) {
if (!isKnownSession) {
// Handle reconnection
} else {
// Handle duplicate beacon
}
}
Why: Easier to understand, test, and maintain. Each function has a single responsibility.
Pattern 5: Deduplication at Service Level
Architecture:
1
2
3
4
5
6
HostProcessor (Created per emit() call)
↓
BeaconService (Singleton)
├── jobs: MutableMap<String, Job>
├── beaconJobMutex: Mutex
└── start(node) checks jobs.containsKey(node.id)
BeaconService Implementation:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private val jobs: MutableMap<String, Job> = mutableMapOf()
private val beaconJobMutex = Mutex()
suspend fun start(node: Node, discovered: suspend (NodeWire) -> Unit) {
beaconJobMutex.withLock {
if (!jobs.containsKey(node.id)) {
// Only create job if not already running
jobs[node.id] = scope.launch {
multicast.receiveBeacons { wire ->
// Process beacons
}
}
}
}
}
Why: Since HostProcessor instances are created fresh on each emit(), instance-level flags wouldn’t persist. Deduplication must be at singleton level.
Pattern 6: Error Isolation
Before:
1
2
3
4
beaconService.start(node) { wire ->
// If this throws, entire beacon service could crash
peerSessionManager.add(wire.id, wire.sessionId)
}
After:
1
2
3
4
5
6
7
8
9
10
beaconService.start(node) { wire ->
scope.launch {
try {
processWire(wire, node)
} catch (e: Exception) {
// Log and continue - one bad wire doesn't crash service
logger.e("Error processing wire from ${wire.id}: ${e.message}", e)
}
}
}
Why: Network issues or malformed messages shouldn’t crash the entire discovery service.
Pattern 7: Idempotent State Updates
NodeManager.update() Implementation:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
override suspend fun update(node: Node) {
val existing = nodes[copy.id]
// Check if already equal (idempotent)
if (existing is NodeFlow.Success && copy == existing.node.value) {
return // No-op if nothing changed
}
when (existing) {
is NodeFlow.Error, null -> {
// Create new
val newNode = NodeFlow.Success(MutableStateFlow(copy))
nodes[newNode.node.value.id] = newNode
}
is NodeFlow.Success -> {
// Update existing flow
(nodes[copy.id] as NodeFlow.Success).node.update { copy }
}
}
}
Why: Multiple calls with same data don’t trigger unnecessary flow updates or observers.
Pattern 8: Defensive Logging
Structured by Severity:
1
2
3
4
logger.d("...") // Debug: Normal operation details
logger.i("...") // Info: Significant events (connection established)
logger.w("...") // Warning: Unexpected but handled (unknown wire type)
logger.e("...", exception) // Error: Failures that need attention
Context-Rich Messages:
1
2
3
4
5
// Bad
logger.e("Error")
// Good
logger.e("Error processing wire from ${wire.id}: ${e.message}", e)
Why: When things go wrong in production, detailed logs are essential for debugging connection issues.
Summary of Thread Safety Guarantees
| Component | Thread Safety Mechanism | Protected Operations |
|---|---|---|
| PeerSessionManager | Mutex | add(), remove(), isKnownSession() |
| BeaconService | beaconJobMutex | jobs map access |
| HostProcessor | wireProcessingMutex | Wire processing logic |
| NodeManager | nodesMutex | nodes map, swarm updates |
| Dependency Injection | SYNCHRONIZED | All by inject() calls |
Result: Safe concurrent access from multiple threads/coroutines without race conditions.