diff -r 3da24640513a -r e57ca2747782 ucx/buffer.c --- a/ucx/buffer.c Sun Dec 07 20:00:33 2025 +0100 +++ b/ucx/buffer.c Sat Dec 13 15:58:58 2025 +0100 @@ -32,28 +32,10 @@ #include #include -#ifdef _WIN32 -#include -#include -static unsigned long system_page_size() { - static unsigned long ps = 0; - if (ps == 0) { - SYSTEM_INFO sysinfo; - GetSystemInfo(&sysinfo); - ps = sysinfo.dwPageSize; - } - return ps; -} -#define SYSTEM_PAGE_SIZE system_page_size() -#else -#include -#define SYSTEM_PAGE_SIZE sysconf(_SC_PAGESIZE) -#endif - static int buffer_copy_on_write(CxBuffer* buffer) { if (0 == (buffer->flags & CX_BUFFER_COPY_ON_WRITE)) return 0; void *newspace = cxMalloc(buffer->allocator, buffer->capacity); - if (NULL == newspace) return -1; + if (NULL == newspace) return -1; // LCOV_EXCL_LINE memcpy(newspace, buffer->space, buffer->size); buffer->space = newspace; buffer->flags &= ~CX_BUFFER_COPY_ON_WRITE; @@ -78,37 +60,24 @@ buffer->flags = flags; if (!space) { buffer->bytes = cxMalloc(allocator, capacity); - if (buffer->bytes == NULL) { - return -1; // LCOV_EXCL_LINE - } + if (buffer->bytes == NULL) return -1; // LCOV_EXCL_LINE buffer->flags |= CX_BUFFER_FREE_CONTENTS; } else { buffer->bytes = space; } buffer->capacity = capacity; + buffer->max_capacity = SIZE_MAX; buffer->size = 0; buffer->pos = 0; - buffer->flush = NULL; - return 0; } -int cxBufferEnableFlushing( - CxBuffer *buffer, - CxBufferFlushConfig config -) { - buffer->flush = cxMallocDefault(sizeof(CxBufferFlushConfig)); - if (buffer->flush == NULL) return -1; // LCOV_EXCL_LINE - memcpy(buffer->flush, &config, sizeof(CxBufferFlushConfig)); - return 0; -} - void cxBufferDestroy(CxBuffer *buffer) { - if (buffer->flags & CX_BUFFER_FREE_CONTENTS) { + if ((buffer->flags & (CX_BUFFER_FREE_CONTENTS | CX_BUFFER_DO_NOT_FREE)) + == CX_BUFFER_FREE_CONTENTS) { cxFree(buffer->allocator, buffer->bytes); } - cxFreeDefault(buffer->flush); memset(buffer, 0, sizeof(CxBuffer)); } @@ -122,7 +91,7 @@ allocator = cxDefaultAllocator; } CxBuffer *buf = cxMalloc(allocator, sizeof(CxBuffer)); - if (buf == NULL) return NULL; + if (buf == NULL) return NULL; // LCOV_EXCL_LINE if (0 == cxBufferInit(buf, space, capacity, allocator, flags)) { return buf; } else { @@ -183,6 +152,35 @@ } +size_t cxBufferPop(CxBuffer *buffer, size_t size, size_t nitems) { + size_t len; + if (cx_szmul(size, nitems, &len)) { + // LCOV_EXCL_START + errno = EOVERFLOW; + return 0; + // LCOV_EXCL_STOP + } + if (len == 0) return 0; + if (len > buffer->size) { + if (size == 1) { + // simple case: everything can be discarded + len = buffer->size; + } else { + // complicated case: misaligned bytes must stay + size_t misalignment = buffer->size % size; + len = buffer->size - misalignment; + } + } + buffer->size -= len; + + // adjust position, if required + if (buffer->pos > buffer->size) { + buffer->pos = buffer->size; + } + + return len / size; +} + void cxBufferClear(CxBuffer *buffer) { if (0 == (buffer->flags & CX_BUFFER_COPY_ON_WRITE)) { memset(buffer->bytes, 0, buffer->size); @@ -200,36 +198,13 @@ return buffer->pos >= buffer->size; } -int cxBufferMinimumCapacity( - CxBuffer *buffer, - size_t newcap -) { - if (newcap <= buffer->capacity) { +int cxBufferReserve(CxBuffer *buffer, size_t newcap) { + if (newcap == buffer->capacity) { return 0; } - - unsigned long pagesize = SYSTEM_PAGE_SIZE; - // if page size is larger than 64 KB - for some reason - truncate to 64 KB - if (pagesize > 65536) pagesize = 65536; - if (newcap < pagesize) { - // when smaller as one page, map to the next power of two - newcap--; - newcap |= newcap >> 1; - newcap |= newcap >> 2; - newcap |= newcap >> 4; - // last operation only needed for pages larger 4096 bytes - // but if/else would be more expensive than just doing this - newcap |= newcap >> 8; - newcap++; - } else { - // otherwise, map to a multiple of the page size - newcap -= newcap % pagesize; - newcap += pagesize; - // note: if newcap is already page aligned, - // this gives a full additional page (which is good) + if (newcap > buffer->max_capacity) { + return -1; } - - const int force_copy_flags = CX_BUFFER_COPY_ON_WRITE | CX_BUFFER_COPY_ON_EXTEND; if (buffer->flags & force_copy_flags) { void *newspace = cxMalloc(buffer->allocator, newcap); @@ -242,13 +217,60 @@ return 0; } else if (cxReallocate(buffer->allocator, (void **) &buffer->bytes, newcap) == 0) { + buffer->flags |= CX_BUFFER_FREE_CONTENTS; buffer->capacity = newcap; + if (buffer->size > newcap) { + buffer->size = newcap; + } return 0; } else { return -1; // LCOV_EXCL_LINE } } +int cxBufferMaximumCapacity(CxBuffer *buffer, size_t capacity) { + if (capacity < buffer->capacity) { + return -1; + } + buffer->max_capacity = capacity; + return 0; +} + +int cxBufferMinimumCapacity(CxBuffer *buffer, size_t newcap) { + if (newcap <= buffer->capacity) { + return 0; + } + if (newcap > buffer->max_capacity) { + return -1; + } + if (newcap < buffer->max_capacity) { + unsigned long pagesize = cx_system_page_size(); + // if page size is larger than 64 KB - for some reason - truncate to 64 KB + if (pagesize > 65536) pagesize = 65536; + if (newcap < pagesize) { + // when smaller as one page, map to the next power of two + newcap--; + newcap |= newcap >> 1; + newcap |= newcap >> 2; + newcap |= newcap >> 4; + // last operation only needed for pages larger 4096 bytes + // but if/else would be more expensive than just doing this + newcap |= newcap >> 8; + newcap++; + } else { + // otherwise, map to a multiple of the page size + newcap -= newcap % pagesize; + newcap += pagesize; + // note: if newcap is already page aligned, + // this gives a full additional page (which is good) + } + if (newcap > buffer->max_capacity) { + newcap = buffer->max_capacity; + } + } + return cxBufferReserve(buffer, newcap); +} + void cxBufferShrink( CxBuffer *buffer, size_t reserve @@ -271,60 +293,15 @@ } } -static size_t cx_buffer_flush_helper( - const CxBuffer *buffer, - const unsigned char *src, - size_t size, - size_t nitems -) { - // flush data from an arbitrary source - // does not need to be the buffer's contents - size_t max_items = buffer->flush->blksize / size; - size_t fblocks = 0; - size_t flushed_total = 0; - while (nitems > 0 && fblocks < buffer->flush->blkmax) { - fblocks++; - size_t items = nitems > max_items ? max_items : nitems; - size_t flushed = buffer->flush->wfunc( - src, size, items, buffer->flush->target); - if (flushed > 0) { - flushed_total += flushed; - src += flushed * size; - nitems -= flushed; - } else { - // if no bytes can be flushed out anymore, we give up - break; - } - } - return flushed_total; -} - -static size_t cx_buffer_flush_impl(CxBuffer *buffer, size_t size) { - // flush the current contents of the buffer - unsigned char *space = buffer->bytes; - size_t remaining = buffer->pos / size; - size_t flushed_total = cx_buffer_flush_helper( - buffer, space, size, remaining); - - // shift the buffer left after flushing - // IMPORTANT: up to this point, copy on write must have been - // performed already, because we can't do error handling here - cxBufferShiftLeft(buffer, flushed_total*size); - - return flushed_total; -} - -size_t cxBufferFlush(CxBuffer *buffer) { - if (buffer_copy_on_write(buffer)) return 0; - return cx_buffer_flush_impl(buffer, 1); -} - size_t cxBufferWrite( const void *ptr, size_t size, size_t nitems, CxBuffer *buffer ) { + // trivial case + if (size == 0 || nitems == 0) return 0; + // optimize for easy case if (size == 1 && (buffer->capacity - buffer->pos) >= nitems) { if (buffer_copy_on_write(buffer)) return 0; @@ -336,98 +313,52 @@ return nitems; } - size_t len, total_flushed = 0; -cx_buffer_write_retry: + size_t len; if (cx_szmul(size, nitems, &len)) { errno = EOVERFLOW; - return total_flushed; + return 0; } if (buffer->pos > SIZE_MAX - len) { errno = EOVERFLOW; - return total_flushed; + return 0; } + const size_t required = buffer->pos + len; - size_t required = buffer->pos + len; - bool perform_flush = false; + // check if we need to auto-extend if (required > buffer->capacity) { if (buffer->flags & CX_BUFFER_AUTO_EXTEND) { - if (buffer->flush != NULL && required > buffer->flush->threshold) { - perform_flush = true; - } else { - if (cxBufferMinimumCapacity(buffer, required)) { - return total_flushed; // LCOV_EXCL_LINE - } - } - } else { - if (buffer->flush != NULL) { - perform_flush = true; - } else { - // truncate data, if we can neither extend nor flush - len = buffer->capacity - buffer->pos; - if (size > 1) { - len -= len % size; - } - nitems = len / size; + size_t newcap = required < buffer->max_capacity + ? required : buffer->max_capacity; + if (cxBufferMinimumCapacity(buffer, newcap)) { + return 0; // LCOV_EXCL_LINE } } } + // check again and truncate data if capacity is still not enough + if (required > buffer->capacity) { + len = buffer->capacity - buffer->pos; + if (size > 1) { + len -= len % size; + } + nitems = len / size; + } + // check here and not above because of possible truncation if (len == 0) { - return total_flushed; + return 0; } // check if we need to copy if (buffer_copy_on_write(buffer)) return 0; // perform the operation - if (perform_flush) { - size_t items_flushed; - if (buffer->pos == 0) { - // if we don't have data in the buffer, but are instructed - // to flush, it means that we are supposed to relay the data - items_flushed = cx_buffer_flush_helper(buffer, ptr, size, nitems); - if (items_flushed == 0) { - // we needed to relay data, but could not flush anything - // i.e. we have to give up to avoid endless trying - return 0; - } - nitems -= items_flushed; - total_flushed += items_flushed; - if (nitems > 0) { - ptr = ((unsigned char*)ptr) + items_flushed * size; - goto cx_buffer_write_retry; - } - return total_flushed; - } else { - items_flushed = cx_buffer_flush_impl(buffer, size); - if (items_flushed == 0) { - // flush target is full, let's try to truncate - size_t remaining_space; - if (buffer->flags & CX_BUFFER_AUTO_EXTEND) { - remaining_space = buffer->flush->threshold > buffer->pos - ? buffer->flush->threshold - buffer->pos - : 0; - } else { - remaining_space = buffer->capacity > buffer->pos - ? buffer->capacity - buffer->pos - : 0; - } - nitems = remaining_space / size; - if (nitems == 0) { - return total_flushed; - } - } - goto cx_buffer_write_retry; - } - } else { - memcpy(buffer->bytes + buffer->pos, ptr, len); - buffer->pos += len; - if (buffer->pos > buffer->size) { - buffer->size = buffer->pos; - } - return total_flushed + nitems; + memcpy(buffer->bytes + buffer->pos, ptr, len); + buffer->pos += len; + if (buffer->pos > buffer->size) { + buffer->size = buffer->pos; } + return nitems; } size_t cxBufferAppend( @@ -436,20 +367,13 @@ size_t nitems, CxBuffer *buffer ) { - size_t pos = buffer->pos; - size_t append_pos = buffer->size; - buffer->pos = append_pos; - size_t written = cxBufferWrite(ptr, size, nitems, buffer); - // the buffer might have been flushed - // we must compute a possible delta for the position - // expected: pos = append_pos + written - // -> if this is not the case, there is a delta - size_t delta = append_pos + written*size - buffer->pos; - if (delta > pos) { - buffer->pos = 0; - } else { - buffer->pos = pos - delta; - } + // trivial case + if (size == 0 || nitems == 0) return 0; + + const size_t pos = buffer->pos; + buffer->pos = buffer->size; + const size_t written = cxBufferWrite(ptr, size, nitems, buffer); + buffer->pos = pos; return written; } @@ -467,19 +391,35 @@ } int cxBufferTerminate(CxBuffer *buffer) { - if (0 == cxBufferPut(buffer, 0)) { - buffer->size = buffer->pos - 1; - return 0; + // try to extend / shrink the buffer + if (buffer->pos >= buffer->capacity) { + if ((buffer->flags & CX_BUFFER_AUTO_EXTEND) == 0) { + return -1; + } + if (cxBufferReserve(buffer, buffer->pos + 1)) { + return -1; // LCOV_EXCL_LINE + } } else { - return -1; + buffer->size = buffer->pos; + cxBufferShrink(buffer, 1); + // set the capacity explicitly, in case shrink was skipped due to CoW + buffer->capacity = buffer->size + 1; } + + // check if we are still on read-only memory + if (buffer_copy_on_write(buffer)) return -1; + + // write the terminator and exit + buffer->space[buffer->pos] = '\0'; + return 0; } -size_t cxBufferPutString( - CxBuffer *buffer, - const char *str -) { - return cxBufferWrite(str, 1, strlen(str), buffer); +size_t cx_buffer_put_string(CxBuffer *buffer, cxstring str) { + return cxBufferWrite(str.ptr, 1, str.length, buffer); +} + +size_t cx_buffer_append_string(CxBuffer *buffer, cxstring str) { + return cxBufferAppend(str.ptr, 1, str.length, buffer); } size_t cxBufferRead(