-
Notifications
You must be signed in to change notification settings - Fork 47
/
Copy pathClientProcessUtil.kt
192 lines (167 loc) · 6.17 KB
/
ClientProcessUtil.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
package org.utbot.rd
import com.jetbrains.rd.framework.*
import com.jetbrains.rd.framework.impl.RdCall
import com.jetbrains.rd.framework.util.launch
import com.jetbrains.rd.util.getLogger
import com.jetbrains.rd.util.info
import com.jetbrains.rd.util.lifetime.Lifetime
import com.jetbrains.rd.util.lifetime.LifetimeDefinition
import com.jetbrains.rd.util.lifetime.isAlive
import com.jetbrains.rd.util.lifetime.plusAssign
import com.jetbrains.rd.util.threading.SingleThreadScheduler
import com.jetbrains.rd.util.trace
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.trySendBlocking
import kotlinx.coroutines.withTimeoutOrNull
import org.utbot.common.*
import org.utbot.rd.generated.synchronizationModel
import java.io.File
import kotlin.time.Duration
const val rdProcessDirName = "rdProcessSync"
val processSyncDirectory = File(utBotTempDirectory.toFile(), rdProcessDirName)
const val rdPortProcessArgumentTag = "rdPort"
internal const val fileWaitTimeoutMillis = 10L
private val logger = getLogger<ClientProtocolBuilder>()
internal fun childCreatedFileName(port: Int): String {
return "$port.created"
}
internal fun signalProcessReady(port: Int) {
processSyncDirectory.mkdirs()
val signalFile = File(processSyncDirectory, childCreatedFileName(port))
if (signalFile.exists()) {
signalFile.delete()
}
val created = signalFile.createNewFile()
if (!created) {
throw IllegalStateException("cannot create signal file")
}
}
fun rdPortArgument(port: Int): String {
return "$rdPortProcessArgumentTag=$port"
}
fun findRdPort(args: Array<String>): Int {
return args.find { it.startsWith(rdPortProcessArgumentTag) }
?.run { split("=").last().toInt().coerceIn(1..65535) }
?: throw IllegalArgumentException("No port provided")
}
/**
* Traces when process is idle for too much time, then terminates it.
*/
class IdleWatchdog(private val ldef: LifetimeDefinition, val timeout: Duration) {
private enum class State {
STARTED,
ENDED
}
private val synchronizer: Channel<State> = Channel(1)
init {
ldef.onTermination { synchronizer.close(CancellationException("Client terminated")) }
}
var suspendTimeout = false
/**
* Execute block indicating that during this activity process should not die.
* After block ended - idle timer restarts
*/
fun <T> wrapActive(block: () -> T): T {
try {
synchronizer.trySendBlocking(State.STARTED)
return block()
} finally {
synchronizer.trySendBlocking(State.ENDED)
}
}
/**
* Adds callback to RdCall with indicating that during this activity process should not die.
* After block ended - idle timer restarts
*/
fun <T, R> wrapActiveCall(call: RdCall<T, R>, block: (T) -> R) {
call.set { it ->
wrapActive {
block(it)
}
}
}
suspend fun setupTimeout() {
ldef.launch {
var lastState = State.ENDED
while (ldef.isAlive) {
val current: State? =
withTimeoutOrNull(timeout) {
synchronizer.receive()
}
if (current == null) {
if (lastState == State.ENDED && !suspendTimeout) {
// process is waiting for command more than expected, better die
logger.info { "terminating lifetime by timeout" }
stopProtocol()
break
}
} else {
lastState = current
}
}
}
}
fun stopProtocol() {
ldef.terminate()
}
}
class ClientProtocolBuilder {
private var timeout = Duration.INFINITE
suspend fun start(port: Int, parent: Lifetime? = null, block: Protocol.(IdleWatchdog) -> Unit) {
UtRdCoroutineScope.initialize()
val pid = currentProcessPid.toInt()
val ldef = parent?.createNested() ?: LifetimeDefinition()
ldef.terminateOnException { _ ->
ldef += { logger.info { "lifetime terminated" } }
ldef += {
val syncFile = File(processSyncDirectory, childCreatedFileName(port))
if (syncFile.exists()) {
logger.info { "sync file existed" }
syncFile.delete()
}
}
logger.info { "pid - $pid, port - $port" }
logger.info { "isJvm8 - $isJvm8, isJvm9Plus - $isJvm9Plus, isWindows - $isWindows" }
val name = "Client$port"
val rdClientProtocolScheduler = SingleThreadScheduler(ldef, "Scheduler for $name")
val clientProtocol = Protocol(
name,
Serializers(),
Identities(IdKind.Client),
rdClientProtocolScheduler,
SocketWire.Client(ldef, rdClientProtocolScheduler, port),
ldef
)
val watchdog = IdleWatchdog(ldef, timeout)
watchdog.setupTimeout()
rdClientProtocolScheduler.pump(ldef) {
clientProtocol.synchronizationModel.suspendTimeoutTimer.set { param ->
watchdog.suspendTimeout = param
}
clientProtocol.block(watchdog)
}
signalProcessReady(port)
logger.info { "signalled" }
clientProtocol.synchronizationModel.synchronizationSignal.let { sync ->
val answerFromMainProcess = sync.adviseForConditionAsync(ldef) {
if (it == "main") {
logger.trace { "received from main" }
watchdog.wrapActive {
sync.fire("child")
}
true
} else {
false
}
}
answerFromMainProcess.await()
}
ldef.awaitTermination()
}
}
fun withProtocolTimeout(duration: Duration): ClientProtocolBuilder {
timeout = duration
return this
}
}