lwip: Optimize UDP RX path and introduce a limit

Store adddress in the memory immediately preceding the packet data.
It is guaranteed to have enough space (IP header is 20 bytes).

Introduce a limit on the length of the RX chain,
MG_LWIP_MAX_UDP_RX_CHAIN_LEN. Default is 20.
This commit is contained in:
Deomid Ryabkov 2021-09-01 22:03:02 +01:00
parent 38f694d071
commit 311fdb8bac
4 changed files with 94 additions and 72 deletions

6
.clang-format Normal file
View File

@ -0,0 +1,6 @@
BasedOnStyle: Google
AllowShortFunctionsOnASingleLine: false
SpaceAfterCStyleCast: true
PointerBindsToType: false
DerivePointerBinding: false
IncludeBlocks: Preserve

View File

@ -14897,11 +14897,12 @@ struct mg_lwip_conn_state {
struct pbuf *rx_chain; /* Chain of incoming data segments. */
size_t rx_offset; /* Offset within the first pbuf (if partially consumed) */
/* Last SSL write size, for retries. */
int last_ssl_write_size;
unsigned int last_ssl_write_size : 16;
unsigned int rx_chain_len : 8;
/* Whether MG_SIG_RECV is already pending for this connection */
int recv_pending;
unsigned int recv_pending : 1;
/* Whether the connection is about to close, just `rx_chain` needs to drain */
int draining_rx_chain;
unsigned int draining_rx_chain : 1;
};
enum mg_sig_type {
@ -14941,6 +14942,10 @@ void mg_lwip_mgr_schedule_poll(struct mg_mgr *mgr);
/* Amalgamated: #include "common/cs_dbg.h" */
#ifndef MG_LWIP_MAX_UDP_RX_CHAIN_LEN
#define MG_LWIP_MAX_UDP_RX_CHAIN_LEN 20
#endif
/*
* Newest versions of LWIP have ip_2_ip4, older have ipX_2_ip,
* even older have nothing.
@ -14961,7 +14966,7 @@ void mg_lwip_mgr_schedule_poll(struct mg_mgr *mgr);
#define TCP_NEW tcp_new_ip6
#define TCP_BIND tcp_bind_ip6
#define UDP_BIND udp_bind_ip6
#define IPADDR_NTOA(x) ip6addr_ntoa((const ip6_addr_t *)(x))
#define IPADDR_NTOA(x) ip6addr_ntoa((const ip6_addr_t *) (x))
#define SET_ADDR(dst, src) \
memcpy((dst)->sin6.sin6_addr.s6_addr, (src)->ip6.addr, \
sizeof((dst)->sin6.sin6_addr.s6_addr))
@ -15174,7 +15179,7 @@ void mg_lwip_if_connect_tcp(struct mg_connection *nc,
mg_lwip_netif_run_on_tcpip(mg_lwip_if_connect_tcp_tcpip, &ctx);
}
struct udp_info {
struct mg_lwip_udp_info {
union socket_address remote_addr;
uint32_t local_ip4;
};
@ -15192,15 +15197,15 @@ static void mg_lwip_udp_recv_cb(void *arg, struct udp_pcb *pcb, struct pbuf *p,
#endif
{
struct mg_connection *nc = (struct mg_connection *) arg;
/*
* Scribble address on the margins. There's enough room there,
* between pbuf header and payload - at a minimum, IP and UDP headers.
*/
struct mg_lwip_udp_info *ui =
(struct mg_lwip_udp_info *) ((((uintptr_t) p->payload) &
(~((uintptr_t) 3))) -
sizeof(*ui));
DBG(("%p %s:%u %p %u %u", nc, IPADDR_NTOA(addr), port, p, p->ref, p->len));
/* Put address in a separate pbuf and tack it onto the packet. */
struct pbuf *sap =
pbuf_alloc(PBUF_RAW, sizeof(struct udp_info), PBUF_RAM);
if (sap == NULL) {
pbuf_free(p);
return;
}
struct udp_info *ui = (struct udp_info *) sap->payload;
struct netif *nif = ip_current_netif();
#if ((LWIP_VERSION_MAJOR << 8) | LWIP_VERSION_MINOR) >= 0x0105
ui->remote_addr.sin.sin_addr.s_addr = ip_2_ip4(addr)->addr;
@ -15210,12 +15215,8 @@ static void mg_lwip_udp_recv_cb(void *arg, struct udp_pcb *pcb, struct pbuf *p,
ui->local_ip4 = nif->ip_addr.addr;
#endif
ui->remote_addr.sin.sin_port = htons(port);
/* Logic in the recv handler requires that there be exactly one data pbuf. */
p = pbuf_coalesce(p, PBUF_RAW);
pbuf_chain(sap, p);
mgos_lock();
mg_lwip_recv_common(nc, sap);
mg_lwip_recv_common(nc, p);
mgos_unlock();
(void) pcb;
}
@ -15224,8 +15225,14 @@ static void mg_lwip_recv_common(struct mg_connection *nc, struct pbuf *p) {
struct mg_lwip_conn_state *cs = (struct mg_lwip_conn_state *) nc->sock;
if (cs->rx_chain == NULL) {
cs->rx_chain = p;
} else {
cs->rx_chain_len = 1;
} else if (!(nc->flags & MG_F_UDP) ||
cs->rx_chain_len < MG_LWIP_MAX_UDP_RX_CHAIN_LEN) {
pbuf_chain(cs->rx_chain, p);
cs->rx_chain_len++;
} else {
/* Drop it on the floor. */
pbuf_free(p);
}
if (!cs->recv_pending) {
cs->recv_pending = 1;
@ -15235,32 +15242,32 @@ static void mg_lwip_recv_common(struct mg_connection *nc, struct pbuf *p) {
static int mg_lwip_if_udp_recv(struct mg_connection *nc, void *buf, size_t len,
union socket_address *sa, size_t *sa_len) {
/*
* For UDP, RX chain consists of interleaved address and packet bufs:
* Address pbuf followed by exactly one data pbuf (recv_cb took care of that).
*/
int res = 0;
struct mg_lwip_conn_state *cs = (struct mg_lwip_conn_state *) nc->sock;
if (nc->sock == INVALID_SOCKET) return -1;
mgos_lock();
if (cs->rx_chain != NULL) {
struct pbuf *ap = cs->rx_chain;
struct pbuf *dp = ap->next;
cs->rx_chain = pbuf_dechain(dp);
res = MIN(dp->len, len);
pbuf_copy_partial(dp, buf, res, 0);
pbuf_free(dp);
const struct udp_info *ui = (struct udp_info *) ap->payload;
struct pbuf *p = cs->rx_chain;
cs->rx_chain = pbuf_dechain(p);
cs->rx_chain_len--;
mgos_unlock();
struct mg_lwip_udp_info *ui =
(struct mg_lwip_udp_info *) ((((uintptr_t) p->payload) &
(~((uintptr_t) 3))) -
sizeof(*ui));
*sa = ui->remote_addr;
/* Store local interface address for incoming packets.
* Only for listener because outgoing connections may use priv_2 for DNS. */
if (nc->listener == NULL) {
nc->priv_2 = (void *) (uintptr_t) ui->local_ip4;
}
pbuf_copy_partial(ap, sa, MIN(*sa_len, ap->len), 0);
pbuf_free(ap);
res = MIN(p->len, len);
pbuf_copy_partial(p, buf, res, 0);
pbuf_free(p);
} else {
mgos_unlock();
}
mgos_unlock();
(void) sa_len;
return res;
}
@ -15439,8 +15446,8 @@ static void mg_lwip_tcp_write_tcpip(void *arg) {
cs->err = tcp_write(tpcb, ctx->data, len, TCP_WRITE_FLAG_COPY);
unsent = (tpcb->unsent != NULL ? tpcb->unsent->len : 0);
unacked = (tpcb->unacked != NULL ? tpcb->unacked->len : 0);
DBG(("%p tcp_write %u = %d, %u %u",
tpcb, len, (int) cs->err, unsent, unacked));
DBG(("%p tcp_write %u = %d, %u %u", tpcb, len, (int) cs->err, unsent,
unacked));
if (cs->err != ERR_OK) {
/*
* We ignore ERR_MEM because memory will be freed up when the data is sent
@ -15502,7 +15509,7 @@ static int mg_lwip_if_udp_send(struct mg_connection *nc, const void *data,
memcpy(p->payload, data, len);
struct udp_sendto_ctx ctx = {.upcb = upcb, .p = p, .ip = &ip, .port = port};
if (ip_addr_ismulticast(&ip)) {
ctx.mcast_ip4 = (uint32_t) (uintptr_t) nc->priv_2;
ctx.mcast_ip4 = (uint32_t)(uintptr_t) nc->priv_2;
}
mg_lwip_netif_run_on_tcpip(udp_sendto_tcpip, &ctx);
cs->err = ctx.ret;
@ -15560,6 +15567,7 @@ static int mg_lwip_if_tcp_recv(struct mg_connection *nc, void *buf,
cs->rx_offset += copy_len;
if (cs->rx_offset == cs->rx_chain->len) {
cs->rx_chain = pbuf_dechain(cs->rx_chain);
cs->rx_chain_len--;
pbuf_free(seg);
cs->rx_offset = 0;
}

View File

@ -17,6 +17,10 @@
#include "common/cs_dbg.h"
#ifndef MG_LWIP_MAX_UDP_RX_CHAIN_LEN
#define MG_LWIP_MAX_UDP_RX_CHAIN_LEN 20
#endif
/*
* Newest versions of LWIP have ip_2_ip4, older have ipX_2_ip,
* even older have nothing.
@ -37,7 +41,7 @@
#define TCP_NEW tcp_new_ip6
#define TCP_BIND tcp_bind_ip6
#define UDP_BIND udp_bind_ip6
#define IPADDR_NTOA(x) ip6addr_ntoa((const ip6_addr_t *)(x))
#define IPADDR_NTOA(x) ip6addr_ntoa((const ip6_addr_t *) (x))
#define SET_ADDR(dst, src) \
memcpy((dst)->sin6.sin6_addr.s6_addr, (src)->ip6.addr, \
sizeof((dst)->sin6.sin6_addr.s6_addr))
@ -250,7 +254,7 @@ void mg_lwip_if_connect_tcp(struct mg_connection *nc,
mg_lwip_netif_run_on_tcpip(mg_lwip_if_connect_tcp_tcpip, &ctx);
}
struct udp_info {
struct mg_lwip_udp_info {
union socket_address remote_addr;
uint32_t local_ip4;
};
@ -268,15 +272,15 @@ static void mg_lwip_udp_recv_cb(void *arg, struct udp_pcb *pcb, struct pbuf *p,
#endif
{
struct mg_connection *nc = (struct mg_connection *) arg;
/*
* Scribble address on the margins. There's enough room there,
* between pbuf header and payload - at a minimum, IP and UDP headers.
*/
struct mg_lwip_udp_info *ui =
(struct mg_lwip_udp_info *) ((((uintptr_t) p->payload) &
(~((uintptr_t) 3))) -
sizeof(*ui));
DBG(("%p %s:%u %p %u %u", nc, IPADDR_NTOA(addr), port, p, p->ref, p->len));
/* Put address in a separate pbuf and tack it onto the packet. */
struct pbuf *sap =
pbuf_alloc(PBUF_RAW, sizeof(struct udp_info), PBUF_RAM);
if (sap == NULL) {
pbuf_free(p);
return;
}
struct udp_info *ui = (struct udp_info *) sap->payload;
struct netif *nif = ip_current_netif();
#if ((LWIP_VERSION_MAJOR << 8) | LWIP_VERSION_MINOR) >= 0x0105
ui->remote_addr.sin.sin_addr.s_addr = ip_2_ip4(addr)->addr;
@ -286,12 +290,8 @@ static void mg_lwip_udp_recv_cb(void *arg, struct udp_pcb *pcb, struct pbuf *p,
ui->local_ip4 = nif->ip_addr.addr;
#endif
ui->remote_addr.sin.sin_port = htons(port);
/* Logic in the recv handler requires that there be exactly one data pbuf. */
p = pbuf_coalesce(p, PBUF_RAW);
pbuf_chain(sap, p);
mgos_lock();
mg_lwip_recv_common(nc, sap);
mg_lwip_recv_common(nc, p);
mgos_unlock();
(void) pcb;
}
@ -300,8 +300,14 @@ static void mg_lwip_recv_common(struct mg_connection *nc, struct pbuf *p) {
struct mg_lwip_conn_state *cs = (struct mg_lwip_conn_state *) nc->sock;
if (cs->rx_chain == NULL) {
cs->rx_chain = p;
} else {
cs->rx_chain_len = 1;
} else if (!(nc->flags & MG_F_UDP) ||
cs->rx_chain_len < MG_LWIP_MAX_UDP_RX_CHAIN_LEN) {
pbuf_chain(cs->rx_chain, p);
cs->rx_chain_len++;
} else {
/* Drop it on the floor. */
pbuf_free(p);
}
if (!cs->recv_pending) {
cs->recv_pending = 1;
@ -311,32 +317,32 @@ static void mg_lwip_recv_common(struct mg_connection *nc, struct pbuf *p) {
static int mg_lwip_if_udp_recv(struct mg_connection *nc, void *buf, size_t len,
union socket_address *sa, size_t *sa_len) {
/*
* For UDP, RX chain consists of interleaved address and packet bufs:
* Address pbuf followed by exactly one data pbuf (recv_cb took care of that).
*/
int res = 0;
struct mg_lwip_conn_state *cs = (struct mg_lwip_conn_state *) nc->sock;
if (nc->sock == INVALID_SOCKET) return -1;
mgos_lock();
if (cs->rx_chain != NULL) {
struct pbuf *ap = cs->rx_chain;
struct pbuf *dp = ap->next;
cs->rx_chain = pbuf_dechain(dp);
res = MIN(dp->len, len);
pbuf_copy_partial(dp, buf, res, 0);
pbuf_free(dp);
const struct udp_info *ui = (struct udp_info *) ap->payload;
struct pbuf *p = cs->rx_chain;
cs->rx_chain = pbuf_dechain(p);
cs->rx_chain_len--;
mgos_unlock();
struct mg_lwip_udp_info *ui =
(struct mg_lwip_udp_info *) ((((uintptr_t) p->payload) &
(~((uintptr_t) 3))) -
sizeof(*ui));
*sa = ui->remote_addr;
/* Store local interface address for incoming packets.
* Only for listener because outgoing connections may use priv_2 for DNS. */
if (nc->listener == NULL) {
nc->priv_2 = (void *) (uintptr_t) ui->local_ip4;
}
pbuf_copy_partial(ap, sa, MIN(*sa_len, ap->len), 0);
pbuf_free(ap);
res = MIN(p->len, len);
pbuf_copy_partial(p, buf, res, 0);
pbuf_free(p);
} else {
mgos_unlock();
}
mgos_unlock();
(void) sa_len;
return res;
}
@ -515,8 +521,8 @@ static void mg_lwip_tcp_write_tcpip(void *arg) {
cs->err = tcp_write(tpcb, ctx->data, len, TCP_WRITE_FLAG_COPY);
unsent = (tpcb->unsent != NULL ? tpcb->unsent->len : 0);
unacked = (tpcb->unacked != NULL ? tpcb->unacked->len : 0);
DBG(("%p tcp_write %u = %d, %u %u",
tpcb, len, (int) cs->err, unsent, unacked));
DBG(("%p tcp_write %u = %d, %u %u", tpcb, len, (int) cs->err, unsent,
unacked));
if (cs->err != ERR_OK) {
/*
* We ignore ERR_MEM because memory will be freed up when the data is sent
@ -578,7 +584,7 @@ static int mg_lwip_if_udp_send(struct mg_connection *nc, const void *data,
memcpy(p->payload, data, len);
struct udp_sendto_ctx ctx = {.upcb = upcb, .p = p, .ip = &ip, .port = port};
if (ip_addr_ismulticast(&ip)) {
ctx.mcast_ip4 = (uint32_t) (uintptr_t) nc->priv_2;
ctx.mcast_ip4 = (uint32_t)(uintptr_t) nc->priv_2;
}
mg_lwip_netif_run_on_tcpip(udp_sendto_tcpip, &ctx);
cs->err = ctx.ret;
@ -636,6 +642,7 @@ static int mg_lwip_if_tcp_recv(struct mg_connection *nc, void *buf,
cs->rx_offset += copy_len;
if (cs->rx_offset == cs->rx_chain->len) {
cs->rx_chain = pbuf_dechain(cs->rx_chain);
cs->rx_chain_len--;
pbuf_free(seg);
cs->rx_offset = 0;
}

View File

@ -24,11 +24,12 @@ struct mg_lwip_conn_state {
struct pbuf *rx_chain; /* Chain of incoming data segments. */
size_t rx_offset; /* Offset within the first pbuf (if partially consumed) */
/* Last SSL write size, for retries. */
int last_ssl_write_size;
unsigned int last_ssl_write_size : 16;
unsigned int rx_chain_len : 8;
/* Whether MG_SIG_RECV is already pending for this connection */
int recv_pending;
unsigned int recv_pending : 1;
/* Whether the connection is about to close, just `rx_chain` needs to drain */
int draining_rx_chain;
unsigned int draining_rx_chain : 1;
};
enum mg_sig_type {