Ensō 0.4.6
Software API reference
Loading...
Searching...
No Matches
pipe.cpp
Go to the documentation of this file.
1/*
2 * Copyright (c) 2023, Carnegie Mellon University
3 *
4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted (subject to the limitations in the disclaimer
6 * below) provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 *
11 * * Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 *
15 * * Neither the name of the copyright holder nor the names of its
16 * contributors may be used to endorse or promote products derived from
17 * this software without specific prior written permission.
18 *
19 * NO EXPRESS OR IMPLIED LICENSES TO ANY PARTY'S PATENT RIGHTS ARE GRANTED BY
20 * THIS LICENSE. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
21 * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT
22 * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
23 * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
24 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
30 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 */
32
40#include <enso/config.h>
41#include <enso/helpers.h>
42#include <enso/pipe.h>
43#include <sched.h>
44#include <sys/mman.h>
45#include <unistd.h>
46
47#include <algorithm>
48#include <cassert>
49#include <cstdio>
50#include <iostream>
51#include <memory>
52#include <string>
53
54#include "../pcie.h"
55
56namespace enso {
57
58uint32_t external_peek_next_batch_from_queue(
59 struct RxEnsoPipeInternal* enso_pipe,
60 struct NotificationBufPair* notification_buf_pair, void** buf) {
61 return peek_next_batch_from_queue(enso_pipe, notification_buf_pair, buf);
62}
63
64int RxPipe::Bind(uint16_t dst_port, uint16_t src_port, uint32_t dst_ip,
65 uint32_t src_ip, uint32_t protocol) {
66 return insert_flow_entry(notification_buf_pair_, dst_port, src_port, dst_ip,
67 src_ip, protocol, id_);
68}
69
70uint32_t RxPipe::Recv(uint8_t** buf, uint32_t max_nb_bytes) {
71 uint32_t ret = Peek(buf, max_nb_bytes);
72 ConfirmBytes(ret);
73 return ret;
74}
75
76inline uint32_t RxPipe::Peek(uint8_t** buf, uint32_t max_nb_bytes) {
77 if (!next_pipe_) {
78 get_new_tails(notification_buf_pair_);
79 }
80 uint32_t ret = peek_next_batch_from_queue(
81 &internal_rx_pipe_, notification_buf_pair_, (void**)buf);
82 return std::min(ret, max_nb_bytes);
83}
84
85void RxPipe::Free(uint32_t nb_bytes) {
86 advance_pipe(&internal_rx_pipe_, nb_bytes);
87}
88
89void RxPipe::Prefetch() { prefetch_pipe(&internal_rx_pipe_); }
90
91void RxPipe::Clear() { fully_advance_pipe(&internal_rx_pipe_); }
92
93RxPipe::~RxPipe() {
94 enso_pipe_free(notification_buf_pair_, &internal_rx_pipe_, id_);
95}
96
97int RxPipe::Init(bool fallback) noexcept {
98 int ret =
99 enso_pipe_init(&internal_rx_pipe_, notification_buf_pair_, fallback);
100 if (ret < 0) {
101 return ret;
102 }
103
104 id_ = ret;
105
106 return 0;
107}
108
109TxPipe::~TxPipe() {
110 if (internal_buf_) {
111 munmap(buf_, kMaxCapacity);
112 std::string path = GetHugePageFilePath();
113 unlink(path.c_str());
114 }
115}
116
117int TxPipe::Init() noexcept {
118 if (internal_buf_) {
119 std::string path = GetHugePageFilePath();
120 buf_ = (uint8_t*)get_huge_page(path, 0, true);
121 if (unlikely(!buf_)) {
122 return -1;
123 }
124 }
125
126 struct NotificationBufPair* notif_buf = &(device_->notification_buf_pair_);
127
128 buf_phys_addr_ = get_dev_addr_from_virt_addr(notif_buf, buf_);
129
130 return 0;
131}
132
133int RxTxPipe::Init(bool fallback) noexcept {
134 rx_pipe_ = device_->AllocateRxPipe(fallback);
135 if (rx_pipe_ == nullptr) {
136 return -1;
137 }
138
139 tx_pipe_ = device_->AllocateTxPipe(rx_pipe_->buf());
140 if (tx_pipe_ == nullptr) {
141 return -1;
142 }
143
144 last_tx_pipe_capacity_ = tx_pipe_->capacity();
145
146 return 0;
147}
148
149std::unique_ptr<Device> Device::Create(
150 const std::string& pcie_addr,
151 const std::string& huge_page_prefix) noexcept {
152 std::unique_ptr<Device> dev(new (std::nothrow)
153 Device(pcie_addr, huge_page_prefix));
154 if (unlikely(!dev)) {
155 return std::unique_ptr<Device>{};
156 }
157
158 if (dev->Init()) {
159 return std::unique_ptr<Device>{};
160 }
161
162 return dev;
163}
164
165Device::~Device() {
166 for (auto& pipe : rx_tx_pipes_) {
167 rx_tx_pipes_map_[pipe->rx_id()] = nullptr;
168 delete pipe;
169 }
170
171 for (auto& pipe : rx_pipes_) {
172 rx_pipes_map_[pipe->id()] = nullptr;
173 delete pipe;
174 }
175
176 for (auto& pipe : tx_pipes_) {
177 delete pipe;
178 }
179
180 notification_buf_free(&notification_buf_pair_);
181}
182
183RxPipe* Device::AllocateRxPipe(bool fallback) noexcept {
184 RxPipe* pipe(new (std::nothrow) RxPipe(this));
185
186 if (unlikely(!pipe)) {
187 return nullptr;
188 }
189
190 if (pipe->Init(fallback)) {
191 delete pipe;
192 return nullptr;
193 }
194
195 rx_pipes_.push_back(pipe);
196 rx_pipes_map_[pipe->id()] = pipe;
197
198 return pipe;
199}
200
202 return get_nb_fallback_queues(&notification_buf_pair_);
203}
204
205TxPipe* Device::AllocateTxPipe(uint8_t* buf) noexcept {
206 TxPipe* pipe(new (std::nothrow) TxPipe(tx_pipes_.size(), this, buf));
207
208 if (unlikely(!pipe)) {
209 return nullptr;
210 }
211
212 if (pipe->Init()) {
213 delete pipe;
214 return nullptr;
215 }
216
217 tx_pipes_.push_back(pipe);
218
219 return pipe;
220}
221
222RxTxPipe* Device::AllocateRxTxPipe(bool fallback) noexcept {
223 RxTxPipe* pipe(new (std::nothrow) RxTxPipe(this));
224
225 if (unlikely(!pipe)) {
226 return nullptr;
227 }
228
229 if (pipe->Init(fallback)) {
230 delete pipe;
231 return nullptr;
232 }
233
234 rx_tx_pipes_.push_back(pipe);
235 rx_tx_pipes_map_[pipe->rx_id()] = pipe;
236
237 return pipe;
238}
239
240// TODO(sadok): DRY this code.
242 // This function can only be used when there are **no** RxTx pipes.
243 assert(rx_tx_pipes_.size() == 0);
244
245 int32_t id;
246
247#ifdef LATENCY_OPT
248 // When LATENCY_OPT is enabled, we always prefetch the next pipe.
249 id = get_next_enso_pipe_id(&notification_buf_pair_);
250
251 while (id >= 0) {
252 RxPipe* rx_pipe = rx_pipes_map_[id];
253 assert(rx_pipe != nullptr);
254
255 RxEnsoPipeInternal& pipe = rx_pipe->internal_rx_pipe_;
256 uint32_t enso_pipe_head = pipe.rx_tail;
257 uint32_t enso_pipe_tail = notification_buf_pair_.pending_rx_pipe_tails[id];
258
259 if (enso_pipe_head != enso_pipe_tail) {
260 rx_pipe->Prefetch();
261 break;
262 }
263
264 id = get_next_enso_pipe_id(&notification_buf_pair_);
265 }
266
267#else // !LATENCY_OPT
268 id = get_next_enso_pipe_id(&notification_buf_pair_);
269
270#endif // LATENCY_OPT
271
272 if (id < 0) {
273 return nullptr;
274 }
275
276 RxPipe* rx_pipe = rx_pipes_map_[id];
277 rx_pipe->SetAsNextPipe();
278 return rx_pipe;
279}
280
283 // This function can only be used when there are only RxTx pipes.
284 assert(rx_pipes_.size() == rx_tx_pipes_.size());
285 int32_t id;
286
287#ifdef LATENCY_OPT
288 // When LATENCY_OPT is enabled, we always prefetch the next pipe.
289 id = get_next_enso_pipe_id(&notification_buf_pair_);
290
291 while (id >= 0) {
292 RxTxPipe* rx_tx_pipe = rx_tx_pipes_map_[id];
293 assert(rx_tx_pipe->rx_pipe_ != nullptr);
294
295 RxEnsoPipeInternal& pipe = rx_tx_pipe->rx_pipe_->internal_rx_pipe_;
296 uint32_t enso_pipe_head = pipe.rx_tail;
297 uint32_t enso_pipe_tail = notification_buf_pair_.pending_rx_pipe_tails[id];
298
299 if (enso_pipe_head != enso_pipe_tail) {
300 rx_tx_pipe->Prefetch();
301 break;
302 }
303
304 id = get_next_enso_pipe_id(&notification_buf_pair_);
305 }
306
307#else // !LATENCY_OPT
308 id = get_next_enso_pipe_id(&notification_buf_pair_);
309
310#endif // LATENCY_OPT
311
312 if (id < 0) {
313 return nullptr;
314 }
315
316 RxTxPipe* rx_tx_pipe = rx_tx_pipes_map_[id];
317 rx_tx_pipe->rx_pipe_->SetAsNextPipe();
318 return rx_tx_pipe;
319}
320
321int Device::Init() noexcept {
322 if (core_id_ < 0) {
323 core_id_ = sched_getcpu();
324 if (core_id_ < 0) {
325 // Error getting CPU ID.
326 return 1;
327 }
328 }
329
330 bdf_ = 0;
331 if (kPcieAddr != "") {
332 bdf_ = get_bdf_from_pcie_addr(kPcieAddr);
333
334 if (unlikely(bdf_ == 0)) {
335 // Invalid PCIe address.
336 return 2;
337 }
338 }
339
340 int bar = -1;
341
342 std::cerr << "Running with NOTIFICATION_BUF_SIZE: " << kNotificationBufSize
343 << std::endl;
344 std::cerr << "Running with ENSO_PIPE_SIZE: " << kEnsoPipeSize << std::endl;
345
346 int ret = notification_buf_init(bdf_, bar, &notification_buf_pair_,
347 huge_page_prefix_);
348 if (ret != 0) {
349 // Could not initialize notification buffer.
350 return 3;
351 }
352
353 return 0;
354}
355
356int Device::ApplyConfig(struct TxNotification* config_notification) {
357 return send_config(&notification_buf_pair_, config_notification);
358}
359
360void Device::Send(uint32_t tx_enso_pipe_id, uint64_t phys_addr,
361 uint32_t nb_bytes) {
362 // TODO(sadok): We might be able to improve performance by avoiding the wrap
363 // tracker currently used inside send_to_queue.
364 send_to_queue(&notification_buf_pair_, phys_addr, nb_bytes);
365
366 uint32_t nb_pending_requests =
367 (tx_pr_tail_ - tx_pr_head_) & kPendingTxRequestsBufMask;
368
369 // This will block until there is enough space to keep at least two requests.
370 // We need space for two requests because the request may be split into two
371 // if the bytes wrap around the end of the buffer.
372 while (unlikely(nb_pending_requests >= (kMaxPendingTxRequests - 2))) {
374 nb_pending_requests =
375 (tx_pr_tail_ - tx_pr_head_) & kPendingTxRequestsBufMask;
376 }
377
378 tx_pending_requests_[tx_pr_tail_].pipe_id = tx_enso_pipe_id;
379 tx_pending_requests_[tx_pr_tail_].nb_bytes = nb_bytes;
380 tx_pr_tail_ = (tx_pr_tail_ + 1) & kPendingTxRequestsBufMask;
381}
382
384 uint32_t tx_completions = get_unreported_completions(&notification_buf_pair_);
385 for (uint32_t i = 0; i < tx_completions; ++i) {
386 TxPendingRequest tx_req = tx_pending_requests_[tx_pr_head_];
387 tx_pr_head_ = (tx_pr_head_ + 1) & kPendingTxRequestsBufMask;
388
389 TxPipe* pipe = tx_pipes_[tx_req.pipe_id];
390 pipe->NotifyCompletion(tx_req.nb_bytes);
391 }
392
393 // RxTx pipes need to be explicitly notified so that they can free space for
394 // more incoming packets.
395 for (RxTxPipe* pipe : rx_tx_pipes_) {
396 pipe->ProcessCompletions();
397 }
398}
399
400int Device::EnableTimeStamping(uint8_t offset) {
401 return enable_timestamp(&notification_buf_pair_, offset);
402}
403
405 return disable_timestamp(&notification_buf_pair_);
406}
407
408int Device::EnableRateLimiting(uint16_t num, uint16_t den) {
409 return enable_rate_limit(&notification_buf_pair_, num, den);
410}
411
413 return disable_rate_limit(&notification_buf_pair_);
414}
415
417 return enable_round_robin(&notification_buf_pair_);
418}
419
421 return get_round_robin_status(&notification_buf_pair_);
422}
423
425 return disable_round_robin(&notification_buf_pair_);
426}
427
428} // namespace enso
A class that represents a device.
Definition: pipe.h:82
static std::unique_ptr< Device > Create(const std::string &pcie_addr="", const std::string &huge_page_prefix="") noexcept
Factory method to create a device.
Definition: pipe.cpp:149
int GetNbFallbackQueues() noexcept
Retrieves the number of fallback queues for this device.
Definition: pipe.cpp:201
int GetRoundRobinStatus() noexcept
Gets the round robin status for the device.
Definition: pipe.cpp:420
int DisableRateLimiting()
Disables hardware rate limiting.
Definition: pipe.cpp:412
int ApplyConfig(struct TxNotification *config_notification)
Sends the given config notification to the device.
Definition: pipe.cpp:356
RxTxPipe * NextRxTxPipeToRecv()
Gets the next RxTxPipe that has data pending.
Definition: pipe.cpp:281
int EnableRateLimiting(uint16_t num, uint16_t den)
Enables hardware rate limiting.
Definition: pipe.cpp:408
void ProcessCompletions()
Processes completions for all pipes associated with this device.
Definition: pipe.cpp:383
int DisableRoundRobin()
Disables round robing of packets among the fallback pipes. Will use a hash of the five-tuple to direc...
Definition: pipe.cpp:424
int EnableRoundRobin()
Enables round robing of packets among the fallback pipes.
Definition: pipe.cpp:416
int EnableTimeStamping(uint8_t offset=kDefaultRttOffset)
Enables hardware time stamping.
Definition: pipe.cpp:400
RxPipe * NextRxPipeToRecv()
Gets the next RxPipe that has data pending.
Definition: pipe.cpp:241
RxPipe * AllocateRxPipe(bool fallback=false) noexcept
Allocates an RX pipe.
Definition: pipe.cpp:183
RxTxPipe * AllocateRxTxPipe(bool fallback=false) noexcept
Allocates an RX/TX pipe.
Definition: pipe.cpp:222
int DisableTimeStamping()
Disables hardware time stamping.
Definition: pipe.cpp:404
TxPipe * AllocateTxPipe(uint8_t *buf=nullptr) noexcept
Allocates a TX pipe.
Definition: pipe.cpp:205
A class that represents an RX Enso Pipe.
Definition: pipe.h:394
uint32_t Recv(uint8_t **buf, uint32_t max_nb_bytes)
Receives a batch of bytes.
Definition: pipe.cpp:70
void Clear()
Frees all bytes previously received on the RxPipe.
Definition: pipe.cpp:91
constexpr void ConfirmBytes(uint32_t nb_bytes)
Confirms a certain number of bytes have been received.
Definition: pipe.h:583
uint8_t * buf() const
Returns the pipe's internal buffer.
Definition: pipe.h:678
int Bind(uint16_t dst_port, uint16_t src_port, uint32_t dst_ip, uint32_t src_ip, uint32_t protocol)
Binds the pipe to a given flow entry. Can be called multiple times to bind to multiple flows.
Definition: pipe.cpp:64
uint32_t Peek(uint8_t **buf, uint32_t max_nb_bytes)
Receives a batch of bytes without removing them from the queue.
Definition: pipe.cpp:76
void Free(uint32_t nb_bytes)
Frees a given number of bytes previously received on the RxPipe.
Definition: pipe.cpp:85
void Prefetch()
Prefetches the next batch of bytes to be received on the RxPipe.
Definition: pipe.cpp:89
enso_pipe_id_t id() const
Returns the pipe's ID.
Definition: pipe.h:685
A class that represents an RX/TX Enso Pipe.
Definition: pipe.h:1076
void Prefetch()
Prefetches the next batch of bytes to be received on the RxTxPipe.
Definition: pipe.h:1149
enso_pipe_id_t rx_id() const
Return the pipe's RX ID.
Definition: pipe.h:1191
A class that represents a TX Enso Pipe.
Definition: pipe.h:782
static constexpr uint32_t kMaxCapacity
Definition: pipe.h:960
Functions to configure the data plane.
int enable_timestamp(struct NotificationBufPair *notification_buf_pair, uint8_t offset=kDefaultRttOffset)
Enables hardware timestamping.
Definition: config.cpp:123
int enable_round_robin(struct NotificationBufPair *notification_buf_pair)
Enables packet round robin for the fallback pipes.
Definition: config.cpp:208
int disable_timestamp(struct NotificationBufPair *notification_buf_pair)
Disables hardware timestamping.
Definition: config.cpp:139
int enable_rate_limit(struct NotificationBufPair *notification_buf_pair, uint16_t num, uint16_t den)
Enables hardware rate limit.
Definition: config.cpp:149
int disable_rate_limit(struct NotificationBufPair *notification_buf_pair)
Disables hardware rate limit.
Definition: config.cpp:162
int insert_flow_entry(struct NotificationBufPair *notification_buf_pair, uint16_t dst_port, uint16_t src_port, uint32_t dst_ip, uint32_t src_ip, uint32_t protocol, uint32_t enso_pipe_id)
Inserts flow entry in the data plane flow table that will direct all packets matching the flow entry ...
Definition: config.cpp:97
int disable_round_robin(struct NotificationBufPair *notification_buf_pair)
Disables packet round robin for the fallback pipes. Using a hash of the packet's five tuple to select...
Definition: config.cpp:212
Miscellaneous helper functions.
void * get_huge_page(const std::string &path, size_t size=0, bool mirror=false)
Definition: ixy_helpers.cpp:82
int notification_buf_init(uint32_t bdf, int32_t bar, struct NotificationBufPair *notification_buf_pair, const std::string &huge_page_prefix)
Initializes the notification buffer pair.
Definition: pcie.cpp:75
void prefetch_pipe(struct RxEnsoPipeInternal *enso_pipe)
Prefetches a given Enso Pipe.
Definition: pcie.cpp:460
uint16_t get_new_tails(struct NotificationBufPair *notification_buf_pair)
Gets latest tails for the pipes associated with the given notification buffer.
Definition: pcie.cpp:353
void fully_advance_pipe(struct RxEnsoPipeInternal *enso_pipe)
Frees all the received bytes in the buffer associated with the socket_entry socket.
Definition: pcie.cpp:455
int32_t get_next_enso_pipe_id(struct NotificationBufPair *notification_buf_pair)
Get next Enso Pipe with pending data.
Definition: pcie.cpp:423
uint64_t get_dev_addr_from_virt_addr(struct NotificationBufPair *notification_buf_pair, void *virt_addr)
Converts an address in the application's virtual address space to an address that can be used by the ...
Definition: pcie.cpp:627
void notification_buf_free(struct NotificationBufPair *notification_buf_pair)
Frees the notification buffer pair.
Definition: pcie.cpp:635
int send_config(struct NotificationBufPair *notification_buf_pair, struct TxNotification *config_notification)
Sends configuration to the NIC.
Definition: pcie.cpp:569
uint32_t peek_next_batch_from_queue(struct RxEnsoPipeInternal *enso_pipe, struct NotificationBufPair *notification_buf_pair, void **buf)
Gets the next batch of data from the given Enso Pipe without consuming it. So the next call to get_ne...
Definition: pcie.cpp:391
uint32_t send_to_queue(struct NotificationBufPair *notification_buf_pair, uint64_t phys_addr, uint32_t len)
Sends data through a given queue.
Definition: pcie.cpp:515
uint32_t get_unreported_completions(struct NotificationBufPair *notification_buf_pair)
Returns the number of transmission requests that were completed since the last call to this function.
Definition: pcie.cpp:520
int get_nb_fallback_queues(struct NotificationBufPair *notification_buf_pair)
Get number of fallback queues currently in use.
Definition: pcie.cpp:608
int get_round_robin_status(struct NotificationBufPair *notification_buf_pair)
Gets the round robin status for the device.
Definition: pcie.cpp:621
void advance_pipe(struct RxEnsoPipeInternal *enso_pipe, size_t len)
Frees the next len bytes in the buffer associated with the socket_entry socket. If len is greater tha...
Definition: pcie.cpp:446
void enso_pipe_free(struct NotificationBufPair *notification_buf_pair, struct RxEnsoPipeInternal *enso_pipe, enso_pipe_id_t enso_pipe_id)
Frees the Enso Pipe.
Definition: pcie.cpp:661
Enso Pipe API. We define RX, TX, and RX/TX pipes as well as the Device class that manages them....