2
2
#include " node_internals.h"
3
3
4
4
#include " env-inl.h"
5
+ #include " debug_utils.h"
5
6
#include " util.h"
6
7
#include < algorithm>
7
8
@@ -25,7 +26,127 @@ static void BackgroundRunner(void* data) {
25
26
}
26
27
}
27
28
29
+ class BackgroundTaskRunner ::DelayedTaskScheduler {
30
+ public:
31
+ explicit DelayedTaskScheduler (TaskQueue<Task>* tasks)
32
+ : pending_worker_tasks_(tasks) {}
33
+
34
+ std::unique_ptr<uv_thread_t > Start () {
35
+ auto start_thread = [](void * data) {
36
+ static_cast <DelayedTaskScheduler*>(data)->Run ();
37
+ };
38
+ std::unique_ptr<uv_thread_t > t { new uv_thread_t () };
39
+ uv_sem_init (&ready_, 0 );
40
+ CHECK_EQ (0 , uv_thread_create (t.get (), start_thread, this ));
41
+ uv_sem_wait (&ready_);
42
+ uv_sem_destroy (&ready_);
43
+ return t;
44
+ }
45
+
46
+ void PostDelayedTask (std::unique_ptr<Task> task, double delay_in_seconds) {
47
+ tasks_.Push (std::unique_ptr<Task>(new ScheduleTask (this , std::move (task),
48
+ delay_in_seconds)));
49
+ uv_async_send (&flush_tasks_);
50
+ }
51
+
52
+ void Stop () {
53
+ tasks_.Push (std::unique_ptr<Task>(new StopTask (this )));
54
+ uv_async_send (&flush_tasks_);
55
+ }
56
+
57
+ private:
58
+ void Run () {
59
+ TRACE_EVENT_METADATA1 (" __metadata" , " thread_name" , " name" ,
60
+ " WorkerThreadsTaskRunner::DelayedTaskScheduler" );
61
+ loop_.data = this ;
62
+ CHECK_EQ (0 , uv_loop_init (&loop_));
63
+ flush_tasks_.data = this ;
64
+ CHECK_EQ (0 , uv_async_init (&loop_, &flush_tasks_, FlushTasks));
65
+ uv_sem_post (&ready_);
66
+
67
+ uv_run (&loop_, UV_RUN_DEFAULT);
68
+ CheckedUvLoopClose (&loop_);
69
+ }
70
+
71
+ static void FlushTasks (uv_async_t * flush_tasks) {
72
+ DelayedTaskScheduler* scheduler =
73
+ ContainerOf (&DelayedTaskScheduler::loop_, flush_tasks->loop );
74
+ while (std::unique_ptr<Task> task = scheduler->tasks_ .Pop ())
75
+ task->Run ();
76
+ }
77
+
78
+ class StopTask : public Task {
79
+ public:
80
+ explicit StopTask (DelayedTaskScheduler* scheduler): scheduler_(scheduler) {}
81
+
82
+ void Run () override {
83
+ std::vector<uv_timer_t *> timers;
84
+ for (uv_timer_t * timer : scheduler_->timers_ )
85
+ timers.push_back (timer);
86
+ for (uv_timer_t * timer : timers)
87
+ scheduler_->TakeTimerTask (timer);
88
+ uv_close (reinterpret_cast <uv_handle_t *>(&scheduler_->flush_tasks_ ),
89
+ [](uv_handle_t * handle) {});
90
+ }
91
+
92
+ private:
93
+ DelayedTaskScheduler* scheduler_;
94
+ };
95
+
96
+ class ScheduleTask : public Task {
97
+ public:
98
+ ScheduleTask (DelayedTaskScheduler* scheduler,
99
+ std::unique_ptr<Task> task,
100
+ double delay_in_seconds)
101
+ : scheduler_(scheduler),
102
+ task_ (std::move(task)),
103
+ delay_in_seconds_(delay_in_seconds) {}
104
+
105
+ void Run () override {
106
+ uint64_t delay_millis =
107
+ static_cast <uint64_t >(delay_in_seconds_ + 0.5 ) * 1000 ;
108
+ std::unique_ptr<uv_timer_t > timer (new uv_timer_t ());
109
+ CHECK_EQ (0 , uv_timer_init (&scheduler_->loop_ , timer.get ()));
110
+ timer->data = task_.release ();
111
+ CHECK_EQ (0 , uv_timer_start (timer.get (), RunTask, delay_millis, 0 ));
112
+ scheduler_->timers_ .insert (timer.release ());
113
+ }
114
+
115
+ private:
116
+ DelayedTaskScheduler* scheduler_;
117
+ std::unique_ptr<Task> task_;
118
+ double delay_in_seconds_;
119
+ };
120
+
121
+ static void RunTask (uv_timer_t * timer) {
122
+ DelayedTaskScheduler* scheduler =
123
+ ContainerOf (&DelayedTaskScheduler::loop_, timer->loop );
124
+ scheduler->pending_worker_tasks_ ->Push (scheduler->TakeTimerTask (timer));
125
+ }
126
+
127
+ std::unique_ptr<Task> TakeTimerTask (uv_timer_t * timer) {
128
+ std::unique_ptr<Task> task (static_cast <Task*>(timer->data ));
129
+ uv_timer_stop (timer);
130
+ uv_close (reinterpret_cast <uv_handle_t *>(timer), [](uv_handle_t * handle) {
131
+ delete reinterpret_cast <uv_timer_t *>(handle);
132
+ });
133
+ timers_.erase (timer);
134
+ return task;
135
+ }
136
+
137
+ uv_sem_t ready_;
138
+ TaskQueue<v8::Task>* pending_worker_tasks_;
139
+
140
+ TaskQueue<v8::Task> tasks_;
141
+ uv_loop_t loop_;
142
+ uv_async_t flush_tasks_;
143
+ std::unordered_set<uv_timer_t *> timers_;
144
+ };
145
+
28
146
BackgroundTaskRunner::BackgroundTaskRunner (int thread_pool_size) {
147
+ delayed_task_scheduler_.reset (
148
+ new DelayedTaskScheduler (&background_tasks_));
149
+ threads_.push_back (delayed_task_scheduler_->Start ());
29
150
for (int i = 0 ; i < thread_pool_size; i++) {
30
151
std::unique_ptr<uv_thread_t > t { new uv_thread_t () };
31
152
if (uv_thread_create (t.get (), BackgroundRunner, &background_tasks_) != 0 )
@@ -44,7 +165,7 @@ void BackgroundTaskRunner::PostIdleTask(std::unique_ptr<v8::IdleTask> task) {
44
165
45
166
void BackgroundTaskRunner::PostDelayedTask (std::unique_ptr<v8::Task> task,
46
167
double delay_in_seconds) {
47
- UNREACHABLE ( );
168
+ delayed_task_scheduler_-> PostDelayedTask ( std::move (task), delay_in_seconds );
48
169
}
49
170
50
171
void BackgroundTaskRunner::BlockingDrain () {
@@ -53,6 +174,7 @@ void BackgroundTaskRunner::BlockingDrain() {
53
174
54
175
void BackgroundTaskRunner::Shutdown () {
55
176
background_tasks_.Stop ();
177
+ delayed_task_scheduler_->Stop ();
56
178
for (size_t i = 0 ; i < threads_.size (); i++) {
57
179
CHECK_EQ (0 , uv_thread_join (threads_[i].get ()));
58
180
}
0 commit comments