Skip to content

Commit 3642e97

Browse files
committed
fix(trace): misc bugs with objs_query when deleting objects
1 parent fea4105 commit 3642e97

File tree

3 files changed

+167
-4
lines changed

3 files changed

+167
-4
lines changed

tests/trace/test_objects_query_builder.py

+15-2
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,11 @@ def test_object_query_builder_sort():
177177
) AS row_num,
178178
if (row_num = 1, 1, 0) AS is_latest
179179
FROM (
180+
--
181+
-- Object versions are uniquely identified by (kind, project_id, object_id, digest).
182+
-- This subquery selects a row to represent each object version. There are multiple rows
183+
-- for each object version if it has been deleted or recreated prior to a table merge.
184+
--
180185
SELECT
181186
project_id,
182187
object_id,
@@ -193,7 +198,15 @@ def test_object_query_builder_sort():
193198
kind,
194199
object_id,
195200
digest
196-
ORDER BY created_at ASC
201+
--
202+
-- Prefer the most recent row. If there is a tie, prefer the row
203+
-- with non-null deleted_at, which represents the deletion event.
204+
--
205+
-- Rows for the same object version may have the same created_at
206+
-- because deletion events inherit the created_at of the last
207+
-- non-deleted row for the object version.
208+
--
209+
ORDER BY created_at DESC, (deleted_at IS NULL) ASC
197210
) AS rn
198211
FROM object_versions"""
199212

@@ -289,7 +302,7 @@ def test_make_objects_val_query_and_parameters():
289302
)
290303

291304
expected_query = """
292-
SELECT object_id, digest, any(val_dump)
305+
SELECT object_id, digest, argMax(val_dump, created_at)
293306
FROM object_versions
294307
WHERE project_id = {project_id: String} AND
295308
object_id IN {object_ids: Array(String)} AND

tests/trace/test_objs_query.py

