-
Notifications
You must be signed in to change notification settings - Fork 45
/
Copy pathworker.ts
124 lines (101 loc) · 3.47 KB
/
worker.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import { Debugger } from 'debug';
import { Worker } from 'worker_threads';
import { resolve } from 'path';
import { SubprocessMessage, Thread, ThreadOptions } from '../../thread';
import { SubprocessResult, SubprocessSyncedData, SubprocessRunnerMessage, SubprocessOutputMessage, InterProcessMessage, isSyncSnapshot, isOverwrittenStandardStreamMessage } from '../../message-channel';
import { removeDebugArgs } from '../util';
export interface WorkerData {
file: string;
options: ThreadOptions;
}
export class WorkerThread implements Thread {
private file: string;
private log: Debugger;
private options: ThreadOptions;
private events: SubprocessMessage[] = [];
private startedAt: number | undefined;
private syncedSubprocessData: SubprocessSyncedData | undefined;
constructor(file: string, log: Debugger, options: ThreadOptions) {
this.file = file;
this.log = log;
this.options = options;
}
run() {
const workerFilename = this.options.isTypescriptRunMode ? 'worker.development.js' : 'worker.js';
const workerPath = resolve(__dirname, `../../subprocess/thread/${workerFilename}`);
this.startedAt = Date.now();
return new Promise<SubprocessResult>((resolve, reject) => {
const worker = new Worker(workerPath, {
execArgv: process.execArgv.filter(removeDebugArgs),
stderr: true,
stdout: true,
workerData: this.buildWorkerData(),
});
// it's unsafe to listen to stderr/stdout messages from the worker thread
// because they are asynchronous (process.stdout.isTTY = False)
// worker.stderr.on('data', this.onStderr);
// worker.stdout.on('data', this.onStdout);
worker.on('message', this.onMessage);
worker.on('error', this.onError(reject));
worker.on('exit', this.onExit(resolve));
});
}
private buildWorkerData(): WorkerData {
return {
file: resolve(this.file),
options: this.options,
};
}
private onMessage = (message: InterProcessMessage) => {
if (isOverwrittenStandardStreamMessage(message)) {
const { data, stream } = message;
if (stream === 'stdout') {
this.onStdout(Buffer.from(data));
} else if (stream === 'stderr') {
this.onStderr(Buffer.from(data));
}
return;
}
if (isSyncSnapshot(message)) {
this.syncedSubprocessData = message.data;
} else {
const runnerEvent: SubprocessRunnerMessage = {
data: message.data,
event: message.event,
type: 'runner',
};
this.events.push(runnerEvent);
}
}
private onStdout = (data: Buffer) => {
const outputEvent: SubprocessOutputMessage = {
data,
type: 'stdout',
};
this.events.push(outputEvent);
}
private onStderr = (data: Buffer) => {
const outputEvent: SubprocessOutputMessage = {
data,
type: 'stderr',
};
this.events.push(outputEvent);
}
private onError = (reject: (err: Error) => void) => (err: Error) => {
this.log(`Error occured in subprocess: ${err.stack}`);
reject(err);
}
private onExit = (resolve: (data: SubprocessResult) => void) => (code: number) => {
this.log(`Process for ${this.file} exited with code ${code}`);
if (!this.startedAt) {
throw new Error('Attempt to close a thread which hasn\'t been started yet');
}
resolve({
code,
events: this.events,
execTime: Date.now() - this.startedAt,
file: this.file,
syncedSubprocessData: this.syncedSubprocessData,
});
}
}