Skip to content

pickle function call uses kwargs added in python 3.8 #3851

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
mnarodovitch opened this issue Jun 3, 2020 · 20 comments · Fixed by #3639
Closed

pickle function call uses kwargs added in python 3.8 #3851

mnarodovitch opened this issue Jun 3, 2020 · 20 comments · Fixed by #3639

Comments

@mnarodovitch
Copy link

What happened:
With python 3.6.9, my job got stuck at 998/1000 and left the following log message on the worker. At this point the job didn't continue for >10 minutes. I had to stop it manually.

distributed.worker - ERROR - 'buffers' is an invalid keyword argument for this function Traceback (most recent call last): File "/usr/local/lib/python3.6/site-packages/distributed/worker.py", line 2511, in execute data[k] = self.data[k] File "/usr/local/lib/python3.6/site-packages/zict/buffer.py", line 78, in __getitem__ return self.slow_to_fast(key) File "/usr/local/lib/python3.6/site-packages/zict/buffer.py", line 65, in slow_to_fast value = self.slow[key] File "/usr/local/lib/python3.6/site-packages/zict/func.py", line 38, in __getitem__ return self.load(self.d[key]) File "/usr/local/lib/python3.6/site-packages/distributed/protocol/serialize.py", line 502, in deserialize_bytes return deserialize(header, frames) File "/usr/local/lib/python3.6/site-packages/distributed/protocol/serialize.py", line 302, in deserialize return loads(header, frames) File "/usr/local/lib/python3.6/site-packages/distributed/protocol/serialize.py", line 64, in pickle_loads return pickle.loads(x, buffers=buffers) File "/usr/local/lib/python3.6/site-packages/distributed/protocol/pickle.py", line 64, in loads return pickle.loads(x, buffers=buffers) TypeError: 'buffers' is an invalid keyword argument for this function

What you expected to happen:
My job finishes

Minimal Complete Verifiable Example:
On python 3.6, run

>>> from distributed.protocol import pickle
>>> pickle.loads(b"", buffers=(1))
...
TypeError: 'buffers' is an invalid keyword argument for this function

Anything else we need to know?:
There is a backport of pickle5 to 3.6 on https://pypi.org/project/pickle5/

I am trying to work around that problem as follows. Thx @samaust for the remark in #3843 .

import pickle5
from distributed.protocol import pickle
pickle.pickle = pickle5

def pickle_dumps(x):
    header = {"serializer": "pickle"}
    frames = [None]
    buffer_callback = lambda f: frames.append(memoryview(f))
    frames[0] = pickle.dumps(x, buffer_callback=buffer_callback)
    return header, frames

def pickle_loads(header, frames):
    x, buffers = frames[0], frames[1:]
    return pickle.loads(x, buffers=buffers)

from distributed.protocol.serialize import register_serialization_family
register_serialization_family('pickle', pickle_dumps, pickle_loads)

Environment:

  • Python version: 3.6.9
  • Operating System: ubuntu
  • Install method (conda, pip, source): pip
@mrocklin
Copy link
Member

mrocklin commented Jun 3, 2020

With python 3.6.9, my job got stuck at 998/1000 and left the following log message on the worker. At this point the job didn't continue for >10 minutes. I had to stop it manually.

Oof, that sounds frustrating. My apologies.

Minimal Complete Verifiable Example:

Ideally this would be something that you were trying to do with Dask. Is it easy for you to provide us an example that someone like you would try doing normally that would fail? I ask because our normal test suite passes fine on Python 3.6 and somehow failed to trigger this failure. It would be useful to know what was going on and have a test for it.

cc @jakirkham because of the pickle connection.

@mnarodovitch
Copy link
Author

mnarodovitch commented Jun 3, 2020

Unfortunately it is not very easy to show an example. It seems, that my job usually doesn't fill the buffer and executes normally. In my use-case it is rather an edge-case, when dask uses buffers=buffers keyword.

Oof, that sounds frustrating. My apologies.

No worries anymore :) Thx for your empathy.

@mrocklin
Copy link
Member

mrocklin commented Jun 3, 2020 via email

@mnarodovitch
Copy link
Author

mnarodovitch commented Jun 3, 2020

Yes, all have the same versions, as they run from the same docker image: python 3.6.9.final.0. I am quite sure, that the problem is related to the following commit. ddc6377

Ended up with the following workaround. It seems to help

    from distributed.diagnostics.plugin import WorkerPlugin
    import dask
    class Pickle5Hack(WorkerPlugin):
        def setup(self, worker: dask.distributed.Worker):
            import sys
            import pickle5
            sys.modules['pickle'] = pickle5

    client = Client('localhost:8786')
    client.register_worker_plugin(Pickle5Hack())

Thx for the great project and great documentation.

@mrocklin
Copy link
Member

mrocklin commented Jun 3, 2020

I'm glad to see that you were able to find a solution to help work around things.

Let's wait a bit to see if @jakirkham has some thoughts. If memory serves he's responsible for the pickle5 changes and may know more.

@jakirkham
Copy link
Member

Yeah, while I believe there could be an issue, Jim and I haven't been able to find anything and we still haven't identified a reproducer, which makes it hard to do anything helpful here. Would you be able to come up with a reproducer for the behavior @michaelnarodovitch?

