Skip to content

Commit 548cedd

Browse files
addaleaxcodebytere
authored andcommitted
src: split out callback queue implementation from Environment
This isn’t conceptually tied to anything Node.js-specific at all. PR-URL: #33272 Reviewed-By: James M Snell <[email protected]> Reviewed-By: Colin Ihrig <[email protected]>
1 parent ad86807 commit 548cedd

File tree

6 files changed

+179
-120
lines changed

6 files changed

+179
-120
lines changed

node.gyp

+2
Original file line numberDiff line numberDiff line change
@@ -642,6 +642,8 @@
642642
'src/base_object.h',
643643
'src/base_object-inl.h',
644644
'src/base64.h',
645+
'src/callback_queue.h',
646+
'src/callback_queue-inl.h',
645647
'src/connect_wrap.h',
646648
'src/connection_wrap.h',
647649
'src/debug_utils.h',

src/callback_queue-inl.h

+97
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
#ifndef SRC_CALLBACK_QUEUE_INL_H_
2+
#define SRC_CALLBACK_QUEUE_INL_H_
3+
4+
#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
5+
6+
#include "callback_queue.h"
7+
8+
namespace node {
9+
10+
template <typename R, typename... Args>
11+
template <typename Fn>
12+
std::unique_ptr<typename CallbackQueue<R, Args...>::Callback>
13+
CallbackQueue<R, Args...>::CreateCallback(Fn&& fn, bool refed) {
14+
return std::make_unique<CallbackImpl<Fn>>(std::move(fn), refed);
15+
}
16+
17+
template <typename R, typename... Args>
18+
std::unique_ptr<typename CallbackQueue<R, Args...>::Callback>
19+
CallbackQueue<R, Args...>::Shift() {
20+
std::unique_ptr<Callback> ret = std::move(head_);
21+
if (ret) {
22+
head_ = ret->get_next();
23+
if (!head_)
24+
tail_ = nullptr; // The queue is now empty.
25+
}
26+
size_--;
27+
return ret;
28+
}
29+
30+
template <typename R, typename... Args>
31+
void CallbackQueue<R, Args...>::Push(std::unique_ptr<Callback> cb) {
32+
Callback* prev_tail = tail_;
33+
34+
size_++;
35+
tail_ = cb.get();
36+
if (prev_tail != nullptr)
37+
prev_tail->set_next(std::move(cb));
38+
else
39+
head_ = std::move(cb);
40+
}
41+
42+
template <typename R, typename... Args>
43+
void CallbackQueue<R, Args...>::ConcatMove(CallbackQueue<R, Args...>&& other) {
44+
size_ += other.size_;
45+
if (tail_ != nullptr)
46+
tail_->set_next(std::move(other.head_));
47+
else
48+
head_ = std::move(other.head_);
49+
tail_ = other.tail_;
50+
other.tail_ = nullptr;
51+
other.size_ = 0;
52+
}
53+
54+
template <typename R, typename... Args>
55+
size_t CallbackQueue<R, Args...>::size() const {
56+
return size_.load();
57+
}
58+
59+
template <typename R, typename... Args>
60+
CallbackQueue<R, Args...>::Callback::Callback(bool refed)
61+
: refed_(refed) {}
62+
63+
template <typename R, typename... Args>
64+
bool CallbackQueue<R, Args...>::Callback::is_refed() const {
65+
return refed_;
66+
}
67+
68+
template <typename R, typename... Args>
69+
std::unique_ptr<typename CallbackQueue<R, Args...>::Callback>
70+
CallbackQueue<R, Args...>::Callback::get_next() {
71+
return std::move(next_);
72+
}
73+
74+
template <typename R, typename... Args>
75+
void CallbackQueue<R, Args...>::Callback::set_next(
76+
std::unique_ptr<Callback> next) {
77+
next_ = std::move(next);
78+
}
79+
80+
template <typename R, typename... Args>
81+
template <typename Fn>
82+
CallbackQueue<R, Args...>::CallbackImpl<Fn>::CallbackImpl(
83+
Fn&& callback, bool refed)
84+
: Callback(refed),
85+
callback_(std::move(callback)) {}
86+
87+
template <typename R, typename... Args>
88+
template <typename Fn>
89+
R CallbackQueue<R, Args...>::CallbackImpl<Fn>::Call(Args... args) {
90+
return callback_(std::forward<Args>(args)...);
91+
}
92+
93+
} // namespace node
94+
95+
#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
96+
97+
#endif // SRC_CALLBACK_QUEUE_INL_H_

src/callback_queue.h

