Skip to content

Commit 463883b

Browse files
authored
Support tags for freshness checks sensor factory (#28131)
## Summary & Motivation Interface for [build_sensor_for_freshness_checks](https://github.com/dagster-io/dagster/blob/64ea57c11c086e91523525011eb52df006bb126d/python_modules/dagster/dagster/_core/definitions/asset_check_factories/freshness_checks/sensor.py#L38) is really limited; this helps add some flexibility. ## How I Tested These Changes I don't know if it's necessary to add a whole test for this. I could modify one of the existing tests and check that the `run_request` in the test has the right tags? ## Changelog Added support for passing `tags` to the created `RunRequest` when using `build_sensor_for_freshness_checks()`.
1 parent 1c09aaa commit 463883b

File tree

2 files changed

+10
-4
lines changed
  • python_modules/dagster
    • dagster/_core/definitions/asset_check_factories/freshness_checks
    • dagster_tests/definitions_tests/freshness_checks_tests

2 files changed

+10
-4
lines changed

python_modules/dagster/dagster/_core/definitions/asset_check_factories/freshness_checks/sensor.py

+6-3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import datetime
2-
from collections.abc import Iterator, Sequence
3-
from typing import Optional, Union, cast
2+
from collections.abc import Iterator, Mapping, Sequence
3+
from typing import Any, Optional, Union, cast
44

55
from dagster import _check as check
66
from dagster._annotations import beta
@@ -41,6 +41,7 @@ def build_sensor_for_freshness_checks(
4141
minimum_interval_seconds: Optional[int] = None,
4242
name: str = DEFAULT_FRESHNESS_SENSOR_NAME,
4343
default_status: DefaultSensorStatus = DefaultSensorStatus.STOPPED,
44+
tags: Optional[Mapping[str, Any]] = None,
4445
) -> SensorDefinition:
4546
"""Builds a sensor which kicks off evaluation of freshness checks.
4647
@@ -67,6 +68,8 @@ def build_sensor_for_freshness_checks(
6768
name may need to be provided in case of multiple calls of this function.
6869
default_status (Optional[DefaultSensorStatus]): The default status of the sensor. Defaults
6970
to stopped.
71+
tags (Optional[Dict[str, Any]]): A dictionary of tags (string key-value pairs) to attach
72+
to the launched run.
7073
7174
Returns:
7275
SensorDefinition: The sensor that kicks off freshness evaluations.
@@ -112,7 +115,7 @@ def the_sensor(context: SensorEvaluationContext) -> Optional[Union[RunRequest, S
112115
new_cursor = check_key.to_user_string() if check_key else None
113116
context.update_cursor(new_cursor)
114117
if checks_to_evaluate:
115-
return RunRequest(asset_check_keys=checks_to_evaluate)
118+
return RunRequest(asset_check_keys=checks_to_evaluate, tags=tags)
116119
else:
117120
return SkipReason(
118121
"No freshness checks need to be evaluated at this time, since all checks are either currently evaluating, have failed, or are not yet overdue."

python_modules/dagster/dagster_tests/definitions_tests/freshness_checks_tests/test_sensor.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,9 @@ def my_asset():
117117
)
118118
)
119119

120-
sensor = build_sensor_for_freshness_checks(freshness_checks=freshness_checks)
120+
sensor = build_sensor_for_freshness_checks(
121+
freshness_checks=freshness_checks, tags={"foo": "FOO"}
122+
)
121123
defs = Definitions(asset_checks=freshness_checks, assets=[my_asset], sensors=[sensor])
122124

123125
context = build_sensor_context(instance=instance, definitions=defs)
@@ -129,6 +131,7 @@ def my_asset():
129131
AssetCheckKey(AssetKey("never_eval"), "freshness_check"),
130132
AssetCheckKey(AssetKey("success_eval_expired"), "freshness_check"),
131133
]
134+
assert run_request.tags == {"foo": "FOO"}
132135
# Cursor should be None, since we made it through all assets.
133136
assert context.cursor is None
134137

0 commit comments

Comments
 (0)