Ensō 0.4.6
Software API reference
Loading...
Searching...
No Matches
pipe.h
Go to the documentation of this file.
1/*
2 * Copyright (c) 2023, Carnegie Mellon University
3 *
4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted (subject to the limitations in the disclaimer
6 * below) provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 *
11 * * Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 *
15 * * Neither the name of the copyright holder nor the names of its
16 * contributors may be used to endorse or promote products derived from
17 * this software without specific prior written permission.
18 *
19 * NO EXPRESS OR IMPLIED LICENSES TO ANY PARTY'S PATENT RIGHTS ARE GRANTED BY
20 * THIS LICENSE. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
21 * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT
22 * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
23 * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
24 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
30 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 */
32
42#ifndef SOFTWARE_INCLUDE_ENSO_PIPE_H_
43#define SOFTWARE_INCLUDE_ENSO_PIPE_H_
44
45#include <enso/consts.h>
46#include <enso/helpers.h>
47#include <enso/internals.h>
48
49#include <array>
50#include <cassert>
51#include <functional>
52#include <iostream>
53#include <memory>
54#include <string>
55#include <vector>
56
57namespace enso {
58
59class RxPipe;
60class TxPipe;
61class RxTxPipe;
62
63class PktIterator;
64class PeekPktIterator;
65
66uint32_t external_peek_next_batch_from_queue(
67 struct RxEnsoPipeInternal* enso_pipe,
68 struct NotificationBufPair* notification_buf_pair, void** buf);
69
82class Device {
83 public:
94 static std::unique_ptr<Device> Create(
95 const std::string& pcie_addr = "",
96 const std::string& huge_page_prefix = "") noexcept;
97
98 Device(const Device&) = delete;
99 Device& operator=(const Device&) = delete;
100 Device(Device&&) = delete;
101 Device& operator=(Device&&) = delete;
102
103 ~Device();
104
122 RxPipe* AllocateRxPipe(bool fallback = false) noexcept;
123
133 TxPipe* AllocateTxPipe(uint8_t* buf = nullptr) noexcept;
134
138 int GetNbFallbackQueues() noexcept;
139
157 RxTxPipe* AllocateRxTxPipe(bool fallback = false) noexcept;
158
169
180
184 void ProcessCompletions();
185
201 int EnableTimeStamping(uint8_t offset = kDefaultRttOffset);
202
212
254 int EnableRateLimiting(uint16_t num, uint16_t den);
255
267
278 int EnableRoundRobin();
279
291 int DisableRoundRobin();
292
299 int GetRoundRobinStatus() noexcept;
300
307 int ApplyConfig(struct TxNotification* config_notification);
308
309 private:
310 struct TxPendingRequest {
311 uint32_t pipe_id;
312 uint32_t nb_bytes;
313 };
314
318 Device(const std::string& pcie_addr, std::string huge_page_prefix) noexcept
319 : kPcieAddr(pcie_addr) {
320#ifndef NDEBUG
321 std::cerr << "Warning: assertions are enabled. Performance may be affected."
322 << std::endl;
323#endif // NDEBUG
324 if (huge_page_prefix == "") {
325 huge_page_prefix = std::string(kHugePageDefaultPrefix);
326 }
327 huge_page_prefix_ = huge_page_prefix;
328 }
329
335 int Init() noexcept;
336
346 void Send(uint32_t tx_enso_pipe_id, uint64_t phys_addr, uint32_t nb_bytes);
347
348 friend class RxPipe;
349 friend class TxPipe;
350 friend class RxTxPipe;
351
352 const std::string kPcieAddr;
353
354 struct NotificationBufPair notification_buf_pair_;
355 int16_t core_id_;
356 uint16_t bdf_;
357 std::string huge_page_prefix_;
358
359 std::vector<RxPipe*> rx_pipes_;
360 std::vector<TxPipe*> tx_pipes_;
361 std::vector<RxTxPipe*> rx_tx_pipes_;
362
363 std::array<RxPipe*, kMaxNbFlows> rx_pipes_map_ = {};
364 std::array<RxTxPipe*, kMaxNbFlows> rx_tx_pipes_map_ = {};
365
366 int32_t next_pipe_id_ = -1;
367
368 uint32_t tx_pr_head_ = 0;
369 uint32_t tx_pr_tail_ = 0;
370 std::array<TxPendingRequest, kMaxPendingTxRequests + 1> tx_pending_requests_;
371 static constexpr uint32_t kPendingTxRequestsBufMask = kMaxPendingTxRequests;
372 static_assert((kMaxPendingTxRequests & (kMaxPendingTxRequests + 1)) == 0,
373 "kMaxPendingTxRequests + 1 must be a power of 2");
374};
375
394class RxPipe {
395 public:
402 template <typename T>
404 public:
408 constexpr MessageBatch() : MessageBatch(nullptr, 0, 0, nullptr) {}
409
410 MessageBatch(const MessageBatch&) = default;
411 MessageBatch(MessageBatch&&) = default;
412
413 MessageBatch& operator=(const MessageBatch&) = default;
414 MessageBatch& operator=(MessageBatch&&) = default;
415
416 constexpr T begin() { return T(buf_, message_limit_, this); }
417 constexpr T begin() const { return T(buf_, message_limit_, this); }
418
419 constexpr T end() {
420 return T(buf_ + available_bytes_, message_limit_, this);
421 }
422 constexpr T end() const {
423 return T(buf_ + available_bytes_, message_limit_, this);
424 }
425
429 uint32_t processed_bytes() const { return processed_bytes_; }
430
437 inline void NotifyProcessedBytes(uint32_t nb_bytes) {
438 processed_bytes_ += nb_bytes;
439 }
440
451 uint32_t available_bytes() const { return available_bytes_; }
452
458 int32_t message_limit() const { return message_limit_; }
459
465 uint8_t* buf() const { return buf_; }
466
467 private:
476 constexpr MessageBatch(uint8_t* buf, uint32_t available_bytes,
477 int32_t message_limit, RxPipe* pipe)
478 : available_bytes_(available_bytes),
479 message_limit_(message_limit),
480 buf_(buf),
481 pipe_(pipe) {}
482
483 friend class RxPipe;
484 friend T;
485
486 uint32_t available_bytes_;
487 int32_t message_limit_;
488 uint8_t* buf_;
489 uint32_t processed_bytes_ = 0;
490 RxPipe* pipe_;
491 };
492
493 RxPipe(const RxPipe&) = delete;
494 RxPipe& operator=(const RxPipe&) = delete;
495 RxPipe(RxPipe&&) = delete;
496 RxPipe& operator=(RxPipe&&) = delete;
497
548 int Bind(uint16_t dst_port, uint16_t src_port, uint32_t dst_ip,
549 uint32_t src_ip, uint32_t protocol);
550
559 uint32_t Recv(uint8_t** buf, uint32_t max_nb_bytes);
560
569 uint32_t Peek(uint8_t** buf, uint32_t max_nb_bytes);
570
583 constexpr void ConfirmBytes(uint32_t nb_bytes) {
584 uint32_t rx_tail = internal_rx_pipe_.rx_tail;
585 rx_tail = (rx_tail + nb_bytes / 64) % kEnsoPipeSize;
586 internal_rx_pipe_.rx_tail = rx_tail;
587 }
588
595 constexpr uint32_t capacity() const {
596 uint32_t rx_head = internal_rx_pipe_.rx_head;
597 uint32_t rx_tail = internal_rx_pipe_.rx_tail;
598 return ((rx_head - rx_tail) % kEnsoPipeSize) * 64;
599 }
600
613 template <typename T>
614 constexpr MessageBatch<T> RecvMessages(int32_t max_nb_messages = -1) {
615 uint8_t* buf = nullptr;
616 uint32_t recv = Peek(&buf, ~0);
617 return MessageBatch<T>((uint8_t*)buf, recv, max_nb_messages, this);
618 }
619
629 inline MessageBatch<PktIterator> RecvPkts(int32_t max_nb_pkts = -1) {
630 return RecvMessages<PktIterator>(max_nb_pkts);
631 }
632
642 inline MessageBatch<PeekPktIterator> PeekPkts(int32_t max_nb_pkts = -1) {
643 return RecvMessages<PeekPktIterator>(max_nb_pkts);
644 }
645
655 void Prefetch();
656
664 void Free(uint32_t nb_bytes);
665
671 void Clear();
672
678 inline uint8_t* buf() const { return (uint8_t*)internal_rx_pipe_.buf; }
679
685 inline enso_pipe_id_t id() const { return id_; }
686
699 inline void* context() const { return context_; }
700
706 inline void set_context(void* new_context) { context_ = new_context; }
707
712 static constexpr uint32_t kQuantumSize = 64;
713
718 static constexpr uint32_t kMaxCapacity = kEnsoPipeSize * 64 - kQuantumSize;
719
720 private:
727 explicit RxPipe(Device* device) noexcept
728 : notification_buf_pair_(&(device->notification_buf_pair_)) {}
729
734 ~RxPipe();
735
743 int Init(bool fallback) noexcept;
744
745 void SetAsNextPipe() noexcept { next_pipe_ = true; }
746
747 friend class Device;
748
749 bool next_pipe_ = false;
752 enso_pipe_id_t id_;
753 void* context_;
754 struct RxEnsoPipeInternal internal_rx_pipe_;
755 struct NotificationBufPair* notification_buf_pair_;
756};
757
782class TxPipe {
783 public:
784 TxPipe(const TxPipe&) = delete;
785 TxPipe& operator=(const TxPipe&) = delete;
786 TxPipe(TxPipe&&) = delete;
787 TxPipe& operator=(TxPipe&&) = delete;
788
819 uint8_t* AllocateBuf(uint32_t target_capacity = 0) {
820 ExtendBufToTarget(target_capacity);
821 return buf_ + app_begin_;
822 }
823
841 inline void SendAndFree(uint32_t nb_bytes) {
842 uint64_t phys_addr = buf_phys_addr_ + app_begin_;
843 assert(nb_bytes <= kMaxCapacity);
844 assert(nb_bytes / kQuantumSize * kQuantumSize == nb_bytes);
845
846 app_begin_ = (app_begin_ + nb_bytes) & kBufMask;
847
848 device_->Send(kId, phys_addr, nb_bytes);
849 }
850
866 inline uint32_t TryExtendBuf() {
867 device_->ProcessCompletions();
868 return capacity();
869 }
870
887 inline uint32_t ExtendBufToTarget(uint32_t target_capacity) {
888 uint32_t _capacity = capacity();
889 assert(target_capacity <= kMaxCapacity);
890 while (_capacity < target_capacity) {
891 _capacity = TryExtendBuf();
892 }
893 return _capacity;
894 }
895
904 inline uint32_t capacity() const {
905 return (app_end_ - app_begin_ - 1) & kBufMask;
906 }
907
913 inline uint32_t pending_transmission() const {
914 return kMaxCapacity - ((app_end_ - app_begin_) & kBufMask);
915 }
916
922 inline uint8_t* buf() const { return buf_; }
923
929 inline enso_pipe_id_t id() const { return kId; }
930
941 inline void* context() const { return context_; }
942
948 inline void set_context(void* new_context) { context_ = new_context; }
949
954 static constexpr uint32_t kQuantumSize = 64;
955
960 static constexpr uint32_t kMaxCapacity = kEnsoPipeSize * 64 - kQuantumSize;
961
962 private:
972 explicit TxPipe(uint32_t id, Device* device, uint8_t* buf = nullptr) noexcept
973 : kId(id), device_(device), buf_(buf), internal_buf_(buf == nullptr) {}
974
979 ~TxPipe();
980
986 int Init() noexcept;
987
995 inline void NotifyCompletion(uint32_t nb_bytes) {
996 app_end_ = (app_end_ + nb_bytes) & kBufMask;
997 }
998
999 inline std::string GetHugePageFilePath() const {
1000 return device_->huge_page_prefix_ + std::string(kHugePagePathPrefix) +
1001 std::to_string(kId);
1002 }
1003
1004 friend class Device;
1005
1006 const enso_pipe_id_t kId;
1007 void* context_;
1008 Device* device_;
1009 uint8_t* buf_;
1010 bool internal_buf_; // If true, the buffer is allocated internally.
1011 uint32_t app_begin_ = 0; // The next byte to be sent.
1012 uint32_t app_end_ = 0; // The next byte to be allocated.
1013 uint64_t buf_phys_addr_;
1014
1015 static constexpr uint32_t kBufMask = (kMaxCapacity + kQuantumSize) - 1;
1016 static_assert((kBufMask & (kBufMask + 1)) == 0,
1017 "(kBufMask + 1) must be a power of 2");
1018
1019 // Buffer layout:
1020 // | app_begin_ | app_end_
1021 // v v
1022 // +---+------------------------+---+---------------------+--------+
1023 // | | Waiting transmission | | Available to user | |
1024 // +---+------------------------+---+---------------------+--------+
1025 // ^ ^
1026 // | hw_begin_ | hw_end_
1027 //
1028 // The area available to the user is between `app_begin_` and `app_end_`.
1029 // The area between `hw_begin_` and `hw_end_` is waiting to be transmitted.
1030 //
1031 // SendAndFree() will advance `app_begin_` by `nb_bytes_`, reducing the
1032 // available space to the user. The size of the available region can be
1033 // obtained by calling `capacity()`.
1034 //
1035 // To reclaim space, the user must call `Extend()`, which will check for
1036 // completions and potentially advance the `hw_begin_`. We can then advance
1037 // `app_end_` to match `hw_begin_`, increasing the available space to the
1038 // user.
1039 //
1040 // All the buffer pointers (i.e., app_begin_, app_end_, hw_begin_, hw_end_)
1041 // are in units of 64 bytes. The buffer itself is a circular buffer, so
1042 // `app_end_` can be smaller than `app_begin_`.
1043 //
1044 // Currently app_begin_ == hw_end_ and app_end_ == hw_begin_. But this may
1045 // change in the future. One reason it might be useful to change this is that
1046 // currently `Device::Send()` blocks when the notification buffer is full. If
1047 // we make sending non-blocking, it may be useful to let the hw_*_ and app_*_
1048 // pointers diverge.
1049};
1050
1077 public:
1078 RxTxPipe(const RxTxPipe&) = delete;
1079 RxTxPipe& operator=(const RxTxPipe&) = delete;
1080 RxTxPipe(RxTxPipe&&) = delete;
1081 RxTxPipe& operator=(RxTxPipe&&) = delete;
1082
1086 inline int Bind(uint16_t dst_port, uint16_t src_port, uint32_t dst_ip,
1087 uint32_t src_ip, uint32_t protocol) {
1088 return rx_pipe_->Bind(dst_port, src_port, dst_ip, src_ip, protocol);
1089 }
1090
1094 inline uint32_t Recv(uint8_t** buf, uint32_t max_nb_bytes) {
1095 device_->ProcessCompletions();
1096 return rx_pipe_->Recv(buf, max_nb_bytes);
1097 }
1098
1102 inline uint32_t Peek(uint8_t** buf, uint32_t max_nb_bytes) {
1103 device_->ProcessCompletions();
1104 return rx_pipe_->Peek(buf, max_nb_bytes);
1105 }
1106
1110 inline void ConfirmBytes(uint32_t nb_bytes) {
1111 rx_pipe_->ConfirmBytes(nb_bytes);
1112 }
1113
1117 template <typename T>
1118 inline RxPipe::MessageBatch<T> RecvMessages(int32_t max_nb_messages = -1) {
1119 device_->ProcessCompletions();
1120 return rx_pipe_->RecvMessages<T>(max_nb_messages);
1121 }
1122
1126 inline RxPipe::MessageBatch<PktIterator> RecvPkts(int32_t max_nb_pkts = -1) {
1127 device_->ProcessCompletions();
1128 return rx_pipe_->RecvPkts(max_nb_pkts);
1129 }
1130
1135 int32_t max_nb_pkts = -1) {
1136 device_->ProcessCompletions();
1137 return rx_pipe_->PeekPkts(max_nb_pkts);
1138 }
1139
1149 inline void Prefetch() { rx_pipe_->Prefetch(); }
1150
1160 inline void SendAndFree(uint32_t nb_bytes) {
1161 tx_pipe_->SendAndFree(nb_bytes);
1162 last_tx_pipe_capacity_ -= nb_bytes;
1163 }
1164
1169 inline void ProcessCompletions() {
1170 uint32_t new_capacity = tx_pipe_->capacity();
1171
1172 // If the capacity has increased, we need to free up space in the RX pipe.
1173 if (new_capacity > last_tx_pipe_capacity_) {
1174 rx_pipe_->Free(new_capacity - last_tx_pipe_capacity_);
1175 last_tx_pipe_capacity_ = new_capacity;
1176 }
1177 }
1178
1184 inline uint8_t* buf() const { return rx_pipe_->buf(); }
1185
1191 inline enso_pipe_id_t rx_id() const { return rx_pipe_->id(); }
1192
1198 inline enso_pipe_id_t tx_id() const { return tx_pipe_->id(); }
1199
1212 inline void* context() const { return rx_pipe_->context(); }
1213
1219 inline void set_context(void* new_context) {
1220 rx_pipe_->set_context(new_context);
1221 }
1222
1226 static constexpr uint32_t kQuantumSize = RxPipe::kQuantumSize;
1227
1231 static constexpr uint32_t kMaxCapacity = RxPipe::kMaxCapacity;
1232
1234 "Quantum size mismatch");
1236 "Max capacity mismatch");
1237
1238 private:
1243 static constexpr uint32_t kCompletionsThreshold = kEnsoPipeSize * 64 / 2;
1244
1252 explicit RxTxPipe(Device* device) noexcept : device_(device) {}
1253
1258 ~RxTxPipe() = default;
1259
1267 int Init(bool fallback) noexcept;
1268
1269 friend class Device;
1270
1271 Device* device_;
1272 RxPipe* rx_pipe_;
1273 TxPipe* tx_pipe_;
1274 uint32_t last_tx_pipe_capacity_;
1275};
1276
1283template <typename T>
1285 public:
1286 constexpr uint8_t* operator*() { return addr_; }
1287
1288 /*
1289 * This is a bit ugly but, as far as I know, there is no way to check for the
1290 * end of a range-based for loop without overloading the != operator.
1291 *
1292 * The issue is that it does not work as expected for actual inequality check.
1293 */
1294 constexpr bool operator!=(const MessageIteratorBase& other) const {
1295 return (missing_messages_ != 0) && (next_addr_ <= other.addr_);
1296 }
1297
1298 constexpr T& operator++() {
1299 T* child = static_cast<T*>(this);
1300
1301 uint32_t nb_bytes = next_addr_ - addr_;
1302
1303 child->OnAdvanceMessage(nb_bytes);
1304
1305 addr_ = next_addr_;
1306 next_addr_ = child->GetNextMessage(addr_);
1307
1308 --missing_messages_;
1309 batch_->NotifyProcessedBytes(nb_bytes);
1310
1311 return *child;
1312 }
1313
1314 protected:
1315 inline MessageIteratorBase()
1316 : addr_(nullptr),
1317 missing_messages_(0),
1318 batch_(nullptr),
1319 next_addr_(nullptr) {}
1320
1321 MessageIteratorBase(const MessageIteratorBase&) = default;
1322 MessageIteratorBase& operator=(const MessageIteratorBase&) = default;
1324 MessageIteratorBase& operator=(MessageIteratorBase&&) = default;
1325
1331 inline MessageIteratorBase(uint8_t* addr, int32_t message_limit,
1333 : addr_(addr),
1334 missing_messages_(message_limit),
1335 batch_(batch),
1336 next_addr_(static_cast<T*>(this)->GetNextMessage(addr)) {}
1337
1338 uint8_t* addr_;
1339 int32_t missing_messages_;
1341 uint8_t* next_addr_;
1342};
1343
1350class PktIterator : public MessageIteratorBase<PktIterator> {
1351 public:
1352 inline PktIterator() : MessageIteratorBase() {}
1353
1357 inline PktIterator(uint8_t* addr, int32_t message_limit,
1359 : MessageIteratorBase(addr, message_limit, batch) {}
1360
1361 PktIterator(const PktIterator&) = default;
1362 PktIterator& operator=(const PktIterator&) = default;
1363 PktIterator(PktIterator&&) = default;
1364 PktIterator& operator=(PktIterator&&) = default;
1365
1373 _enso_always_inline uint8_t* GetNextMessage(uint8_t* current_message) {
1374 return get_next_pkt(current_message);
1375 }
1376
1382 constexpr void OnAdvanceMessage(uint32_t nb_bytes) {
1383 batch_->pipe_->ConfirmBytes(nb_bytes);
1384 }
1385};
1386
1393class PeekPktIterator : public MessageIteratorBase<PeekPktIterator> {
1394 public:
1398 inline PeekPktIterator(uint8_t* addr, int32_t message_limit,
1400 : MessageIteratorBase(addr, message_limit, batch) {}
1401
1405 _enso_always_inline uint8_t* GetNextMessage(uint8_t* current_message) {
1406 return get_next_pkt(current_message);
1407 }
1408
1412 constexpr void OnAdvanceMessage([[maybe_unused]] uint32_t nb_bytes) {}
1413};
1414
1415} // namespace enso
1416
1417#endif // SOFTWARE_INCLUDE_ENSO_PIPE_H_
A class that represents a device.
Definition: pipe.h:82
static std::unique_ptr< Device > Create(const std::string &pcie_addr="", const std::string &huge_page_prefix="") noexcept
Factory method to create a device.
Definition: pipe.cpp:149
int GetNbFallbackQueues() noexcept
Retrieves the number of fallback queues for this device.
Definition: pipe.cpp:201
int GetRoundRobinStatus() noexcept
Gets the round robin status for the device.
Definition: pipe.cpp:420
int DisableRateLimiting()
Disables hardware rate limiting.
Definition: pipe.cpp:412
int ApplyConfig(struct TxNotification *config_notification)
Sends the given config notification to the device.
Definition: pipe.cpp:356
RxTxPipe * NextRxTxPipeToRecv()
Gets the next RxTxPipe that has data pending.
Definition: pipe.cpp:281
int EnableRateLimiting(uint16_t num, uint16_t den)
Enables hardware rate limiting.
Definition: pipe.cpp:408
void ProcessCompletions()
Processes completions for all pipes associated with this device.
Definition: pipe.cpp:383
int DisableRoundRobin()
Disables round robing of packets among the fallback pipes. Will use a hash of the five-tuple to direc...
Definition: pipe.cpp:424
int EnableRoundRobin()
Enables round robing of packets among the fallback pipes.
Definition: pipe.cpp:416
int EnableTimeStamping(uint8_t offset=kDefaultRttOffset)
Enables hardware time stamping.
Definition: pipe.cpp:400
RxPipe * NextRxPipeToRecv()
Gets the next RxPipe that has data pending.
Definition: pipe.cpp:241
RxPipe * AllocateRxPipe(bool fallback=false) noexcept
Allocates an RX pipe.
Definition: pipe.cpp:183
RxTxPipe * AllocateRxTxPipe(bool fallback=false) noexcept
Allocates an RX/TX pipe.
Definition: pipe.cpp:222
int DisableTimeStamping()
Disables hardware time stamping.
Definition: pipe.cpp:404
TxPipe * AllocateTxPipe(uint8_t *buf=nullptr) noexcept
Allocates a TX pipe.
Definition: pipe.cpp:205
Base class to represent a message within a batch.
Definition: pipe.h:1284
MessageIteratorBase(uint8_t *addr, int32_t message_limit, RxPipe::MessageBatch< T > *batch)
Definition: pipe.h:1331
Packet iterator that does not consume the packets from the pipe.
Definition: pipe.h:1393
PeekPktIterator(uint8_t *addr, int32_t message_limit, RxPipe::MessageBatch< PeekPktIterator > *batch)
Definition: pipe.h:1398
constexpr void OnAdvanceMessage(uint32_t nb_bytes)
Called when the iterator is done processing a message.
Definition: pipe.h:1412
_enso_always_inline uint8_t * GetNextMessage(uint8_t *current_message)
Computes the next message address based on the current message.
Definition: pipe.h:1405
Packet iterator.
Definition: pipe.h:1350
constexpr void OnAdvanceMessage(uint32_t nb_bytes)
Called when the iterator is done processing a message.
Definition: pipe.h:1382
PktIterator(uint8_t *addr, int32_t message_limit, RxPipe::MessageBatch< PktIterator > *batch)
Definition: pipe.h:1357
_enso_always_inline uint8_t * GetNextMessage(uint8_t *current_message)
Computes the next message address based on the current message.
Definition: pipe.h:1373
A class that represents a batch of messages.
Definition: pipe.h:403
constexpr MessageBatch()
Instantiates an empty message batch.
Definition: pipe.h:408
uint32_t processed_bytes() const
Number of bytes processed by the iterator.
Definition: pipe.h:429
uint8_t * buf() const
Returns a pointer to the start of the batch.
Definition: pipe.h:465
uint32_t available_bytes() const
Returns number of bytes available in the batch.
Definition: pipe.h:451
void NotifyProcessedBytes(uint32_t nb_bytes)
Notifies the batch that a given number of bytes have been processed.
Definition: pipe.h:437
int32_t message_limit() const
Returns maximum number of messages in the batch.
Definition: pipe.h:458
A class that represents an RX Enso Pipe.
Definition: pipe.h:394
constexpr MessageBatch< T > RecvMessages(int32_t max_nb_messages=-1)
Receives a batch of generic messages.
Definition: pipe.h:614
uint32_t Recv(uint8_t **buf, uint32_t max_nb_bytes)
Receives a batch of bytes.
Definition: pipe.cpp:70
void * context() const
Returns the context associated with the pipe.
Definition: pipe.h:699
void Clear()
Frees all bytes previously received on the RxPipe.
Definition: pipe.cpp:91
void set_context(void *new_context)
Sets the context associated with the pipe.
Definition: pipe.h:706
MessageBatch< PeekPktIterator > PeekPkts(int32_t max_nb_pkts=-1)
Receives a batch of packets without removing them from the queue.
Definition: pipe.h:642
static constexpr uint32_t kQuantumSize
Definition: pipe.h:712
constexpr void ConfirmBytes(uint32_t nb_bytes)
Confirms a certain number of bytes have been received.
Definition: pipe.h:583
uint8_t * buf() const
Returns the pipe's internal buffer.
Definition: pipe.h:678
constexpr uint32_t capacity() const
Returns the number of bytes allocated in the pipe, i.e., the number of bytes owned by the application...
Definition: pipe.h:595
MessageBatch< PktIterator > RecvPkts(int32_t max_nb_pkts=-1)
Receives a batch of packets.
Definition: pipe.h:629
int Bind(uint16_t dst_port, uint16_t src_port, uint32_t dst_ip, uint32_t src_ip, uint32_t protocol)
Binds the pipe to a given flow entry. Can be called multiple times to bind to multiple flows.
Definition: pipe.cpp:64
uint32_t Peek(uint8_t **buf, uint32_t max_nb_bytes)
Receives a batch of bytes without removing them from the queue.
Definition: pipe.cpp:76
void Free(uint32_t nb_bytes)
Frees a given number of bytes previously received on the RxPipe.
Definition: pipe.cpp:85
void Prefetch()
Prefetches the next batch of bytes to be received on the RxPipe.
Definition: pipe.cpp:89
static constexpr uint32_t kMaxCapacity
Definition: pipe.h:718
enso_pipe_id_t id() const
Returns the pipe's ID.
Definition: pipe.h:685
A class that represents an RX/TX Enso Pipe.
Definition: pipe.h:1076
RxPipe::MessageBatch< T > RecvMessages(int32_t max_nb_messages=-1)
Receives a batch of generic messages.
Definition: pipe.h:1118
uint32_t Peek(uint8_t **buf, uint32_t max_nb_bytes)
Receives a batch of bytes without removing them from the queue.
Definition: pipe.h:1102
int Bind(uint16_t dst_port, uint16_t src_port, uint32_t dst_ip, uint32_t src_ip, uint32_t protocol)
Binds the pipe to a given flow entry. Can be called multiple times to bind to multiple flows.
Definition: pipe.h:1086
RxPipe::MessageBatch< PeekPktIterator > PeekPkts(int32_t max_nb_pkts=-1)
Receives a batch of packets without removing them from the queue.
Definition: pipe.h:1134
void * context() const
Returns the context associated with the pipe.
Definition: pipe.h:1212
uint8_t * buf() const
Returns the pipe's internal buffer.
Definition: pipe.h:1184
void ConfirmBytes(uint32_t nb_bytes)
Confirms a certain number of bytes have been received.
Definition: pipe.h:1110
void SendAndFree(uint32_t nb_bytes)
Sends and deallocates a given number of bytes.
Definition: pipe.h:1160
RxPipe::MessageBatch< PktIterator > RecvPkts(int32_t max_nb_pkts=-1)
Receives a batch of packets.
Definition: pipe.h:1126
static constexpr uint32_t kQuantumSize
Definition: pipe.h:1226
static constexpr uint32_t kMaxCapacity
Definition: pipe.h:1231
void set_context(void *new_context)
Sets the context associated with the pipe.
Definition: pipe.h:1219
void ProcessCompletions()
Process completions for this pipe, potentially freeing up space to receive more data.
Definition: pipe.h:1169
uint32_t Recv(uint8_t **buf, uint32_t max_nb_bytes)
Receives a batch of bytes.
Definition: pipe.h:1094
void Prefetch()
Prefetches the next batch of bytes to be received on the RxTxPipe.
Definition: pipe.h:1149
enso_pipe_id_t tx_id() const
Return the pipe's TX ID.
Definition: pipe.h:1198
enso_pipe_id_t rx_id() const
Return the pipe's RX ID.
Definition: pipe.h:1191
A class that represents a TX Enso Pipe.
Definition: pipe.h:782
uint32_t ExtendBufToTarget(uint32_t target_capacity)
Explicitly requests a buffer extension with a target capacity.
Definition: pipe.h:887
void * context() const
Returns the context associated with the pipe.
Definition: pipe.h:941
static constexpr uint32_t kMaxCapacity
Definition: pipe.h:960
uint8_t * AllocateBuf(uint32_t target_capacity=0)
Allocates a buffer in the pipe.
Definition: pipe.h:819
void SendAndFree(uint32_t nb_bytes)
Sends and deallocates a given number of bytes.
Definition: pipe.h:841
uint32_t capacity() const
Returns the allocated buffer's current available capacity.
Definition: pipe.h:904
uint32_t TryExtendBuf()
Explicitly requests a best-effort buffer extension.
Definition: pipe.h:866
void set_context(void *new_context)
Sets the context associated with the pipe.
Definition: pipe.h:948
uint32_t pending_transmission() const
Returns the number of bytes that are currently being transmitted.
Definition: pipe.h:913
static constexpr uint32_t kQuantumSize
Definition: pipe.h:954
uint8_t * buf() const
Returns the pipe's internal buffer.
Definition: pipe.h:922
enso_pipe_id_t id() const
Returns the pipe's ID.
Definition: pipe.h:929
Constants used throughout the codebase. Some of these constants need to be kept in sync with the hard...
constexpr uint8_t kDefaultRttOffset
Default timestamp offset of the RTT when timestamp is enabled (in bytes). Uses bytes 4–7 of IPv4 head...
Definition: consts.h:118
Miscellaneous helper functions.
Definitions that are internal to Enso. They should not be exposed to applications.