From 515e438d4f28e2a7a6d9d37665ba4c02e6cd3574 Mon Sep 17 00:00:00 2001 From: cpq Date: Sun, 12 Feb 2023 20:30:18 +0000 Subject: [PATCH] Refactor queue --- .github/workflows/test.yml | 2 +- Makefile | 8 +- examples/multi-threaded/Makefile | 2 +- examples/multi-threaded/main.c | 19 ++-- mongoose.c | 166 +++++++++++++++++++------------ mongoose.h | 56 +++++------ src/arch_newlib.h | 1 - src/arch_rp2040.h | 1 - src/config.h | 8 +- src/printf.c | 2 +- src/queue.c | 92 ++++++++++++----- src/queue.h | 42 ++++---- src/sha1.c | 2 +- src/util.h | 6 ++ test/mip_tap_test.c | 1 - test/mip_test.c | 1 - test/unit_test.c | 117 +++++++--------------- 17 files changed, 279 insertions(+), 247 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 6818ee63..b9ac4190 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -71,7 +71,7 @@ jobs: - if: ${{ env.GO == 1 }} run: make test upload-coverage TFLAGS=-DNO_SNTP_CHECK SSL=OPENSSL ASAN_OPTIONS= OPENSSL=`echo /usr/local/Cellar/openssl*/*` - if: ${{ env.GO == 1 }} - run: make test SSL=MBEDTLS TFLAGS="-DNO_SNTP_CHECK -DMG_ENABLE_ATOMIC=1" ASAN_OPTIONS= MBEDTLS=`echo /usr/local/Cellar/mbedtls*/*` + run: make test SSL=MBEDTLS TFLAGS=-DNO_SNTP_CHECK ASAN_OPTIONS= MBEDTLS=`echo /usr/local/Cellar/mbedtls*/*` - if: ${{ env.GO == 1 }} run: make mg_prefix windows: diff --git a/Makefile b/Makefile index 84466ac4..dd954698 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ SRCS = mongoose.c test/unit_test.c test/packed_fs.c HDRS = $(wildcard src/*.h) $(wildcard src/tcpip/*.h) -DEFS ?= -DMG_MAX_HTTP_HEADERS=7 -DMG_ENABLE_LINES -DMG_ENABLE_PACKED_FS=1 -DMG_ENABLE_SSI=1 +DEFS ?= -DMG_MAX_HTTP_HEADERS=7 -DMG_ENABLE_LINES -DMG_ENABLE_PACKED_FS=1 -DMG_ENABLE_SSI=1 -DMG_ENABLE_ASSERT=1 WARN ?= -pedantic -W -Wall -Werror -Wshadow -Wdouble-promotion -fno-common -Wconversion -Wundef OPTS ?= -O3 -g3 INCS ?= -Isrc -I. @@ -15,10 +15,10 @@ ASAN_OPTIONS ?= detect_leaks=1 EXAMPLES := $(dir $(wildcard examples/*/Makefile)) PREFIX ?= /usr/local VERSION ?= $(shell cut -d'"' -f2 src/version.h) -COMMON_CFLAGS ?= $(C_WARN) $(WARN) $(INCS) $(DEFS) -DMG_ENABLE_IPV6=$(IPV6) $(TFLAGS) -CFLAGS ?= $(OPTS) $(ASAN) $(COMMON_CFLAGS) -pthread +COMMON_CFLAGS ?= $(C_WARN) $(WARN) $(INCS) $(DEFS) -DMG_ENABLE_IPV6=$(IPV6) $(TFLAGS) -pthread +CFLAGS ?= $(OPTS) $(ASAN) $(COMMON_CFLAGS) VALGRIND_CFLAGS ?= $(OPTS) $(COMMON_CFLAGS) -VALGRIND_RUN ?= valgrind --tool=memcheck --gen-suppressions=all --leak-check=full --show-leak-kinds=all --leak-resolution=high --track-origins=yes --error-exitcode=1 --exit-on-first-error=yes +VALGRIND_RUN ?= valgrind --tool=memcheck --gen-suppressions=all --leak-check=full --show-leak-kinds=all --leak-resolution=high --track-origins=yes --error-exitcode=1 --exit-on-first-error=yes --fair-sched=yes .PHONY: examples test valgrind mip_test ifeq "$(findstring ++,$(CC))" "" diff --git a/examples/multi-threaded/Makefile b/examples/multi-threaded/Makefile index 0abba701..9e09b59d 100644 --- a/examples/multi-threaded/Makefile +++ b/examples/multi-threaded/Makefile @@ -2,7 +2,7 @@ SOURCES = main.c mongoose.c # Source code files CFLAGS = -W -Wall -Wextra -g -I. # Build options # Mongoose build options. See https://mongoose.ws/documentation/#build-options -CFLAGS_MONGOOSE += -DMG_ENABLE_ATOMIC=1 +CFLAGS_MONGOOSE += ifeq ($(OS),Windows_NT) # Windows settings. Assume MinGW compiler. To use VC: make CC=cl CFLAGS=/MD diff --git a/examples/multi-threaded/main.c b/examples/multi-threaded/main.c index e584cbf0..f1d0d0a8 100644 --- a/examples/multi-threaded/main.c +++ b/examples/multi-threaded/main.c @@ -15,7 +15,7 @@ struct thread_data { struct mg_str body; // Copy of message body }; -static void start_thread(void (*f)(void *), void *p) { +static void start_thread(void *(*f)(void *), void *p) { #ifdef _WIN32 #define usleep(x) Sleep((x) / 1000) _beginthread((void(__cdecl *)(void *)) f, 0, p); @@ -25,18 +25,17 @@ static void start_thread(void (*f)(void *), void *p) { pthread_attr_t attr; (void) pthread_attr_init(&attr); (void) pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); - pthread_create(&thread_id, &attr, (void *(*) (void *) ) f, p); + pthread_create(&thread_id, &attr, f, p); pthread_attr_destroy(&attr); #endif } -static void worker_thread(void *param) { +static void *worker_thread(void *param) { struct thread_data *d = (struct thread_data *) param; char buf[100]; // On-stack buffer for the message queue - d->queue.buf = buf; // Caller passed us an empty queue with NULL - d->queue.len = sizeof(buf); // buffer. Initialise it now - usleep(1 * 1000 * 1000); // Simulate long execution time + mg_queue_init(&d->queue, buf, sizeof(buf)); // Init queue + usleep(1 * 1000 * 1000); // Simulate long execution time // Send a response to the connection if (d->body.len == 0) { @@ -48,9 +47,10 @@ static void worker_thread(void *param) { } // Wait until connection reads our message, then it is safe to quit - while (mg_queue_next(&d->queue, NULL) != MG_QUEUE_EMPTY) usleep(1000); + while (d->queue.tail != d->queue.head) usleep(1000); MG_INFO(("done, cleaning up...")); free(d); + return NULL; } // HTTP request callback @@ -68,10 +68,11 @@ static void fn(struct mg_connection *c, int ev, void *ev_data, void *fn_data) { size_t len; char *buf; // Check if we have a message from the worker - if (d != NULL && (len = mg_queue_next(&d->queue, &buf)) != MG_QUEUE_EMPTY) { + if (d != NULL && (len = mg_queue_next(&d->queue, &buf)) > 0) { // Got message from worker. Send a response and cleanup mg_http_reply(c, 200, "", "%.*s\n", (int) len, buf); - mg_queue_del(&d->queue); // Delete message: signal worker that we're done + mg_queue_del(&d->queue, len); // Delete message + *(void **) c->data = NULL; // Forget about thread data } } (void) fn_data; diff --git a/mongoose.c b/mongoose.c index be28e9f8..4df20e81 100644 --- a/mongoose.c +++ b/mongoose.c @@ -3497,7 +3497,7 @@ void mg_mgr_init(struct mg_mgr *mgr) { size_t mg_queue_vprintf(struct mg_queue *q, const char *fmt, va_list *ap) { size_t len = mg_snprintf(NULL, 0, fmt, ap); char *buf; - if (mg_queue_space(q, &buf) < len + 1) { + if (len == 0 || mg_queue_book(q, &buf, len + 1) < len + 1) { len = 0; // Nah. Not enough space } else { len = mg_vsnprintf((char *)buf, len + 1, fmt, ap); @@ -3613,43 +3613,85 @@ size_t mg_print_mac(void (*out)(char, void *), void *arg, va_list *ap) { #endif -// Every message in the queue is prepended by the message length (ML) -// ML is sizeof(size_t) in size -// Tail points to the message data -// -// |------| ML | message1 | ML | message2 |--- free space ---| -// ^ ^ ^ ^ -// buf tail head len -size_t mg_queue_space(struct mg_queue *q, char **buf) { - size_t ofs; - if (q->head > 0 && q->tail >= q->head) { // All messages read? - q->head = 0; // Yes. Reset head first - q->tail = 0; // Now reset the tail +#if defined(__GNUC__) || defined(__clang__) +#define MG_MEMORY_BARRIER() __sync_synchronize() +#elif defined(_MSC_VER) && _MSC_VER >= 1700 +#define MG_MEMORY_BARRIER() MemoryBarrier() +#elif !defined(MG_MEMORY_BARRIER) +#define MG_MEMORY_BARRIER() +#endif + +// Every message in a queue is prepended by a 32-bit message length (ML). +// If ML is 0, then it is the end, and reader must wrap to the beginning. +// +// Queue when q->tail <= q->head: +// |----- free -----| ML | message1 | ML | message2 | ----- free ------| +// ^ ^ ^ ^ +// buf tail head len +// +// Queue when q->tail > q->head: +// | ML | message2 |----- free ------| ML | message1 | 0 |---- free ----| +// ^ ^ ^ ^ +// buf head tail len + +void mg_queue_init(struct mg_queue *q, char *buf, size_t size) { + q->size = size; + q->buf = buf; + q->head = q->tail = 0; +} + +static size_t mg_queue_read_len(struct mg_queue *q) { + uint32_t n = 0; + MG_MEMORY_BARRIER(); + memcpy(&n, q->buf + q->tail, sizeof(n)); + assert(q->tail + n + sizeof(n) <= q->size); + return n; +} + +static void mg_queue_write_len(struct mg_queue *q, size_t len) { + uint32_t n = (uint32_t) len; + memcpy(q->buf + q->head, &n, sizeof(n)); + MG_MEMORY_BARRIER(); +} + +size_t mg_queue_book(struct mg_queue *q, char **buf, size_t len) { + size_t space = 0, hs = sizeof(uint32_t) * 2; // *2 is for the 0 marker + if (q->head >= q->tail && q->head + len + hs <= q->size) { + space = q->size - q->head - hs; // There is enough space + } else if (q->head >= q->tail && q->tail > hs) { + mg_queue_write_len(q, 0); // Not enough space ahead + q->head = 0; // Wrap head to the beginning } - ofs = q->head + sizeof(size_t); - if (buf != NULL) *buf = q->buf + ofs; - return ofs > q->len ? 0 : q->len - ofs; + if (q->head + hs + len < q->tail) space = q->tail - q->head - hs; + if (buf != NULL) *buf = q->buf + q->head + sizeof(uint32_t); + return space; } size_t mg_queue_next(struct mg_queue *q, char **buf) { - size_t len = MG_QUEUE_EMPTY; - if (q->tail < q->head) memcpy(&len, &q->buf[q->tail], sizeof(len)); - if (buf != NULL) *buf = &q->buf[q->tail + sizeof(len)]; + size_t len = 0; + if (q->tail != q->head) { + len = mg_queue_read_len(q); + if (len == 0) { // Zero (head wrapped) ? + q->tail = 0; // Reset tail to the start + if (q->head > q->tail) len = mg_queue_read_len(q); // Read again + } + } + if (buf != NULL) *buf = q->buf + q->tail + sizeof(uint32_t); + assert(q->tail + len <= q->size); return len; } void mg_queue_add(struct mg_queue *q, size_t len) { - size_t head = q->head + len + (size_t) sizeof(head); // New head - if (head <= q->len) { // Have space ? - memcpy(q->buf + q->head, &len, sizeof(len)); // Yes. Store ML - q->head = head; // Advance head - } + assert(len > 0); + mg_queue_write_len(q, len); + assert(q->head + sizeof(uint32_t) * 2 + len <= q->size); + q->head += len + sizeof(uint32_t); } -void mg_queue_del(struct mg_queue *q) { - size_t len = mg_queue_next(q, NULL), tail = q->tail + len + sizeof(size_t); - if (len != MG_QUEUE_EMPTY) q->tail = tail; +void mg_queue_del(struct mg_queue *q, size_t len) { + q->tail += len + sizeof(uint32_t); + assert(q->tail + sizeof(uint32_t) <= q->size); } #ifdef MG_ENABLE_LINES @@ -3807,7 +3849,7 @@ static uint32_t blk0(union char64long16 *block, int i) { w = rol(w, 30); static void mg_sha1_transform(uint32_t state[5], - const unsigned char buffer[64]) { + const unsigned char *buffer) { uint32_t a, b, c, d, e; union char64long16 block[1]; @@ -6035,14 +6077,14 @@ volatile uint32_t RESERVED0, EIR, EIMR, RESERVED1, RDAR, TDAR, RESERVED2[3], ECR const uint32_t EIMR_RX_ERR = 0x2400000; // Intr mask RXF+EBERR void ETH_IRQHandler(void); -static bool mg_tcpip_driver_imxrt1020_init(struct mip_if *ifp); +static bool mg_tcpip_driver_imxrt1020_init(struct mg_tcpip_if *ifp); static void wait_phy_complete(void); -static struct mip_if *s_ifp; // MIP interface +static struct mg_tcpip_if *s_ifp; // MIP interface -static size_t mg_tcpip_driver_imxrt1020_tx(const void *, size_t , struct mip_if *); -static bool mg_tcpip_driver_imxrt1020_up(struct mip_if *ifp); +static size_t mg_tcpip_driver_imxrt1020_tx(const void *, size_t , struct mg_tcpip_if *); +static bool mg_tcpip_driver_imxrt1020_up(struct mg_tcpip_if *ifp); -enum { PHY_ADDR = 0x02, PHY_BCR = 0, PHY_BSR = 1 }; // PHY constants +enum { IMXRT1020_PHY_ADDR = 0x02, IMXRT1020_PHY_BCR = 0, IMXRT1020_PHY_BSR = 1 }; // PHY constants void delay(uint32_t); void delay (uint32_t di) { @@ -6061,7 +6103,7 @@ static void wait_phy_complete(void) { ENET->EIR |= BIT(23); // MII interrupt clear } -static uint32_t eth_read_phy(uint8_t addr, uint8_t reg) { +static uint32_t imxrt1020_eth_read_phy(uint8_t addr, uint8_t reg) { ENET->EIR |= BIT(23); // MII interrupt clear uint32_t mask_phy_adr_reg = 0x1f; // 0b00011111: Ensure we write 5 bits (Phy address & register) uint32_t phy_transaction = 0x00; @@ -6077,7 +6119,7 @@ static uint32_t eth_read_phy(uint8_t addr, uint8_t reg) { return (ENET->MMFR & 0x0000ffff); } -static void eth_write_phy(uint8_t addr, uint8_t reg, uint32_t val) { +static void imxrt1020_eth_write_phy(uint8_t addr, uint8_t reg, uint32_t val) { ENET->EIR |= BIT(23); // MII interrupt clear uint8_t mask_phy_adr_reg = 0x1f; // 0b00011111: Ensure we write 5 bits (Phy address & register) uint32_t mask_phy_data = 0x0000ffff; // Ensure we write 16 bits (data) @@ -6116,7 +6158,7 @@ uint8_t tx_data_buffer[(ENET_TXBD_NUM)][((unsigned int)(((ENET_TXBUFF_SIZE)) + ( // Initialise driver imx_rt1020 // static bool mg_tcpip_driver_imxrt1020_init(uint8_t *mac, void *data) { // VO -static bool mg_tcpip_driver_imxrt1020_init(struct mip_if *ifp) { +static bool mg_tcpip_driver_imxrt1020_init(struct mg_tcpip_if *ifp) { struct mg_tcpip_driver_imxrt1020_data *d = (struct mg_tcpip_driver_imxrt1020_data *) ifp->driver_data; s_ifp = ifp; @@ -6131,17 +6173,17 @@ static bool mg_tcpip_driver_imxrt1020_init(struct mip_if *ifp) { // Setup MII/RMII MDC clock divider (<= 2.5MHz). ENET->MSCR = 0x130; // HOLDTIME 2 clk, Preamble enable, MDC MII_Speed Div 0x30 - eth_write_phy(PHY_ADDR, PHY_BCR, 0x8000); // PHY W @0x00 D=0x8000 Soft reset - while (eth_read_phy(PHY_ADDR, PHY_BSR) & BIT(15)) {delay(0x5000);} // Wait finished poll 10ms + imxrt1020_eth_write_phy(IMXRT1020_PHY_ADDR, IMXRT1020_PHY_BCR, 0x8000); // PHY W @0x00 D=0x8000 Soft reset + while (imxrt1020_eth_read_phy(IMXRT1020_PHY_ADDR, IMXRT1020_PHY_BSR) & BIT(15)) {delay(0x5000);} // Wait finished poll 10ms // PHY: Start Link { - eth_write_phy(PHY_ADDR, PHY_BCR, 0x1200); // PHY W @0x00 D=0x1200 Autonego enable + start - eth_write_phy(PHY_ADDR, 0x1f, 0x8180); // PHY W @0x1f D=0x8180 Ref clock 50 MHz at XI input + imxrt1020_eth_write_phy(IMXRT1020_PHY_ADDR, IMXRT1020_PHY_BCR, 0x1200); // PHY W @0x00 D=0x1200 Autonego enable + start + imxrt1020_eth_write_phy(IMXRT1020_PHY_ADDR, 0x1f, 0x8180); // PHY W @0x1f D=0x8180 Ref clock 50 MHz at XI input - uint32_t bcr = eth_read_phy(PHY_ADDR, PHY_BCR); + uint32_t bcr = imxrt1020_eth_read_phy(IMXRT1020_PHY_ADDR, IMXRT1020_PHY_BCR); bcr &= ~BIT(10); // Isolation -> Normal - eth_write_phy(PHY_ADDR, PHY_BCR, bcr); + imxrt1020_eth_write_phy(IMXRT1020_PHY_ADDR, IMXRT1020_PHY_BCR, bcr); } // Disable ENET @@ -6199,23 +6241,23 @@ static bool mg_tcpip_driver_imxrt1020_init(struct mip_if *ifp) { } // Transmit frame -static uint32_t s_txno; +static uint32_t s_rt1020_txno; -static size_t mg_tcpip_driver_imxrt1020_tx(const void *buf, size_t len, struct mip_if *ifp) { +static size_t mg_tcpip_driver_imxrt1020_tx(const void *buf, size_t len, struct mg_tcpip_if *ifp) { if (len > sizeof(tx_data_buffer[ENET_TXBD_NUM])) { // MG_ERROR(("Frame too big, %ld", (long) len)); len = 0; // Frame is too big - } else if ((tx_buffer_descriptor[s_txno].control & BIT(15))) { + } else if ((tx_buffer_descriptor[s_rt1020_txno].control & BIT(15))) { MG_ERROR(("No free descriptors")); // printf("D0 %lx SR %lx\n", (long) s_txdesc[0][0], (long) ETH->DMASR); len = 0; // All descriptors are busy, fail } else { - memcpy(tx_data_buffer[s_txno], buf, len); // Copy data - tx_buffer_descriptor[s_txno].length = (uint16_t) len; // Set data len - tx_buffer_descriptor[s_txno].control |= (uint16_t)(BIT(10)); // TC (transmit CRC) - // tx_buffer_descriptor[s_txno].control &= (uint16_t)(BIT(14) | BIT(12)); // Own doesn't affect HW - tx_buffer_descriptor[s_txno].control |= (uint16_t)(BIT(15) | BIT(11)); // R+L (ready+last) + memcpy(tx_data_buffer[s_rt1020_txno], buf, len); // Copy data + tx_buffer_descriptor[s_rt1020_txno].length = (uint16_t) len; // Set data len + tx_buffer_descriptor[s_rt1020_txno].control |= (uint16_t)(BIT(10)); // TC (transmit CRC) + // tx_buffer_descriptor[s_rt1020_txno].control &= (uint16_t)(BIT(14) | BIT(12)); // Own doesn't affect HW + tx_buffer_descriptor[s_rt1020_txno].control |= (uint16_t)(BIT(15) | BIT(11)); // R+L (ready+last) ENET->TDAR = BIT(24); // Descriptor updated. Hand over to DMA. // INFO // Relevant Descriptor bits: 15(R) Ready @@ -6223,36 +6265,34 @@ static size_t mg_tcpip_driver_imxrt1020_tx(const void *buf, size_t len, struct m // 10(TC) transmis CRC // __DSB(); // ARM errata 838869 Cortex-M4, M4F, M7, M7F: "store immediate overlapping // exception" return might vector to incorrect interrupt. - if (++s_txno >= ENET_TXBD_NUM) s_txno = 0; + if (++s_rt1020_txno >= ENET_TXBD_NUM) s_rt1020_txno = 0; } (void) ifp; return len; } // IRQ (RX) -static uint32_t s_rxno; +static uint32_t s_rt1020_rxno; void ENET_IRQHandler(void) { ENET->EIMR = 0; // Mask interrupts. uint32_t eir = ENET->EIR; // Read EIR ENET->EIR = 0xffffffff; // Clear interrupts - qp_mark(QP_IRQTRIGGERED, 0); - if (eir & EIMR_RX_ERR) // Global mask used { - if (rx_buffer_descriptor[s_rxno].control & BIT(15)) { + if (rx_buffer_descriptor[s_rt1020_rxno].control & BIT(15)) { ENET->EIMR = EIMR_RX_ERR; // Enable interrupts return; // Empty? -> exit. } // Read inframes else { // Frame received, loop for (uint32_t i = 0; i < 10; i++) { // read as they arrive but not forever - if (rx_buffer_descriptor[s_rxno].control & BIT(15)) break; // exit when done - uint32_t len = (rx_buffer_descriptor[s_rxno].length); - mg_tcpip_qwrite(rx_buffer_descriptor[s_rxno].buffer, len > 4 ? len - 4 : len, s_ifp); - rx_buffer_descriptor[s_rxno].control |= BIT(15); // Inform DMA RX is empty - if (++s_rxno >= ENET_RXBD_NUM) s_rxno = 0; + if (rx_buffer_descriptor[s_rt1020_rxno].control & BIT(15)) break; // exit when done + uint32_t len = (rx_buffer_descriptor[s_rt1020_rxno].length); + mg_tcpip_qwrite(rx_buffer_descriptor[s_rt1020_rxno].buffer, len > 4 ? len - 4 : len, s_ifp); + rx_buffer_descriptor[s_rt1020_rxno].control |= BIT(15); // Inform DMA RX is empty + if (++s_rt1020_rxno >= ENET_RXBD_NUM) s_rt1020_rxno = 0; } } } @@ -6260,15 +6300,15 @@ void ENET_IRQHandler(void) { } // Up/down status -static bool mg_tcpip_driver_imxrt1020_up(struct mip_if *ifp) { - uint32_t bsr = eth_read_phy(PHY_ADDR, PHY_BSR); +static bool mg_tcpip_driver_imxrt1020_up(struct mg_tcpip_if *ifp) { + uint32_t bsr = imxrt1020_eth_read_phy(IMXRT1020_PHY_ADDR, IMXRT1020_PHY_BSR); (void) ifp; return bsr & BIT(2) ? 1 : 0; } // API struct mg_tcpip_driver mg_tcpip_driver_imxrt1020 = { - mg_tcpip_driver_imxrt1020_init, mg_tcpip_driver_imxrt1020_tx, mip_driver_rx, + mg_tcpip_driver_imxrt1020_init, mg_tcpip_driver_imxrt1020_tx, mg_tcpip_driver_rx, mg_tcpip_driver_imxrt1020_up}; #endif diff --git a/mongoose.h b/mongoose.h index d66bbb32..6b6cfd56 100644 --- a/mongoose.h +++ b/mongoose.h @@ -223,7 +223,6 @@ static inline int mg_mkdir(const char *path, mode_t mode) { #define MG_PATH_MAX 100 #define MG_ENABLE_SOCKET 0 #define MG_ENABLE_DIRLIST 0 -#define MG_ENABLE_ATOMIC 1 #endif @@ -240,7 +239,6 @@ static inline int mg_mkdir(const char *path, mode_t mode) { #include int mkdir(const char *, mode_t); -#define MG_ENABLE_ATOMIC 1 #endif @@ -599,10 +597,6 @@ struct timeval { #define MG_ENABLE_TCPIP 0 // Mongoose built-in network stack #endif -#ifndef MG_ENABLE_ATOMIC -#define MG_ENABLE_ATOMIC 0 // Required by mg_queue -#endif - #ifndef MG_ENABLE_LWIP #define MG_ENABLE_LWIP 0 // lWIP network stack #endif @@ -676,6 +670,10 @@ struct timeval { #define MG_ENABLE_PACKED_FS 0 #endif +#ifndef MG_ENABLE_ASSERT +#define MG_ENABLE_ASSERT 0 +#endif + #ifndef MG_IO_SIZE #define MG_IO_SIZE 2048 // Granularity of the send/recv IO buffer growth #endif @@ -792,40 +790,32 @@ char *mg_remove_double_dots(char *s); - -#if MG_ENABLE_ATOMIC -#include -#elif !defined(_Atomic) -#define _Atomic -#endif - // Single producer, single consumer non-blocking queue // // Producer: -// void *buf; -// while (mg_queue_space(q, &buf) < len) WAIT(); // Wait for free space -// memcpy(buf, data, len); // Copy data to the queue -// mg_queue_add(q, len); // Advance q->head +// char *buf; +// while (mg_queue_book(q, &buf) < len) WAIT(); // Wait for space +// memcpy(buf, my_data, len); // Copy data to the queue +// mg_queue_add(q, len); // // Consumer: -// void *buf; -// while ((len = mg_queue_next(q, &buf)) == MG_QUEUE_EMPTY) WAIT(); -// mg_hexdump(buf, len); // Handle message -// mg_queue_del(q); // Delete message +// char *buf; +// while ((len = mg_queue_get(q, &buf)) == 0) WAIT(); +// mg_hexdump(buf, len); // Handle message +// mg_queue_del(q, len); // struct mg_queue { char *buf; - size_t len; - volatile _Atomic size_t tail; - volatile _Atomic size_t head; + size_t size; + volatile size_t tail; + volatile size_t head; }; -#define MG_QUEUE_EMPTY ((size_t) ~0ul) // Next message size when queue is empty - -void mg_queue_add(struct mg_queue *, size_t len); // Advance head -void mg_queue_del(struct mg_queue *); // Advance tail -size_t mg_queue_space(struct mg_queue *, char **); // Get free space -size_t mg_queue_next(struct mg_queue *, char **); // Get next message size +void mg_queue_init(struct mg_queue *, char *, size_t); // Init queue +size_t mg_queue_book(struct mg_queue *, char **buf, size_t); // Reserve space +void mg_queue_add(struct mg_queue *, size_t); // Add new message +size_t mg_queue_next(struct mg_queue *, char **); // Get oldest message +void mg_queue_del(struct mg_queue *, size_t); // Delete oldest message @@ -958,6 +948,12 @@ bool mg_file_printf(struct mg_fs *fs, const char *path, const char *fmt, ...); +#if MG_ENABLE_ASSERT +#include +#elif !defined(assert) +#define assert(x) +#endif + void mg_random(void *buf, size_t len); char *mg_random_str(char *buf, size_t len); uint16_t mg_ntohs(uint16_t net); diff --git a/src/arch_newlib.h b/src/arch_newlib.h index 6618e9bd..bb0281b9 100644 --- a/src/arch_newlib.h +++ b/src/arch_newlib.h @@ -19,6 +19,5 @@ #define MG_PATH_MAX 100 #define MG_ENABLE_SOCKET 0 #define MG_ENABLE_DIRLIST 0 -#define MG_ENABLE_ATOMIC 1 #endif diff --git a/src/arch_rp2040.h b/src/arch_rp2040.h index d5c28eba..ace069bf 100644 --- a/src/arch_rp2040.h +++ b/src/arch_rp2040.h @@ -12,5 +12,4 @@ #include int mkdir(const char *, mode_t); -#define MG_ENABLE_ATOMIC 1 #endif diff --git a/src/config.h b/src/config.h index f0e38d26..80012f47 100644 --- a/src/config.h +++ b/src/config.h @@ -8,10 +8,6 @@ #define MG_ENABLE_TCPIP 0 // Mongoose built-in network stack #endif -#ifndef MG_ENABLE_ATOMIC -#define MG_ENABLE_ATOMIC 0 // Required by mg_queue -#endif - #ifndef MG_ENABLE_LWIP #define MG_ENABLE_LWIP 0 // lWIP network stack #endif @@ -85,6 +81,10 @@ #define MG_ENABLE_PACKED_FS 0 #endif +#ifndef MG_ENABLE_ASSERT +#define MG_ENABLE_ASSERT 0 +#endif + #ifndef MG_IO_SIZE #define MG_IO_SIZE 2048 // Granularity of the send/recv IO buffer growth #endif diff --git a/src/printf.c b/src/printf.c index 79a86ff8..5d0cd9c2 100644 --- a/src/printf.c +++ b/src/printf.c @@ -6,7 +6,7 @@ size_t mg_queue_vprintf(struct mg_queue *q, const char *fmt, va_list *ap) { size_t len = mg_snprintf(NULL, 0, fmt, ap); char *buf; - if (mg_queue_space(q, &buf) < len + 1) { + if (len == 0 || mg_queue_book(q, &buf, len + 1) < len + 1) { len = 0; // Nah. Not enough space } else { len = mg_vsnprintf((char *)buf, len + 1, fmt, ap); diff --git a/src/queue.c b/src/queue.c index faf26d6f..7eac0d32 100644 --- a/src/queue.c +++ b/src/queue.c @@ -1,40 +1,82 @@ #include "queue.h" +#include "util.h" -// Every message in the queue is prepended by the message length (ML) -// ML is sizeof(size_t) in size -// Tail points to the message data +#if defined(__GNUC__) || defined(__clang__) +#define MG_MEMORY_BARRIER() __sync_synchronize() +#elif defined(_MSC_VER) && _MSC_VER >= 1700 +#define MG_MEMORY_BARRIER() MemoryBarrier() +#elif !defined(MG_MEMORY_BARRIER) +#define MG_MEMORY_BARRIER() +#endif + +// Every message in a queue is prepended by a 32-bit message length (ML). +// If ML is 0, then it is the end, and reader must wrap to the beginning. // -// |------| ML | message1 | ML | message2 |--- free space ---| -// ^ ^ ^ ^ -// buf tail head len +// Queue when q->tail <= q->head: +// |----- free -----| ML | message1 | ML | message2 | ----- free ------| +// ^ ^ ^ ^ +// buf tail head len +// +// Queue when q->tail > q->head: +// | ML | message2 |----- free ------| ML | message1 | 0 |---- free ----| +// ^ ^ ^ ^ +// buf head tail len -size_t mg_queue_space(struct mg_queue *q, char **buf) { - size_t ofs; - if (q->head > 0 && q->tail >= q->head) { // All messages read? - q->head = 0; // Yes. Reset head first - q->tail = 0; // Now reset the tail +void mg_queue_init(struct mg_queue *q, char *buf, size_t size) { + q->size = size; + q->buf = buf; + q->head = q->tail = 0; +} + +static size_t mg_queue_read_len(struct mg_queue *q) { + uint32_t n = 0; + MG_MEMORY_BARRIER(); + memcpy(&n, q->buf + q->tail, sizeof(n)); + assert(q->tail + n + sizeof(n) <= q->size); + return n; +} + +static void mg_queue_write_len(struct mg_queue *q, size_t len) { + uint32_t n = (uint32_t) len; + memcpy(q->buf + q->head, &n, sizeof(n)); + MG_MEMORY_BARRIER(); +} + +size_t mg_queue_book(struct mg_queue *q, char **buf, size_t len) { + size_t space = 0, hs = sizeof(uint32_t) * 2; // *2 is for the 0 marker + if (q->head >= q->tail && q->head + len + hs <= q->size) { + space = q->size - q->head - hs; // There is enough space + } else if (q->head >= q->tail && q->tail > hs) { + mg_queue_write_len(q, 0); // Not enough space ahead + q->head = 0; // Wrap head to the beginning } - ofs = q->head + sizeof(size_t); - if (buf != NULL) *buf = q->buf + ofs; - return ofs > q->len ? 0 : q->len - ofs; + if (q->head + hs + len < q->tail) space = q->tail - q->head - hs; + if (buf != NULL) *buf = q->buf + q->head + sizeof(uint32_t); + return space; } size_t mg_queue_next(struct mg_queue *q, char **buf) { - size_t len = MG_QUEUE_EMPTY; - if (q->tail < q->head) memcpy(&len, &q->buf[q->tail], sizeof(len)); - if (buf != NULL) *buf = &q->buf[q->tail + sizeof(len)]; + size_t len = 0; + if (q->tail != q->head) { + len = mg_queue_read_len(q); + if (len == 0) { // Zero (head wrapped) ? + q->tail = 0; // Reset tail to the start + if (q->head > q->tail) len = mg_queue_read_len(q); // Read again + } + } + if (buf != NULL) *buf = q->buf + q->tail + sizeof(uint32_t); + assert(q->tail + len <= q->size); return len; } void mg_queue_add(struct mg_queue *q, size_t len) { - size_t head = q->head + len + (size_t) sizeof(head); // New head - if (head <= q->len) { // Have space ? - memcpy(q->buf + q->head, &len, sizeof(len)); // Yes. Store ML - q->head = head; // Advance head - } + assert(len > 0); + mg_queue_write_len(q, len); + assert(q->head + sizeof(uint32_t) * 2 + len <= q->size); + q->head += len + sizeof(uint32_t); } -void mg_queue_del(struct mg_queue *q) { - size_t len = mg_queue_next(q, NULL), tail = q->tail + len + sizeof(size_t); - if (len != MG_QUEUE_EMPTY) q->tail = tail; +void mg_queue_del(struct mg_queue *q, size_t len) { + q->tail += len + sizeof(uint32_t); + assert(q->tail + sizeof(uint32_t) <= q->size); } diff --git a/src/queue.h b/src/queue.h index fbd905ab..ff37d6d4 100644 --- a/src/queue.h +++ b/src/queue.h @@ -1,38 +1,30 @@ #pragma once -#include "arch.h" // For size_t -#include "config.h" // For MG_ENABLE_ATOMIC - -#if MG_ENABLE_ATOMIC -#include -#elif !defined(_Atomic) -#define _Atomic -#endif +#include "arch.h" // For size_t // Single producer, single consumer non-blocking queue // // Producer: -// void *buf; -// while (mg_queue_space(q, &buf) < len) WAIT(); // Wait for free space -// memcpy(buf, data, len); // Copy data to the queue -// mg_queue_add(q, len); // Advance q->head +// char *buf; +// while (mg_queue_book(q, &buf) < len) WAIT(); // Wait for space +// memcpy(buf, my_data, len); // Copy data to the queue +// mg_queue_add(q, len); // // Consumer: -// void *buf; -// while ((len = mg_queue_next(q, &buf)) == MG_QUEUE_EMPTY) WAIT(); -// mg_hexdump(buf, len); // Handle message -// mg_queue_del(q); // Delete message +// char *buf; +// while ((len = mg_queue_get(q, &buf)) == 0) WAIT(); +// mg_hexdump(buf, len); // Handle message +// mg_queue_del(q, len); // struct mg_queue { char *buf; - size_t len; - volatile _Atomic size_t tail; - volatile _Atomic size_t head; + size_t size; + volatile size_t tail; + volatile size_t head; }; -#define MG_QUEUE_EMPTY ((size_t) ~0ul) // Next message size when queue is empty - -void mg_queue_add(struct mg_queue *, size_t len); // Advance head -void mg_queue_del(struct mg_queue *); // Advance tail -size_t mg_queue_space(struct mg_queue *, char **); // Get free space -size_t mg_queue_next(struct mg_queue *, char **); // Get next message size +void mg_queue_init(struct mg_queue *, char *, size_t); // Init queue +size_t mg_queue_book(struct mg_queue *, char **buf, size_t); // Reserve space +void mg_queue_add(struct mg_queue *, size_t); // Add new message +size_t mg_queue_next(struct mg_queue *, char **); // Get oldest message +void mg_queue_del(struct mg_queue *, size_t); // Delete oldest message diff --git a/src/sha1.c b/src/sha1.c index 10be1c74..dc0c6bf6 100644 --- a/src/sha1.c +++ b/src/sha1.c @@ -48,7 +48,7 @@ static uint32_t blk0(union char64long16 *block, int i) { w = rol(w, 30); static void mg_sha1_transform(uint32_t state[5], - const unsigned char buffer[64]) { + const unsigned char *buffer) { uint32_t a, b, c, d, e; union char64long16 block[1]; diff --git a/src/util.h b/src/util.h index 4b5cc696..ccce7d9b 100644 --- a/src/util.h +++ b/src/util.h @@ -6,6 +6,12 @@ #include "net.h" #include "str.h" +#if MG_ENABLE_ASSERT +#include +#elif !defined(assert) +#define assert(x) +#endif + void mg_random(void *buf, size_t len); char *mg_random_str(char *buf, size_t len); uint16_t mg_ntohs(uint16_t net); diff --git a/test/mip_tap_test.c b/test/mip_tap_test.c index 91843b06..97333c4d 100644 --- a/test/mip_tap_test.c +++ b/test/mip_tap_test.c @@ -4,7 +4,6 @@ #define MG_ENABLE_PACKED_FS 0 #define MG_ENABLE_LINES 1 -#include #include #ifndef __OpenBSD__ #include diff --git a/test/mip_test.c b/test/mip_test.c index 8b6e0112..626d6d79 100644 --- a/test/mip_test.c +++ b/test/mip_test.c @@ -3,7 +3,6 @@ #define MG_ENABLE_TCPIP 1 #define MG_ENABLE_PACKED_FS 0 -#include #include "mongoose.c" #include "driver_mock.c" diff --git a/test/unit_test.c b/test/unit_test.c index d0e0430b..abb04e69 100644 --- a/test/unit_test.c +++ b/test/unit_test.c @@ -2686,32 +2686,41 @@ static void test_poll(void) { mg_mgr_free(&mgr); } -#if MG_ENABLE_ATOMIC -#define NMESSAGES 49999 +#define NMESSAGES 99999 static uint32_t s_qcrc = 0; +static int s_out, s_in; static void producer(void *param) { struct mg_queue *q = (struct mg_queue *) param; - volatile size_t i, n, len; - for (i = 0; i < NMESSAGES; i++) { - char buf[100]; - char *p; - mg_random(buf, sizeof(buf)); - n = ((unsigned char *) buf)[0] % sizeof(buf); - while ((len = mg_queue_space(q, &p)) < n) (void) 0; - memcpy(p, buf, n); - mg_queue_add(q, n); - s_qcrc = mg_crc32(s_qcrc, buf, n); + char tmp[64 * 1024], *buf; + size_t len, ofs = sizeof(tmp); + for (s_out = 0; s_out < NMESSAGES; s_out++) { + if (ofs >= sizeof(tmp)) mg_random(tmp, sizeof(tmp)), ofs = 0; + len = ((uint8_t *) tmp)[ofs] % 55 + 1; + if (ofs + len > sizeof(tmp)) len = sizeof(tmp) - ofs; + while ((mg_queue_book(q, &buf, len)) < len) (void) 0; + memcpy(buf, &tmp[ofs], len); + s_qcrc = mg_crc32(s_qcrc, buf, len); + ofs += len; +#if 0 + fprintf(stderr, "-->prod %3d %8x %-3lu %zu/%zu/%lu\n", s_out, s_qcrc, len, q->tail, + q->head, buf - q->buf); +#endif + mg_queue_add(q, len); } } static uint32_t consumer(struct mg_queue *q) { - uint32_t i, crc = 0; - for (i = 0; i < NMESSAGES; i++) { - char *p; - volatile size_t len; - while ((len = mg_queue_next(q, &p)) == MG_QUEUE_EMPTY) (void) 0; - crc = mg_crc32(crc, (char *) p, len); - mg_queue_del(q); + uint32_t crc = 0; + for (s_in = 0; s_in < NMESSAGES; s_in++) { + char *buf; + size_t len; + while ((len = mg_queue_next(q, &buf)) == 0) (void) 0; + crc = mg_crc32(crc, buf, len); +#if 0 + fprintf(stderr, "-->cons %3u %8x %-3lu %zu/%zu/%lu\n", s_in, crc, len, q->tail, + q->head, buf - q->buf); +#endif + mg_queue_del(q, len); } return crc; } @@ -2739,68 +2748,18 @@ static void start_thread(void (*f)(void *), void *p) { (void) f, (void) p; } #endif -#endif static void test_queue(void) { - char buf[512], *p; - size_t size = sizeof(size); - struct mg_queue queue, *q = &queue; - - memset(q, 0, sizeof(*q)); - q->buf = buf; - q->len = sizeof(buf); - - ASSERT(mg_queue_next(q, &p) == MG_QUEUE_EMPTY); - - // Write "hi" - ASSERT(mg_queue_space(q, &p) == sizeof(buf) - size); - ASSERT(mg_queue_printf(q, "hi") == 2); - ASSERT(q->head == size + 2); - ASSERT(mg_queue_next(q, &p) == 2); - - // Write zero-length message - ASSERT(mg_queue_space(q, &p) == sizeof(buf) - 2 - 2 * size); - ASSERT(mg_queue_printf(q, "") == 0); - ASSERT(q->head == size * 2 + 2); - ASSERT(mg_queue_next(q, &p) == 2); - - // Write "dude" - ASSERT(mg_queue_space(q, &p) == sizeof(buf) - 2 - 3 * size); - ASSERT(mg_queue_printf(q, "dude") == 4); - ASSERT(q->head == size * 3 + 2 + 4); - ASSERT(mg_queue_next(q, &p) == 2); - - // Read "hi" - ASSERT(mg_queue_next(q, &p) == 2); - ASSERT(memcmp(p, "hi", 2) == 0); - mg_queue_del(q); - ASSERT(q->head == size * 3 + 2 + 4); - ASSERT(q->tail == size + 2); - - // Read empty message - ASSERT(mg_queue_next(q, &p) == 0); - mg_queue_del(q); - ASSERT(q->tail == size * 2 + 2); - - ASSERT(mg_queue_next(q, &p) == 4); - ASSERT(memcmp(p, "dude", 4) == 0); - mg_queue_del(q); - ASSERT(q->tail == q->head); - - ASSERT(mg_queue_space(q, &p) == sizeof(buf) - size); - ASSERT(q->tail == 0 && q->head == 0); - q->tail = q->head = 0; - -#if MG_ENABLE_ATOMIC - { - // Test concurrent queue access - uint32_t crc; - start_thread(producer, q); - crc = consumer(q); - MG_INFO(("DONE. %x %x", s_qcrc, crc)); - ASSERT(s_qcrc == crc); - } -#endif + char buf[512]; + struct mg_queue queue; + uint32_t crc; + memset(buf, 0x55, sizeof(buf)); + mg_queue_init(&queue, buf, sizeof(buf)); + start_thread(producer, &queue); // Start producer in a separate thread + crc = consumer(&queue); // Consumer eats data in this thread + MG_INFO(("CRC1 %8x", s_qcrc)); // Show CRCs + MG_INFO(("CRC2 %8x", crc)); + ASSERT(s_qcrc == crc); } int main(void) {