+70
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
#ifndef SRC_CALLBACK_QUEUE_H_
2+
#define SRC_CALLBACK_QUEUE_H_
3+
4+
#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
5+
6+
#include <atomic>
7+
8+
namespace node {
9+
10+
// A queue of C++ functions that take Args... as arguments and return R
11+
// (this is similar to the signature of std::function).
12+
// New entries are added using `CreateCallback()`/`Push()`, and removed using
13+
// `Shift()`.
14+
// The `refed` flag is left for easier use in situations in which some of these
15+
// should be run even if nothing else is keeping the event loop alive.
16+
template <typename R, typename... Args>
17+
class CallbackQueue {
18+
public:
19+
class Callback {
20+
public:
21+
explicit inline Callback(bool refed);
22+
23+
virtual ~Callback() = default;
24+
virtual R Call(Args... args) = 0;
25+
26+
inline bool is_refed() const;
27+
28+
private:
29+
inline std::unique_ptr<Callback> get_next();
30+
inline void set_next(std::unique_ptr<Callback> next);
31+
32+
bool refed_;
33+
std::unique_ptr<Callback> next_;
34+
35+
friend class CallbackQueue;
36+
};
37+
38+
template <typename Fn>
39+
inline std::unique_ptr<Callback> CreateCallback(Fn&& fn, bool refed);
40+
41+
inline std::unique_ptr<Callback> Shift();
42+
inline void Push(std::unique_ptr<Callback> cb);
43+
// ConcatMove adds elements from 'other' to the end of this list, and clears
44+
// 'other' afterwards.
45+
inline void ConcatMove(CallbackQueue&& other);
46+
47+
// size() is atomic and may be called from any thread.
48+
inline size_t size() const;
49+
50+
private:
51+
template <typename Fn>
52+
class CallbackImpl final : public Callback {
53+
public:
54+
CallbackImpl(Fn&& callback, bool refed);
55+
R Call(Args... args) override;
56+
57+
private:
58+
Fn callback_;
59+
};
60+
61+
std::atomic<size_t> size_ {0};
62+
std::unique_ptr<Callback> head_;
63+
Callback* tail_ = nullptr;
64+
};
65+
66+
} // namespace node
67+
68+
#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
69+
70+
#endif // SRC_CALLBACK_QUEUE_H_

src/env-inl.h

+6-74
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
2626

2727
#include "aliased_buffer.h"
28+
#include "callback_queue-inl.h"
2829
#include "env.h"
2930
#include "node.h"
3031
#include "util-inl.h"
@@ -705,50 +706,9 @@ inline void IsolateData::set_options(
705706
options_ = std::move(options);
706707
}
707708

708-
std::unique_ptr<Environment::NativeImmediateCallback>
709-
Environment::NativeImmediateQueue::Shift() {
710-
std::unique_ptr<Environment::NativeImmediateCallback> ret = std::move(head_);
711-
if (ret) {
712-
head_ = ret->get_next();
713-
if (!head_)
714-
tail_ = nullptr; // The queue is now empty.
715-
}
716-
size_--;
717-
return ret;
718-
}
719-
720-
void Environment::NativeImmediateQueue::Push(
721-
std::unique_ptr<Environment::NativeImmediateCallback> cb) {
722-
NativeImmediateCallback* prev_tail = tail_;
723-
724-
size_++;
725-
tail_ = cb.get();
726-
if (prev_tail != nullptr)
727-
prev_tail->set_next(std::move(cb));
728-
else
729-
head_ = std::move(cb);
730-
}
731-
732-
void Environment::NativeImmediateQueue::ConcatMove(
733-
NativeImmediateQueue&& other) {
734-
size_ += other.size_;
735-
if (tail_ != nullptr)
736-
tail_->set_next(std::move(other.head_));
737-
else
738-
head_ = std::move(other.head_);
739-
tail_ = other.tail_;
740-
other.tail_ = nullptr;
741-
other.size_ = 0;
742-
}
743-
744-
size_t Environment::NativeImmediateQueue::size() const {
745-
return size_.load();
746-
}
747-
748709
template <typename Fn>
749710
void Environment::CreateImmediate(Fn&& cb, bool ref) {
750-
auto callback = std::make_unique<NativeImmediateCallbackImpl<Fn>>(
751-
std::move(cb), ref);
711+
auto callback = native_immediates_.CreateCallback(std::move(cb), ref);
752712
native_immediates_.Push(std::move(callback));
753713
}
754714

