Ensō 0.4.6
Software API reference
Loading...
Searching...
No Matches
queue.h
Go to the documentation of this file.
1/*
2 * Copyright (c) 2023, Carnegie Mellon University
3 *
4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted (subject to the limitations in the disclaimer
6 * below) provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 *
11 * * Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 *
15 * * Neither the name of the copyright holder nor the names of its
16 * contributors may be used to endorse or promote products derived from
17 * this software without specific prior written permission.
18 *
19 * NO EXPRESS OR IMPLIED LICENSES TO ANY PARTY'S PATENT RIGHTS ARE GRANTED BY
20 * THIS LICENSE. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
21 * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT
22 * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
23 * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
24 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
30 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 */
32
40#ifndef SOFTWARE_INCLUDE_ENSO_QUEUE_H_
41#define SOFTWARE_INCLUDE_ENSO_QUEUE_H_
42
43#include <enso/consts.h>
44#include <enso/helpers.h>
45#include <sys/mman.h>
46#include <unistd.h>
47
48#include <cstdint>
49#include <cstring>
50#include <filesystem>
51#include <iostream>
52#include <memory>
53#include <optional>
54#include <string>
55
56namespace enso {
57
58// This implementation assumes AVX512.
59#ifndef __AVX512F__
60#error "Need support for AVX512F to use Queue."
61#endif // __AVX512F__
62
63template <typename T>
64static constexpr T align_cache_power_two(T value) {
65 T cache_aligned = (value + kCacheLineSize - 1) & ~(kCacheLineSize - 1);
66 T power_two = 1;
67 while (power_two < cache_aligned) {
68 power_two <<= 1;
69 }
70 return power_two;
71}
72
97template <typename T, typename Subclass>
98class Queue {
99 public:
100 static constexpr size_t kElementMetaSize = sizeof(T) + 8;
101 static constexpr size_t kElementPadding =
102 align_cache_power_two(kElementMetaSize) - kElementMetaSize;
103
104 struct alignas(kCacheLineSize) Element {
105 uint64_t signal;
106 T data;
107 };
108
109 static_assert(sizeof(Element) == kCacheLineSize,
110 "Element must fit in a cache line");
111
112 static_assert((sizeof(Element) & (sizeof(Element) - 1)) == 0,
113 "Element size must be a power of two");
114
115 ~Queue() noexcept {
116 if (buf_addr_ != nullptr) {
117 munmap(buf_addr_, size_);
118 if (created_queue_) {
119 unlink(huge_page_path_.c_str());
120 }
121 }
122 }
123
124 Queue(Queue&& other) = default;
125 Queue& operator=(Queue&& other) = default;
126
141 static std::unique_ptr<Subclass> Create(
142 const std::string& queue_name, size_t size = 0,
143 bool join_if_exists = true, std::string huge_page_prefix = "") noexcept {
144 if (huge_page_prefix == "") {
145 huge_page_prefix = kHugePageDefaultPrefix;
146 }
147
148 std::unique_ptr<Subclass> queue(
149 new (std::nothrow) Subclass(queue_name, size, huge_page_prefix));
150
151 if (queue == nullptr) {
152 return std::unique_ptr<Subclass>{};
153 }
154
155 if (queue->Init(join_if_exists)) {
156 return std::unique_ptr<Subclass>{};
157 }
158
159 return queue;
160 }
161
166 inline Element* buf_addr() const noexcept { return buf_addr_; }
167
172 inline size_t size() const noexcept { return size_; }
173
178 inline uint32_t capacity() const noexcept { return capacity_; }
179
180 static_assert(std::is_trivially_copyable<T>::value,
181 "T must be trivially copyable");
182
183 protected:
184 explicit Queue(const std::string& queue_name, size_t size,
185 const std::string& huge_page_prefix) noexcept
186 : size_(size),
187 queue_name_(queue_name),
188 huge_page_prefix_(huge_page_prefix) {}
189
198 int Init(bool join_if_exists) noexcept {
199 if (size_ == 0) {
200 size_ = kBufPageSize;
201 }
202
203 if ((size_ & (size_ - 1)) != 0) {
204 std::cerr << "Queue size must be a power of two" << std::endl;
205 return -1;
206 }
207
208 if (size_ < sizeof(struct Element)) {
209 std::cerr << "Queue size must be at least " << sizeof(struct Element)
210 << " bytes" << std::endl;
211 return -1;
212 }
213
214 capacity_ = size_ / sizeof(struct Element);
215 index_mask_ = capacity_ - 1;
216
217 // Keep path so that we can unlink it later if needed.
218 huge_page_path_ =
219 huge_page_prefix_ + std::string(kHugePageQueuePathPrefix) + queue_name_;
220
221 // Check if a file starting with huge_page_path_ exists.
222 std::filesystem::path path = std::filesystem::path(huge_page_path_);
223 std::filesystem::path dir_path = path.parent_path();
224 std::string file_name = path.filename();
225
226 bool create_queue = true;
227 for (const auto& entry : std::filesystem::directory_iterator(dir_path)) {
228 if (!entry.is_regular_file()) {
229 continue;
230 }
231
232 std::string entry_name = entry.path().filename().string();
233
234 // File starting with file_name exists.
235 if (entry_name.find(file_name) == 0) {
236 std::string size_str = entry_name.substr(file_name.size());
237
238 // Check if the string is a number.
239 if (size_str.find_first_not_of("0123456789") != std::string::npos) {
240 std::cerr << "Found existing queue with invalid size: " << size_str
241 << std::endl;
242 return -1;
243 }
244
245 size_t existing_size = std::stoul(size_str);
246
247 if (existing_size != size_) {
248 std::cerr << "Found existing queue with different size: "
249 << existing_size << std::endl;
250 return -1;
251 }
252
253 create_queue = false;
254 }
255 }
256
257 if (!join_if_exists && !create_queue) {
258 std::cerr << "Queue already exists" << std::endl;
259 return -1;
260 }
261
262 created_queue_ = create_queue;
263 huge_page_path_ += std::to_string(size_);
264
265 void* addr = get_huge_page(huge_page_path_, size_);
266 if (addr == nullptr) {
267 std::cerr << "Failed to allocate shared memory" << std::endl;
268 return -1;
269 }
270 buf_addr_ = reinterpret_cast<Element*>(addr);
271
272 if (create_queue) {
273 memset(buf_addr_, 0, size_);
274 }
275
276 return 0;
277 }
278
279 inline bool created_queue() const noexcept { return created_queue_; }
280
281 inline uint32_t index_mask() const noexcept { return index_mask_; }
282
283 private:
284 Queue(const Queue& other) = delete;
285 Queue& operator=(const Queue& other) = delete;
286
287 size_t size_;
288 uint32_t capacity_; // In number of elements.
289 uint32_t index_mask_;
290 Element* buf_addr_ = nullptr;
291 std::string huge_page_path_;
292 bool created_queue_ = false;
293 std::string queue_name_;
294 std::string huge_page_prefix_;
295};
296
297template <typename T>
298class QueueProducer : public Queue<T, QueueProducer<T>> {
299 public:
306 inline int Push(const T& data) {
307 struct Parent::Element* current_element = &(Parent::buf_addr()[tail_]);
308 if (unlikely(current_element->signal)) {
309 return -1; // Queue is full.
310 }
311
312 __m512i tmp_element_raw;
313 struct Parent::Element* tmp_element =
314 (struct Parent::Element*)(&tmp_element_raw);
315 tmp_element->signal = 1;
316 tmp_element->data = data;
317
318 _mm512_storeu_si512((__m512i*)current_element, tmp_element_raw);
319
320 tail_ = (tail_ + 1) & Parent::index_mask();
321
322 return 0;
323 }
324
325 protected:
326 explicit QueueProducer(const std::string& queue_name, size_t size,
327 const std::string& huge_page_prefix) noexcept
328 : Queue<T, QueueProducer<T>>(queue_name, size, huge_page_prefix) {}
329
338 int Init(bool join_if_exists) noexcept {
339 if (Parent::Init(join_if_exists)) {
340 return -1;
341 }
342
343 if (Parent::created_queue()) {
344 return 0;
345 }
346
347 // Synchronize the pointer in case the queue is not empty.
348 struct Parent::Element* buf = Parent::buf_addr();
349 for (uint32_t i = 0; i < Parent::capacity(); ++i) {
350 if (buf[i].signal) {
351 tail_ = (i + 1) & Parent::index_mask();
352 }
353
354 if (buf[tail_].signal == 0) {
355 break;
356 }
357 }
358
359 if (tail_ == 0 && buf[0].signal) {
360 std::cerr << "Cannot synchronize a full queue" << std::endl;
361 return -1;
362 }
363
364 return 0;
365 }
366
367 private:
368 using Parent = Queue<T, QueueProducer<T>>;
369 friend Parent;
370
371 uint32_t tail_ = 0;
372};
373
374template <typename T>
375class QueueConsumer : public Queue<T, QueueConsumer<T>> {
376 public:
377 ~QueueConsumer() noexcept {}
378
385 inline T* Front() {
386 struct Parent::Element* current_element = &(Parent::buf_addr()[head_]);
387 if (!current_element->signal) {
388 return nullptr; // Queue is empty.
389 }
390 return &(current_element->data);
391 }
392
398 inline std::optional<T> Pop() {
399 struct Parent::Element* current_element = &(Parent::buf_addr()[head_]);
400 if (!current_element->signal) {
401 return {}; // Queue is empty.
402 }
403
404 T data = current_element->data;
405 current_element->signal = 0;
406
407 head_ = (head_ + 1) & Parent::index_mask();
408
409 return data;
410 }
411
412 protected:
413 explicit QueueConsumer(const std::string& queue_name, size_t size,
414 const std::string& huge_page_prefix) noexcept
415 : Queue<T, QueueConsumer<T>>(queue_name, size, huge_page_prefix) {}
416
425 int Init(bool join_if_exists) noexcept {
426 if (Parent::Init(join_if_exists)) {
427 return -1;
428 }
429
430 if (Parent::created_queue()) {
431 return 0;
432 }
433
434 // Synchronize the pointer in case the queue is not empty.
435 struct Parent::Element* buf = Parent::buf_addr();
436 for (uint32_t i = Parent::capacity(); i > 0; --i) {
437 if (buf[i - 1].signal) {
438 head_ = i - 1;
439 }
440
441 uint32_t prev_element = (head_ - 1) & Parent::index_mask();
442 if (buf[prev_element].signal == 0) {
443 break;
444 }
445 }
446
447 if (head_ == 0 && buf[0].signal) {
448 std::cerr << "Cannot synchronize a full queue" << std::endl;
449 return -1;
450 }
451
452 return 0;
453 }
454
455 private:
456 using Parent = Queue<T, QueueConsumer<T>>;
457 friend Parent;
458
459 uint32_t head_ = 0;
460};
461
462} // namespace enso
463
464#endif // SOFTWARE_INCLUDE_ENSO_QUEUE_H_
T * Front()
Returns the data at the front of the queue without popping it.
Definition: queue.h:385
int Init(bool join_if_exists) noexcept
Initializes the Queue object.
Definition: queue.h:425
std::optional< T > Pop()
Pops data from the queue.
Definition: queue.h:398
int Init(bool join_if_exists) noexcept
Initializes the Queue object.
Definition: queue.h:338
int Push(const T &data)
Pushes data to the queue.
Definition: queue.h:306
Queue parent class.
Definition: queue.h:98
Element * buf_addr() const noexcept
Returns the address of the internal buffer.
Definition: queue.h:166
uint32_t capacity() const noexcept
Returns the capacity of the queue.
Definition: queue.h:178
int Init(bool join_if_exists) noexcept
Initializes the Queue object.
Definition: queue.h:198
static std::unique_ptr< Subclass > Create(const std::string &queue_name, size_t size=0, bool join_if_exists=true, std::string huge_page_prefix="") noexcept
Factory method to create a Queue object.
Definition: queue.h:141
size_t size() const noexcept
Returns the size of the internal buffer.
Definition: queue.h:172
Constants used throughout the codebase. Some of these constants need to be kept in sync with the hard...
Miscellaneous helper functions.
void * get_huge_page(const std::string &path, size_t size=0, bool mirror=false)
Definition: ixy_helpers.cpp:82