Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add mutable tensor support for local cluster #464

Merged
merged 13 commits into from
Jul 5, 2019

Conversation

sighingnow
Copy link
Contributor

@sighingnow sighingnow commented Jun 17, 2019

What do these changes do?

Support mutable tensor.

  • A MutableTensorActor on scheduler to manager mutable tensors and perform the underlying operations.

  • A MutableTensor and MutableTensorData to manage chunks of mutable tensors.

  • allocate/read/write interface in ResultSenderActor (on workers).

  • Chunk allocation:

    • The chunks are stored in plasma store (currently pinned in memory for convenience) before seal.
    • No compression is performed on mutable tensor thunks, and read/write operation is implemented by reading/writing a np.ndarray constructed by the memory view directly (zero copy and no read/write overhead).

TODO

  • Finish write_mutable_tensor.
  • Construct Tensor (and TensorData) using those thunks directly when seal.
  • Do the read/write dispatch logic on client (now it runs on scheduler)

Related issue number

#415
Resolves #439
Resolves #450.

@sighingnow
Copy link
Contributor Author

I would like to share my current progress on seal with you.

My current implementation is allocating chunks directly on workers for mutable tensor. When seal, I create a Tensor from those thunks and return to the client. I use Fetch as the op of thunks of the generated tensor.

Now the trouble is, I cannot find a clean way to make SessionActor know that the tensor generated from mutable tensor has already been evaluated (has thunks).

  • If a tensor has been executed, there would be a reference in SessionActor._tileable_to_graph, thu s I need to build a GraphActor ref for the created tensor.
  • In GraphActor, for Fetch op, tile_fetch_tileable is called (in prepare_graph), that means there is already a GraphActor reference in the SessionActor._tileable_to_graph.
  • If I mark the GraphActor's state as SUCCESSED and don't run prepare_graph, the _get_tileable_by_key in GraphActor would fail when evaluating other tensors that depends on the generated tensor.
  • To make _get_tileable_by_key success, I have tried to set the tiled tensor (has chunks) to _tileable_key_to_opid and _tileable_key_opid_to_tiled, but it is uneasy to serialize/deserialize chunks between workers.

Another approach is creating GraphActor when creating the mutable tensor, and do prepare_graph then we could have those chunks, and the GraphActor also has those tensors. But that means we need to do mutable tensor's operations (read/write/seal) in GraphActor.

@sighingnow
Copy link
Contributor Author

Or, should I make MutableTensorActor Inherit from GraphActor to share those data structure?

@sighingnow
Copy link
Contributor Author

Now MutableTensorActor is a subclass of GraphActor, and seal could works!

We now have a MutableTensor as follows:

>>> import numpy as np
>>> from mars.deploy.local import new_cluster
>>> from mars.session import new_session
>>> cluster = new_cluster()
>>> session = new_session(cluster.endpoint)
>>> session.as_default()

>>> mtensor = session.create_mutable_tensor("test", (4, 4), chunk_size=3)
>>> mtensor[:]
array([[0, 0, 0, 0],
       [0, 0, 0, 0],
       [0, 0, 0, 0],
       [0, 0, 0, 0]], dtype=int8)
>>> mtensor[1:3] = np.arange(8).reshape(2, 4)
>>> mtensor[3:4]
array([[0, 0, 0, 0]], dtype=int8)
>>> mtensor[2:4]
array([[4, 5, 6, 7],
       [0, 0, 0, 0]], dtype=int8)
>>> x = mtensor.seal()
>>>
>>> session.run(x + 1)
array([[1, 1, 1, 1],
       [1, 2, 3, 4],
       [5, 6, 7, 8],
       [1, 1, 1, 1]], dtype=int8)
>>>

@sighingnow sighingnow changed the title [WIP] Mutable tensor. Mutable tensor. Jun 19, 2019
@qinxuye
Copy link
Collaborator

qinxuye commented Jun 19, 2019

Excellent work! I will take some time to review the implementation.

@wjsi
Copy link
Member

wjsi commented Jun 19, 2019

@sighingnow Please rebase your fork onto mars/master and update your code as we renamed all default_name into default_uid in #466 .

@sighingnow sighingnow force-pushed the mt-wip branch 2 times, most recently from 4309a8f to ea63136 Compare June 20, 2019 02:51
Copy link
Collaborator

@qinxuye qinxuye left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, every single write will retrieve data first, modify it, then write back to plasma. This is an option without doubt, but I think this method have some flaws listed blow:

  1. It's not safe, if many clients try to modify a same chunk, no transaction guaranteed.
  2. The data would be preferred to be pinned in the plasma store always, it there are other computation, there would be no room for it.
  3. I doubt the performance would not be great especially for write when many small modifies happen.

I prefer that for each write just store the effected index, timestamp and data into chunk store, if read or seal come just get all the historical write, and assemble them into one chunk, and write back if it's seal.

I think in this way will have better write performance.

@@ -68,6 +70,30 @@ def _update_tileable_shape(self, tileable):
tileable._update_shape(tuple(sum(nsplit) for nsplit in new_nsplits))
tileable.nsplits = new_nsplits