@@ -768,8 +728,8 @@ void Environment::SetUnrefImmediate(Fn&& cb) {
768728

769729
template <typename Fn>
770730
void Environment::SetImmediateThreadsafe(Fn&& cb) {
771-
auto callback = std::make_unique<NativeImmediateCallbackImpl<Fn>>(
772-
std::move(cb), false);
731+
auto callback =
732+
native_immediates_threadsafe_.CreateCallback(std::move(cb), false);
773733
{
774734
Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_);
775735
native_immediates_threadsafe_.Push(std::move(callback));
@@ -780,8 +740,8 @@ void Environment::SetImmediateThreadsafe(Fn&& cb) {
780740

781741
template <typename Fn>
782742
void Environment::RequestInterrupt(Fn&& cb) {
783-
auto callback = std::make_unique<NativeImmediateCallbackImpl<Fn>>(
784-
std::move(cb), false);
743+
auto callback =
744+
native_immediates_interrupts_.CreateCallback(std::move(cb), false);
785745
{
786746
Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_);
787747
native_immediates_interrupts_.Push(std::move(callback));
@@ -791,34 +751,6 @@ void Environment::RequestInterrupt(Fn&& cb) {
791751
RequestInterruptFromV8();
792752
}
793753

794-
Environment::NativeImmediateCallback::NativeImmediateCallback(bool refed)
795-
: refed_(refed) {}
796-
797-
bool Environment::NativeImmediateCallback::is_refed() const {
798-
return refed_;
799-
}
800-
801-
std::unique_ptr<Environment::NativeImmediateCallback>
802-
Environment::NativeImmediateCallback::get_next() {
803-
return std::move(next_);
804-
}
805-
806-
void Environment::NativeImmediateCallback::set_next(
807-
std::unique_ptr<NativeImmediateCallback> next) {
808-
next_ = std::move(next);
809-
}
810-
811-
template <typename Fn>
812-
Environment::NativeImmediateCallbackImpl<Fn>::NativeImmediateCallbackImpl(
813-
Fn&& callback, bool refed)
814-
: NativeImmediateCallback(refed),
815-
callback_(std::move(callback)) {}
816-
817-
template <typename Fn>
818-
void Environment::NativeImmediateCallbackImpl<Fn>::Call(Environment* env) {
819-
callback_(env);
820-
}
821-
822754
inline bool Environment::can_call_into_js() const {
823755
return can_call_into_js_ && !is_stopping();
824756
}

src/env.cc

+2-3
Original file line numberDiff line numberDiff line change
@@ -729,7 +729,7 @@ void Environment::RunAndClearInterrupts() {
729729
}
730730
DebugSealHandleScope seal_handle_scope(isolate());
731731

732-
while (std::unique_ptr<NativeImmediateCallback> head = queue.Shift())
732+
while (auto head = queue.Shift())
733733
head->Call(this);
734734
}
735735
}
@@ -755,8 +755,7 @@ void Environment::RunAndClearNativeImmediates(bool only_refed) {
755755
auto drain_list = [&]() {
756756
TryCatchScope try_catch(this);
757757
DebugSealHandleScope seal_handle_scope(isolate());
758-
while (std::unique_ptr<NativeImmediateCallback> head =
759-
native_immediates_.Shift()) {
758+
while (auto head = native_immediates_.Shift()) {
760759
if (head->is_refed())
761760
ref_count++;
762761

src/env.h

+2-43
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
#include "inspector_agent.h"
3030
#include "inspector_profiler.h"
3131
#endif
32+
#include "callback_queue.h"
3233
#include "debug_utils.h"
3334
#include "handle_wrap.h"
3435
#include "node.h"
@@ -1365,49 +1366,7 @@ class Environment : public MemoryRetainer {
13651366

13661367
std::list<ExitCallback> at_exit_functions_;
13671368

1368-
class NativeImmediateCallback {
1369-
public:
1370-
explicit inline NativeImmediateCallback(bool refed);
1371-
1372-
virtual ~NativeImmediateCallback() = default;
1373-
virtual void Call(Environment* env) = 0;
1374-
1375-
inline bool is_refed() const;
1376-
inline std::unique_ptr<NativeImmediateCallback> get_next();
1377-
inline void set_next(std::unique_ptr<NativeImmediateCallback> next);
1378-
1379-
private:
1380-
bool refed_;
1381-
std::unique_ptr<NativeImmediateCallback> next_;
1382-
};
1383-
1384-
template <typename Fn>
1385-
class NativeImmediateCallbackImpl final : public NativeImmediateCallback {
1386-
public:
1387-
NativeImmediateCallbackImpl(Fn&& callback, bool refed);
1388-
void Call(Environment* env) override;
1389-
1390-
private:
1391-
Fn callback_;
1392-
};
1393-
1394-
class NativeImmediateQueue {
1395-
public:
1396-
inline std::unique_ptr<NativeImmediateCallback> Shift();
1397-
inline void Push(std::unique_ptr<NativeImmediateCallback> cb);
1398-
// ConcatMove adds elements from 'other' to the end of this list, and clears
1399-
// 'other' afterwards.
1400-
inline void ConcatMove(NativeImmediateQueue&& other);
1401-
1402-
// size() is atomic and may be called from any thread.
1403-
inline size_t size() const;
1404-
1405-
private:
1406-
std::atomic<size_t> size_ {0};
1407-
std::unique_ptr<NativeImmediateCallback> head_;
1408-
NativeImmediateCallback* tail_ = nullptr;
1409-
};
1410-
1369+
typedef CallbackQueue<void, Environment*> NativeImmediateQueue;
14111370
NativeImmediateQueue native_immediates_;
14121371
Mutex native_immediates_threadsafe_mutex_;
14131372
NativeImmediateQueue native_immediates_threadsafe_;

0 commit comments

Comments
 (0)