forked from mariadb-corporation/mariadb-columnstore-engine
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrowstorage.h
370 lines (314 loc) · 10.8 KB
/
rowstorage.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
/* Copyright (C) 2021 MariaDB Corporation
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#pragma once
#include "resourcemanager.h"
#include "rowgroup.h"
#include "idbcompress.h"
#include <random>
#include <sys/stat.h>
#include <unistd.h>
namespace rowgroup
{
uint32_t calcNumberOfBuckets(ssize_t availMem, uint32_t numOfThreads, uint32_t numOfBuckets,
uint32_t groupsPerThread, uint32_t inRowSize, uint32_t outRowSize,
bool enabledDiskAggr);
class MemManager;
class RowPosHashStorage;
using RowPosHashStoragePtr = std::unique_ptr<RowPosHashStorage>;
class RowGroupStorage;
uint64_t hashRow(const rowgroup::Row& r, std::size_t lastCol);
constexpr const size_t MaxConstStrSize = 2048ULL;
constexpr const size_t MaxConstStrBufSize = MaxConstStrSize << 1;
class RowAggStorage
{
public:
RowAggStorage(const std::string& tmpDir, RowGroup* rowGroupOut, RowGroup* keysRowGroup, uint32_t keyCount,
joblist::ResourceManager* rm = nullptr, boost::shared_ptr<int64_t> sessLimit = {},
bool enabledDiskAgg = false, bool allowGenerations = false,
compress::CompressInterface* compressor = nullptr);
RowAggStorage(const std::string& tmpDir, RowGroup* rowGroupOut, uint32_t keyCount,
joblist::ResourceManager* rm = nullptr, boost::shared_ptr<int64_t> sessLimit = {},
bool enabledDiskAgg = false, bool allowGenerations = false,
compress::CompressInterface* compressor = nullptr)
: RowAggStorage(tmpDir, rowGroupOut, rowGroupOut, keyCount, rm, std::move(sessLimit), enabledDiskAgg,
allowGenerations, compressor)
{
}
~RowAggStorage();
static uint16_t getMaxRows(bool enabledDiskAgg)
{
return (enabledDiskAgg ? 8192 : 256);
}
static size_t getBucketSize();
/** @brief Find or create resulting row.
*
* Create "aggregation key" row if necessary.
* NB! Using getTargetRow() after append() is UB!
*
* @param row(in) input row
* @param rowOut() row to aggregate data from input row
*
* @returns true if new row created, false otherwise
*/
bool getTargetRow(const Row& row, Row& rowOut);
bool getTargetRow(const Row& row, uint64_t row_hash, Row& rowOut);
/** @brief Dump some RGDatas to disk and release memory for further use.
*/
void dump();
/** @brief Append RGData from other RowAggStorage and clear it.
*
* NB! Any operation except getNextRGData() or append() is UB!
*
* @param other(in) donor storage
*/
void append(RowAggStorage& other);
/** @brief Remove last RGData from internal RGData storage and return it.
*
* @returns pointer to the next RGData or nullptr if empty
*/
std::unique_ptr<RGData> getNextRGData();
/** @brief TODO
*
* @param mergeFunc
* @param rowOut
*/
void finalize(std::function<void(Row&)> mergeFunc, Row& rowOut);
/** @brief Calculate maximum size of hash assuming 80% fullness.
*
* @param elems(in) number of elements
* @returns calculated size
*/
inline static size_t calcMaxSize(size_t elems) noexcept
{
if (LIKELY(elems <= std::numeric_limits<size_t>::max() / 100))
return elems * 80 / 100;
return (elems / 100) * 80;
}
inline static size_t calcSizeWithBuffer(size_t elems, size_t maxSize) noexcept
{
return elems + std::min(maxSize, 0xFFUL);
}
inline static size_t calcSizeWithBuffer(size_t elems) noexcept
{
return calcSizeWithBuffer(elems, calcMaxSize(elems));
}
private:
struct Data;
/** @brief Create new RowAggStorage with the same params and load dumped data
*
* @param gen(in) generation number
* @return pointer to a new RowAggStorage
*/
RowAggStorage* clone(uint16_t gen) const;
/** @brief Free any internal data
*/
void freeData();
/** @brief Move internal data & row position inside [insIdx, startIdx] up by 1.
*
* @param startIdx(in) last element's index to move
* @param insIdx(in) first element's index to move
*/
void shiftUp(size_t startIdx, size_t insIdx);
using InfoIdxType = std::pair<uint32_t, size_t>;
inline InfoIdxType rowHashToIdx(uint64_t h, const size_t mask, const uint64_t hashMultiplier,
const uint32_t infoInc, const uint32_t infoHashShift) const
{
// An addition from the original robin hood HM.
h *= hashMultiplier;
h ^= h >> 33U;
uint32_t info = infoInc + static_cast<uint32_t>((h & INFO_MASK) >> infoHashShift);
size_t idx = (h >> INIT_INFO_BITS) & mask;
return {info, idx};
}
inline InfoIdxType rowHashToIdx(uint64_t h) const
{
return rowHashToIdx(h, fCurData->fMask, fCurData->hashMultiplier_, fCurData->fInfoInc,
fCurData->fInfoHashShift);
}
/** @brief Iterate over internal info until info with less-or-equal distance
* from the best position was found.
*
* @param info(in,out) info data
* @param idx(in,out) index
*/
inline void nextWhileLess(uint32_t& info, size_t& idx, const Data* curData) const noexcept
{
while (info < curData->fInfo[idx])
{
next(info, idx, curData);
}
}
inline void nextWhileLess(uint32_t& info, size_t& idx) const noexcept
{
return nextWhileLess(info, idx, fCurData);
}
/** @brief Get next index and corresponding info
*/
inline void next(uint32_t& info, size_t& idx, const Data* curData) const noexcept
{
++(idx);
info += curData->fInfoInc;
}
inline void next(uint32_t& info, size_t& idx) const noexcept
{
return next(info, idx, fCurData);
}
/** @brief Get index and info of the next non-empty entry
*/
inline void nextExisting(uint32_t& info, size_t& idx) const noexcept
{
uint64_t n = 0;
uint64_t data;
while (true)
{
memcpy(&data, fCurData->fInfo.get() + idx, sizeof(data));
if (data == 0)
{
idx += sizeof(n);
}
else
{
break;
}
}
#if BYTE_ORDER == BIG_ENDIAN
n = __builtin_clzll(data) / sizeof(data);
#else
n = __builtin_ctzll(data) / sizeof(data);
#endif
idx += n;
info = fCurData->fInfo[idx];
}
/** @brief Increase internal data size if needed
*/
void increaseSize();
/** @brief Increase distance capacity of info removing 1 bit of the hash.
*
* @returns success
*/
bool tryIncreaseInfo();
/** @brief Reserve space for number of elements (power of two)
*
* This function performs re-insert all data
*
* @param elems(in) new size
*/
void rehashPowerOfTwo(size_t elems);
/** @brief Move elements from old one into rehashed data.
*
* It's mostly the same algo as in getTargetRow(), but returns nothing
* and skips some checks because it's guaranteed that there is no dups.
*
* @param oldIdx(in) index of "old" data
* @param oldHashes(in) old storage of row positions and hashes
*/
void insertSwap(size_t oldIdx, RowPosHashStorage* oldHashes);
/** @brief (Re)Initialize internal data of specified size.
*
* @param elems(in) number of elements
*/
void initData(size_t elems, const RowPosHashStorage* oldHashes);
/** @brief Calculate memory size of info data
*
* @param elems(in) number of elements
* @returns size in bytes
*/
inline static size_t calcBytes(size_t elems) noexcept
{
return elems + sizeof(uint64_t);
}
/** @brief Reserve place sufficient for elems
*
* @param elems(in) number of elements
*/
void reserve(size_t elems);
/** @brief Start new aggregation generation
*
* Dump all the data on disk, including internal info data, positions & row
* hashes, and the rowgroups itself.
*/
void startNewGeneration();
/** @brief Save internal info data on disk */
void dumpInternalData() const;
/** @brief Load previously dumped data from disk
*
* @param gen(in) generation number
*/
void loadGeneration(uint16_t gen);
/** @brief Load previously dumped data into the tmp storage */
void loadGeneration(uint16_t gen, size_t& size, size_t& mask, size_t& maxSize, size_t& hashMultiplier,
uint32_t& infoInc, uint32_t& infoHashShift, std::unique_ptr<uint8_t[]>& info);
/** @brief Remove temporary data files */
void cleanup();
void cleanup(uint16_t gen);
/** @brief Remove all temporary data files */
void cleanupAll() noexcept;
std::string makeDumpFilename(int32_t gen = -1) const;
private:
static constexpr size_t INIT_SIZE{sizeof(uint64_t)};
static constexpr uint32_t INIT_INFO_BITS{5};
static constexpr uint8_t INIT_INFO_INC{1U << INIT_INFO_BITS};
static constexpr size_t INFO_MASK{INIT_INFO_INC - 1U};
static constexpr uint8_t INIT_INFO_HASH_SHIFT{0};
static constexpr uint16_t MAX_INMEMORY_GENS{4};
// This is SplitMix64 implementation borrowed from here
// https://thompsonsed.co.uk/random-number-generators-for-c-performance-tested
inline uint64_t nextRandom()
{
uint64_t z = (fRandom += UINT64_C(0x9E3779B97F4A7C15));
z = (z ^ (z >> 30)) * UINT64_C(0xBF58476D1CE4E5B9);
z = (z ^ (z >> 27)) * UINT64_C(0x94D049BB133111EB);
return z ^ (z >> 31);
}
inline uint64_t nextRandDistib()
{
return nextRandom() % 100;
}
struct Data
{
RowPosHashStoragePtr fHashes;
std::unique_ptr<uint8_t[]> fInfo;
// This is a power of 2 that controls a potential number of hash buckets
// w/o rehashing
size_t fSize{0};
size_t fMask{0};
size_t fMaxSize{0};
uint64_t hashMultiplier_{0xc4ceb9fe1a85ec53ULL};
uint32_t fInfoInc{INIT_INFO_INC};
uint32_t fInfoHashShift{INIT_INFO_HASH_SHIFT};
};
std::vector<std::unique_ptr<Data>> fGens;
Data* fCurData;
uint32_t fMaxRows;
const bool fExtKeys;
std::unique_ptr<RowGroupStorage> fStorage;
std::unique_ptr<RowGroupStorage> fRealKeysStorage;
RowGroupStorage* fKeysStorage;
uint32_t fLastKeyCol;
uint16_t fGeneration{0};
void* fUniqId;
Row fKeyRow;
std::unique_ptr<MemManager> fMM;
uint32_t fNumOfInputRGPerThread;
bool fAggregated = true;
bool fAllowGenerations;
bool fEnabledDiskAggregation;
std::unique_ptr<compress::CompressInterface> fCompressor;
std::string fTmpDir;
bool fInitialized{false};
rowgroup::RowGroup* fRowGroupOut;
rowgroup::RowGroup* fKeysRowGroup;
uint64_t fRandom = 0xc4ceb9fe1a85ec53ULL; // initial integer to set PRNG up
};
} // namespace rowgroup