Skip to content

Commit 22020dc

Browse files
committed
Add lock free message queue
1 parent 1d79787 commit 22020dc

File tree

3 files changed

+152
-3
lines changed

3 files changed

+152
-3
lines changed

Diff for: LICENSE

+32
Original file line numberDiff line numberDiff line change
@@ -1056,3 +1056,35 @@ The externally maintained libraries used by Node.js are:
10561056
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
10571057
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
10581058
"""
1059+
1060+
- ProducerConsumerQueue, located at src/producer-consumer-queue.h. The folly::ProducerConsumerQueue class is a
1061+
one-producer one-consumer queue with very low synchronization overhead.
1062+
ProducerConsumerQueue's license follows:
1063+
"""
1064+
Copyright 2015 Facebook, Inc.
1065+
1066+
Licensed under the Apache License, Version 2.0 (the "License");
1067+
you may not use this file except in compliance with the License.
1068+
You may obtain a copy of the License at
1069+
1070+
http://www.apache.org/licenses/LICENSE-2.0
1071+
1072+
Unless required by applicable law or agreed to in writing, software
1073+
distributed under the License is distributed on an "AS IS" BASIS,
1074+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1075+
See the License for the specific language governing permissions and
1076+
limitations under the License.
1077+
"""
1078+
1079+
Significant changes made to the software:
1080+
1081+
- Removed Boost dependency
1082+
- Removed support for storing values directly
1083+
- Removed construction and destruction of the queue items feature
1084+
- Added initialization of all values to nullptr
1085+
- Made size a template parameter
1086+
- Crash instead of throw if malloc fails in constructor
1087+
- Changed namespace from folly to node
1088+
- Removed sizeGuess(), isFull(), isEmpty(), popFront() and frontPtr() methods
1089+
- Renamed write() to PushBack(), read() to PopFront()
1090+
- Added padding to fields

Diff for: common.gypi

+4-3
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,7 @@
279279
'libraries': [ '-llog' ],
280280
}],
281281
['OS=="mac"', {
282-
'defines': ['_DARWIN_USE_64_BIT_INODE=1'],
282+
'defines': ['_DARWIN_USE_64_BIT_INODE=1', 'NODE_OS_MACOSX'],
283283
'xcode_settings': {
284284
'ALWAYS_SEARCH_USER_PATHS': 'NO',
285285
'GCC_CW_ASM_SYNTAX': 'NO', # No -fasm-blocks
@@ -290,7 +290,7 @@
290290
'GCC_ENABLE_PASCAL_STRINGS': 'NO', # No -mpascal-strings
291291
'GCC_THREADSAFE_STATICS': 'NO', # -fno-threadsafe-statics
292292
'PREBINDING': 'NO', # No -Wl,-prebind
293-
'MACOSX_DEPLOYMENT_TARGET': '10.5', # -mmacosx-version-min=10.5
293+
'MACOSX_DEPLOYMENT_TARGET': '10.7', # -mmacosx-version-min=10.7
294294
'USE_HEADERMAP': 'NO',
295295
'OTHER_CFLAGS': [
296296
'-fno-strict-aliasing',
@@ -317,7 +317,8 @@
317317
['clang==1', {
318318
'xcode_settings': {
319319
'GCC_VERSION': 'com.apple.compilers.llvm.clang.1_0',
320-
'CLANG_CXX_LANGUAGE_STANDARD': 'gnu++0x', # -std=gnu++0x
320+
'CLANG_CXX_LANGUAGE_STANDARD': 'c++11', # -std=c++11
321+
'CLANG_CXX_LIBRARY': 'libc++', #-stdlib=libc++
321322
},
322323
}],
323324
],

Diff for: src/producer-consumer-queue.h

+116
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
/*
2+
* Copyright 2015 Facebook, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
* Significant changes made to the software:
17+
*
18+
* - Removed Boost dependency
19+
* - Removed support for storing values directly
20+
* - Removed construction and destruction of the queue items feature
21+
* - Added initialization of all values to nullptr
22+
* - Made size a template parameter
23+
* - Crash instead of throw if malloc fails in constructor
24+
* - Changed namespace from folly to node
25+
* - Removed sizeGuess(), isFull(), isEmpty(), popFront() and frontPtr() methods
26+
* - Renamed write() to PushBack(), read() to PopFront()
27+
* - Added padding to fields
28+
*
29+
*/
30+
31+
// @author Bo Hu ([email protected])
32+
// @author Jordan DeLong ([email protected])
33+
34+
#ifndef SRC_PRODUCER_CONSUMER_QUEUE_H_
35+
#define SRC_PRODUCER_CONSUMER_QUEUE_H_
36+
37+
#include "util.h"
38+
#include <atomic>
39+
#include <string.h>
40+
41+
namespace node {
42+
43+
/*
44+
* ProducerConsumerQueue is a one producer and one consumer queue
45+
* without locks.
46+
*/
47+
template<uint32_t size, class T>
48+
class ProducerConsumerQueue {
49+
public:
50+
// size must be >= 2.
51+
//
52+
// Also, note that the number of usable slots in the queue at any
53+
// given time is actually (size-1), so if you start with an empty queue,
54+
// PushBack will fail after size-1 insertions.
55+
ProducerConsumerQueue() : readIndex_(0), writeIndex_(0) {
56+
STATIC_ASSERT(size >= 2);
57+
memset(&records_, 0, sizeof(records_[0]) * size);
58+
}
59+
60+
~ProducerConsumerQueue() {
61+
while (T* record = PopFront())
62+
delete record;
63+
}
64+
65+
// Returns false if the queue is full, cannot insert nullptr
66+
bool PushBack(T* item) {
67+
CHECK_NE(item, nullptr);
68+
auto const currentWrite = writeIndex_.load(std::memory_order_relaxed);
69+
auto nextRecord = currentWrite + 1;
70+
if (nextRecord == size) {
71+
nextRecord = 0;
72+
}
73+
74+
if (nextRecord != readIndex_.load(std::memory_order_acquire)) {
75+
records_[currentWrite] = item;
76+
writeIndex_.store(nextRecord, std::memory_order_release);
77+
return true;
78+
}
79+
80+
// queue is full
81+
return false;
82+
}
83+
84+
// Returns nullptr if the queue is empty.
85+
T* PopFront() {
86+
auto const currentRead = readIndex_.load(std::memory_order_relaxed);
87+
if (currentRead == writeIndex_.load(std::memory_order_acquire)) {
88+
// queue is empty
89+
return nullptr;
90+
}
91+
92+
auto nextRecord = currentRead + 1;
93+
if (nextRecord == size) {
94+
nextRecord = 0;
95+
}
96+
T* ret = records_[currentRead];
97+
readIndex_.store(nextRecord, std::memory_order_release);
98+
CHECK_NE(ret, nullptr);
99+
return ret;
100+
}
101+
private:
102+
static const size_t kCacheLineLength = 128;
103+
typedef char padding[kCacheLineLength];
104+
padding padding1_;
105+
T* records_[size];
106+
padding padding2_;
107+
std::atomic<unsigned int> readIndex_;
108+
padding padding3_;
109+
std::atomic<unsigned int> writeIndex_;
110+
padding padding4_;
111+
DISALLOW_COPY_AND_ASSIGN(ProducerConsumerQueue);
112+
};
113+
114+
} // namespace node
115+
116+
#endif // SRC_PRODUCER_CONSUMER_QUEUE_H_

0 commit comments

Comments
 (0)