1
+ /* * @file ActivityMultiplexerAsync.cpp
2
+ *
3
+ * Implementation of the Activity Multiplexer Interface
4
+ *
5
+ *
6
+ *
7
+ *
8
+ * Possible Improvements
9
+ * TODO global flags to discard activities
10
+ * TODO measure response time of plugins (e.g. how fast does l->Notify() return)
11
+ * TODO a custom shared lock that suits the use case best
12
+ *
13
+ *
14
+ */
15
+
16
+ #include < assert.h>
17
+
18
+ #include < list>
19
+ #include < deque>
20
+ #include < mutex>
21
+
22
+ #include < thread>
23
+
1
24
#include < boost/thread/shared_mutex.hpp>
2
25
3
26
#include < monitoring/datatypes/Activity.hpp>
@@ -11,78 +34,299 @@ using namespace monitoring;
11
34
12
35
namespace monitoring {
13
36
37
+ // forward declarations
38
+ class ActivityMultiplexerNotifier ;
39
+ class ActivityMultiplexerAsync ;
40
+
41
+ class Dispatcher
42
+ {
43
+ public:
44
+ virtual void Dispatch (void * work) {};
45
+ };
46
+
47
+
48
+ /* *
49
+ * A threadsafe queue implementation for the multiplexer to use
50
+ * The Queue is also responsible for counting discarded activities.
51
+ */
14
52
class ActivityMultiplexerQueue
15
53
{
16
54
public:
17
55
ActivityMultiplexerQueue () {};
18
56
virtual ~ActivityMultiplexerQueue () {};
19
57
58
+ /* *
59
+ * Check whether or not the queue has still capacity
60
+ *
61
+ * @return bool
62
+ for( int i = 0; i < dispatcher; ++i ) {
63
+ t.push_back( std::thread( &MultiplexerNotifierTemplate::Run, this ) );
64
+ } true = queue is full, false = not full
65
+ */
66
+ virtual bool Full () {
67
+ bool result = ( queue.size () > capacity );
68
+ return result;
69
+ };
70
+
71
+
72
+
73
+ /* Check whether of not the queue is empty
74
+ *
75
+ * @return bool true = queue is empty, false = not empty
76
+ */
77
+ virtual bool Empty () {
78
+ bool result = ( queue.size () == 0 );
79
+ return result;
80
+ };
81
+
82
+
83
+ /* *
84
+ * Check if queue is in overload mode
85
+ *
86
+ */
87
+ virtual bool Overloaded () {
88
+ return overloaded;
89
+ };
90
+
91
+ /* *
92
+ * Add an activity to the queue if there is capacity, set overload flag
93
+ * otherwhise.
94
+ *
95
+ * @param activity an activity that need to be dispatched in the future
96
+ */
97
+ virtual void Push (Activity * activity) {
98
+ std::lock_guard<std::mutex> lock ( mut );
99
+
100
+ printf (" push %p\n " , activity);
101
+ // maybe this should happen in notifier run()
102
+ /*
103
+ if (Overloaded() && Empty()) {
104
+ // TODO notifier.Reset(lost);
105
+ lost = 0;
106
+ overloaded = false;
107
+ }
108
+ */
109
+
110
+ if ( Overloaded () ) {
111
+ lost++;
112
+ } else {
113
+
114
+ if ( Full () ) {
115
+ overloaded = true ;
116
+ lost = 1 ;
117
+ } else {
118
+ queue.push_back ( activity );
119
+ }
120
+ }
121
+ };
122
+
123
+
124
+ /* *
125
+ * Get an activity from queue, returned element is popped!
126
+ *
127
+ * @return Activity an activity that needs to be dispatched to async listeners
128
+ */
129
+ virtual Activity * Pop () {
130
+ std::lock_guard<std::mutex> lock ( mut );
131
+
132
+ Activity * activity = nullptr ;
133
+ if (queue.empty ())
134
+ return nullptr ;
135
+
136
+ auto itr = queue.begin ();
137
+
138
+ queue.erase (itr);
139
+
140
+ printf (" pop %p\n " , *itr);
141
+
142
+ return *itr;
143
+ };
144
+
20
145
private:
21
- /* data */
146
+ std::deque<Activity *> queue;
147
+ unsigned int capacity = 1000 ; // TODO specify by options
148
+ bool overloaded = false ;
149
+ int lost = 1 ;
150
+
151
+ std::mutex mut;
22
152
};
23
153
24
154
155
+
25
156
class ActivityMultiplexerNotifier
26
157
{
27
158
public:
28
- ActivityMultiplexerNotifier () {};
29
- virtual ~ActivityMultiplexerNotifier () {};
159
+ ActivityMultiplexerNotifier (Dispatcher * dispatcher, ActivityMultiplexerQueue * queue) {
160
+ this ->dispatcher = dispatcher;
161
+ this ->queue = queue;
162
+
163
+ for ( int i = 0 ; i < num_dispatcher; ++i ) {
164
+ t.push_back ( std::thread ( &ActivityMultiplexerNotifier::Run, this ) );
165
+ }
166
+ };
167
+
168
+ virtual ~ActivityMultiplexerNotifier () {
169
+ for ( auto it = t.begin (); it != t.end (); ++it ) {
170
+ ( *it ).join ();
171
+ }
172
+ };
173
+
174
+ virtual void Run () {
175
+ assert (queue);
176
+ // call dispatch of Dispatcher
177
+ while ( !terminate ) {
178
+ Activity * activity = queue->Pop ();
179
+
180
+ if ( activity )
181
+ dispatcher->Dispatch ((void *)activity);
182
+
183
+ }
184
+ }
185
+
186
+ virtual void finalize () {
187
+ printf (" Finalizing\n " );
188
+ terminate = true ;
189
+ }
30
190
31
191
private:
32
- /* data */
192
+ Dispatcher * dispatcher = nullptr ;
193
+ ActivityMultiplexerQueue * queue = nullptr ;
194
+
195
+ // notifier currently maintains own thread pool
196
+ int num_dispatcher = 1 ;
197
+ list<std::thread> t;
198
+
199
+ bool terminate = false ;
33
200
};
34
201
202
+
203
+
35
204
/* *
36
205
* ActivityMultiplexer
37
206
* Forwards logged activities to registered listeners (e.g. Plugins) either
38
207
* in an syncronised or asyncronous manner.
39
208
*/
40
- class ActivityMultiplexerAsync : public ActivityMultiplexer {
209
+ class ActivityMultiplexerAsync : public ActivityMultiplexer , public Dispatcher {
210
+ // class ActivityMultiplexerAsync : public ActivityMultiplexer {
41
211
private:
42
212
list<ActivityMultiplexerListener *> listeners;
43
213
list<ActivityMultiplexerListenerAsync *> listeners_async;
44
214
215
+ ActivityMultiplexerQueue * queue = nullptr ;
216
+ ActivityMultiplexerNotifier * notifier = nullptr ;
217
+
45
218
boost::shared_mutex listener_change_mutex;
219
+ boost::shared_mutex listener_change_mutex_async;
46
220
public:
221
+
222
+ ~ActivityMultiplexerAsync () {
223
+ if ( notifier ) {
224
+ notifier->finalize ();
225
+ delete notifier;
226
+ }
227
+
228
+ if ( queue )
229
+ delete queue;
230
+
231
+ }
232
+
233
+
234
+ /* *
235
+ * hand over activity to registered listeners
236
+ *
237
+ * @param activity logged activity
238
+ */
47
239
virtual void Log ( Activity * activity ){
48
- boost::shared_lock<boost::shared_mutex> lock ( listener_change_mutex );
49
- for (auto l = listeners.begin (); l != listeners.end () ; l++){
50
- (*l)->Notify (activity);
240
+ assert ( activity != nullptr );
241
+ // quick sync dispatch
242
+ {
243
+ boost::shared_lock<boost::shared_mutex> lock ( listener_change_mutex );
244
+ for (auto l = listeners.begin (); l != listeners.end () ; l++){
245
+ (*l)->Notify (activity);
246
+ }
51
247
}
248
+ // add to async patch
249
+ if ( queue )
250
+ queue->Push (activity);
52
251
}
53
252
253
+ // this functions is registered to a notifier as callback
254
+
255
+ /* *
256
+ * Notify async listeners of activity
257
+ *
258
+ * @param lost lost activtiy count
259
+ * @param work activtiy as void pointer to support abstract notifier
260
+ */
261
+ virtual void Dispatch (int lost, void * work) {
262
+ Activity * activity = (Activity *) work;
263
+ assert ( activity != nullptr );
264
+ boost::shared_lock<boost::shared_mutex> lock ( listener_change_mutex_async );
265
+ for (auto l = listeners_async.begin (); l != listeners_async.end () ; l++){
266
+ (*l)->NotifyAsync (lost, activity);
267
+ }
268
+ }
269
+
270
+ /* *
271
+ * Register Listener to sync path
272
+ *
273
+ * @param listener listener to be registered
274
+ */
54
275
virtual void registerListener ( ActivityMultiplexerListener * listener ){
55
276
boost::unique_lock<boost::shared_mutex> lock ( listener_change_mutex );
277
+ listeners.push_back (listener);
56
278
279
+ // snipped conserved for later use
57
280
// boost::upgrade_lock<boost::shared_mutex> lock( listener_change_mutex );
58
281
// if () {
59
282
// boost::upgrade_to_unique_lock<boost::shared_mutex> lock( listener_change_mutex );
60
283
// }
61
-
62
- listeners.push_back (listener);
63
284
}
64
285
286
+
287
+ /* *
288
+ * Unegister Listener from sync path
289
+ *
290
+ * @param listener listener to be unregistered
291
+ */
65
292
virtual void unregisterListener ( ActivityMultiplexerListener * listener ){
66
293
boost::unique_lock<boost::shared_mutex> lock ( listener_change_mutex );
67
294
listeners.remove (listener);
68
295
}
69
296
297
+ /* *
298
+ * Register Listener to async path
299
+ *
300
+ * @param listener listener to be registered
301
+ */
70
302
virtual void registerAsyncListener ( ActivityMultiplexerListenerAsync * listener ){
71
- boost::unique_lock<boost::shared_mutex> lock ( listener_change_mutex );
303
+ boost::unique_lock<boost::shared_mutex> lock ( listener_change_mutex_async );
72
304
listeners_async.push_back (listener);
73
305
}
74
306
307
+
308
+ /* *
309
+ * Unegister Listener from async path
310
+ *
311
+ * @param listener listener to be unregistered
312
+ */
75
313
virtual void unregisterAsyncListener ( ActivityMultiplexerListenerAsync * listener ){
76
- boost::unique_lock<boost::shared_mutex> lock ( listener_change_mutex );
314
+ boost::unique_lock<boost::shared_mutex> lock ( listener_change_mutex_async );
77
315
listeners_async.remove (listener);
78
316
}
79
317
318
+
319
+ /* Satisfy Component Requirements
320
+ */
80
321
ComponentOptions * AvailableOptions () {
81
322
return new ActivityMultiplexerAsyncOptions ();
82
323
}
83
324
84
325
void init () {
85
326
ActivityMultiplexerAsyncOptions & options = getOptions<ActivityMultiplexerAsyncOptions>();
327
+
328
+ queue = new ActivityMultiplexerQueue ();
329
+ notifier = new ActivityMultiplexerNotifier (dynamic_cast <Dispatcher *>(this ), queue);
86
330
}
87
331
};
88
332
0 commit comments