42#ifndef SOFTWARE_INCLUDE_ENSO_PIPE_H_
43#define SOFTWARE_INCLUDE_ENSO_PIPE_H_
66uint32_t external_peek_next_batch_from_queue(
67 struct RxEnsoPipeInternal* enso_pipe,
68 struct NotificationBufPair* notification_buf_pair,
void** buf);
94 static std::unique_ptr<Device>
Create(
95 const std::string& pcie_addr =
"",
96 const std::string& huge_page_prefix =
"")
noexcept;
307 int ApplyConfig(
struct TxNotification* config_notification);
310 struct TxPendingRequest {
318 Device(
const std::string& pcie_addr, std::string huge_page_prefix) noexcept
319 : kPcieAddr(pcie_addr) {
321 std::cerr <<
"Warning: assertions are enabled. Performance may be affected."
324 if (huge_page_prefix ==
"") {
325 huge_page_prefix = std::string(kHugePageDefaultPrefix);
327 huge_page_prefix_ = huge_page_prefix;
346 void Send(uint32_t tx_enso_pipe_id, uint64_t phys_addr, uint32_t nb_bytes);
352 const std::string kPcieAddr;
357 std::string huge_page_prefix_;
359 std::vector<RxPipe*> rx_pipes_;
360 std::vector<TxPipe*> tx_pipes_;
361 std::vector<RxTxPipe*> rx_tx_pipes_;
363 std::array<RxPipe*, kMaxNbFlows> rx_pipes_map_ = {};
364 std::array<RxTxPipe*, kMaxNbFlows> rx_tx_pipes_map_ = {};
366 int32_t next_pipe_id_ = -1;
368 uint32_t tx_pr_head_ = 0;
369 uint32_t tx_pr_tail_ = 0;
370 std::array<TxPendingRequest, kMaxPendingTxRequests + 1> tx_pending_requests_;
371 static constexpr uint32_t kPendingTxRequestsBufMask = kMaxPendingTxRequests;
372 static_assert((kMaxPendingTxRequests & (kMaxPendingTxRequests + 1)) == 0,
373 "kMaxPendingTxRequests + 1 must be a power of 2");
402 template <
typename T>
416 constexpr T begin() {
return T(buf_, message_limit_,
this); }
417 constexpr T begin()
const {
return T(buf_, message_limit_,
this); }
420 return T(buf_ + available_bytes_, message_limit_,
this);
422 constexpr T end()
const {
423 return T(buf_ + available_bytes_, message_limit_,
this);
438 processed_bytes_ += nb_bytes;
465 uint8_t*
buf()
const {
return buf_; }
486 uint32_t available_bytes_;
487 int32_t message_limit_;
489 uint32_t processed_bytes_ = 0;
493 RxPipe(
const RxPipe&) =
delete;
494 RxPipe& operator=(
const RxPipe&) =
delete;
495 RxPipe(RxPipe&&) =
delete;
496 RxPipe& operator=(RxPipe&&) =
delete;
548 int Bind(uint16_t dst_port, uint16_t src_port, uint32_t dst_ip,
549 uint32_t src_ip, uint32_t protocol);
559 uint32_t
Recv(uint8_t**
buf, uint32_t max_nb_bytes);
569 uint32_t
Peek(uint8_t**
buf, uint32_t max_nb_bytes);
584 uint32_t rx_tail = internal_rx_pipe_.rx_tail;
585 rx_tail = (rx_tail + nb_bytes / 64) % kEnsoPipeSize;
586 internal_rx_pipe_.rx_tail = rx_tail;
596 uint32_t rx_head = internal_rx_pipe_.rx_head;
597 uint32_t rx_tail = internal_rx_pipe_.rx_tail;
598 return ((rx_head - rx_tail) % kEnsoPipeSize) * 64;
613 template <
typename T>
615 uint8_t*
buf =
nullptr;
616 uint32_t recv =
Peek(&
buf, ~0);
630 return RecvMessages<PktIterator>(max_nb_pkts);
643 return RecvMessages<PeekPktIterator>(max_nb_pkts);
664 void Free(uint32_t nb_bytes);
678 inline uint8_t*
buf()
const {
return (uint8_t*)internal_rx_pipe_.buf; }
685 inline enso_pipe_id_t
id()
const {
return id_; }
699 inline void*
context()
const {
return context_; }
706 inline void set_context(
void* new_context) { context_ = new_context; }
728 : notification_buf_pair_(&(device->notification_buf_pair_)) {}
743 int Init(
bool fallback)
noexcept;
745 void SetAsNextPipe() noexcept { next_pipe_ =
true; }
749 bool next_pipe_ =
false;
754 struct RxEnsoPipeInternal internal_rx_pipe_;
755 struct NotificationBufPair* notification_buf_pair_;
821 return buf_ + app_begin_;
842 uint64_t phys_addr = buf_phys_addr_ + app_begin_;
846 app_begin_ = (app_begin_ + nb_bytes) & kBufMask;
848 device_->Send(kId, phys_addr, nb_bytes);
890 while (_capacity < target_capacity) {
905 return (app_end_ - app_begin_ - 1) & kBufMask;
914 return kMaxCapacity - ((app_end_ - app_begin_) & kBufMask);
922 inline uint8_t*
buf()
const {
return buf_; }
929 inline enso_pipe_id_t
id()
const {
return kId; }
941 inline void*
context()
const {
return context_; }
948 inline void set_context(
void* new_context) { context_ = new_context; }
972 explicit TxPipe(uint32_t
id,
Device* device, uint8_t*
buf =
nullptr) noexcept
973 : kId(
id), device_(device), buf_(
buf), internal_buf_(
buf ==
nullptr) {}
995 inline
void NotifyCompletion(uint32_t nb_bytes) {
996 app_end_ = (app_end_ + nb_bytes) & kBufMask;
999 inline std::string GetHugePageFilePath()
const {
1000 return device_->huge_page_prefix_ + std::string(kHugePagePathPrefix) +
1001 std::to_string(kId);
1004 friend class Device;
1006 const enso_pipe_id_t kId;
1011 uint32_t app_begin_ = 0;
1012 uint32_t app_end_ = 0;
1013 uint64_t buf_phys_addr_;
1016 static_assert((kBufMask & (kBufMask + 1)) == 0,
1017 "(kBufMask + 1) must be a power of 2");
1086 inline int Bind(uint16_t dst_port, uint16_t src_port, uint32_t dst_ip,
1087 uint32_t src_ip, uint32_t protocol) {
1088 return rx_pipe_->
Bind(dst_port, src_port, dst_ip, src_ip, protocol);
1094 inline uint32_t
Recv(uint8_t**
buf, uint32_t max_nb_bytes) {
1096 return rx_pipe_->
Recv(
buf, max_nb_bytes);
1102 inline uint32_t
Peek(uint8_t**
buf, uint32_t max_nb_bytes) {
1104 return rx_pipe_->
Peek(
buf, max_nb_bytes);
1117 template <
typename T>
1128 return rx_pipe_->
RecvPkts(max_nb_pkts);
1135 int32_t max_nb_pkts = -1) {
1137 return rx_pipe_->
PeekPkts(max_nb_pkts);
1162 last_tx_pipe_capacity_ -= nb_bytes;
1170 uint32_t new_capacity = tx_pipe_->
capacity();
1173 if (new_capacity > last_tx_pipe_capacity_) {
1174 rx_pipe_->
Free(new_capacity - last_tx_pipe_capacity_);
1175 last_tx_pipe_capacity_ = new_capacity;
1184 inline uint8_t*
buf()
const {
return rx_pipe_->
buf(); }
1191 inline enso_pipe_id_t
rx_id()
const {
return rx_pipe_->
id(); }
1198 inline enso_pipe_id_t
tx_id()
const {
return tx_pipe_->
id(); }
1234 "Quantum size mismatch");
1236 "Max capacity mismatch");
1243 static constexpr uint32_t kCompletionsThreshold = kEnsoPipeSize * 64 / 2;
1267 int Init(
bool fallback)
noexcept;
1274 uint32_t last_tx_pipe_capacity_;
1283template <
typename T>
1286 constexpr uint8_t* operator*() {
return addr_; }
1295 return (missing_messages_ != 0) && (next_addr_ <= other.addr_);
1298 constexpr T& operator++() {
1299 T* child =
static_cast<T*
>(
this);
1301 uint32_t nb_bytes = next_addr_ - addr_;
1303 child->OnAdvanceMessage(nb_bytes);
1306 next_addr_ = child->GetNextMessage(addr_);
1308 --missing_messages_;
1309 batch_->NotifyProcessedBytes(nb_bytes);
1317 missing_messages_(0),
1319 next_addr_(
nullptr) {}
1334 missing_messages_(message_limit),
1336 next_addr_(static_cast<T*>(this)->GetNextMessage(addr)) {}
1339 int32_t missing_messages_;
1341 uint8_t* next_addr_;
1374 return get_next_pkt(current_message);
1406 return get_next_pkt(current_message);
A class that represents a device.
static std::unique_ptr< Device > Create(const std::string &pcie_addr="", const std::string &huge_page_prefix="") noexcept
Factory method to create a device.
int GetNbFallbackQueues() noexcept
Retrieves the number of fallback queues for this device.
int GetRoundRobinStatus() noexcept
Gets the round robin status for the device.
int DisableRateLimiting()
Disables hardware rate limiting.
int ApplyConfig(struct TxNotification *config_notification)
Sends the given config notification to the device.
RxTxPipe * NextRxTxPipeToRecv()
Gets the next RxTxPipe that has data pending.
int EnableRateLimiting(uint16_t num, uint16_t den)
Enables hardware rate limiting.
void ProcessCompletions()
Processes completions for all pipes associated with this device.
int DisableRoundRobin()
Disables round robing of packets among the fallback pipes. Will use a hash of the five-tuple to direc...
int EnableRoundRobin()
Enables round robing of packets among the fallback pipes.
int EnableTimeStamping(uint8_t offset=kDefaultRttOffset)
Enables hardware time stamping.
RxPipe * NextRxPipeToRecv()
Gets the next RxPipe that has data pending.
RxPipe * AllocateRxPipe(bool fallback=false) noexcept
Allocates an RX pipe.
RxTxPipe * AllocateRxTxPipe(bool fallback=false) noexcept
Allocates an RX/TX pipe.
int DisableTimeStamping()
Disables hardware time stamping.
TxPipe * AllocateTxPipe(uint8_t *buf=nullptr) noexcept
Allocates a TX pipe.
Base class to represent a message within a batch.
MessageIteratorBase(uint8_t *addr, int32_t message_limit, RxPipe::MessageBatch< T > *batch)
Packet iterator that does not consume the packets from the pipe.
PeekPktIterator(uint8_t *addr, int32_t message_limit, RxPipe::MessageBatch< PeekPktIterator > *batch)
constexpr void OnAdvanceMessage(uint32_t nb_bytes)
Called when the iterator is done processing a message.
_enso_always_inline uint8_t * GetNextMessage(uint8_t *current_message)
Computes the next message address based on the current message.
constexpr void OnAdvanceMessage(uint32_t nb_bytes)
Called when the iterator is done processing a message.
PktIterator(uint8_t *addr, int32_t message_limit, RxPipe::MessageBatch< PktIterator > *batch)
_enso_always_inline uint8_t * GetNextMessage(uint8_t *current_message)
Computes the next message address based on the current message.
A class that represents a batch of messages.
constexpr MessageBatch()
Instantiates an empty message batch.
uint32_t processed_bytes() const
Number of bytes processed by the iterator.
uint8_t * buf() const
Returns a pointer to the start of the batch.
uint32_t available_bytes() const
Returns number of bytes available in the batch.
void NotifyProcessedBytes(uint32_t nb_bytes)
Notifies the batch that a given number of bytes have been processed.
int32_t message_limit() const
Returns maximum number of messages in the batch.
A class that represents an RX Enso Pipe.
constexpr MessageBatch< T > RecvMessages(int32_t max_nb_messages=-1)
Receives a batch of generic messages.
uint32_t Recv(uint8_t **buf, uint32_t max_nb_bytes)
Receives a batch of bytes.
void * context() const
Returns the context associated with the pipe.
void Clear()
Frees all bytes previously received on the RxPipe.
void set_context(void *new_context)
Sets the context associated with the pipe.
MessageBatch< PeekPktIterator > PeekPkts(int32_t max_nb_pkts=-1)
Receives a batch of packets without removing them from the queue.
static constexpr uint32_t kQuantumSize
constexpr void ConfirmBytes(uint32_t nb_bytes)
Confirms a certain number of bytes have been received.
uint8_t * buf() const
Returns the pipe's internal buffer.
constexpr uint32_t capacity() const
Returns the number of bytes allocated in the pipe, i.e., the number of bytes owned by the application...
MessageBatch< PktIterator > RecvPkts(int32_t max_nb_pkts=-1)
Receives a batch of packets.
int Bind(uint16_t dst_port, uint16_t src_port, uint32_t dst_ip, uint32_t src_ip, uint32_t protocol)
Binds the pipe to a given flow entry. Can be called multiple times to bind to multiple flows.
uint32_t Peek(uint8_t **buf, uint32_t max_nb_bytes)
Receives a batch of bytes without removing them from the queue.
void Free(uint32_t nb_bytes)
Frees a given number of bytes previously received on the RxPipe.
void Prefetch()
Prefetches the next batch of bytes to be received on the RxPipe.
static constexpr uint32_t kMaxCapacity
enso_pipe_id_t id() const
Returns the pipe's ID.
A class that represents an RX/TX Enso Pipe.
RxPipe::MessageBatch< T > RecvMessages(int32_t max_nb_messages=-1)
Receives a batch of generic messages.
uint32_t Peek(uint8_t **buf, uint32_t max_nb_bytes)
Receives a batch of bytes without removing them from the queue.
int Bind(uint16_t dst_port, uint16_t src_port, uint32_t dst_ip, uint32_t src_ip, uint32_t protocol)
Binds the pipe to a given flow entry. Can be called multiple times to bind to multiple flows.
RxPipe::MessageBatch< PeekPktIterator > PeekPkts(int32_t max_nb_pkts=-1)
Receives a batch of packets without removing them from the queue.
void * context() const
Returns the context associated with the pipe.
uint8_t * buf() const
Returns the pipe's internal buffer.
void ConfirmBytes(uint32_t nb_bytes)
Confirms a certain number of bytes have been received.
void SendAndFree(uint32_t nb_bytes)
Sends and deallocates a given number of bytes.
RxPipe::MessageBatch< PktIterator > RecvPkts(int32_t max_nb_pkts=-1)
Receives a batch of packets.
static constexpr uint32_t kQuantumSize
static constexpr uint32_t kMaxCapacity
void set_context(void *new_context)
Sets the context associated with the pipe.
void ProcessCompletions()
Process completions for this pipe, potentially freeing up space to receive more data.
uint32_t Recv(uint8_t **buf, uint32_t max_nb_bytes)
Receives a batch of bytes.
void Prefetch()
Prefetches the next batch of bytes to be received on the RxTxPipe.
enso_pipe_id_t tx_id() const
Return the pipe's TX ID.
enso_pipe_id_t rx_id() const
Return the pipe's RX ID.
A class that represents a TX Enso Pipe.
uint32_t ExtendBufToTarget(uint32_t target_capacity)
Explicitly requests a buffer extension with a target capacity.
void * context() const
Returns the context associated with the pipe.
static constexpr uint32_t kMaxCapacity
uint8_t * AllocateBuf(uint32_t target_capacity=0)
Allocates a buffer in the pipe.
void SendAndFree(uint32_t nb_bytes)
Sends and deallocates a given number of bytes.
uint32_t capacity() const
Returns the allocated buffer's current available capacity.
uint32_t TryExtendBuf()
Explicitly requests a best-effort buffer extension.
void set_context(void *new_context)
Sets the context associated with the pipe.
uint32_t pending_transmission() const
Returns the number of bytes that are currently being transmitted.
static constexpr uint32_t kQuantumSize
uint8_t * buf() const
Returns the pipe's internal buffer.
enso_pipe_id_t id() const
Returns the pipe's ID.
Constants used throughout the codebase. Some of these constants need to be kept in sync with the hard...
constexpr uint8_t kDefaultRttOffset
Default timestamp offset of the RTT when timestamp is enabled (in bytes). Uses bytes 4–7 of IPv4 head...
Miscellaneous helper functions.
Definitions that are internal to Enso. They should not be exposed to applications.