Skip to content

Commit 471c4cd

Browse files
authored
Merge pull request #21543 from JuliaLang/jb/messageser
simpler and faster message (de)serialize
2 parents f680888 + e62a7db commit 471c4cd

8 files changed

+44
-39
lines changed

base/associative.jl

+6-1
Original file line numberDiff line numberDiff line change
@@ -441,7 +441,12 @@ function delete!(t::ObjectIdDict, key::ANY)
441441
t
442442
end
443443

444-
empty!(t::ObjectIdDict) = (t.ht = Vector{Any}(length(t.ht)); t.ndel = 0; t)
444+
function empty!(t::ObjectIdDict)
445+
resize!(t.ht, 32)
446+
ccall(:memset, Ptr{Void}, (Ptr{Void}, Cint, Csize_t), t.ht, 0, sizeof(t.ht))
447+
t.ndel = 0
448+
return t
449+
end
445450

446451
_oidd_nextind(a, i) = reinterpret(Int,ccall(:jl_eqtable_nextind, Csize_t, (Any, Csize_t), a, i))
447452

base/distributed/Distributed.jl

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import Base: getindex, wait, put!, take!, fetch, isready, push!, length,
1010
using Base: Process, Semaphore, JLOptions, AnyDict, buffer_writes, wait_connected,
1111
VERSION_STRING, sync_begin, sync_add, sync_end, async_run_thunk,
1212
binding_module, notify_error, atexit, julia_exename, julia_cmd,
13-
AsyncGenerator, display_error, acquire, release
13+
AsyncGenerator, display_error, acquire, release, invokelatest
1414

1515
# NOTE: clusterserialize.jl imports additional symbols from Base.Serializer for use
1616

base/distributed/messages.jl

+25-30
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,8 @@
22

33
abstract type AbstractMsg end
44

5-
let REF_ID::Int = 1
6-
global next_ref_id
7-
next_ref_id() = (id = REF_ID; REF_ID += 1; id)
8-
end
5+
const REF_ID = Ref(1)
6+
next_ref_id() = (id = REF_ID[]; REF_ID[] = id+1; id)
97

108
struct RRID
119
whence::Int
@@ -80,34 +78,32 @@ end
8078
# of approximately 10%. Can be removed once module Serializer
8179
# has been suitably improved.
8280

83-
# replace CallMsg{Mode} with specific invocations
84-
const msgtypes = filter!(x->x!=CallMsg, subtypes(AbstractMsg))
85-
push!(msgtypes, CallMsg{:call}, CallMsg{:call_fetch})
81+
const msgtypes = Any[CallWaitMsg, IdentifySocketAckMsg, IdentifySocketMsg,
82+
JoinCompleteMsg, JoinPGRPMsg, RemoteDoMsg, ResultMsg,
83+
CallMsg{:call}, CallMsg{:call_fetch}]
8684

8785
for (idx, tname) in enumerate(msgtypes)
88-
nflds = length(fieldnames(tname))
89-
@eval begin
90-
function serialize(s::AbstractSerializer, o::$tname)
91-
write(s.io, UInt8($idx))
92-
for fld in fieldnames($tname)
93-
serialize(s, getfield(o, fld))
94-
end
95-
end
96-
97-
function deserialize_msg(s::AbstractSerializer, ::Type{$tname})
98-
data=Array{Any,1}($nflds)
99-
for i in 1:$nflds
100-
data[i] = deserialize(s)
101-
end
102-
return $tname(data...)
103-
end
86+
exprs = Any[ :(serialize(s, o.$fld)) for fld in fieldnames(tname) ]
87+
@eval function serialize_msg(s::AbstractSerializer, o::$tname)
88+
write(s.io, UInt8($idx))
89+
$(exprs...)
90+
return nothing
10491
end
10592
end
10693

107-
function deserialize_msg(s::AbstractSerializer)
108-
idx = read(s.io, UInt8)
109-
t = msgtypes[idx]
110-
return eval(current_module(), Expr(:body, Expr(:return, Expr(:call, deserialize_msg, QuoteNode(s), QuoteNode(t)))))
94+
let msg_cases = :(assert(false))
95+
for i = length(msgtypes):-1:1
96+
mti = msgtypes[i]
97+
msg_cases = :(if idx == $i
98+
return $(Expr(:call, QuoteNode(mti), fill(:(deserialize(s)), nfields(mti))...))
99+
else
100+
$msg_cases
101+
end)
102+
end
103+
@eval function deserialize_msg(s::AbstractSerializer)
104+
idx = read(s.io, UInt8)
105+
$msg_cases
106+
end
111107
end
112108

113109
function send_msg_unknown(s::IO, header, msg)
@@ -171,8 +167,7 @@ function serialize_hdr_raw(io, hdr)
171167
end
172168

173169
function deserialize_hdr_raw(io)
174-
data = Array{Int,1}(4)
175-
read!(io, data)
170+
data = read(io, Ref{NTuple{4,Int}}())[]
176171
return MsgHeader(RRID(data[1], data[2]), RRID(data[3], data[4]))
177172
end
178173

@@ -183,7 +178,7 @@ function send_msg_(w::Worker, header, msg, now::Bool)
183178
try
184179
reset_state(w.w_serializer)
185180
serialize_hdr_raw(io, header)
186-
eval(current_module(), Expr(:body, Expr(:return, Expr(:call, serialize, QuoteNode(w.w_serializer), QuoteNode(msg))))) # io is wrapped in w_serializer
181+
invokelatest(serialize_msg, w.w_serializer, msg) # io is wrapped in w_serializer
187182
write(io, MSG_BOUNDARY)
188183