@jakirkham
Copy link
Member

Independently we would like to get pickle5 working with this, but it doesn't currently ( #2495 ) as this would depend on cloudpickle having this functionality ( cloudpipe/cloudpickle#370 ), which depends on some structural changes to cloudpickle ( cloudpipe/cloudpickle#368 ) that still need wrapping up.

@mrocklin
Copy link
Member

mrocklin commented Jun 3, 2020

+1 on getting a reproducer if possible

If we're unable to find out what's going on then maybe we revert the change, or apply it only for appropriate versions of Python?

@jakirkham
Copy link
Member

FWIW someone reported an issue with this the other day and it turned out to be some other usage error ( #3843 ). So it may not be this change per se, but instead some other upstream error that is not being handled gracefully.

@mnarodovitch
Copy link
Author

mnarodovitch commented Jun 3, 2020

Mh, I see that this is a tricky one. I was able to reproduce on my cluster. The job forced disk spills with subsequent shuffles, which collect ~4 GB pandas dataframes on 7 GB memory nodes (to make it reproduce pretty fast, I reduced the memory of the nodes). The following stacktrace might provide more insight.

distributed.core - INFO - Event loop was unresponsive in Worker for 4.36s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.protocol.pickle - INFO - Failed to deserialize b'<MY_BIG_BUFFER - pandas_dataframe_holds_4GB_of_data>'
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/distributed/protocol/pickle.py", line 64, in loads
    return pickle.loads(x, buffers=buffers)
TypeError: 'buffers' is an invalid keyword argument for this function
distributed.worker - ERROR - 'buffers' is an invalid keyword argument for this function
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/distributed/worker.py", line 2511, in execute
    data[k] = self.data[k]
  File "/usr/local/lib/python3.6/site-packages/zict/buffer.py", line 78, in __getitem__
    return self.slow_to_fast(key)
  File "/usr/local/lib/python3.6/site-packages/zict/buffer.py", line 65, in slow_to_fast
    value = self.slow[key]
  File "/usr/local/lib/python3.6/site-packages/zict/func.py", line 38, in __getitem__
    return self.load(self.d[key])
  File "/usr/local/lib/python3.6/site-packages/distributed/protocol/serialize.py", line 502, in deserialize_bytes
    return deserialize(header, frames)
  File "/usr/local/lib/python3.6/site-packages/distributed/protocol/serialize.py", line 302, in deserialize
    return loads(header, frames)
  File "/usr/local/lib/python3.6/site-packages/distributed/protocol/serialize.py", line 64, in pickle_loads
    return pickle.loads(x, buffers=buffers)
  File "/usr/local/lib/python3.6/site-packages/distributed/protocol/pickle.py", line 64, in loads
    return pickle.loads(x, buffers=buffers)
TypeError: 'buffers' is an invalid keyword argument for this function

Dask: 2.17.2
Dask Distributed: 2.17.0
Python: 3.6.9

Thank you so much for following up on this.

@jakirkham
Copy link
Member

Can you please share the code? Otherwise I'm afraid we won't be able to reproduce it.

@jakirkham
Copy link
Member

So I've dug into this it a bit and this is what I have come up with. Sharing the reproducer for now. Though I haven't debugged it at all yet. This passes on 2.16.0 and fails on 2.17.0 using Python 3.7.

from distributed.protocol import deserialize_bytes, serialize_bytes

b = 2**27 * b"a"
deserialize_bytes(serialize_bytes(b, serializers=["pickle"]))

@jakirkham
Copy link
Member

Heh on the bright side we already solved this before ( #3639 ). 😄 We were just missing the test, which we now have. Will clean that up.

@jakirkham
Copy link
Member

Heh on the bright side we already solved this before ( #3639 ). 😄 We were just missing the test, which we now have. Will clean that up.

This is ready for testing/review 🙂

@mnarodovitch
Copy link
Author

mnarodovitch commented Jun 4, 2020

Oh great that you found something :) I didn't really understand the failure mode in my use-case, and the code of my use-case was not something to share directly.
Just by looking at the serializer code, it is everything else then obvious.
The reproducer looks promising to me.
PS: My 'fixes' didn't help with 2.17.0. On 2.16.0, the job is executing just fine.

@KrishanBhasin
Copy link
Contributor

KrishanBhasin commented Jun 4, 2020

Upgrading to distributed==2.17.0 has been giving me headaches - I've since found
distributed.worker - ERROR - 'buffers' is an invalid keyword argument for this function
in my logs while my workers' memory use rises slowly to 80% where they pause indefinitely.

I can confirm that when running the branch in #3639, I no longer have this problem!
(Python 3.7, dask==2.17.2, distributed==2.17.0)

@mnarodovitch
Copy link
Author

mnarodovitch commented Jun 4, 2020

Pulled via pip install git+https://github.com/jakirkham/distributed.git@33594d3c22dfa3fac4fdb82f7860da3492dafd49 and reran my (previously failing) usecase with that version. Works flawless now.

@jakirkham
Copy link
Member

Great, thanks for the feedback! 😄

@jakirkham
Copy link
Member

The fix is in distributed version 2.18.0. So please upgrade to get the fix 🙂

@gshimansky
Copy link

Thank you! It works as expected now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants