--- a/ucx/buffer.c Sat Feb 22 18:10:36 2025 +0100 +++ b/ucx/buffer.c Sun Feb 23 14:28:47 2025 +0100 @@ -205,8 +205,8 @@ static size_t cx_buffer_flush_helper( const CxBuffer *buffer, + const unsigned char *src, size_t size, - const unsigned char *src, size_t nitems ) { // flush data from an arbitrary source @@ -236,7 +236,7 @@ unsigned char *space = buffer->bytes; size_t remaining = buffer->pos / size; size_t flushed_total = cx_buffer_flush_helper( - buffer, size, space, remaining); + buffer, space, size, remaining); // shift the buffer left after flushing // IMPORTANT: up to this point, copy on write must have been @@ -268,17 +268,18 @@ return nitems; } - size_t len; + size_t len, total_flushed = 0; +cx_buffer_write_retry: if (cx_szmul(size, nitems, &len)) { errno = EOVERFLOW; - return 0; + return total_flushed; } if (buffer->pos > SIZE_MAX - len) { errno = EOVERFLOW; - return 0; + return total_flushed; } + size_t required = buffer->pos + len; - bool perform_flush = false; if (required > buffer->capacity) { if (buffer->flags & CX_BUFFER_AUTO_EXTEND) { @@ -286,7 +287,7 @@ perform_flush = true; } else { if (cxBufferMinimumCapacity(buffer, required)) { - return 0; // LCOV_EXCL_LINE + return total_flushed; // LCOV_EXCL_LINE } } } else { @@ -305,7 +306,7 @@ // check here and not above because of possible truncation if (len == 0) { - return 0; + return total_flushed; } // check if we need to copy @@ -313,26 +314,43 @@ // perform the operation if (perform_flush) { - size_t items_flush; + size_t items_flushed; if (buffer->pos == 0) { // if we don't have data in the buffer, but are instructed // to flush, it means that we are supposed to relay the data - items_flush = cx_buffer_flush_helper(buffer, size, ptr, nitems); - if (items_flush == 0) { - // we needed to flush, but could not flush anything - // give up and avoid endless trying + items_flushed = cx_buffer_flush_helper(buffer, ptr, size, nitems); + if (items_flushed == 0) { + // we needed to relay data, but could not flush anything + // i.e. we have to give up to avoid endless trying return 0; } - size_t ritems = nitems - items_flush; - const unsigned char *rest = ptr; - rest += items_flush * size; - return items_flush + cxBufferWrite(rest, size, ritems, buffer); + nitems -= items_flushed; + total_flushed += items_flushed; + if (nitems > 0) { + ptr = ((unsigned char*)ptr) + items_flushed * size; + goto cx_buffer_write_retry; + } + return total_flushed; } else { - items_flush = cx_buffer_flush_impl(buffer, size); - if (items_flush == 0) { - return 0; + items_flushed = cx_buffer_flush_impl(buffer, size); + if (items_flushed == 0) { + // flush target is full, let's try to truncate + size_t remaining_space; + if (buffer->flags & CX_BUFFER_AUTO_EXTEND) { + remaining_space = buffer->flush->threshold > buffer->pos + ? buffer->flush->threshold - buffer->pos + : 0; + } else { + remaining_space = buffer->capacity > buffer->pos + ? buffer->capacity - buffer->pos + : 0; + } + nitems = remaining_space / size; + if (nitems == 0) { + return total_flushed; + } } - return cxBufferWrite(ptr, size, nitems, buffer); + goto cx_buffer_write_retry; } } else { memcpy(buffer->bytes + buffer->pos, ptr, len); @@ -340,9 +358,8 @@ if (buffer->pos > buffer->size) { buffer->size = buffer->pos; } - return nitems; + return total_flushed + nitems; } - } size_t cxBufferAppend( @@ -352,9 +369,19 @@ CxBuffer *buffer ) { size_t pos = buffer->pos; - buffer->pos = buffer->size; + size_t append_pos = buffer->size; + buffer->pos = append_pos; size_t written = cxBufferWrite(ptr, size, nitems, buffer); - buffer->pos = pos; + // the buffer might have been flushed + // we must compute a possible delta for the position + // expected: pos = append_pos + written + // -> if this is not the case, there is a delta + size_t delta = append_pos + written*size - buffer->pos; + if (delta > pos) { + buffer->pos = 0; + } else { + buffer->pos = pos - delta; + } return written; }