189184
if !now && w.gcflag

base/distributed/process_messages.jl

+1-1
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ function message_handler_loop(r_stream::IO, w_stream::IO, incoming::Bool)
158158
# println("header: ", header)
159159

160160
try
161-
msg = deserialize_msg(serializer)
161+
msg = invokelatest(deserialize_msg, serializer)
162162
catch e
163163
# Deserialization error; discard bytes in stream until boundary found
164164
boundary_idx = 1

base/distributed/remotecall.jl

+1-1
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ function lookup_ref(pg, rrid, f)
152152
rv = get(pg.refs, rrid, false)
153153
if rv === false
154154
# first we've heard of this ref
155-
rv = RemoteValue(eval(Main, Expr(:body, Expr(:return, Expr(:call, f)))))
155+
rv = RemoteValue(invokelatest(f))
156156
pg.refs[rrid] = rv
157157
push!(rv.clientset, rrid.whence)
158158
end

base/precompile.jl

-3
Original file line numberDiff line numberDiff line change
@@ -1405,7 +1405,6 @@ precompile(Tuple{typeof(Base.close), Base.TCPSocket})
14051405
precompile(Tuple{typeof(Base.write), Base.TCPSocket, Array{UInt8, 1}})
14061406
precompile(Tuple{typeof(Base.wait_readnb), Base.PipeEndpoint, Int64})
14071407
precompile(Tuple{typeof(Base.eof), Base.PipeEndpoint})
1408-
precompile(Tuple{typeof(Base.Distributed.deserialize_msg), Base.Distributed.ClusterSerializer{Base.TCPSocket}, Type{Base.Distributed.JoinPGRPMsg}})
14091408
precompile(Tuple{typeof(Base.unsafe_read), Base.TCPSocket, Base.RefValue{Int32}, Int64})
14101409
precompile(Tuple{typeof(Base.unsafe_read), Base.TCPSocket, Base.RefValue{Int64}, Int64})
14111410
precompile(Tuple{typeof(Base.read!), Base.TCPSocket, Array{UInt8, 1}})
@@ -1479,7 +1478,6 @@ precompile(Tuple{typeof(Base.notify), Base.Condition, Base.Distributed.ProcessEx
14791478
precompile(Tuple{typeof(Base.pop!), Base.Dict{Int64, Union{Base.Distributed.Worker, Base.Distributed.LocalProcess}}, Int64, Void})
14801479
precompile(Tuple{typeof(Base.Distributed.deregister_worker), Base.Distributed.ProcessGroup, Int64})
14811480
precompile(Tuple{typeof(Base.Distributed.process_hdr), Base.TCPSocket, Bool})
1482-
precompile(Tuple{typeof(Base.Distributed.deserialize_msg), Base.Distributed.ClusterSerializer{Base.TCPSocket}})
14831481
precompile(Tuple{typeof(Base.Distributed.null_id), Base.Distributed.RRID})
14841482
precompile(Tuple{typeof(Base.Distributed.deliver_result), Base.TCPSocket, Symbol, Base.Distributed.RRID, Base.Distributed.RemoteException})
14851483
precompile(Tuple{typeof(Base.Distributed.disable_nagle), Base.TCPSocket})
@@ -1535,7 +1533,6 @@ precompile(Tuple{typeof(Base.Serializer.serialize), Base.Distributed.ClusterSeri
15351533
precompile(Tuple{typeof(Base.unsafe_write), Base.TCPSocket, Base.RefValue{UInt8}, Int64})
15361534
precompile(Tuple{typeof(Base.Serializer.serialize), Base.Distributed.ClusterSerializer{Base.TCPSocket}, Int64})
15371535
precompile(Tuple{typeof(Base.write), Base.TCPSocket, Array{UInt8, 1}})
1538-
precompile(Tuple{typeof(Base.Distributed.deserialize_msg), Base.Distributed.ClusterSerializer{Base.TCPSocket}, Type{Base.Distributed.JoinCompleteMsg}})
15391536
precompile(Tuple{typeof(Base.unsafe_read), Base.TCPSocket, Base.RefValue{Int32}, Int64})
15401537
precompile(Tuple{typeof(Base.unsafe_read), Base.TCPSocket, Base.RefValue{Int64}, Int64})
15411538
precompile(Tuple{typeof(Base.read!), Base.TCPSocket, Array{UInt8, 1}})

base/serialize.jl

+1-1
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ end
157157

158158
function reset_state(s::AbstractSerializer)
159159
s.counter = 0
160-
s.table = ObjectIdDict()
160+
empty!(s.table)
161161
empty!(s.pending_refs)
162162
s
163163
end

base/stream.jl

+9-1
Original file line numberDiff line numberDiff line change
@@ -863,7 +863,15 @@ buffer_writes(s::LibuvStream, bufsize) = (s.sendbuf=PipeBuffer(bufsize); s)
863863

864864
## low-level calls to libuv ##
865865

866-
write(s::LibuvStream, b::UInt8) = write(s, Ref{UInt8}(b))
866+
function write(s::LibuvStream, b::UInt8)
867+
if !isnull(s.sendbuf)
868+
buf = get(s.sendbuf)
869+
if nb_available(buf) + 1 < buf.maxsize
870+
return write(buf, b)
871+
end
872+
end
873+
return write(s, Ref{UInt8}(b))
874+
end
867875

868876
function uv_writecb_task(req::Ptr{Void}, status::Cint)
869877
d = uv_req_data(req)

0 commit comments

Comments
 (0)