-
Notifications
You must be signed in to change notification settings - Fork 39
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Deal with deprecated TIMESTAMP sqlite3 converters in Python 3.12 #321
Comments
I looked a bit into using cattrs for this. Using only cattrs is somewhat involved, because:
Arguably, it may be better to use cattrs just for (un)structuring and basic conversions (datetime, tuples, ...), and handle (un)flattening and JSON string conversions separately. For reference, here's code for various approaches of (un)structuring things:from __future__ import annotations
from dataclasses import dataclass, fields
from datetime import datetime
from collections.abc import Sequence
from functools import partial
from collections import ChainMap
import typing
import json
import cattrs
from cattrs import override
from cattrs.gen import make_dict_structure_fn, make_dict_unstructure_fn
@dataclass(frozen=True)
class ExceptionInfo:
type_name: str
value_str: str
@dataclass(frozen=True)
class Feed:
url: str
updated: datetime | None = None
last_exception: ExceptionInfo | None = None
@dataclass(frozen=True)
class Content:
value: str
type: str | None = None
@dataclass(frozen=True)
class Entry:
id: str
updated: datetime | None = None
content: Sequence[Content] = ()
feed: Feed = None
feed_input = {
'url': 'http://example.com/index.xml',
'updated': '2022-01-01 00:00:00',
'last_exception': json.dumps(ExceptionInfo('Error', 'message').__dict__),
}
entry_input = {
'feeds.url': 'http://example.com/index.xml',
'feeds.updated': '2022-01-01 00:00:00',
'feeds.last_exception': json.dumps(ExceptionInfo('Error', 'message').__dict__),
'entries.id': '123',
'entries.updated': '2022-01-01 12:34:56.789000',
'entries.content': json.dumps([Content('value').__dict__]),
}
converter = cattrs.Converter()
converter.register_structure_hook(datetime, lambda v, _: datetime.fromisoformat(v))
converter.register_unstructure_hook(datetime, lambda v: v.isoformat(sep=' '))
"""
feed = converter.structure(feed_input, Feed)
print(feed)
print(converter.unstructure(feed))
print()
def unflatten_inplace(d, prefix, json_keys=frozenset(), nested_keys={}):
keys = [k for k in d if k.startswith(prefix)]
for k in keys:
d[k.removeprefix(prefix)] = d.pop(k)
for k in json_keys:
d[k] = json.loads(d[k])
for k, p in nested_keys.items():
keys = [k for k in d if k.startswith(p)]
d[k] = {k.removeprefix(p): d.pop(k) for k in keys}
unflatten_inplace(entry_input, 'entries.', {'content'}, {'feed': 'feeds.'})
entry = converter.structure(entry_input, Entry)
print(entry)
print(converter.unstructure(entry))
print()
"""
def structure_json(v, cl, wrap=None):
rv = cattrs.structure(json.loads(v), cl)
if wrap:
rv = wrap(rv)
return rv
def unstructure_json(v):
return json.dumps(cattrs.unstructure(v))
converter.register_structure_hook(ExceptionInfo, structure_json)
converter.register_unstructure_hook(ExceptionInfo, unstructure_json)
feed = converter.structure(feed_input, Feed)
print(feed)
print(converter.unstructure(feed))
print()
def make_structure_fn(cl):
overrides = {}
hints = typing.get_type_hints(cl)
for f in fields(cl):
kwargs = {}
if typing.get_origin(hints[f.name]) is Sequence:
overrides[f.name] = override(struct_hook=partial(structure_json, wrap=tuple))
return make_dict_structure_fn(cl, converter, **overrides)
def make_unstructure_fn(cl):
overrides = {}
hints = typing.get_type_hints(cl)
for f in fields(cl):
kwargs = {}
if typing.get_origin(hints[f.name]) is Sequence:
overrides[f.name] = override(unstruct_hook=unstructure_json)
return make_dict_unstructure_fn(cl, converter, **overrides)
converter.register_structure_hook(Entry, make_structure_fn(Entry))
converter.register_unstructure_hook(Entry, make_unstructure_fn(Entry))
class Stripper:
def __init__(self, data, prefix):
self.data = data
self.prefix = prefix
def __getitem__(self, name):
return self.data[self.prefix + name]
def __iter__(self):
return (k.removeprefix(self.prefix) for k in self.data if k.startswith(self.prefix))
entry_input = ChainMap(
{'feed': Stripper(entry_input, 'feeds.')},
Stripper(entry_input, 'entries.'),
)
entry = converter.structure(entry_input, Entry)
print(entry)
print(converter.unstructure(entry))
print() |
The code below contains a few different ways of manipulating the output of a query to a form that's accepted by cattrs structure(); all of them precompute the list of changes ahead of time to some extent, based on the expected type. Conclusions (preliminary):
The code (the good stuff starts around line 70).from __future__ import annotations
from dataclasses import dataclass, fields
from datetime import datetime
from collections.abc import Sequence
from functools import partial, lru_cache
from collections import ChainMap
import typing, types
import json
import cattrs
from cattrs import override
from cattrs.gen import make_dict_structure_fn, make_dict_unstructure_fn
from pprint import pprint
pprint = partial(pprint, sort_dicts=False)
@dataclass(frozen=True)
class ExceptionInfo:
type_name: str
value_str: str
@dataclass(frozen=True)
class Feed:
url: str
updated: typing.Union[datetime, None] = None
last_exception: ExceptionInfo | None = None
@dataclass(frozen=True)
class Content:
value: str
type: str | None = None
@dataclass(frozen=True)
class Entry:
id: str
updated: datetime | None = None
content: Sequence[Content] = ()
feed: Feed = None
a: str = ''
b: str = ''
c: str = ''
d: str = ''
e: str = ''
f: str = ''
feed_input = {
'url': 'http://example.com/index.xml',
'updated': '2022-01-01 00:00:00',
'last_exception': None,
'last_exception': json.dumps(ExceptionInfo('Error', 'message').__dict__),
}
entry_input = {
'feeds.url': 'http://example.com/index.xml',
'feeds.updated': '2022-01-01 00:00:00',
'feeds.last_exception': json.dumps(ExceptionInfo('Error', 'message').__dict__),
'entries.id': '123',
'entries.updated': '2022-01-01 12:34:56.789000',
'entries.content': json.dumps([Content('value').__dict__]),
'entries.a': 'a',
'entries.b': 'a',
'entries.c': 'a',
'entries.d': 'a',
'entries.e': 'a',
'entries.f': 'a',
}
converter = cattrs.Converter()
converter.register_structure_hook(datetime, lambda v, _: datetime.fromisoformat(v))
converter.register_unstructure_hook(datetime, lambda v: v.isoformat(sep=' '))
# stuff that's precomputed can be derived once per type with introspection
#### approach one: precompute as much as possible
def _structure_static(value, cls, *, _composite_keys=(), _rename_keys={}, _nested_keys={}):
m = ChainMap({}, value)
for k in _composite_keys:
v = m[k]
if v is not None:
m[k] = json.loads(v)
for dst, src in _rename_keys.items():
m[dst] = m[src]
for key, renames in _nested_keys.items():
n = m[key] = {}
for dst, src in renames.items():
n[dst] = m[src]
return converter.structure(m, cls)
STATIC_KWARGS = {
Feed: dict(
_composite_keys=['last_exception'],
),
Entry: dict(
_composite_keys=[
'entries.content',
'feeds.last_exception',
],
_rename_keys={
'id': 'entries.id',
'updated': 'entries.updated',
'content': 'entries.content',
} | {k: f'entries.{k}' for k in 'abcdef'},
_nested_keys={'feed': {
'url': 'feeds.url',
'updated': 'feeds.updated',
'last_exception': 'feeds.last_exception',
}},
),
}
def structure_static(value, cls):
return _structure_static(value, cls, **STATIC_KWARGS[cls])
# In [4]: %timeit structure(feed_input, Feed)
# 11.1 µs ± 50 ns per loop (mean ± std. dev. of 7 runs, 100,000 loops each)
# In [3]: %timeit structure(entry_input, Entry)
# 35.5 µs ± 653 ns per loop (mean ± std. dev. of 7 runs, 10,000 loops each)
#### approach two: precompute less, rely on a wrapper to remove prefixes
class Stripper:
def __init__(self, value, prefix='', composite=set()):
self.value = value
self.prefix = prefix
self.composite = composite
def __getitem__(self, name):
rv = self.value[self.prefix + name]
if name in self.composite:
rv = json.loads(rv)
return rv
def __iter__(self):
return (k.removeprefix(self.prefix) for k in self.value if k.startswith(self.prefix))
def _structure_dynamic(value, cls, *, _prefix='', _composite_keys=set(), _nested_keys=()):
m = ChainMap(value)
if _prefix or _composite_keys:
m.maps.insert(0, Stripper(value, _prefix, composite=_composite_keys))
if _nested_keys:
n = {}
m.maps.insert(0, n)
for k, prefix, composite in _nested_keys:
n[k] = Stripper(m, prefix, composite)
return converter.structure(m, cls)
DYNAMIC_KWARGS = {
Feed: dict(
_composite_keys={'last_exception'},
),
Entry: dict(
_prefix='entries.',
_composite_keys={'content'},
_nested_keys=[['feed', 'feeds.', ['last_exception']]],
),
}
def structure_dynamic(value, cls):
return _structure_dynamic(value, cls, **DYNAMIC_KWARGS[cls])
# In [6]: %timeit structure(feed_input, Feed)
# 13 µs ± 123 ns per loop (mean ± std. dev. of 7 runs, 100,000 loops each)
# In [4]: %timeit structure(entry_input, Entry)
# 78 µs ± 260 ns per loop (mean ± std. dev. of 7 runs, 10,000 loops each)
#### approach three: unroll all the loops in generated code
def _codegen(cls, _composite_keys=(), _rename_keys={}, _nested_keys={}):
lines = [f"def fn(m, loads):"]
for k in _composite_keys:
lines.extend([
f" v = m[{k!r}]",
f" if v is not None:",
f" m[{k!r}] = loads(v)",
])
for dst, src in _rename_keys.items():
lines.append(f" m[{dst!r}] = m[{src!r}]")
for key, renames in _nested_keys.items():
lines.append(f" m[{key!r}] = {{")
for dst, src in renames.items():
lines.append(f" {dst!r}: m[{src!r}],")
lines.append(" }")
code = '\n'.join(lines)
ns = {}
exec(code, ns)
fn = ns['fn']
fn.__name__ = f"prepare__{cls.__qualname__.replace('.', '__')}"
return fn
CODEGEN_FUNCS = {cls: _codegen(cls, **kwargs) for cls, kwargs in STATIC_KWARGS.items()}
def structure_codegen(value, cls):
m = ChainMap({}, value)
CODEGEN_FUNCS[cls](m, json.loads)
return converter.structure(m, cls)
# In [1]: %timeit structure(feed_input, Feed)
# 10.3 µs ± 18.1 ns per loop (mean ± std. dev. of 7 runs, 100,000 loops each)
# In [3]: %timeit structure(entry_input, Entry)
# 34 µs ± 620 ns per loop (mean ± std. dev. of 7 runs, 10,000 loops each)
# structure = structure_static
#
# feed = structure(feed_input, Feed)
# pprint(feed)
# entry = structure(entry_input, Entry)
# pprint(entry)
#### baseline: cattrs only
feed_input = dict(feed_input)
CODEGEN_FUNCS[Feed](feed_input, json.loads)
entry_input = dict(entry_input)
CODEGEN_FUNCS[Entry](entry_input, json.loads)
# structure = converter.structure
#
# feed = structure(feed_input, Feed)
# pprint(feed)
# entry = structure(entry_input, Entry)
# pprint(entry)
# In [2]: %timeit structure(feed_input, Feed)
# 3.65 µs ± 42 ns per loop (mean ± std. dev. of 7 runs, 100,000 loops each)
# In [3]: %timeit structure(entry_input, Entry)
# 10.4 µs ± 35.7 ns per loop (mean ± std. dev. of 7 runs, 100,000 loops each)
#### unused/unfinished introspection code
BASIC_TYPES = (str, datetime)
def resolve_optional(hint):
origin = typing.get_origin(hint)
if origin not in (types.UnionType, typing.Union):
return hint
args = [a for a in typing.get_args(hint) if a is not type(None)]
if len(args) != 1:
raise ValueError(f"expected T|None union, got: {hint}")
return args[0]
def get_composite_keys(cls):
rv = []
hints = typing.get_type_hints(cls)
for f in fields(cls):
hint = resolve_optional(hints[f.name])
if issubclass(hint, BASIC_TYPES):
continue
rv.append(f.name)
return frozenset(rv) Update: Complete db -> objects conversion code.from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime
from collections.abc import Sequence
from collections import ChainMap
import typing, types
import json
import cattrs
from pprint import pprint
import typing
import types
import dataclasses
import functools
@dataclass(frozen=True)
class ExceptionInfo:
type_name: str
value_str: str
@dataclass(frozen=True)
class Feed:
url: str
updated: datetime | None = None
last_exception: ExceptionInfo | None = None
@dataclass(frozen=True)
class Content:
value: str
type: str | None = None
@dataclass(frozen=True)
class Entry:
id: str
updated: datetime | None = None
content: Sequence[Content] = ()
feed: Feed = None
feed_input = {
'url': 'http://example.com/index.xml',
'updated': '2022-01-01 00:00:00',
'last_exception': None,
'last_exception': json.dumps(ExceptionInfo('Error', 'message').__dict__),
}
entry_input = {
'feeds.url': 'http://example.com/index.xml',
'feeds.updated': '2022-01-01 00:00:00',
'feeds.last_exception': json.dumps(ExceptionInfo('Error', 'message').__dict__),
'entries.id': '123',
'entries.updated': '2022-01-01 12:34:56.789000',
'entries.content': json.dumps([Content('value').__dict__]),
}
converter = cattrs.Converter()
converter.register_structure_hook(datetime, lambda v, _: datetime.fromisoformat(v))
converter.register_unstructure_hook(datetime, lambda v: v.isoformat(sep=' '))
def resolve_optional(hint):
origin = typing.get_origin(hint)
if origin not in (types.UnionType, typing.Union):
return hint
args = [a for a in typing.get_args(hint) if a is not type(None)]
if len(args) != 1:
raise ValueError(f"expected T|None union, got: {hint}")
return args[0]
BASIC_TYPES = (str, datetime)
def as_(factory):
def decorator(fn):
@functools.wraps(fn)
def wrapper(*args, **kwargs):
return factory(fn(*args, **kwargs))
return wrapper
return decorator
@as_(list)
def composite_keys(cls, prefix='', nested={}):
hints = typing.get_type_hints(cls)
for field in dataclasses.fields(cls):
name = field.name
hint = resolve_optional(hints[name])
if name in nested:
yield from composite_keys(hint, nested[name])
elif not issubclass(hint, BASIC_TYPES):
yield prefix + name
@as_(dict)
def rename_keys(cls, prefix='', nested={}):
if not prefix:
return
for field in dataclasses.fields(cls):
name = field.name
if name in nested:
continue
yield name, prefix + name
@as_(dict)
def nested_keys(cls, nested={}):
fields = {f.name for f in dataclasses.fields(cls)}
hints = typing.get_type_hints(cls)
for name, prefix in nested.items():
if name not in fields:
raise ValueError(f"{name!r} is not a field of {cls}")
hint = resolve_optional(hints[name])
if not (dataclasses.is_dataclass(hint) and isinstance(hint, type)):
raise ValueError(f"{name!r} field must be a dataclass")
yield name, rename_keys(hint, prefix)
def wangjangle_static(m, cls, composite_keys=(), rename_keys={}, nested_keys={}):
for k in composite_keys:
v = m[k]
if v is not None:
m[k] = json.loads(v)
for dst, src in rename_keys.items():
m[dst] = m[src]
for key, renames in nested_keys.items():
n = m[key] = {}
for dst, src in renames.items():
n[dst] = m[src]
@functools.cache
def make_wangjangle(cls, prefix, **nested):
return functools.partial(
wangjangle_static,
cls=cls,
composite_keys=composite_keys(cls, prefix, nested),
rename_keys=rename_keys(cls, prefix, nested),
nested_keys=nested_keys(cls, nested),
)
def structure(value, cls, prefix='', **nested):
m = ChainMap({}, value)
make_wangjangle(cls, prefix, **nested)(m)
return converter.structure(m, cls)
# pprint = functools.partial(pprint, sort_dicts=False)
#
# feed = structure(feed_input, Feed)
# pprint(feed)
# entry = structure(entry_input, Entry, 'entries.', feed='feeds.')
# pprint(entry)
if __name__ == '__main__':
import sys
for _ in range(int(sys.argv[1])):
feed = structure(feed_input, Feed)
entry = structure(entry_input, Entry, 'entries.', feed='feeds.') |
Following some sage advice from @andreivasiliu, I think it's probably best to keep the handmade row–object conversion functions; in his words:
(...and it's not that many conversions anyway, I got carried away with cattrs for a bit there :) On a semi-related note, this is probably a good opportunity to get rid of fix_datetime_tzinfo() and require Storage to return timezone-aware datetimes (UTC only); better now before #325. |
python/cpython#90016 https://docs.python.org/3.12/library/sqlite3.html#default-adapters-and-converters-deprecated
...but, the converter infra is global, so that can't work for reader (as a library, it should not change global stuff).
One solution that might work is to make custom connection/cursor subclasses that take care of the conversion (we'd need to turn detect_types off). For reference, reader uses TIMESTAMP for dates, which is stored as
2016-11-05 00:00:00
or2023-08-25 03:00:42.262181
.Update: ...except doing it in pure-Python is impossible, because detect_types=PARSE_DECLTYPES uses sqlite3_column_decltype, which is not exposed in the Python sqlite3 API.
The text was updated successfully, but these errors were encountered: