@@ -52,6 +52,9 @@ namespace NKikimr {
52
52
53
53
friend class TActorBootstrapped <THullSyncFullActor>;
54
54
55
+ constexpr static TDuration MaxProcessingTime = TDuration::MilliSeconds(5 ); // half of a quota for mailbox
56
+ constexpr static ui32 TimerIterations = 1024 ;
57
+
55
58
void Serialize (const TActorContext &ctx,
56
59
TString *buf,
57
60
const TKeyLogoBlob &key,
@@ -89,6 +92,7 @@ namespace NKikimr {
89
92
90
93
static const ui32 EmptyFlag = 0x1 ;
91
94
static const ui32 MsgFullFlag = 0x2 ;
95
+ static const ui32 LongProcessing = 0x4 ;
92
96
93
97
void Bootstrap (const TActorContext &ctx) {
94
98
LogoBlobFilter.BuildBarriersEssence (FullSnap.BarriersSnap );
@@ -98,14 +102,14 @@ namespace NKikimr {
98
102
case NKikimrBlobStorage::LogoBlobs:
99
103
Stage = NKikimrBlobStorage::LogoBlobs;
100
104
pres = Process (ctx, FullSnap.LogoBlobsSnap , KeyLogoBlob, LogoBlobFilter);
101
- if (pres & MsgFullFlag)
105
+ if (pres & ( MsgFullFlag | LongProcessing) )
102
106
break ;
103
107
Y_ABORT_UNLESS (pres & EmptyFlag);
104
108
[[fallthrough]];
105
109
case NKikimrBlobStorage::Blocks:
106
110
Stage = NKikimrBlobStorage::Blocks;
107
111
pres = Process (ctx, FullSnap.BlocksSnap , KeyBlock, FakeFilter);
108
- if (pres & MsgFullFlag)
112
+ if (pres & ( MsgFullFlag | LongProcessing) )
109
113
break ;
110
114
Y_ABORT_UNLESS (pres & EmptyFlag);
111
115
[[fallthrough]];
@@ -137,6 +141,7 @@ namespace NKikimr {
137
141
::NKikimr::TLevelIndexSnapshot<TKey, TMemRec> &snapshot,
138
142
TKey &key,
139
143
const TFilter &filter) {
144
+ THPTimer timer;
140
145
// reserve some space for data
141
146
TString *data = Result->Record .MutableData ();
142
147
if (data->capacity () < Config->MaxResponseSize ) {
@@ -147,20 +152,34 @@ namespace NKikimr {
147
152
using TIndexForwardIterator = typename TLevelIndexSnapshot::TIndexForwardIterator;
148
153
TIndexForwardIterator it (HullCtx, &snapshot);
149
154
it.Seek (key);
155
+
150
156
// copy data until we have some space
151
- while (it.Valid () && (data->size () + NSyncLog::MaxRecFullSize <= data->capacity ())) {
157
+ ui32 result = 0 ;
158
+ ui32 timerIterations = TimerIterations;
159
+ while (it.Valid ()) {
160
+ if (data->size () + NSyncLog::MaxRecFullSize > data->capacity ()) {
161
+ result |= MsgFullFlag;
162
+ break ;
163
+ }
164
+
165
+ if (--timerIterations == 0 ) {
166
+ if (TDuration::Seconds (timer.Passed ()) > MaxProcessingTime) {
167
+ result |= LongProcessing;
168
+ break ;
169
+ }
170
+ timerIterations = TimerIterations;
171
+ }
172
+
152
173
key = it.GetCurKey ();
153
174
if (filter.Check (key, it.GetMemRec (), HullCtx->AllowKeepFlags , true /* allowGarbageCollection*/ ))
154
175
Serialize (ctx, data, key, it.GetMemRec ());
155
176
it.Next ();
156
177
}
157
178
// key points to the last seen key
158
179
159
- ui32 result = 0 ;
160
180
if (!it.Valid ())
161
181
result |= EmptyFlag;
162
- if (data->size () + NSyncLog::MaxRecFullSize > data->capacity ())
163
- result |= MsgFullFlag;
182
+
164
183
return result;
165
184
}
166
185
0 commit comments