def create_mutable_tensor(self, name, shape, dtype, *args, **kwargs):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can still let user create a mutable tensor via mt.mutable_tensor, because sometimes session is not always required during processing data.

@sighingnow
Copy link
Contributor Author

Thanks for the comprehensive review. I would like to explain some implementation decisions based on the comments. The expected design is just storing the affected index, timestamp, and data separately and merge into a whole tensor when seal. But the concern is:

  • First, how to record the affected index and value? The __setitem__ of numpy could be really "fancy".
    • If we record them in a sparse manner (something like (idx_x, idx_y, idx_z): value) then the performance cannot be good when writing in a chunk (or continuous) fashion, and a write with broadcast will generate a large volume of such index-value record.
    • If we just record the initial index and value (something like [slice(1, 2, None)], value), then the merge operation is just replay all users operation on the same worker to combine all these operations. The performance won't be good, too.
  • Second, how to do the merge? One of our assumptions is that, the data is so large that cannot be held in only one machine, then if the write is not sparse, it is not trivial to do a merge when sealing it.

Now back to the current design. The basic insight is that the mutable tensor won't be kept in memory for a long time since the mutable tensor is only used to create a tensor by a series of write operation, IMO. Then,

It's not safe, if many clients try to modify a same chunk, no transaction guaranteed.

Based on the assumption that the mutable tensor is only used to create tensor, the write operations issued by the client won't overlap, or the overlapping doesn't affect the result. If we use the design that just recording the affected index and value, I think we couldn't give such transaction guarantee as well.

The data would be preferred to be pinned in the plasma store always, it there are other computation, there would be no room for it.

If the mutable tensor won't exist in memory for a long time, keep it in memory could provide the best wirte performance. When it is sealed, the chunks would be unpinned.

I doubt the performance would not be great especially for write when many small modifies happen.

I think the write performance should be even better compared to the design that just recording the write operation and then do a merge. Because,

  • The memory is pre-allocated.
  • When there are many small write, we also need to decide which worker to be used to write the record as well, just like what we do in this PR to route the write operation. (The route logic can be moved to client side, though).
  • In this PR, the write on plasma store is just writing on a view of raw memory and without any extra copy. The chunk is pinned in the memory and is not serialized in plasma store, guaranteeing the best write performance that can be achieved (IMHO).

@sighingnow
Copy link
Contributor Author

A previous bug has been fixed now and local test shows that now write is ok for such cases:

mtensor = session.create_mutable_tensor("test", (4, 4), chunk_size=3)

# write slice cross chunks
mtensor[2:4] = np.arange(8).reshape(2, 4) 

# write with broadcast
mtensor[1] = np.arange(4).reshape(4)

# write scalar value (cross chunks)
mtensor[1:4, 2] = 8

x = mtensor.seal()
print(session.fetch(x))

Copy link
Collaborator

@qinxuye qinxuye left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a few suggestions.

  1. Some functions could be util functions, for instance, the function that given a slice or integer to calculate the index array and value array. Some tests could be added to ensure the correctness.
  2. GraphActor looks like sort of final class, so does MutableTensor, so when seal triggered, is it possible to create a GraphActor with a tensor that owns FetchTensor op?
  3. A few functions that added to SenderActor seem not suitable, maybe ReceiveActor can be leveraged to receive data. When seal triggered, some actor can do the job to merge historical arrays into one, and then put back into plasma or just into disk if no room can be cleared out.
  4. Tests should be added.

@sighingnow sighingnow force-pushed the mt-wip branch 3 times, most recently from f6b567e to 968a40c Compare July 3, 2019 11:29
@sighingnow
Copy link
Contributor Author

CI is green.

@qinxuye
Copy link
Collaborator

qinxuye commented Jul 3, 2019

CI is green.

Great, I will review soon later.

Copy link
Collaborator

@qinxuye qinxuye left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks pretty good to me, almost mergeable, but still have two issues to solve.

  1. Remove inheritance from GraphActor for MutableTensorActor.
  2. As ResultSenderActor could be removed recently, the logic based on it should be refined. We can see if we could wait for PR to remove ResultSenderActor.

Besides, there are small number of lines that not covered which can tell on coverall. Could add some ut to cover them as much as you can.

Overall really an excited PR for me.

Copy link
Collaborator

@qinxuye qinxuye left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well done, LGTM. We could pass the ci by constraining pyarrow version as a workaround. I think by then we can merge this PR.

@qinxuye qinxuye changed the title Mutable tensor. Add mutable tensor support for local cluster Jul 4, 2019
@qinxuye qinxuye added this to the v0.2.0b2 milestone Jul 4, 2019
@qinxuye
Copy link
Collaborator

qinxuye commented Jul 5, 2019

Please rebase master branch to pass travis ci.

@sighingnow
Copy link
Contributor Author

Could someone help me relaunch the failed py35 job on appveyor? The job under my personal account is ok for the latest commit.

Copy link
Collaborator

@qinxuye qinxuye left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@qinxuye qinxuye merged commit ba7ba05 into mars-project:master Jul 5, 2019
@sighingnow sighingnow deleted the mt-wip branch July 5, 2019 12:02
wdkwyf pushed a commit to wdkwyf/mars that referenced this pull request Jul 5, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants