Skip to content

Commit 053c806

Browse files
committed
Extract Unix socket creation from connect
1 parent c87bbac commit 053c806

File tree

1 file changed

+50
-25
lines changed

1 file changed

+50
-25
lines changed

chronos/transports/stream.nim

+50-25
Original file line numberDiff line numberDiff line change
@@ -713,6 +713,30 @@ elif defined(windows):
713713
sizeof(saddr).SockLen) != 0'i32:
714714
result = false
715715

716+
proc createPipe*(address: TransportAddress): Future[Handle] =
717+
var retFuture = newFuture[Handle]("stream.handle")
718+
var pipeHandle = INVALID_HANDLE_VALUE
719+
var pipeContinuation: proc (udata: pointer) {.gcsafe, raises: [Defect].}
720+
pipeContinuation = proc (udata: pointer) {.gcsafe, raises: [Defect].} =
721+
# Continue only if `retFuture` is not cancelled.
722+
if not(retFuture.finished()):
723+
var pipeSuffix = $cast[cstring](unsafeAddr address.address_un[0])
724+
var pipeName = newWideCString(r"\\.\pipe\" & pipeSuffix[1 .. ^1])
725+
pipeHandle = createFileW(pipeName, GENERIC_READ or GENERIC_WRITE,
726+
FILE_SHARE_READ or FILE_SHARE_WRITE,
727+
nil, OPEN_EXISTING,
728+
FILE_FLAG_OVERLAPPED, Handle(0))
729+
if pipeHandle == INVALID_HANDLE_VALUE:
730+
let err = osLastError()
731+
if int32(err) == ERROR_PIPE_BUSY:
732+
discard setTimer(Moment.fromNow(50.milliseconds), pipeContinuation, nil)
733+
else:
734+
retFuture.fail(getTransportOsError(err))
735+
else:
736+
retFuture.complete(pipeHandle)
737+
pipeContinuation(nil)
738+
return retFuture
739+
716740
proc isDomainSet(sock: AsyncFD): bool =
717741
try:
718742
discard getSockDomain(SocketHandle(sock))
@@ -739,7 +763,6 @@ elif defined(windows):
739763
povl: RefCustomOverlapped
740764

741765
var raddress = windowsAnyAddressFix(address)
742-
743766
toSAddr(raddress, saddr, slen)
744767

745768
if sock == asyncInvalidSocket:
@@ -792,8 +815,17 @@ elif defined(windows):
792815

793816
retFuture.cancelCallback = cancel
794817

795-
else: #address.family == AddressFamily.Unix:
796-
retFuture.fail(newException(TransportAddressError, "Unsupported address family"))
818+
elif address.family == AddressFamily.Unix:
819+
let transp =
820+
try:
821+
register(sock)
822+
newStreamPipeTransport(sock, bufferSize, child)
823+
except CatchableError as exc:
824+
retFuture.fail(exc)
825+
return
826+
# Start tracking transport
827+
trackStream(transp)
828+
retFuture.complete(transp)
797829

798830
return retFuture
799831

@@ -805,11 +837,11 @@ elif defined(windows):
805837
## new transport object ``StreamTransport`` for established connection.
806838
## ``bufferSize`` is size of internal buffer for transport.
807839
var retFuture = newFuture[StreamTransport]("stream.transport.connect")
840+
var raddress = windowsAnyAddressFix(address)
808841
if address.family in {AddressFamily.IPv4, AddressFamily.IPv6}:
809-
var raddress = windowsAnyAddressFix(address)
810842
try:
811843
let sock = createAsyncSocket(raddress.getDomain(), SockType.SOCK_STREAM, Protocol.IPPROTO_TCP)
812-
let r = connect(sock, address, bufferSize, child, flags)
844+
let r = connect(sock, raddress, bufferSize, child, flags)
813845
proc cb(arg: pointer) =
814846
try:
815847
retFuture.complete(r.read)
@@ -820,34 +852,27 @@ elif defined(windows):
820852
retFuture.fail(exc)
821853
elif address.family == AddressFamily.Unix:
822854
## Unix domain socket emulation with Windows Named Pipes.
823-
var pipeHandle = INVALID_HANDLE_VALUE
855+
var pipeHandleFut = createPipe(address)
824856
var pipeContinuation: proc (udata: pointer) {.gcsafe, raises: [Defect].}
825857
pipeContinuation = proc (udata: pointer) {.gcsafe, raises: [Defect].} =
826858
# Continue only if `retFuture` is not cancelled.
827859
if not(retFuture.finished()):
828-
var pipeSuffix = $cast[cstring](unsafeAddr address.address_un[0])
829-
var pipeName = newWideCString(r"\\.\pipe\" & pipeSuffix[1 .. ^1])
830-
pipeHandle = createFileW(pipeName, GENERIC_READ or GENERIC_WRITE,
831-
FILE_SHARE_READ or FILE_SHARE_WRITE,
832-
nil, OPEN_EXISTING,
833-
FILE_FLAG_OVERLAPPED, Handle(0))
834-
if pipeHandle == INVALID_HANDLE_VALUE:
835-
let err = osLastError()
836-
if int32(err) == ERROR_PIPE_BUSY:
837-
discard setTimer(Moment.fromNow(50.milliseconds), pipeContinuation, nil)
838-
else:
839-
retFuture.fail(getTransportOsError(err))
840-
else:
841-
let transp =
860+
if pipeHandleFut.finished():
861+
let sock =
842862
try:
843-
register(AsyncFD(pipeHandle))
844-
newStreamPipeTransport(AsyncFD(pipeHandle), bufferSize, child)
863+
AsyncFD(pipeHandleFut.read)
845864
except CatchableError as exc:
846865
retFuture.fail(exc)
847866
return
848-
# Start tracking transport
849-
trackStream(transp)
850-
retFuture.complete(transp)
867+
let transpFut = connect(sock, raddress, bufferSize, child, flags)
868+
proc cb(arg: pointer) =
869+
try:
870+
retFuture.complete(transpFut.read)
871+
except CatchableError as exc:
872+
retFuture.fail(exc)
873+
transpFut.addCallback(cb)
874+
else:
875+
discard setTimer(Moment.fromNow(50.milliseconds), pipeContinuation, nil)
851876
pipeContinuation(nil)
852877
else:
853878
retFuture.fail(newException(TransportAddressError, "Unsupported address family"))

0 commit comments

Comments
 (0)