@@ -161,9 +161,42 @@ function exec(thunk::Function)
161
161
exit (0 )
162
162
end
163
163
164
+ type FileSink
165
+ s:: IOStream
166
+ own:: Bool
167
+ function FileSink (s:: IOStream , own:: Bool )
168
+ if fd (s) == - 1
169
+ error (" Cannot use the given IOStream as FileSink" )
170
+ end
171
+ this = new (s, own)
172
+ if own
173
+ finalizer (this, close_sink)
174
+ end
175
+ return this
176
+ end
177
+ end
178
+
179
+ FileSink (s:: IOStream ) = FileSink (s, false )
180
+
181
+ function FileSink (filename:: String , args... )
182
+ s = open (filename, args... )
183
+ return FileSink (s, true )
184
+ end
185
+
186
+ function close_sink (sink:: FileSink )
187
+ if sink. own
188
+ close (sink. s)
189
+ end
190
+ end
191
+
192
+ fd (sink:: FileSink ) = fd (sink. s)
193
+
164
194
type Cmd
165
195
exec:: Executable
196
+ name:: String
166
197
pipes:: Dict{FileDes,PipeEnd}
198
+ sinks:: Dict{FileDes,FileSink}
199
+ closed_fds:: Vector{FileDes}
167
200
pipeline:: Set{Cmd}
168
201
pid:: Int32
169
202
status:: ProcessStatus
@@ -174,7 +207,10 @@ type Cmd
174
207
error (" Cmd: too few words to exec" )
175
208
end
176
209
this = new (exec,
210
+ " " ,
177
211
Dict {FileDes,PipeEnd} (),
212
+ Dict {FileDes,FileSink} (),
213
+ FileDes[],
178
214
Set {Cmd} (),
179
215
0 ,
180
216
ProcessNotRun (),
@@ -194,7 +230,9 @@ setsuccess(cmd::Cmd, f::Function) = (cmd.successful=f; cmd)
194
230
ignorestatus (cmd:: Cmd ) = setsuccess (cmd, ignore_success)
195
231
196
232
function show (io, cmd:: Cmd )
197
- if isa (cmd. exec,Vector{ByteString})
233
+ if cmd. name != " "
234
+ show (io, cmd. name)
235
+ elseif isa (cmd. exec,Vector{ByteString})
198
236
esc = shell_escape (cmd. exec... )
199
237
print (io, ' `' )
200
238
for c in esc
@@ -205,30 +243,41 @@ function show(io, cmd::Cmd)
205
243
end
206
244
print (io, ' `' )
207
245
else
208
- invoke (show, (Any,) , cmd. exec)
246
+ invoke (show, (Any, Any,), io , cmd. exec)
209
247
end
210
248
end
211
249
212
250
exec (cmd:: Cmd ) = exec (cmd. exec)
213
251
252
+ function close_sinks (cmd:: Cmd )
253
+ for (f,s) in cmd. sinks
254
+ close_sink (s)
255
+ end
256
+ end
257
+
214
258
# # Port: a file descriptor on a particular command ##
215
259
216
260
type Port
217
261
cmd:: Cmd
218
262
fd:: FileDes
219
263
end
220
264
221
- fd (cmd:: Cmd , f:: FileDes ) = Port (cmd,f)
265
+ function fd (cmd:: Cmd , f:: FileDes )
266
+ if ! has (cmd. pipes, f) && ! has (cmd. sinks, f) && ! contains (cmd. closed_fds, f)
267
+ return Port (cmd,f)
268
+ end
269
+ error (" no " , f, " available in " , cmd)
270
+ end
222
271
223
272
function fd (cmds:: Set{Cmd} , f:: FileDes )
224
273
set = Set {Port} ()
225
274
for cmd in cmds
226
- if ! has (cmd. pipes, f)
275
+ if ! has (cmd. pipes, f) && ! has (cmd . sinks, f) && ! contains (cmd . closed_fds, f)
227
276
add (set, fd (cmd,f))
228
277
end
229
278
end
230
279
if isempty (set)
231
- error (" no " , f, " available: " , cmds)
280
+ error (" no " , f, " available in " , cmds)
232
281
end
233
282
set
234
283
end
@@ -277,11 +326,17 @@ end
277
326
output (cmds:: Cmds ) = stdout (cmds) & stderr (cmds)
278
327
279
328
function connect (port:: Port , pend:: PipeEnd )
280
- if ! has (port. cmd. pipes, port. fd)
329
+ if contains (port. cmd. closed_fds, port. fd)
330
+ error (port. cmd, " port " , port. fd, " is closed" )
331
+ end
332
+ if ! has (port. cmd. pipes, port. fd) && ! has (port. cmd. sinks, port. fd)
281
333
port. cmd. pipes[port. fd] = pend
282
- elseif port. cmd. pipes[port. fd] != pend
334
+ elseif has (port . cmd . pipes, port . fd) && port. cmd. pipes[port. fd] != pend
283
335
error (port. cmd, " is already connected to " ,
284
336
fd (port. cmd. pipes[port. fd]))
337
+ elseif has (port. cmd. sinks, port. fd)
338
+ error (port. cmd, " is already connected to " ,
339
+ fd (port. cmd. sinks[port. fd]))
285
340
end
286
341
return pend
287
342
end
@@ -339,6 +394,102 @@ end
339
394
(| )(src:: Ports , dst:: Cmds ) = (src | stdin (dst); dst)
340
395
(| )(src:: Cmds , dst:: Cmds ) = (stdout (src) | stdin (dst); src & dst)
341
396
397
+ redir (port:: Port , sink:: FileSink ) = port. cmd. sinks[port. fd] = sink
398
+ function redir (ports:: Ports , sink:: FileSink )
399
+ for port in ports
400
+ redir (port, sink)
401
+ end
402
+ end
403
+
404
+ # redirect stdout
405
+ function (> )(src:: String , dst:: Cmds )
406
+ redir (stdin (dst), FileSink (src, " r" ))
407
+ return dst
408
+ end
409
+
410
+ (< )(dst:: Cmds , src:: String ) = (> )(src, dst)
411
+
412
+ function (> )(src:: IOStream , dst:: Cmds )
413
+ redir (stdin (dst), FileSink (src))
414
+ return dst
415
+ end
416
+
417
+ (< )(dst:: Cmds , src:: IOStream ) = (> )(src, dst)
418
+
419
+ function (> )(src:: Cmds , dst:: String )
420
+ redir (stdout (src), FileSink (dst, " w" ))
421
+ return src
422
+ end
423
+
424
+ function (>> )(src:: Cmds , dst:: String )
425
+ redir (stdout (src), FileSink (dst, " a" ))
426
+ return src
427
+ end
428
+
429
+ (< )(dst:: String , src:: Cmds ) = (> )(src, dst)
430
+ (<< )(dst:: String , src:: Cmds ) = (>> )(src, dst)
431
+
432
+ function (> )(src:: Cmds , dst:: IOStream )
433
+ redir (stdout (src), FileSink (dst))
434
+ return src
435
+ end
436
+
437
+ (< )(dst:: IOStream , src:: Cmds ) = (> )(src, dst)
438
+
439
+ # redirect stderr
440
+ function (.> )(src:: Cmds , dst:: String )
441
+ redir (stderr (src), FileSink (dst, " w" ))
442
+ return src
443
+ end
444
+
445
+ function (.>> )(src:: Cmds , dst:: String )
446
+ redir (stderr (src), FileSink (dst, " a" ))
447
+ return src
448
+ end
449
+
450
+ (.< )(dst:: String , src:: Cmds ) = (.> )(src, dst)
451
+ (.<< )(dst:: String , src:: Cmds ) = (.>> )(src, dst)
452
+
453
+ function (.> )(src:: Cmds , dst:: IOStream )
454
+ redir (stderr (src), FileSink (dst))
455
+ return src
456
+ end
457
+
458
+ (.< )(dst:: IOStream , src:: Cmds ) = (.> )(src, dst)
459
+
460
+ # redirect both stdout and stderr
461
+ function (& > )(src:: Cmds , dst:: String )
462
+ redir (output (src), FileSink (dst, " w" ))
463
+ return src
464
+ end
465
+
466
+ function (& >> )(src:: Cmds , dst:: String )
467
+ redir (output (src), FileSink (dst, " a" ))
468
+ return src
469
+ end
470
+
471
+ (& < )(dst:: String , src:: Cmds ) = (& > )(src, dst)
472
+ (& << )(dst:: String , src:: Cmds ) = (& >> )(src, dst)
473
+
474
+ function (& > )(src:: Cmds , dst:: IOStream )
475
+ redir (output (src), FileSink (dst))
476
+ return src
477
+ end
478
+
479
+ (& < )(dst:: IOStream , src:: Cmds ) = (& > )(src, dst)
480
+
481
+ # here-strings:
482
+ function (>>> )(src:: String , dst:: Cmds )
483
+ hscmd = Cmd (()-> print (src))
484
+ push (hscmd. closed_fds, STDIN)
485
+ push (hscmd. closed_fds, STDERR)
486
+ hscmd. name = " here-string<" * src * " >"
487
+ return hscmd | dst
488
+ end
489
+
490
+ (<< < )(dst:: Cmds , src:: String ) = (>>> )(src, dst)
491
+
492
+
342
493
# spawn(cmd) starts all processes connected to cmd
343
494
344
495
function spawn (cmd:: Cmd )
@@ -364,18 +515,34 @@ function spawn(cmd::Cmd)
364
515
c. status = ProcessRunning ()
365
516
ptrs = isa (c. exec,Vector{ByteString}) ? _jl_pre_exec (c. exec) : nothing
366
517
dup2_fds = Array (Int32, 2 * numel (c. pipes))
518
+ dup2_sinks = Array (Int32, 2 * numel (c. sinks))
367
519
close_fds_ = copy (fds)
368
520
i = 0
369
521
for (f,p) in c. pipes
370
522
dup2_fds[i+= 1 ] = fd (p). fd
371
523
dup2_fds[i+= 1 ] = f. fd
372
524
del (close_fds_, fd (p))
373
525
end
526
+ i = 0
527
+ for (f,s) in c. sinks
528
+ dup2_sinks[i+= 1 ] = fd (s)
529
+ dup2_sinks[i+= 1 ] = f. fd
530
+ end
374
531
close_fds = Array (Int32, numel (close_fds_))
375
532
i = 0
376
533
for f in close_fds_
377
534
close_fds[i+= 1 ] = f. fd
378
535
end
536
+
537
+ # save the stderr descriptor because it may be redirected, but we may need to
538
+ # print errors from Julia
539
+ bk_stderr_fd = ccall (:dup , Int32, (Int32,), STDERR. fd)
540
+ if bk_stderr_fd == - 1
541
+ println (stderr_stream, " dup: " , strerror ())
542
+ exit (0x7f )
543
+ end
544
+ bk_stderr_stream = fdio (bk_stderr_fd)
545
+
379
546
# now actually do the fork and exec without writes
380
547
pid = fork ()
381
548
if pid == 0
@@ -385,7 +552,18 @@ function spawn(cmd::Cmd)
385
552
# dup2 manually inlined to avoid potential heap stomping
386
553
r = ccall (:dup2 , Int32, (Int32, Int32), dup2_fds[i], dup2_fds[i+ 1 ])
387
554
if r == - 1
388
- println (stderr_stream, " dup2: " , strerror ())
555
+ println (bk_stderr_stream, " dup2: " , strerror ())
556
+ exit (0x7f )
557
+ end
558
+ i += 2
559
+ end
560
+ i = 1
561
+ n = length (dup2_sinks)
562
+ while i <= n
563
+ # dup2 manually inlined to avoid potential heap stomping
564
+ r = ccall (:dup2 , Int32, (Int32, Int32), dup2_sinks[i], dup2_sinks[i+ 1 ])
565
+ if r == - 1
566
+ println (bk_stderr_stream, " dup2: " , strerror ())
389
567
exit (0x7f )
390
568
end
391
569
i += 2
@@ -396,14 +574,14 @@ function spawn(cmd::Cmd)
396
574
# close manually inlined to avoid potential heap stomping
397
575
r = ccall (:close , Int32, (Int32,), close_fds[i])
398
576
if r != 0
399
- println (stderr_stream , " close: " , strerror ())
577
+ println (bk_stderr_stream , " close: " , strerror ())
400
578
exit (0x7f )
401
579
end
402
580
i += 1
403
581
end
404
582
if ! isequal (ptrs, nothing )
405
583
ccall (:execvp , Int32, (Ptr{Uint8}, Ptr{Ptr{Uint8}}), ptrs[1 ], ptrs)
406
- println (stderr_stream , " exec: " , strerror ())
584
+ println (bk_stderr_stream , " exec: " , strerror ())
407
585
exit (0x7f )
408
586
end
409
587
# other ways of execing (e.g. a julia function)
@@ -412,12 +590,13 @@ function spawn(cmd::Cmd)
412
590
try
413
591
exec (c)
414
592
catch err
415
- show (stderr , err)
593
+ show (bk_stderr_stream , err)
416
594
exit (0x7f )
417
595
end
418
596
error (" exec should not return but has" )
419
597
end
420
598
c. pid = pid
599
+ close (bk_stderr_stream) # do it manually since gc is disabled
421
600
end
422
601
for f in fds_
423
602
close (f)
@@ -442,7 +621,7 @@ successful(cmd::Cmd) =
442
621
isa (cmd. status,ProcessRunning) || cmd. successful (cmd. status)
443
622
444
623
wait (cmd:: Cmd , nohang:: Bool ) =
445
- (cmd. status = process_status (wait (cmd. pid,nohang)); successful (cmd))
624
+ (cmd. status = process_status (wait (cmd. pid,nohang)); close_sinks (cmd); successful (cmd))
446
625
447
626
# wait for a set of command processes to finish
448
627
0 commit comments