Skip to content

Commit ba7ba05

Browse files
sighingnowXuye (Chris) Qin
authored and
Xuye (Chris) Qin
committed
Add mutable tensor support for local cluster (#464)
1 parent f0dd061 commit ba7ba05

File tree

13 files changed

+834
-10
lines changed

13 files changed

+834
-10
lines changed

mars/api.py

+72
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,20 @@
1313
# limitations under the License.
1414

1515
import logging
16+
import uuid
17+
import zlib
18+
import pyarrow
1619

1720
from .actors import new_client
21+
from .config import options
1822
from .errors import GraphNotExists
1923
from .scheduler import SessionActor, GraphActor, GraphMetaActor, ResourceActor, \
2024
SessionManagerActor, ChunkMetaClient
2125
from .scheduler.graph import ResultReceiverActor
2226
from .scheduler.node_info import NodeInfoActor
2327
from .scheduler.utils import SchedulerClusterInfoActor
2428
from .serialize import dataserializer
29+
from .utils import tokenize
2530

2631
logger = logging.getLogger(__name__)
2732

@@ -51,6 +56,13 @@ def get_schedulers_info(self):
5156
infos[scheduler] = info_ref.get_info()
5257
return infos
5358

59+
def _get_receiver_ref(self, chunk_key):
60+
from .worker.dispatcher import DispatchActor
61+
ep = self.cluster_info.get_scheduler(chunk_key)
62+
dispatch_ref = self.actor_client.actor_ref(DispatchActor.default_uid(), address=ep)
63+
uid = dispatch_ref.get_hash_slot('receiver', chunk_key)
64+
return self.actor_client.actor_ref(uid, address=ep)
65+
5466
def count_workers(self):
5567
try:
5668
uid = ResourceActor.default_uid()
@@ -71,6 +83,66 @@ def submit_graph(self, session_id, serialized_graph, graph_key, target,
7183
session_ref.submit_tileable_graph(
7284
serialized_graph, graph_key, target, compose=compose, _tell=not wait)
7385

86+
def create_mutable_tensor(self, session_id, name, shape, dtype, *args, **kwargs):
87+
session_uid = SessionActor.gen_uid(session_id)
88+
session_ref = self.get_actor_ref(session_uid)
89+
return session_ref.create_mutable_tensor(name, shape, dtype, *args, **kwargs)
90+
91+
def get_mutable_tensor(self, session_id, name):
92+
session_uid = SessionActor.gen_uid(session_id)
93+
session_ref = self.get_actor_ref(session_uid)
94+
return session_ref.get_mutable_tensor(name)
95+
96+
def send_chunk_records(self, session_id, name, chunk_records_to_send, directly=True):
97+
from .worker.dataio import ArrowBufferIO
98+
from .worker.quota import MemQuotaActor
99+
session_uid = SessionActor.gen_uid(session_id)
100+
session_ref = self.get_actor_ref(session_uid)
101+
102+
chunk_records = []
103+
for chunk_key, records in chunk_records_to_send.items():
104+
record_chunk_key = tokenize(chunk_key, uuid.uuid4().hex)
105+
ep = self.cluster_info.get_scheduler(chunk_key)
106+
# register quota
107+
quota_ref = self.actor_client.actor_ref(MemQuotaActor.default_uid(), address=ep)
108+
quota_ref.request_batch_quota({record_chunk_key: records.nbytes})
109+
# send record chunk
110+
buf = pyarrow.serialize(records).to_buffer()
111+
receiver_ref = self._get_receiver_ref(chunk_key)
112+
receiver_ref.create_data_writer(session_id, record_chunk_key, buf.size, None,
113+
ensure_cached=False, use_promise=False)
114+
115+
block_size = options.worker.transfer_block_size
116+
117+
try:
118+
reader = ArrowBufferIO(buf, 'r', block_size=block_size)
119+
checksum = 0
120+
while True:
121+
next_chunk = reader.read(block_size)
122+
if not next_chunk:
123+
reader.close()
124+
receiver_ref.finish_receive(session_id, record_chunk_key, checksum)
125+
break
126+
checksum = zlib.crc32(next_chunk, checksum)
127+
receiver_ref.receive_data_part(session_id, record_chunk_key, next_chunk, checksum)
128+
except:
129+
receiver_ref.cancel_receive(session_id, chunk_key)
130+
raise
131+
finally:
132+
if reader:
133+
reader.close()
134+
del reader
135+
136+
chunk_records.append((chunk_key, record_chunk_key))
137+
138+
# register the record chunk to MutableTensorActor
139+
session_ref.append_chunk_records(name, chunk_records)
140+
141+
def seal(self, session_id, name):
142+
session_uid = SessionActor.gen_uid(session_id)
143+
session_ref = self.get_actor_ref(session_uid)
144+
return session_ref.seal(name)
145+
74146
def delete_graph(self, session_id, graph_key):
75147
graph_uid = GraphActor.gen_uid(session_id, graph_key)
76148
graph_ref = self.get_actor_ref(graph_uid)

mars/deploy/local/session.py

+15
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,21 @@ def _update_tileable_shape(self, tileable):
6868
tileable._update_shape(tuple(sum(nsplit) for nsplit in new_nsplits))
6969
tileable.nsplits = new_nsplits
7070

71+
def create_mutable_tensor(self, name, shape, dtype, *args, **kwargs):
72+
return self._api.create_mutable_tensor(self._session_id, name, shape,
73+
dtype, *args, **kwargs)
74+
75+
def get_mutable_tensor(self, name):
76+
return self._api.get_mutable_tensor(self._session_id, name)
77+
78+
def send_chunk_records(self, name, chunk_records_to_send):
79+
return self._api.send_chunk_records(self._session_id, name, chunk_records_to_send)
80+
81+
def seal(self, name):
82+
graph_key, tensor_key, tensor_id, tensor_meta = self._api.seal(self._session_id, name)
83+
self._executed_tileables[tensor_key] = graph_key, {tensor_id}
84+
return tensor_meta
85+
7186
def run(self, *tileables, **kw):
7287
timeout = kw.pop('timeout', -1)
7388
fetch = kw.pop('fetch', True)

mars/scheduler/graph.py

+8
Original file line numberDiff line numberDiff line change
@@ -941,6 +941,14 @@ def fetch_tileable_result(self, tileable_key):
941941
concat_result = executor.execute_tileable(tileable, concat=True)[0]
942942
return dataserializer.dumps(concat_result)
943943

944+
@log_unhandled
945+
def add_fetch_tileable(self, tileable_key, tileable_id, shape, dtype, chunk_size, chunk_keys):
946+
from ..tensor.expressions.utils import create_fetch_tensor
947+
tensor = create_fetch_tensor(chunk_size, shape, dtype,
948+
tileable_key, tileable_id, chunk_keys)
949+
self._tileable_key_to_opid[tileable_key] = tensor.op.id
950+
self._tileable_key_opid_to_tiled[(tileable_key, tensor.op.id)].append(tensor)
951+
944952
@log_unhandled
945953
def check_operand_can_be_freed(self, succ_op_keys):
946954
"""

mars/scheduler/mutable.py

+85
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
# Copyright 1999-2018 Alibaba Group Holding Ltd.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from collections import defaultdict
16+
17+
import numpy as np
18+
19+
from ..utils import log_unhandled
20+
from .utils import SchedulerActor
21+
22+
23+
class MutableTensorActor(SchedulerActor):
24+
"""
25+
Actor handling processing of a Mars mutable tensor.
26+
"""
27+
@staticmethod
28+
def gen_uid(session_id, name):
29+
return 's:0:mutable-tensor$%s$%s' % (session_id, name)
30+
31+
def __init__(self, session_id, name, shape, dtype, graph_key, chunk_size=None, *args, **kwargs):
32+
super(MutableTensorActor, self).__init__(*args, **kwargs)
33+
self._session_id = session_id
34+
self._name = name
35+
self._shape = shape
36+
if isinstance(dtype, np.dtype):
37+
self._dtype = dtype
38+
else:
39+
self._dtype = np.dtype(dtype)
40+
self._graph_key = graph_key
41+
self._chunk_size = chunk_size
42+
self._tensor = None
43+
self._sealed = False
44+
self._chunk_map = defaultdict(lambda: [])
45+
self._record_type = np.dtype([("index", np.uint32), ("ts", np.dtype('datetime64[ns]')), ("value", self._dtype)])
46+
47+
@log_unhandled
48+
def post_create(self):
49+
from ..tensor.expressions.utils import create_fetch_tensor
50+
51+
super(MutableTensorActor, self).post_create()
52+
self.set_cluster_info_ref()
53+
self._tensor = create_fetch_tensor(self._chunk_size, self._shape, self._dtype)
54+
55+
def tensor_meta(self):
56+
return self._shape, self._dtype, self._chunk_size, [c.key for c in self._tensor.chunks]
57+
58+
def tensor_key(self):
59+
return self._tensor.key
60+
61+
def sealed(self):
62+
return self._sealed
63+
64+
@log_unhandled
65+
def read(self, tensor_index):
66+
raise NotImplementedError
67+
68+
@log_unhandled
69+
def append_chunk_records(self, chunk_records):
70+
for chunk_key, record_chunk_key in chunk_records:
71+
self._chunk_map[chunk_key].append(record_chunk_key)
72+
73+
@log_unhandled
74+
def seal(self):
75+
from ..worker.seal import SealActor
76+
self._sealed = True
77+
for chunk in self._tensor.chunks:
78+
ep = self.get_scheduler(chunk.key)
79+
sealer_uid = SealActor.gen_uid(self._session_id, chunk.key)
80+
sealer_ref = self.ctx.create_actor(SealActor, uid=sealer_uid, address=ep)
81+
82+
sealer_ref.seal_chunk(self._session_id, self._graph_key,
83+
chunk.key, self._chunk_map[chunk.key],
84+
chunk.shape, self._record_type, self._dtype)
85+
return self._graph_key, self._tensor.key, self._tensor.id, self.tensor_meta()

mars/scheduler/session.py

+62
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
import logging
1616
import os
17+
import uuid
1718

1819
from .utils import SchedulerActor
1920
from ..utils import log_unhandled
@@ -33,6 +34,7 @@ def __init__(self, session_id, **kwargs):
3334
self._graph_refs = dict()
3435
self._graph_meta_refs = dict()
3536
self._tileable_to_graph = dict()
37+
self._mut_tensor_refs = dict()
3638

3739
@staticmethod
3840
def gen_uid(session_id):
@@ -87,6 +89,66 @@ def submit_tileable_graph(self, serialized_graph, graph_key, target_tileables=No
8789
self._tileable_to_graph[tileable_key] = graph_ref
8890
return graph_ref
8991

92+
@log_unhandled
93+
def create_mutable_tensor(self, name, shape, dtype, *args, **kwargs):
94+
from .mutable import MutableTensorActor
95+
if name in self._mut_tensor_refs:
96+
raise ValueError("The mutable tensor named '%s' already exists." % name)
97+
graph_key = uuid.uuid4()
98+
mut_tensor_uid = MutableTensorActor.gen_uid(self._session_id, name)
99+
mut_tensor_addr = self.get_scheduler(mut_tensor_uid)
100+
mut_tensor_ref = self.ctx.create_actor(MutableTensorActor, self._session_id, name,
101+
shape, dtype, graph_key, uid=mut_tensor_uid,
102+
address=mut_tensor_addr, *args, **kwargs)
103+
self._mut_tensor_refs[name] = mut_tensor_ref
104+
return mut_tensor_ref.tensor_meta()
105+
106+
@log_unhandled
107+
def get_mutable_tensor(self, name):
108+
tensor_ref = self._mut_tensor_refs.get(name)
109+
if tensor_ref is None or tensor_ref.sealed():
110+
raise ValueError("The mutable tensor named '%s' doesn't exist, or has already been sealed." % name)
111+
return tensor_ref.tensor_meta()
112+
113+
@log_unhandled
114+
def append_chunk_records(self, name, chunk_records):
115+
tensor_ref = self._mut_tensor_refs.get(name)
116+
if tensor_ref is None or tensor_ref.sealed():
117+
raise ValueError("The mutable tensor named '%s' doesn't exist, or has already been sealed." % name)
118+
return tensor_ref.append_chunk_records(chunk_records)
119+
120+
@log_unhandled
121+
def seal(self, name):
122+
from .graph import GraphActor, GraphMetaActor
123+
from .utils import GraphState
124+
125+
tensor_ref = self._mut_tensor_refs.get(name)
126+
if tensor_ref is None or tensor_ref.sealed():
127+
raise ValueError("The mutable tensor named '%s' doesn't exist, or has already been sealed." % name)
128+
129+
graph_key, tensor_key, tensor_id, tensor_meta = tensor_ref.seal()
130+
shape, dtype, chunk_size, chunk_keys = tensor_meta
131+
132+
# Create a GraphActor
133+
graph_uid = GraphActor.gen_uid(self._session_id, graph_key)
134+
graph_addr = self.get_scheduler(graph_uid)
135+
136+
graph_ref = self.ctx.create_actor(GraphActor, self._session_id, graph_key,
137+
serialized_tileable_graph=None,
138+
state=GraphState.SUCCEEDED, final_state=GraphState.SUCCEEDED,
139+
uid=graph_uid, address=graph_addr)
140+
self._graph_refs[graph_key] = graph_ref
141+
self._graph_meta_refs[graph_key] = self.ctx.actor_ref(
142+
GraphMetaActor.gen_uid(self._session_id, graph_key), address=tensor_ref.__getstate__()[0])
143+
144+
# Add the tensor to the GraphActor
145+
graph_ref.add_fetch_tileable(tensor_key, tensor_id, shape, dtype, chunk_size, chunk_keys)
146+
self._tileable_to_graph[tensor_key] = graph_ref
147+
148+
# Clean up mutable tensor refs.
149+
self._mut_tensor_refs.pop(name)
150+
return graph_key, tensor_key, tensor_id, tensor_meta
151+
90152
def graph_state(self, graph_key):
91153
return self._graph_refs[graph_key].get_state()
92154

mars/session.py

+39
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,45 @@ def default_or_local(cls):
190190
cls._default_session = Session()
191191
return cls._default_session
192192

193+
def create_mutable_tensor(self, name, shape, dtype, *args, **kwargs):
194+
from .tensor.core import MutableTensor, MutableTensorData
195+
from .tensor.expressions.utils import create_fetch_tensor
196+
self._ensure_local_cluster()
197+
shape, dtype, chunk_size, chunk_keys = \
198+
self._sess.create_mutable_tensor(name, shape, dtype, *args, **kwargs)
199+
# Construct MutableTensor on the fly.
200+
tensor = create_fetch_tensor(chunk_size, shape, dtype, chunk_keys=chunk_keys)
201+
return MutableTensor(data=MutableTensorData(_name=name, _op=None, _shape=shape, _dtype=dtype,
202+
_nsplits=tensor.nsplits, _chunks=tensor.chunks))
203+
204+
def get_mutable_tensor(self, name):
205+
from .tensor.core import MutableTensor, MutableTensorData
206+
from .tensor.expressions.utils import create_fetch_tensor
207+
self._ensure_local_cluster()
208+
shape, dtype, chunk_size, chunk_keys = self._sess.get_mutable_tensor(name)
209+
# Construct MutableTensor on the fly.
210+
tensor = create_fetch_tensor(chunk_size, shape, dtype, chunk_keys=chunk_keys)
211+
return MutableTensor(data=MutableTensorData(_name=name, _op=None, _shape=shape, _dtype=dtype,
212+
_nsplits=tensor.nsplits, _chunks=tensor.chunks))
213+
214+
def write_mutable_tensor(self, tensor, index, value):
215+
self._ensure_local_cluster()
216+
chunk_records_to_send = tensor._do_write(index, value)
217+
return self._sess.send_chunk_records(tensor.name, chunk_records_to_send)
218+
219+
def seal(self, tensor):
220+
from .tensor.expressions.utils import create_fetch_tensor
221+
self._ensure_local_cluster()
222+
chunk_records_to_send = tensor._do_flush()
223+
self._sess.send_chunk_records(tensor.name, chunk_records_to_send)
224+
shape, dtype, chunk_size, chunk_keys = self._sess.seal(tensor.name)
225+
# Construct Tensor on the fly.
226+
return create_fetch_tensor(chunk_size, shape, dtype, chunk_keys=chunk_keys)
227+
228+
def _ensure_local_cluster(self):
229+
from .deploy.local.session import LocalClusterSession
230+
if not isinstance(self._sess, LocalClusterSession):
231+
raise RuntimeError("Only local cluster session can be used to manipulate mutable tensors.")
193232

194233
def new_session(scheduler=None, **kwargs):
195234
return Session(scheduler, **kwargs)

0 commit comments

Comments
 (0)