+137
Original file line numberDiff line numberDiff line change
@@ -185,3 +185,140 @@ def test_objs_query_wb_user_id(client: WeaveClient):
185185
res = client._objects()
186186
assert len(res) == 3
187187
assert all(obj.wb_user_id == correct_id for obj in res)
188+
189+
190+
def test_objs_query_deleted_interaction(client: WeaveClient):
191+
weave.publish({"i": 1}, name="obj_1")
192+
weave.publish({"i": 2}, name="obj_1")
193+
weave.publish({"i": 3}, name="obj_1")
194+
195+
res = client.server.objs_query(
196+
tsi.ObjQueryReq(
197+
project_id=client._project_id(),
198+
filter=tsi.ObjectVersionFilter(latest_only=False),
199+
)
200+
)
201+
assert len(res.objs) == 3
202+
assert all(obj.val["i"] in [1, 2, 3] for obj in res.objs)
203+
204+
res = client.server.obj_delete(
205+
tsi.ObjDeleteReq(
206+
project_id=client._project_id(),
207+
object_id="obj_1",
208+
digests=[res.objs[0].digest],
209+
)
210+
)
211+
212+
assert res.num_deleted == 1
213+
214+
res = client.server.objs_query(
215+
tsi.ObjQueryReq(
216+
project_id=client._project_id(),
217+
filter=tsi.ObjectVersionFilter(latest_only=False),
218+
)
219+
)
220+
assert len(res.objs) == 2
221+
assert all(obj.val["i"] in [2, 3] for obj in res.objs)
222+
223+
# Delete the remaining objects
224+
res = client.server.obj_delete(
225+
tsi.ObjDeleteReq(
226+
project_id=client._project_id(),
227+
object_id="obj_1",
228+
digests=[res.objs[0].digest, res.objs[1].digest],
229+
)
230+
)
231+
assert res.num_deleted == 2
232+
233+
res = client.server.objs_query(
234+
tsi.ObjQueryReq(
235+
project_id=client._project_id(),
236+
filter=tsi.ObjectVersionFilter(latest_only=False),
237+
)
238+
)
239+
assert len(res.objs) == 0
240+
241+
242+
def test_objs_query_delete_and_recreate(client: WeaveClient):
243+
weave.publish({"i": 1}, name="obj_1")
244+
weave.publish({"i": 2}, name="obj_1")
245+
weave.publish({"i": 3}, name="obj_1")
246+
247+
res = client.server.objs_query(
248+
tsi.ObjQueryReq(
249+
project_id=client._project_id(),
250+
filter=tsi.ObjectVersionFilter(latest_only=False),
251+
)
252+
)
253+
assert len(res.objs) == 3
254+
255+
original_created_at = res.objs[0].created_at
256+
257+
res = client.server.obj_delete(
258+
tsi.ObjDeleteReq(
259+
project_id=client._project_id(),
260+
object_id="obj_1",
261+
)
262+
)
263+
assert res.num_deleted == 3
264+
265+
weave.publish({"i": 1}, name="obj_1")
266+
267+
res = client.server.objs_query(
268+
tsi.ObjQueryReq(
269+
project_id=client._project_id(),
270+
filter=tsi.ObjectVersionFilter(latest_only=False),
271+
)
272+
)
273+
assert len(res.objs) == 1
274+
assert res.objs[0].val["i"] == 1
275+
assert res.objs[0].created_at > original_created_at
276+
277+
weave.publish({"i": 2}, name="obj_1")
278+
weave.publish({"i": 3}, name="obj_1")
279+
280+
res = client.server.objs_query(
281+
tsi.ObjQueryReq(
282+
project_id=client._project_id(),
283+
filter=tsi.ObjectVersionFilter(latest_only=False),
284+
)
285+
)
286+
assert len(res.objs) == 3
287+
288+
for i in range(3):
289+
print("res.objs[i].val", res.objs[i].val)
290+
assert res.objs[i].val["i"] == i + 1
291+
292+
293+
def test_objs_query_delete_and_add_new_versions(client: WeaveClient):
294+
weave.publish({"i": 1}, name="obj_1")
295+
weave.publish({"i": 2}, name="obj_1")
296+
weave.publish({"i": 3}, name="obj_1")
297+
298+
res = client.server.objs_query(
299+
tsi.ObjQueryReq(
300+
project_id=client._project_id(),
301+
filter=tsi.ObjectVersionFilter(latest_only=False),
302+
)
303+
)
304+
assert len(res.objs) == 3
305+
306+
res = client.server.obj_delete(
307+
tsi.ObjDeleteReq(
308+
project_id=client._project_id(),
309+
object_id="obj_1",
310+
)
311+
)
312+
313+
weave.publish({"i": 4}, name="obj_1")
314+
weave.publish({"i": 5}, name="obj_1")
315+
weave.publish({"i": 6}, name="obj_1")
316+
317+
res = client.server.objs_query(
318+
tsi.ObjQueryReq(
319+
project_id=client._project_id(),
320+
filter=tsi.ObjectVersionFilter(latest_only=False),
321+
)
322+
)
323+
assert len(res.objs) == 3
324+
assert all(obj.val["i"] in [4, 5, 6] for obj in res.objs)

weave/trace_server/objects_query_builder.py

+15-2
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,11 @@ def make_metadata_query(self) -> str:
273273
) AS row_num,
274274
if (row_num = 1, 1, 0) AS is_latest
275275
FROM (
276+
--
277+
-- Object versions are uniquely identified by (kind, project_id, object_id, digest).
278+
-- This subquery selects a row to represent each object version. There are multiple rows
279+
-- for each object version if it has been deleted or recreated prior to a table merge.
280+
--
276281
SELECT
277282
project_id,
278283
object_id,
@@ -289,7 +294,15 @@ def make_metadata_query(self) -> str:
289294
kind,
290295
object_id,
291296
digest
292-
ORDER BY created_at ASC
297+
--
298+
-- Prefer the most recent row. If there is a tie, prefer the row
299+
-- with non-null deleted_at, which represents the deletion event.
300+
--
301+
-- Rows for the same object version may have the same created_at
302+
-- because deletion events inherit the created_at of the last
303+
-- non-deleted row for the object version.
304+
--
305+
ORDER BY created_at DESC, (deleted_at IS NULL) ASC
293306
) AS rn
294307
FROM object_versions
295308
WHERE project_id = {{project_id: String}}{self.object_id_conditions_part}
@@ -311,7 +324,7 @@ def make_objects_val_query_and_parameters(
311324
project_id: str, object_ids: list[str], digests: list[str]
312325
) -> tuple[str, dict[str, Any]]:
313326
query = """
314-
SELECT object_id, digest, any(val_dump)
327+
SELECT object_id, digest, argMax(val_dump, created_at)
315328
FROM object_versions
316329
WHERE project_id = {project_id: String} AND
317330
object_id IN {object_ids: Array(String)} AND

0 commit comments

Comments
 (0)