Skip to content

Commit

Permalink
Save work on support
Browse files Browse the repository at this point in the history
  • Loading branch information
anujkaliaiitd committed Jan 29, 2021
1 parent e65673c commit f6104ea
Show file tree
Hide file tree
Showing 7 changed files with 137 additions and 74 deletions.
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ Some highlights:

## Running eRPC over DPDK on Microsoft Azure VMs

* eRPC works well on Azure VMs with accelerated networking. For now, eRPC
supports only one RPC ID per machine on Azure.
* eRPC works well on Azure VMs with accelerated networking.

* Configure two Ubuntu 18.04 VMs as below. Use the same resource group and
availability zone for both VMs.
Expand Down
2 changes: 1 addition & 1 deletion apps/small_rpc_tput/config
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@
--concurrency 60
--msg_size 32
--num_processes 2
--num_threads 1
--num_threads 4
--numa_0_ports 0
--numa_1_ports 1,3
2 changes: 1 addition & 1 deletion scripts/autorun_app_file
Original file line number Diff line number Diff line change
@@ -1 +1 @@
latency
small_rpc_tput
137 changes: 97 additions & 40 deletions src/transport_impl/dpdk/dpdk_transport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <iomanip>
#include <stdexcept>

#include <rte_thash.h>
#include <rte_version.h>
#include <set>
#include "dpdk_transport.h"
Expand All @@ -12,6 +13,7 @@
namespace erpc {

constexpr size_t DpdkTransport::kMaxDataPerPkt;
static_assert(sizeof(eth_routing_info_t) <= Transport::kMaxRoutingInfoSize, "");

static volatile bool port_initialized[RTE_MAX_ETHPORTS]; // Uses dpdk_lock

Expand All @@ -21,6 +23,14 @@ static std::set<size_t> used_qp_ids[RTE_MAX_ETHPORTS];
/// mempool_arr[i][j] is the mempool to use for port i, queue j
rte_mempool *mempool_arr[RTE_MAX_ETHPORTS][DpdkTransport::kMaxQueuesPerPort];

/// Key used for RSS hashing
static constexpr uint8_t default_rss_key[] = {
0x2c, 0xc6, 0x81, 0xd1, 0x5b, 0xdb, 0xf4, 0xf7, 0xfc, 0xa2,
0x83, 0x19, 0xdb, 0x1a, 0x3e, 0x94, 0x6b, 0x9e, 0x38, 0xd9,
0x2c, 0x9c, 0x03, 0xd1, 0xad, 0x99, 0x44, 0xa7, 0xd9, 0x56,
0x3d, 0x59, 0x06, 0x3c, 0x25, 0xf3, 0xfc, 0x1f, 0xdc, 0x2a,
};

// Initialize the protection domain, queue pair, and memory registration and
// deregistration functions. RECVs will be initialized later when the hugepage
// allocator is provided.
Expand Down Expand Up @@ -116,42 +126,19 @@ void DpdkTransport::setup_phy_port() {
rte_eth_conf eth_conf;
memset(&eth_conf, 0, sizeof(eth_conf));

eth_conf.rxmode.mq_mode = ETH_MQ_RX_NONE;
eth_conf.rxmode.max_rx_pkt_len = RTE_ETHER_MAX_LEN;
eth_conf.rxmode.offloads = 0;
eth_conf.rxmode.mq_mode = ETH_MQ_RX_RSS;
eth_conf.rx_adv_conf.rss_conf.rss_key =
const_cast<uint8_t *>(default_rss_key);
eth_conf.rx_adv_conf.rss_conf.rss_key_len = 40;
eth_conf.rx_adv_conf.rss_conf.rss_hf = ETH_RSS_UDP;

eth_conf.txmode.mq_mode = ETH_MQ_TX_NONE;
eth_conf.txmode.offloads = kOffloads;

eth_conf.fdir_conf.mode = RTE_FDIR_MODE_PERFECT;
eth_conf.fdir_conf.pballoc = RTE_FDIR_PBALLOC_64K;
eth_conf.fdir_conf.status = RTE_FDIR_NO_REPORT_STATUS;
eth_conf.fdir_conf.mask.dst_port_mask = 0xffff;
eth_conf.fdir_conf.drop_queue = 0;

int ret = rte_eth_dev_configure(phy_port, kMaxQueuesPerPort,
kMaxQueuesPerPort, &eth_conf);
rt_assert(ret == 0, "Ethdev configuration error: ", strerror(-1 * ret));

// Set flow director fields if flow director is supported. It's OK if the
// FILTER_SET command fails (e.g., on ConnectX-4 NICs).
if (kInstallFlowRules &&
rte_eth_dev_filter_supported(phy_port, RTE_ETH_FILTER_FDIR) == 0) {
struct rte_eth_fdir_filter_info fi;
memset(&fi, 0, sizeof(fi));
fi.info_type = RTE_ETH_FDIR_FILTER_INPUT_SET_SELECT;
fi.info.input_set_conf.flow_type = RTE_ETH_FLOW_NONFRAG_IPV4_UDP;
fi.info.input_set_conf.inset_size = 2;
fi.info.input_set_conf.field[0] = RTE_ETH_INPUT_SET_L3_DST_IP4;
fi.info.input_set_conf.field[1] = RTE_ETH_INPUT_SET_L4_UDP_DST_PORT;
fi.info.input_set_conf.op = RTE_ETH_INPUT_SET_SELECT;
ret = rte_eth_dev_filter_ctrl(phy_port, RTE_ETH_FILTER_FDIR,
RTE_ETH_FILTER_SET, &fi);
if (ret != 0) {
ERPC_WARN("Failed to set flow director fields. Could be survivable...\n");
}
}

// Set up all RX and TX queues and start the device. This can't be done later
// on a per-thread basis since we must start the device to use any queue.
// Once the device is started, more queues cannot be added without stopping
Expand Down Expand Up @@ -190,11 +177,6 @@ void DpdkTransport::setup_phy_port() {
ret = rte_eth_tx_queue_setup(phy_port, i, kNumTxRingDesc, numa_node,
&eth_tx_conf);
rt_assert(ret == 0, "Failed to setup TX queue: " + std::to_string(i));

if (kInstallFlowRules) {
install_flow_rule(phy_port, i, get_port_ipv4_addr(phy_port),
udp_port_for_queue(phy_port, i));
}
}

rte_eth_dev_start(phy_port);
Expand Down Expand Up @@ -225,6 +207,24 @@ void DpdkTransport::resolve_phy_port() {

resolve.ipv4_addr = get_port_ipv4_addr(phy_port);

// Resolve RSS indirection table size
struct rte_eth_dev_info dev_info;
rte_eth_dev_info_get(phy_port, &dev_info);

rt_assert(std::string(dev_info.driver_name) == "net_mlx4" or
std::string(dev_info.driver_name) == "net_mlx5",
"eRPC supports only mlx4 or mlx5 devices with DPDK");
if (std::string(dev_info.driver_name) == "net_mlx4") {
// MLX4 NICs report a reta size of zero, but they use 128 internally
rt_assert(dev_info.reta_size == 0,
"Unexpected RETA size for MLX4 NIC (expected zero)");
resolve.reta_size = 128;
} else {
resolve.reta_size = dev_info.reta_size;
rt_assert(resolve.reta_size >= kMaxQueuesPerPort,
"Too few entries in NIC RSS indirection table");
}

// Resolve bandwidth
struct rte_eth_link link;
rte_eth_link_get(static_cast<uint8_t>(phy_port), &link);
Expand All @@ -244,10 +244,12 @@ void DpdkTransport::resolve_phy_port() {
resolve.bandwidth = 10.0 * (1000 * 1000 * 1000) / 8.0;
}

ERPC_INFO("Resolved port %u: MAC %s, IPv4 %s, bandwidth %.1f Gbps\n",
phy_port, mac_to_string(resolve.mac_addr).c_str(),
ipv4_to_string(htonl(resolve.ipv4_addr)).c_str(),
resolve.bandwidth * 8.0 / (1000 * 1000 * 1000));
ERPC_INFO(
"Resolved port %u: MAC %s, IPv4 %s, RETA size %zu entries, bandwidth "
"%.1f Gbps\n",
phy_port, mac_to_string(resolve.mac_addr).c_str(),
ipv4_to_string(htonl(resolve.ipv4_addr)).c_str(), resolve.reta_size,
resolve.bandwidth * 8.0 / (1000 * 1000 * 1000));
}

void DpdkTransport::fill_local_routing_info(RoutingInfo *routing_info) const {
Expand All @@ -256,17 +258,71 @@ void DpdkTransport::fill_local_routing_info(RoutingInfo *routing_info) const {
memcpy(ri->mac, resolve.mac_addr, 6);
ri->ipv4_addr = resolve.ipv4_addr;
ri->udp_port = rx_flow_udp_port;
ri->rxq_id = qp_id;
ri->reta_size = resolve.reta_size;
}

/**
* @brief Return a source UDP port for which the RSS target queue at the remote
* receiver for the ntuple {src_ip, dst_ip, src_port, dst_port} will be
* remote_queue_id.
*
* All ntuple arguments and return value is in host-byte order
*
* @param remote_queue_id The remote NIC RX queue to target
* @param remote_reta_size The number of entries in the remote NIC's RSS
* indirection table
* @param src_ip This NIC's IPv4 address
* @param dst_ip The remote NIC's IPv4 address
* @param dst_port The UDP port the remote endpoint is listening on
* @return The source UDP port this endpoint should use for targeting
*/
static uint16_t get_udp_src_port_for_target_queue(size_t remote_queue_id,
size_t remote_reta_size,
uint32_t src_ip,
uint32_t dst_ip,
uint16_t dst_port) {
uint16_t src_port = kBaseEthUDPPort;
for (; src_port < UINT16_MAX; src_port++) {
union rte_thash_tuple tuple;
tuple.v4.src_addr = src_ip;
tuple.v4.dst_addr = dst_ip;
tuple.v4.sport = src_port;
tuple.v4.dport = dst_port;
uint32_t rss_l3l4 = rte_softrss(reinterpret_cast<uint32_t *>(&tuple),
RTE_THASH_V4_L4_LEN, default_rss_key);

size_t target_queue =
(rss_l3l4 % remote_reta_size) % DpdkTransport::kMaxQueuesPerPort;
if (target_queue == remote_queue_id) break;
}

if (src_port == UINT16_MAX) {
ERPC_ERROR(
"Failed to find src port that targets remote queue %zu. "
"Remote RETA size = %zu.\n",
remote_queue_id, remote_reta_size);
rt_assert(false);
}

return src_port;
}

// Generate most fields of the L2--L4 headers now to avoid recomputation.
bool DpdkTransport::resolve_remote_routing_info(
RoutingInfo *routing_info) const {
auto *ri = reinterpret_cast<eth_routing_info_t *>(routing_info);

// Save/use info from routing_info before we overwrite it
uint8_t remote_mac[6];
memcpy(remote_mac, ri->mac, 6);
uint32_t remote_ipv4_addr = ri->ipv4_addr;
uint16_t remote_udp_port = ri->udp_port;
const uint32_t remote_ipv4_addr = ri->ipv4_addr;
const uint16_t remote_udp_port = ri->udp_port;
const uint16_t udp_src_port = get_udp_src_port_for_target_queue(
ri->rxq_id, ri->reta_size, resolve.ipv4_addr, remote_ipv4_addr,
remote_udp_port);

// Overwrite routing_info by constructing the packet header in place
static_assert(kMaxRoutingInfoSize >= kInetHdrsTotSize, "");

auto *eth_hdr = reinterpret_cast<eth_hdr_t *>(ri);
Expand All @@ -276,7 +332,8 @@ bool DpdkTransport::resolve_remote_routing_info(
gen_ipv4_header(ipv4_hdr, resolve.ipv4_addr, remote_ipv4_addr, 0);

auto *udp_hdr = reinterpret_cast<udp_hdr_t *>(&ipv4_hdr[1]);
gen_udp_header(udp_hdr, rx_flow_udp_port, remote_udp_port, 0);
gen_udp_header(udp_hdr, udp_src_port, remote_udp_port, 0);

return true;
}

Expand Down
16 changes: 7 additions & 9 deletions src/transport_impl/dpdk/dpdk_transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,11 @@ class DpdkTransport : public Transport {
// Transport-specific constants
static constexpr TransportType kTransportType = TransportType::kDPDK;
static constexpr size_t kMTU = 1024;
static constexpr size_t kMaxQueuesPerPort = kIsAzure ? 1 : 16;
static constexpr size_t kMaxQueuesPerPort = 16;

static constexpr size_t kNumTxRingDesc = 128;
static constexpr size_t kPostlist = 32;

// If true, install flow steering rules into the NIC
static constexpr bool kInstallFlowRules = kIsAzure ? false : true;

// The PMD may inline internally, but this class doesn't do it
static constexpr size_t kMaxInline = 0;

Expand Down Expand Up @@ -204,18 +201,19 @@ class DpdkTransport : public Transport {

size_t rx_ring_head = 0, rx_ring_tail = 0;

uint16_t rx_flow_udp_port = 0;
size_t qp_id = SIZE_MAX; ///< The RX/TX queue pair for this Transport
uint16_t rx_flow_udp_port = 0; ///< The UDP port this transport listens on
size_t qp_id = SIZE_MAX; ///< The RX/TX queue pair for this Transport

// We don't use DPDK's lcore threads, so a shared mempool with per-lcore
// cache won't work. Instead, we use per-thread pools with zero cached mbufs.
rte_mempool *mempool;

/// Info resolved from \p phy_port, must be filled by constructor.
struct {
uint32_t ipv4_addr; ///< The port's IPv4 address in host-byte order
uint8_t mac_addr[6]; ///< The port's MAC address
size_t bandwidth = 0; ///< Link bandwidth in bytes per second
uint32_t ipv4_addr; // The port's IPv4 address in host-byte order
uint8_t mac_addr[6]; // The port's MAC address
size_t bandwidth; // Link bandwidth in bytes per second
size_t reta_size; // Number of entries in NIC RX indirection table
} resolve;
};

Expand Down
5 changes: 3 additions & 2 deletions src/transport_impl/dpdk/dpdk_transport_datapath.cc
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#ifdef ERPC_DPDK

#include <rte_thash.h>
#include "dpdk_transport.h"
#include "util/huge_alloc.h"

Expand All @@ -19,8 +20,8 @@ static void format_pkthdr(pkthdr_t *pkthdr,
memset(&eth_hdr->dst_mac, 0, sizeof(eth_hdr->dst_mac));
}

// On most bare-metal clusters, a zero IP checksum works fine.
// But on Azure VMs we need a valid checksum.
// On most bare-metal clusters, a zero IP checksum works fine. But on Azure
// VMs we need a valid checksum.
ipv4_hdr_t *ipv4_hdr = pkthdr->get_ipv4_hdr();
ipv4_hdr->tot_len = htons(pkt_size - sizeof(eth_hdr_t));
ipv4_hdr->check = get_ipv4_checksum(ipv4_hdr);
Expand Down
46 changes: 27 additions & 19 deletions src/transport_impl/eth_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,25 +55,6 @@ static std::string ipv4_to_string(uint32_t ipv4_addr) {
return str;
}

/// eRPC session endpoint routing info for Ethernet-based transports. The MAC
/// address is in the byte order retrived from the driver. The IPv4 address and
/// UDP port are in host-byte order.
struct eth_routing_info_t {
uint8_t mac[6];
uint32_t ipv4_addr;
uint16_t udp_port;

std::string to_string() {
std::ostringstream ret;
ret << "[MAC " << mac_to_string(mac) << ", IP " << ipv4_to_string(ipv4_addr)
<< ", UDP port " << std::to_string(udp_port) << "]";

return std::string(ret.str());
}
// This must be smaller than Transport::kMaxRoutingInfoSize, but a static
// assert here causes a circular dependency.
};

struct eth_hdr_t {
uint8_t dst_mac[6];
uint8_t src_mac[6];
Expand Down Expand Up @@ -136,6 +117,33 @@ static constexpr size_t kInetHdrsTotSize =
sizeof(eth_hdr_t) + sizeof(ipv4_hdr_t) + sizeof(udp_hdr_t);
static_assert(kInetHdrsTotSize == 42, "");

/// eRPC session endpoint routing info for Ethernet-based transports. The MAC
/// address is in the byte order retrived from the driver. The IPv4 address and
/// UDP port are in host-byte order.
struct eth_routing_info_t {
uint8_t mac[6];
uint32_t ipv4_addr; // The IPv4 address for this endpoint
uint16_t udp_port; // The UDP port this endpoint listens on
uint16_t rxq_id = UINT16_MAX; // The NIC RX queue ID this endpoint listens on

// Number of entries in this endpoint's NIC RSS indirection table
uint16_t reta_size = UINT16_MAX;

std::string to_string() const {
std::ostringstream ret;
ret << "[MAC " << mac_to_string(mac) << ", IP " << ipv4_to_string(ipv4_addr)
<< ", UDP port " << std::to_string(udp_port) << ", RQ queue ID "
<< (rxq_id == UINT16_MAX ? " N/A " : std::to_string(rxq_id))
<< ", RETA size "
<< ((reta_size == UINT16_MAX) ? " N/A" : std::to_string(reta_size))
<< "]";

return std::string(ret.str());
}
// This must be smaller than Transport::kMaxRoutingInfoSize, but a static
// assert here causes a circular dependency.
};

static std::string frame_header_to_string(uint8_t* buf) {
auto* eth_hdr = reinterpret_cast<eth_hdr_t*>(buf);
auto* ipv4_hdr = reinterpret_cast<ipv4_hdr_t*>(&eth_hdr[1]);
Expand Down

0 comments on commit f6104ea

Please sign in to comment.