40#ifndef SOFTWARE_INCLUDE_ENSO_QUEUE_H_
41#define SOFTWARE_INCLUDE_ENSO_QUEUE_H_
60#error "Need support for AVX512F to use Queue."
64static constexpr T align_cache_power_two(T value) {
65 T cache_aligned = (value + kCacheLineSize - 1) & ~(kCacheLineSize - 1);
67 while (power_two < cache_aligned) {
97template <
typename T,
typename Sub
class>
100 static constexpr size_t kElementMetaSize =
sizeof(T) + 8;
101 static constexpr size_t kElementPadding =
102 align_cache_power_two(kElementMetaSize) - kElementMetaSize;
109 static_assert(
sizeof(
Element) == kCacheLineSize,
110 "Element must fit in a cache line");
113 "Element size must be a power of two");
116 if (buf_addr_ !=
nullptr) {
117 munmap(buf_addr_, size_);
118 if (created_queue_) {
119 unlink(huge_page_path_.c_str());
124 Queue(Queue&& other) =
default;
125 Queue& operator=(Queue&& other) =
default;
142 const std::string& queue_name,
size_t size = 0,
143 bool join_if_exists =
true, std::string huge_page_prefix =
"") noexcept {
144 if (huge_page_prefix ==
"") {
145 huge_page_prefix = kHugePageDefaultPrefix;
148 std::unique_ptr<Subclass> queue(
149 new (std::nothrow) Subclass(queue_name, size, huge_page_prefix));
151 if (queue ==
nullptr) {
152 return std::unique_ptr<Subclass>{};
155 if (queue->Init(join_if_exists)) {
156 return std::unique_ptr<Subclass>{};
172 inline size_t size() const noexcept {
return size_; }
178 inline uint32_t
capacity() const noexcept {
return capacity_; }
180 static_assert(std::is_trivially_copyable<T>::value,
181 "T must be trivially copyable");
184 explicit Queue(
const std::string& queue_name,
size_t size,
185 const std::string& huge_page_prefix) noexcept
187 queue_name_(queue_name),
188 huge_page_prefix_(huge_page_prefix) {}
198 int Init(
bool join_if_exists)
noexcept {
200 size_ = kBufPageSize;
203 if ((size_ & (size_ - 1)) != 0) {
204 std::cerr <<
"Queue size must be a power of two" << std::endl;
208 if (size_ <
sizeof(
struct Element)) {
209 std::cerr <<
"Queue size must be at least " <<
sizeof(
struct Element)
210 <<
" bytes" << std::endl;
214 capacity_ = size_ / sizeof(struct Element);
215 index_mask_ = capacity_ - 1;
219 huge_page_prefix_ + std::string(kHugePageQueuePathPrefix) + queue_name_;
222 std::filesystem::path path = std::filesystem::path(huge_page_path_);
223 std::filesystem::path dir_path = path.parent_path();
224 std::string file_name = path.filename();
226 bool create_queue = true;
227 for (const auto& entry : std::filesystem::directory_iterator(dir_path)) {
228 if (!entry.is_regular_file()) {
232 std::string entry_name = entry.path().filename().string();
235 if (entry_name.find(file_name) == 0) {
236 std::string size_str = entry_name.substr(file_name.size());
239 if (size_str.find_first_not_of(
"0123456789") != std::string::npos) {
240 std::cerr <<
"Found existing queue with invalid size: " << size_str
245 size_t existing_size = std::stoul(size_str);
247 if (existing_size != size_) {
248 std::cerr <<
"Found existing queue with different size: "
249 << existing_size << std::endl;
253 create_queue =
false;
257 if (!join_if_exists && !create_queue) {
258 std::cerr <<
"Queue already exists" << std::endl;
262 created_queue_ = create_queue;
263 huge_page_path_ += std::to_string(size_);
266 if (addr ==
nullptr) {
267 std::cerr <<
"Failed to allocate shared memory" << std::endl;
270 buf_addr_ =
reinterpret_cast<Element*
>(addr);
273 memset(buf_addr_, 0, size_);
279 inline bool created_queue()
const noexcept {
return created_queue_; }
281 inline uint32_t index_mask()
const noexcept {
return index_mask_; }
285 Queue& operator=(
const Queue& other) =
delete;
289 uint32_t index_mask_;
291 std::string huge_page_path_;
292 bool created_queue_ =
false;
293 std::string queue_name_;
294 std::string huge_page_prefix_;
306 inline int Push(
const T& data) {
307 struct Parent::Element* current_element = &(Parent::buf_addr()[tail_]);
308 if (unlikely(current_element->signal)) {
312 __m512i tmp_element_raw;
313 struct Parent::Element* tmp_element =
314 (
struct Parent::Element*)(&tmp_element_raw);
315 tmp_element->signal = 1;
316 tmp_element->data = data;
318 _mm512_storeu_si512((__m512i*)current_element, tmp_element_raw);
320 tail_ = (tail_ + 1) & Parent::index_mask();
326 explicit QueueProducer(
const std::string& queue_name,
size_t size,
327 const std::string& huge_page_prefix) noexcept
338 int Init(
bool join_if_exists)
noexcept {
339 if (Parent::Init(join_if_exists)) {
343 if (Parent::created_queue()) {
348 struct Parent::Element* buf = Parent::buf_addr();
349 for (uint32_t i = 0; i < Parent::capacity(); ++i) {
351 tail_ = (i + 1) & Parent::index_mask();
354 if (buf[tail_].signal == 0) {
359 if (tail_ == 0 && buf[0].signal) {
360 std::cerr <<
"Cannot synchronize a full queue" << std::endl;
386 struct Parent::Element* current_element = &(Parent::buf_addr()[head_]);
387 if (!current_element->signal) {
390 return &(current_element->data);
398 inline std::optional<T>
Pop() {
399 struct Parent::Element* current_element = &(Parent::buf_addr()[head_]);
400 if (!current_element->signal) {
404 T data = current_element->data;
405 current_element->signal = 0;
407 head_ = (head_ + 1) & Parent::index_mask();
413 explicit QueueConsumer(
const std::string& queue_name,
size_t size,
414 const std::string& huge_page_prefix) noexcept
425 int Init(
bool join_if_exists)
noexcept {
426 if (Parent::Init(join_if_exists)) {
430 if (Parent::created_queue()) {
435 struct Parent::Element* buf = Parent::buf_addr();
436 for (uint32_t i = Parent::capacity(); i > 0; --i) {
437 if (buf[i - 1].signal) {
441 uint32_t prev_element = (head_ - 1) & Parent::index_mask();
442 if (buf[prev_element].signal == 0) {
447 if (head_ == 0 && buf[0].signal) {
448 std::cerr <<
"Cannot synchronize a full queue" << std::endl;
T * Front()
Returns the data at the front of the queue without popping it.
int Init(bool join_if_exists) noexcept
Initializes the Queue object.
std::optional< T > Pop()
Pops data from the queue.
int Init(bool join_if_exists) noexcept
Initializes the Queue object.
int Push(const T &data)
Pushes data to the queue.
Element * buf_addr() const noexcept
Returns the address of the internal buffer.
uint32_t capacity() const noexcept
Returns the capacity of the queue.
int Init(bool join_if_exists) noexcept
Initializes the Queue object.
static std::unique_ptr< Subclass > Create(const std::string &queue_name, size_t size=0, bool join_if_exists=true, std::string huge_page_prefix="") noexcept
Factory method to create a Queue object.
size_t size() const noexcept
Returns the size of the internal buffer.
Constants used throughout the codebase. Some of these constants need to be kept in sync with the hard...
Miscellaneous helper functions.
void * get_huge_page(const std::string &path, size_t size=0, bool mirror=false)