1
+ from ldclient .versioned_data_kind import FEATURES , SEGMENTS
2
+ from ldclient .impl .dependency_tracker import DependencyTracker
1
3
from ldclient .impl .listeners import Listeners
2
- from ldclient .interfaces import DataSourceStatusProvider , DataSourceUpdateSink , DataSourceStatus , FeatureStore , DataSourceState , DataSourceErrorInfo , DataSourceErrorKind
4
+ from ldclient .interfaces import DataSourceStatusProvider , DataSourceUpdateSink , DataSourceStatus , FeatureStore , DataSourceState , DataSourceErrorInfo , DataSourceErrorKind , FlagChange
3
5
from ldclient .impl .rwlock import ReadWriteLock
4
6
from ldclient .versioned_data_kind import VersionedDataKind
7
+ from ldclient .impl .dependency_tracker import KindAndKey
5
8
6
9
import time
7
- from typing import Callable , Mapping , Optional
10
+ from typing import Callable , Mapping , Optional , Set
8
11
9
12
10
13
class DataSourceUpdateSinkImpl (DataSourceUpdateSink ):
11
- def __init__ (self , store : FeatureStore , listeners : Listeners ):
14
+ def __init__ (self , store : FeatureStore , status_listeners : Listeners , flag_change_listeners : Listeners ):
12
15
self .__store = store
13
- self .__listeners = listeners
16
+ self .__status_listeners = status_listeners
17
+ self .__flag_change_listeners = flag_change_listeners
18
+ self .__tracker = DependencyTracker ()
14
19
15
20
self .__lock = ReadWriteLock ()
16
21
self .__status = DataSourceStatus (
@@ -28,13 +33,38 @@ def status(self) -> DataSourceStatus:
28
33
self .__lock .runlock ()
29
34
30
35
def init (self , all_data : Mapping [VersionedDataKind , Mapping [str , dict ]]):
31
- self .__monitor_store_update (lambda : self .__store .init (all_data ))
36
+ old_data = None
37
+
38
+ def init_store ():
39
+ nonlocal old_data
40
+ if self .__flag_change_listeners .has_listeners ():
41
+ old_data = {}
42
+ for kind in [FEATURES , SEGMENTS ]:
43
+ old_data [kind ] = self .__store .all (kind , lambda x : x )
44
+
45
+ self .__store .init (all_data )
46
+
47
+ self .__monitor_store_update (init_store )
48
+ self .__reset_tracker_with_new_data (all_data )
49
+
50
+ if old_data is None :
51
+ return
52
+
53
+ self .__send_change_events (
54
+ self .__compute_changed_items_for_full_data_set (old_data , all_data )
55
+ )
32
56
33
57
def upsert (self , kind : VersionedDataKind , item : dict ):
34
58
self .__monitor_store_update (lambda : self .__store .upsert (kind , item ))
35
59
60
+ # TODO(sc-212471): We only want to do this if the store successfully
61
+ # updates the record.
62
+ key = item .get ('key' , '' )
63
+ self .__update_dependency_for_single_item (kind , key , item )
64
+
36
65
def delete (self , kind : VersionedDataKind , key : str , version : int ):
37
66
self .__monitor_store_update (lambda : self .__store .delete (kind , key , version ))
67
+ self .__update_dependency_for_single_item (kind , key , None )
38
68
39
69
def update_status (self , new_state : DataSourceState , new_error : Optional [DataSourceErrorInfo ]):
40
70
status_to_broadcast = None
@@ -60,7 +90,7 @@ def update_status(self, new_state: DataSourceState, new_error: Optional[DataSour
60
90
self .__lock .unlock ()
61
91
62
92
if status_to_broadcast is not None :
63
- self .__listeners .notify (status_to_broadcast )
93
+ self .__status_listeners .notify (status_to_broadcast )
64
94
65
95
def __monitor_store_update (self , fn : Callable [[], None ]):
66
96
try :
@@ -75,6 +105,46 @@ def __monitor_store_update(self, fn: Callable[[], None]):
75
105
self .update_status (DataSourceState .INTERRUPTED , error_info )
76
106
raise
77
107
108
+ def __update_dependency_for_single_item (self , kind : VersionedDataKind , key : str , item : Optional [dict ]):
109
+ self .__tracker .update_dependencies_from (kind , key , item )
110
+ if self .__flag_change_listeners .has_listeners ():
111
+ affected_items : Set [KindAndKey ] = set ()
112
+ self .__tracker .add_affected_items (affected_items , KindAndKey (kind = kind , key = key ))
113
+ self .__send_change_events (affected_items )
114
+
115
+ def __reset_tracker_with_new_data (self , all_data : Mapping [VersionedDataKind , Mapping [str , dict ]]):
116
+ self .__tracker .reset ()
117
+
118
+ for kind , items in all_data .items ():
119
+ for key , item in items .items ():
120
+ self .__tracker .update_dependencies_from (kind , key , item )
121
+
122
+ def __send_change_events (self , affected_items : Set [KindAndKey ]):
123
+ for item in affected_items :
124
+ if item .kind == FEATURES :
125
+ self .__flag_change_listeners .notify (FlagChange (item .key ))
126
+
127
+ def __compute_changed_items_for_full_data_set (self , old_data : Mapping [VersionedDataKind , Mapping [str , dict ]], new_data : Mapping [VersionedDataKind , Mapping [str , dict ]]):
128
+ affected_items : Set [KindAndKey ] = set ()
129
+
130
+ for kind in [FEATURES , SEGMENTS ]:
131
+ old_items = old_data .get (kind , {})
132
+ new_items = new_data .get (kind , {})
133
+
134
+ keys : Set [str ] = set ()
135
+
136
+ for key in keys .union (old_items .keys (), new_items .keys ()):
137
+ old_item = old_items .get (key )
138
+ new_item = new_items .get (key )
139
+
140
+ if old_item is None and new_item is None :
141
+ continue
142
+
143
+ if old_item is None or new_item is None or old_item ['version' ] < new_item ['version' ]:
144
+ self .__tracker .add_affected_items (affected_items , KindAndKey (kind = kind , key = key ))
145
+
146
+ return affected_items
147
+
78
148
79
149
class DataSourceStatusProviderImpl (DataSourceStatusProvider ):
80
150
def __init__ (self , listeners : Listeners , updates_sink : DataSourceUpdateSinkImpl ):
0 commit comments