58uint32_t external_peek_next_batch_from_queue(
59 struct RxEnsoPipeInternal* enso_pipe,
60 struct NotificationBufPair* notification_buf_pair,
void** buf) {
64int RxPipe::Bind(uint16_t dst_port, uint16_t src_port, uint32_t dst_ip,
65 uint32_t src_ip, uint32_t protocol) {
67 src_ip, protocol, id_);
71 uint32_t ret =
Peek(
buf, max_nb_bytes);
76inline uint32_t
RxPipe::Peek(uint8_t** buf, uint32_t max_nb_bytes) {
81 &internal_rx_pipe_, notification_buf_pair_, (
void**)
buf);
82 return std::min(ret, max_nb_bytes);
97int RxPipe::Init(
bool fallback)
noexcept {
99 enso_pipe_init(&internal_rx_pipe_, notification_buf_pair_, fallback);
112 std::string path = GetHugePageFilePath();
113 unlink(path.c_str());
117int TxPipe::Init() noexcept {
119 std::string path = GetHugePageFilePath();
121 if (unlikely(!buf_)) {
126 struct NotificationBufPair* notif_buf = &(device_->notification_buf_pair_);
133int RxTxPipe::Init(
bool fallback)
noexcept {
134 rx_pipe_ = device_->AllocateRxPipe(fallback);
135 if (rx_pipe_ ==
nullptr) {
139 tx_pipe_ = device_->AllocateTxPipe(rx_pipe_->buf());
140 if (tx_pipe_ ==
nullptr) {
144 last_tx_pipe_capacity_ = tx_pipe_->capacity();
150 const std::string& pcie_addr,
151 const std::string& huge_page_prefix)
noexcept {
152 std::unique_ptr<Device> dev(
new (std::nothrow)
153 Device(pcie_addr, huge_page_prefix));
154 if (unlikely(!dev)) {
155 return std::unique_ptr<Device>{};
159 return std::unique_ptr<Device>{};
166 for (
auto& pipe : rx_tx_pipes_) {
167 rx_tx_pipes_map_[pipe->rx_id()] =
nullptr;
171 for (
auto& pipe : rx_pipes_) {
172 rx_pipes_map_[pipe->id()] =
nullptr;
176 for (
auto& pipe : tx_pipes_) {
186 if (unlikely(!pipe)) {
190 if (pipe->Init(fallback)) {
195 rx_pipes_.push_back(pipe);
196 rx_pipes_map_[pipe->
id()] = pipe;
206 TxPipe* pipe(
new (std::nothrow)
TxPipe(tx_pipes_.size(),
this, buf));
208 if (unlikely(!pipe)) {
217 tx_pipes_.push_back(pipe);
225 if (unlikely(!pipe)) {
229 if (pipe->Init(fallback)) {
234 rx_tx_pipes_.push_back(pipe);
235 rx_tx_pipes_map_[pipe->
rx_id()] = pipe;
243 assert(rx_tx_pipes_.size() == 0);
252 RxPipe* rx_pipe = rx_pipes_map_[id];
253 assert(rx_pipe !=
nullptr);
256 uint32_t enso_pipe_head = pipe.rx_tail;
257 uint32_t enso_pipe_tail = notification_buf_pair_.pending_rx_pipe_tails[id];
259 if (enso_pipe_head != enso_pipe_tail) {
276 RxPipe* rx_pipe = rx_pipes_map_[id];
277 rx_pipe->SetAsNextPipe();
284 assert(rx_pipes_.size() == rx_tx_pipes_.size());
292 RxTxPipe* rx_tx_pipe = rx_tx_pipes_map_[id];
293 assert(rx_tx_pipe->rx_pipe_ !=
nullptr);
296 uint32_t enso_pipe_head = pipe.rx_tail;
297 uint32_t enso_pipe_tail = notification_buf_pair_.pending_rx_pipe_tails[id];
299 if (enso_pipe_head != enso_pipe_tail) {
316 RxTxPipe* rx_tx_pipe = rx_tx_pipes_map_[id];
317 rx_tx_pipe->rx_pipe_->SetAsNextPipe();
321int Device::Init() noexcept {
323 core_id_ = sched_getcpu();
331 if (kPcieAddr !=
"") {
332 bdf_ = get_bdf_from_pcie_addr(kPcieAddr);
334 if (unlikely(bdf_ == 0)) {
342 std::cerr <<
"Running with NOTIFICATION_BUF_SIZE: " << kNotificationBufSize
344 std::cerr <<
"Running with ENSO_PIPE_SIZE: " << kEnsoPipeSize << std::endl;
357 return send_config(¬ification_buf_pair_, config_notification);
360void Device::Send(uint32_t tx_enso_pipe_id, uint64_t phys_addr,
366 uint32_t nb_pending_requests =
367 (tx_pr_tail_ - tx_pr_head_) & kPendingTxRequestsBufMask;
372 while (unlikely(nb_pending_requests >= (kMaxPendingTxRequests - 2))) {
374 nb_pending_requests =
375 (tx_pr_tail_ - tx_pr_head_) & kPendingTxRequestsBufMask;
378 tx_pending_requests_[tx_pr_tail_].pipe_id = tx_enso_pipe_id;
379 tx_pending_requests_[tx_pr_tail_].nb_bytes = nb_bytes;
380 tx_pr_tail_ = (tx_pr_tail_ + 1) & kPendingTxRequestsBufMask;
385 for (uint32_t i = 0; i < tx_completions; ++i) {
386 TxPendingRequest tx_req = tx_pending_requests_[tx_pr_head_];
387 tx_pr_head_ = (tx_pr_head_ + 1) & kPendingTxRequestsBufMask;
389 TxPipe* pipe = tx_pipes_[tx_req.pipe_id];
390 pipe->NotifyCompletion(tx_req.nb_bytes);
395 for (
RxTxPipe* pipe : rx_tx_pipes_) {
396 pipe->ProcessCompletions();
A class that represents a device.
static std::unique_ptr< Device > Create(const std::string &pcie_addr="", const std::string &huge_page_prefix="") noexcept
Factory method to create a device.
int GetNbFallbackQueues() noexcept
Retrieves the number of fallback queues for this device.
int GetRoundRobinStatus() noexcept
Gets the round robin status for the device.
int DisableRateLimiting()
Disables hardware rate limiting.
int ApplyConfig(struct TxNotification *config_notification)
Sends the given config notification to the device.
RxTxPipe * NextRxTxPipeToRecv()
Gets the next RxTxPipe that has data pending.
int EnableRateLimiting(uint16_t num, uint16_t den)
Enables hardware rate limiting.
void ProcessCompletions()
Processes completions for all pipes associated with this device.
int DisableRoundRobin()
Disables round robing of packets among the fallback pipes. Will use a hash of the five-tuple to direc...
int EnableRoundRobin()
Enables round robing of packets among the fallback pipes.
int EnableTimeStamping(uint8_t offset=kDefaultRttOffset)
Enables hardware time stamping.
RxPipe * NextRxPipeToRecv()
Gets the next RxPipe that has data pending.
RxPipe * AllocateRxPipe(bool fallback=false) noexcept
Allocates an RX pipe.
RxTxPipe * AllocateRxTxPipe(bool fallback=false) noexcept
Allocates an RX/TX pipe.
int DisableTimeStamping()
Disables hardware time stamping.
TxPipe * AllocateTxPipe(uint8_t *buf=nullptr) noexcept
Allocates a TX pipe.
A class that represents an RX Enso Pipe.
uint32_t Recv(uint8_t **buf, uint32_t max_nb_bytes)
Receives a batch of bytes.
void Clear()
Frees all bytes previously received on the RxPipe.
constexpr void ConfirmBytes(uint32_t nb_bytes)
Confirms a certain number of bytes have been received.
uint8_t * buf() const
Returns the pipe's internal buffer.
int Bind(uint16_t dst_port, uint16_t src_port, uint32_t dst_ip, uint32_t src_ip, uint32_t protocol)
Binds the pipe to a given flow entry. Can be called multiple times to bind to multiple flows.
uint32_t Peek(uint8_t **buf, uint32_t max_nb_bytes)
Receives a batch of bytes without removing them from the queue.
void Free(uint32_t nb_bytes)
Frees a given number of bytes previously received on the RxPipe.
void Prefetch()
Prefetches the next batch of bytes to be received on the RxPipe.
enso_pipe_id_t id() const
Returns the pipe's ID.
A class that represents an RX/TX Enso Pipe.
void Prefetch()
Prefetches the next batch of bytes to be received on the RxTxPipe.
enso_pipe_id_t rx_id() const
Return the pipe's RX ID.
A class that represents a TX Enso Pipe.
static constexpr uint32_t kMaxCapacity
Functions to configure the data plane.
int enable_timestamp(struct NotificationBufPair *notification_buf_pair, uint8_t offset=kDefaultRttOffset)
Enables hardware timestamping.
int enable_round_robin(struct NotificationBufPair *notification_buf_pair)
Enables packet round robin for the fallback pipes.
int disable_timestamp(struct NotificationBufPair *notification_buf_pair)
Disables hardware timestamping.
int enable_rate_limit(struct NotificationBufPair *notification_buf_pair, uint16_t num, uint16_t den)
Enables hardware rate limit.
int disable_rate_limit(struct NotificationBufPair *notification_buf_pair)
Disables hardware rate limit.
int insert_flow_entry(struct NotificationBufPair *notification_buf_pair, uint16_t dst_port, uint16_t src_port, uint32_t dst_ip, uint32_t src_ip, uint32_t protocol, uint32_t enso_pipe_id)
Inserts flow entry in the data plane flow table that will direct all packets matching the flow entry ...
int disable_round_robin(struct NotificationBufPair *notification_buf_pair)
Disables packet round robin for the fallback pipes. Using a hash of the packet's five tuple to select...
Miscellaneous helper functions.
void * get_huge_page(const std::string &path, size_t size=0, bool mirror=false)
int notification_buf_init(uint32_t bdf, int32_t bar, struct NotificationBufPair *notification_buf_pair, const std::string &huge_page_prefix)
Initializes the notification buffer pair.
void prefetch_pipe(struct RxEnsoPipeInternal *enso_pipe)
Prefetches a given Enso Pipe.
uint16_t get_new_tails(struct NotificationBufPair *notification_buf_pair)
Gets latest tails for the pipes associated with the given notification buffer.
void fully_advance_pipe(struct RxEnsoPipeInternal *enso_pipe)
Frees all the received bytes in the buffer associated with the socket_entry socket.
int32_t get_next_enso_pipe_id(struct NotificationBufPair *notification_buf_pair)
Get next Enso Pipe with pending data.
uint64_t get_dev_addr_from_virt_addr(struct NotificationBufPair *notification_buf_pair, void *virt_addr)
Converts an address in the application's virtual address space to an address that can be used by the ...
void notification_buf_free(struct NotificationBufPair *notification_buf_pair)
Frees the notification buffer pair.
int send_config(struct NotificationBufPair *notification_buf_pair, struct TxNotification *config_notification)
Sends configuration to the NIC.
uint32_t peek_next_batch_from_queue(struct RxEnsoPipeInternal *enso_pipe, struct NotificationBufPair *notification_buf_pair, void **buf)
Gets the next batch of data from the given Enso Pipe without consuming it. So the next call to get_ne...
uint32_t send_to_queue(struct NotificationBufPair *notification_buf_pair, uint64_t phys_addr, uint32_t len)
Sends data through a given queue.
uint32_t get_unreported_completions(struct NotificationBufPair *notification_buf_pair)
Returns the number of transmission requests that were completed since the last call to this function.
int get_nb_fallback_queues(struct NotificationBufPair *notification_buf_pair)
Get number of fallback queues currently in use.
int get_round_robin_status(struct NotificationBufPair *notification_buf_pair)
Gets the round robin status for the device.
void advance_pipe(struct RxEnsoPipeInternal *enso_pipe, size_t len)
Frees the next len bytes in the buffer associated with the socket_entry socket. If len is greater tha...
void enso_pipe_free(struct NotificationBufPair *notification_buf_pair, struct RxEnsoPipeInternal *enso_pipe, enso_pipe_id_t enso_pipe_id)
Frees the Enso Pipe.
Enso Pipe API. We define RX, TX, and RX/TX pipes as well as the Device class that manages them....