From 9eacdbcc98b1021cf755c6780efb9352786752d4 Mon Sep 17 00:00:00 2001 From: Nick Gasson Date: Mon, 21 Nov 2022 18:48:05 +0000 Subject: [PATCH] Add a streaming mode to fbuf_t --- src/fbuf.c | 92 ++++++++++++++++++++++++++++++++++++++---------- src/fbuf.h | 6 ++++ src/util.c | 18 +++++++++- src/util.h | 1 + test/test_misc.c | 36 ++++++++++++++++++- 5 files changed, 133 insertions(+), 20 deletions(-) diff --git a/src/fbuf.c b/src/fbuf.c index bc68c030..00f3db75 100644 --- a/src/fbuf.c +++ b/src/fbuf.c @@ -74,6 +74,7 @@ struct _fbuf { fbuf_t *next; fbuf_t *prev; cs_state_t checksum; + fbuf_zip_t zip; }; static fbuf_t *open_list = NULL; @@ -176,11 +177,17 @@ static void fbuf_write_raw(fbuf_t *f, const uint8_t *bytes, size_t count) fatal_errno("%s: fwrite", f->fname); } +static void fbuf_read_raw(fbuf_t *f, uint8_t *bytes, size_t count) +{ + if (fread(bytes, count, 1, f->file) != 1) + fatal_errno("%s: fread", f->fname); +} + static void fbuf_write_header(fbuf_t *f) { const uint8_t header[16] = { 'F', 'B', 'U', 'F', // Magic number "FBUF" - 'F', // Compression format (FastLZ) + f->zip, // Compression format f->checksum.algo, // Checksum algorithm 0, 0, // Unused 0, 0, 0, 0, // Decompressed length @@ -191,7 +198,14 @@ static void fbuf_write_header(fbuf_t *f) static void fbuf_update_header(fbuf_t *f, uint32_t checksum) { - if (fseek(f->file, 8, SEEK_SET) != 0) + struct stat buf; + if (fstat(fileno(f->file), &buf) != 0) + fatal_errno("fstat"); + + if (S_ISFIFO(buf.st_mode)) { + // Streaming mode: length and checksum is appended instead + } + else if (fseek(f->file, 8, SEEK_SET) != 0) fatal_errno("%s: fseek", f->fname); const uint8_t bytes[8] = { PACK_BE32(f->wtotal), PACK_BE32(checksum) }; @@ -200,13 +214,8 @@ static void fbuf_update_header(fbuf_t *f, uint32_t checksum) static void fbuf_decompress(fbuf_t *f) { - struct stat buf; - if (fstat(fileno(f->file), &buf) != 0) - fatal_errno("fstat"); - - void *rmap = map_file(fileno(f->file), buf.st_size); - - const uint8_t *header = rmap; + uint8_t header[16]; + fbuf_read_raw(f, header, sizeof(header)); if (memcmp(header, "FBUF", 4)) fatal("%s: file created with an older version of NVC", f->fname); @@ -219,6 +228,34 @@ static void fbuf_decompress(fbuf_t *f) fatal("%s has was created with unexpected checksum algorithm %c", f->fname, header[5]); + struct stat buf; + if (fstat(fileno(f->file), &buf) != 0) + fatal_errno("fstat"); + + size_t bufsz; + uint8_t *rmap = NULL; + if (S_ISFIFO(buf.st_mode)) { + rmap = xmalloc((bufsz = 16384)); + memcpy(rmap, header, sizeof(header)); + + size_t wptr = sizeof(header); + for (;;) { + const int nr = fread(rmap + wptr, 1, bufsz - wptr, f->file); + if (nr < 0) + fatal_errno("%s", f->fname); + else if (nr == 0) + break; + else if (wptr + nr == bufsz) + rmap = xrealloc(rmap, (bufsz *= 2)); + + wptr += nr; + } + + memcpy(header + 8, rmap + wptr - 8, 8); // Update header + } + else + rmap = map_file(fileno(f->file), (bufsz = buf.st_size)); + const uint32_t len = UNPACK_BE32(header + 8); const uint32_t checksum = UNPACK_BE32(header + 12); @@ -233,7 +270,7 @@ static void fbuf_decompress(fbuf_t *f) src += sizeof(uint32_t); - if (src + blksz > (uint8_t *)rmap + buf.st_size) + if (src + blksz > (uint8_t *)rmap + bufsz) fatal_trace("read past end of compressed file %s", f->fname); const int ret = fastlz_decompress(src, blksz, dst, SPILL_SIZE); @@ -246,20 +283,21 @@ static void fbuf_decompress(fbuf_t *f) src += blksz; } - unmap_file(rmap, buf.st_size); + if (S_ISFIFO(buf.st_mode)) + free(rmap); + else + unmap_file(rmap, buf.st_size); } -fbuf_t *fbuf_open(const char *file, fbuf_mode_t mode, fbuf_cs_t csum) +static fbuf_t *fbuf_new(FILE *file, char *fname, fbuf_mode_t mode, + fbuf_cs_t csum, fbuf_zip_t zip) { - FILE *h = fopen(file, mode == FBUF_OUT ? "wb" : "rb"); - if (h == NULL) - return NULL; - fbuf_t *f = xcalloc(sizeof(struct _fbuf)); - f->file = h; - f->fname = xstrdup(file); + f->file = file; + f->fname = fname; f->mode = mode; f->next = open_list; + f->zip = zip; checksum_init(&(f->checksum), csum); @@ -276,6 +314,24 @@ fbuf_t *fbuf_open(const char *file, fbuf_mode_t mode, fbuf_cs_t csum) return (open_list = f); } +fbuf_t *fbuf_open(const char *file, fbuf_mode_t mode, fbuf_cs_t csum) +{ + FILE *h = fopen(file, mode == FBUF_OUT ? "wb" : "rb"); + if (h == NULL) + return NULL; + + return fbuf_new(h, xstrdup(file), mode, csum, FBUF_ZIP_FASTLZ); +} + +fbuf_t *fbuf_fdopen(int fd, fbuf_mode_t mode, fbuf_cs_t csum) +{ + FILE *h = fdopen(fd, mode == FBUF_OUT ? "wb" : "rb"); + if (h == NULL) + return NULL; + + return fbuf_new(h, xasprintf("", fd), mode, csum, FBUF_ZIP_FASTLZ); +} + const char *fbuf_file_name(fbuf_t *f) { return f->fname; diff --git a/src/fbuf.h b/src/fbuf.h index 3108d6e8..1ea8e0c9 100644 --- a/src/fbuf.h +++ b/src/fbuf.h @@ -34,7 +34,13 @@ typedef enum { FBUF_CS_ADLER32 = 'A', } fbuf_cs_t; +typedef enum { + FBUF_ZIP_NONE = '-', + FBUF_ZIP_FASTLZ = 'F', +} fbuf_zip_t; + fbuf_t *fbuf_open(const char *file, fbuf_mode_t mode, fbuf_cs_t csum); +fbuf_t *fbuf_fdopen(int fd, fbuf_mode_t mode, fbuf_cs_t csum); void fbuf_close(fbuf_t *f, uint32_t *checksum); void fbuf_cleanup(void); const char *fbuf_file_name(fbuf_t *f); diff --git a/src/util.c b/src/util.c index 7d1bf73d..00a4b71a 100644 --- a/src/util.c +++ b/src/util.c @@ -22,6 +22,7 @@ #include #include #include +#include #endif #include "util.h" @@ -47,6 +48,7 @@ #include #include #include +#include #include #include @@ -64,7 +66,6 @@ #include #include #include -#include #include #endif @@ -1587,6 +1588,21 @@ uint64_t get_timestamp_us() #endif } +void open_pipe(int *rfd, int *wfd) +{ + int fds[2]; +#ifdef __MINGW32__ + const int rc = _pipe(fds, 4096, _O_BINARY); +#else + const int rc = pipe(fds) < 0; +#endif + if (rc < 0) + fatal_errno("failed to create pipe"); + + *rfd = fds[0]; + *wfd = fds[1]; +} + #if defined _WIN32 || defined __CYGWIN__ static struct { char illegal; diff --git a/src/util.h b/src/util.h index 1e87a71e..f569c086 100644 --- a/src/util.h +++ b/src/util.h @@ -272,6 +272,7 @@ char *search_path(const char *name); void get_libexec_dir(text_buf_t *tb); void get_lib_dir(text_buf_t *tb); bool get_exe_path(text_buf_t *tb); +void open_pipe(int *rfd, int *wfd); struct cpu_state; void capture_registers(struct cpu_state *cpu); diff --git a/test/test_misc.c b/test/test_misc.c index d8858831..9f546f49 100644 --- a/test/test_misc.c +++ b/test/test_misc.c @@ -16,9 +16,11 @@ // #include "test_util.h" +#include "fbuf.h" #include "hash.h" -#include "mask.h" #include "ident.h" +#include "mask.h" +#include "opt.h" #include "rt/heap.h" #include "thread.h" @@ -28,6 +30,7 @@ #include #include #include +#include #define VOIDP(x) ((void *)(uintptr_t)x) @@ -477,7 +480,34 @@ START_TEST(test_threads) thread_join(threads[i]); ck_assert_int_eq(counter, N * 10000); +} +END_TEST + +START_TEST(test_fbuf_pipe) +{ + opt_set_int(OPT_ERROR_LIMIT, -1); + int rfd, wfd; + open_pipe(&rfd, &wfd); + + fbuf_t *w = fbuf_fdopen(wfd, FBUF_OUT, FBUF_CS_ADLER32); + fbuf_put_int(w, 42); + for (int i = 0; i < 10000; i++) + fbuf_put_int(w, i); + + uint32_t wcsum; + fbuf_close(w, &wcsum); + + fbuf_t *r = fbuf_fdopen(rfd, FBUF_IN, FBUF_CS_ADLER32); + ck_assert_int_eq(fbuf_get_int(r), 42); + for (int i = 0; i < 10000; i++) + ck_assert_int_eq(fbuf_get_int(r), i); + + uint32_t rcsum; + fbuf_close(r, &rcsum); + + ck_assert_int_eq(rcsum, wcsum); + ck_assert_int_eq(close(rfd), -1); } END_TEST @@ -522,5 +552,9 @@ Suite *get_misc_tests(void) tcase_add_test(tc_heap, test_threads); suite_add_tcase(s, tc_thread); + TCase *tc_fbuf = tcase_create("fbuf"); + tcase_add_test(tc_fbuf, test_fbuf_pipe); + suite_add_tcase(s, tc_fbuf); + return s; } -- 2.39.2