Skip to content

Fail to serialize dict_keys argument to read_csv #3893

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
brl0 opened this issue Jun 13, 2020 · 6 comments
Closed

Fail to serialize dict_keys argument to read_csv #3893

brl0 opened this issue Jun 13, 2020 · 6 comments

Comments

@brl0
Copy link

brl0 commented Jun 13, 2020

What happened:
Attempting to read a csv with usecols=columns.keys() while using the distributed scheduler throws an error and hangs.

distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
  File "/opt/conda/envs/default/lib/python3.7/site-packages/distributed/protocol/core.py", line 40, in dumps
    for key, value in data.items()
  File "/opt/conda/envs/default/lib/python3.7/site-packages/distributed/protocol/core.py", line 41, in <dictcomp>
    if type(value) is Serialize
  File "/opt/conda/envs/default/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 245, in serialize
    raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type tuple.', "(<function check_meta at 0x7f4dbc0f1710>, (<function apply at 0x7f4de0e80cb0>, <function pandas_read_text at 0x7f4db9fd70e0>, [<function _make_parser_function.<locals>.parser_f at 0x7f4dbc7c0050>, (<function read_block_from_file at 0x7f4db9fdd7a0>, <OpenFile '/tmp/tmpgou80l9m/31fae6b6-ad97-11ea-b877-fab4a0258d35.csv'>, 0, 64000000, b'\\n'), b'a,b,c\\n', (<class 'dict'>, [['usecols', dict_keys(['a', 'c'])]]), (<class 'dict'>, [['a', dtype('int64')], ['c', dtype('int64')]]), ['a', 'c']], (<class 'dict'>, [['write_header', False], ['enforce', False], ['path', None]])), Empty DataFrame\nColumns: [a, c]\nIndex: [], 'from_delayed')")

What you expected to happen:
Expected read_csv to function as it does without distributed.

Minimal Complete Verifiable Example:

from pathlib import Path
from tempfile import TemporaryDirectory
from uuid import uuid1
import dask.dataframe as dd
from dask.distributed import Client
client = Client()

tmp = TemporaryDirectory()
tmp_path = Path(tmp.name)
tmp_file = tmp_path / f"{uuid1()}.csv"
content = "a,b,c\n1,2,3"
tmp_file.write_text(content)

columns = {"a": "object", "c": "object"}
dd.read_csv(tmp_file, usecols=columns.keys()).compute()

Anything else we need to know?:
The issue seems to be related to using a dict_keys object with usecols. Wrapping in a list works.
This has previously been noted in this comment: #2597 (comment)

I am looking for opportunities to contribute, so I am happy to assist with some guidance.

Environment:

  • Dask version: 2.16, 2.18.1
  • Python version: 3.7.6
  • Operating System: Debian GNU/Linux 10 (buster)
  • Install method (conda, pip, source): conda
@mrocklin
Copy link
Member

Thank you for the excellent bug report and minimal example @brl0 .

I am looking for opportunities to contribute, so I am happy to assist with some guidance.

I'm glad to hear it. I didn't see this at first, so let me first share the steps that I ran.

  1. I ran your reproducer (great job by the way) and saw that it failed
  2. I went back a few versions in history to see if this is new. It wasn't.
  3. I then made an async version of your failure here, which makes it easier to test
# in distributed/tests/test_collections.py
from distributed.utils import tmpfile

@gen_cluster(client=True)
async def test_read_csv(c, s, a, b):
    with tmpfile(extension="csv") as fn:
        with open(fn, "w") as f:
            f.write("a,b,c\n1,2,3")

        columns = {"a": "object", "c": "object"}
        df = dd.read_csv(fn, usecols=list(columns.keys()))
        await df.persist()

Now my next step is probably to put a breakpoint wherever this is happening, probably somewhere in distributed/protocol/serialize.py and see if I can isolate it further to see exactly what the culprit is. As you mention, it probably has to do with serializing the dict_keys object. We might try calling something like the following on that object:

pickle.dump(columns.keys())

and if that doesn't work

cloudpickle.dump(columns.keys())

If those fail then great, let's raise an issue upstream at cloudpickle and then do a short-term fix in read_csv to listify inputs to usecols. I'd be surprised if cloudpickle doesn't work though.

So the next thing to do would be to figure out exactly what combination of things isn't working with serialization.

Ideally you would be able to isolate this further to a call like the following:

from distributed.protocol import serialize

>>> serialze(some_object)
Exception(...)

Then we can strip away all of the dask dataframe and distributed computing setup and focus just on what's causing issues with serialization.

I hope that that plan makes sense. It's also probably wrong. You seem to know what you're doing so please deviate as soon as it seems sensible.

@brl0
Copy link
Author

brl0 commented Jun 13, 2020

Thanks @mrocklin for the fantastic guidance, you rock!

I followed your advice and tested the pickling of dict_keys objects with both pickle and cloudpickle, both of which fail with TypeError: can't pickle dict_keys objects.

After a bit of digging, I believe the issue is that dict_keys is a dynamic view object, which doesn't make sense to pickle.

I think that the best solution here is probably to listify inputs to usecols as you suggested.

Let me know if this doesn't make sense. I will keep digging into this.

@mrocklin
Copy link
Member

OK, that's an interesting result.

After a bit of digging, I believe the issue is that dict_keys is a dynamic view object, which doesn't make sense to pickle.

It might be worth raising an issue on the cloudpickle tracker anyway. The maintainers there have thought more deeply about these kinds of problems I think.

Listifying makes sense short term. What is challenging I think is how to apply a fix like this broadly across kwargs on many different functions. You're running into this in usecols, and so presumably one fix would be to pull usecols out of kwargs and listify it

if "usecols" in kwargs:
    kwargs["usecols"] = list(kwargs["usecols"]

But this has a few problems:

  1. There are other arguments to consider
  2. There are other functions to consider (read_parquet for example)
  3. There might be some arguments that we only want to listify for some types, like dict_keys, but not in others, like pd.Index

We wouldn't want to do this for every possible case, because that would be messy and hard to maintain. In my ideal world this is handled upstream in cloudpickle (where a change would affect everything globally). If that's not possible then maybe we make some general purpose sanitize_for_serialization function or something.

The first step though, I think, is to check in with cloudpickle.

@brl0
Copy link
Author

brl0 commented Jun 30, 2020

@mrocklin, following your suggestion, I submitted a PR to cloudpickle to handle these types of items.
They are looking for feedback from others in deciding whether to accept.
If you are so inclined, your feedback may be helpful in getting the PR accepted.

@brl0
Copy link
Author

brl0 commented Sep 19, 2020

The PR to cloudpickle was accepted, and the example code above now works, closing.

@jrbourbeau
Copy link
Member

Thanks @brl0 for the upstream changes in cloudpickle and following-up here!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants