ucx/buffer.c

changeset 471
063a9f29098c
parent 440
7c4b9cba09ca
--- a/ucx/buffer.c	Sat Feb 22 18:10:36 2025 +0100
+++ b/ucx/buffer.c	Sun Feb 23 14:28:47 2025 +0100
@@ -205,8 +205,8 @@
 
 static size_t cx_buffer_flush_helper(
         const CxBuffer *buffer,
+        const unsigned char *src,
         size_t size,
-        const unsigned char *src,
         size_t nitems
 ) {
     // flush data from an arbitrary source
@@ -236,7 +236,7 @@
     unsigned char *space = buffer->bytes;
     size_t remaining = buffer->pos / size;
     size_t flushed_total = cx_buffer_flush_helper(
-        buffer, size, space, remaining);
+        buffer, space, size, remaining);
 
     // shift the buffer left after flushing
     // IMPORTANT: up to this point, copy on write must have been
@@ -268,17 +268,18 @@
         return nitems;
     }
 
-    size_t len;
+    size_t len, total_flushed = 0;
+cx_buffer_write_retry:
     if (cx_szmul(size, nitems, &len)) {
         errno = EOVERFLOW;
-        return 0;
+        return total_flushed;
     }
     if (buffer->pos > SIZE_MAX - len) {
         errno = EOVERFLOW;
-        return 0;
+        return total_flushed;
     }
+
     size_t required = buffer->pos + len;
-
     bool perform_flush = false;
     if (required > buffer->capacity) {
         if (buffer->flags & CX_BUFFER_AUTO_EXTEND) {
@@ -286,7 +287,7 @@
                 perform_flush = true;
             } else {
                 if (cxBufferMinimumCapacity(buffer, required)) {
-                    return 0; // LCOV_EXCL_LINE
+                    return total_flushed; // LCOV_EXCL_LINE
                 }
             }
         } else {
@@ -305,7 +306,7 @@
 
     // check here and not above because of possible truncation
     if (len == 0) {
-        return 0;
+        return total_flushed;
     }
 
     // check if we need to copy
@@ -313,26 +314,43 @@
 
     // perform the operation
     if (perform_flush) {
-        size_t items_flush;
+        size_t items_flushed;
         if (buffer->pos == 0) {
             // if we don't have data in the buffer, but are instructed
             // to flush, it means that we are supposed to relay the data
-            items_flush = cx_buffer_flush_helper(buffer, size, ptr, nitems);
-            if (items_flush == 0) {
-                // we needed to flush, but could not flush anything
-                // give up and avoid endless trying
+            items_flushed = cx_buffer_flush_helper(buffer, ptr, size, nitems);
+            if (items_flushed == 0) {
+                // we needed to relay data, but could not flush anything
+                // i.e. we have to give up to avoid endless trying
                 return 0;
             }
-            size_t ritems = nitems - items_flush;
-            const unsigned char *rest = ptr;
-            rest += items_flush * size;
-            return items_flush + cxBufferWrite(rest, size, ritems, buffer);
+            nitems -= items_flushed;
+            total_flushed += items_flushed;
+            if (nitems > 0) {
+                ptr = ((unsigned char*)ptr) + items_flushed * size;
+                goto cx_buffer_write_retry;
+            }
+            return total_flushed;
         } else {
-            items_flush = cx_buffer_flush_impl(buffer, size);
-            if (items_flush == 0) {
-                return 0;
+            items_flushed = cx_buffer_flush_impl(buffer, size);
+            if (items_flushed == 0) {
+                // flush target is full, let's try to truncate
+                size_t remaining_space;
+                if (buffer->flags & CX_BUFFER_AUTO_EXTEND) {
+                    remaining_space = buffer->flush->threshold > buffer->pos
+                                          ? buffer->flush->threshold - buffer->pos
+                                          : 0;
+                } else {
+                    remaining_space = buffer->capacity > buffer->pos
+                                          ? buffer->capacity - buffer->pos
+                                          : 0;
+                }
+                nitems = remaining_space / size;
+                if (nitems == 0) {
+                    return total_flushed;
+                }
             }
-            return cxBufferWrite(ptr, size, nitems, buffer);
+            goto cx_buffer_write_retry;
         }
     } else {
         memcpy(buffer->bytes + buffer->pos, ptr, len);
@@ -340,9 +358,8 @@
         if (buffer->pos > buffer->size) {
             buffer->size = buffer->pos;
         }
-        return nitems;
+        return total_flushed + nitems;
     }
-
 }
 
 size_t cxBufferAppend(
@@ -352,9 +369,19 @@
         CxBuffer *buffer
 ) {
     size_t pos = buffer->pos;
-    buffer->pos = buffer->size;
+    size_t append_pos = buffer->size;
+    buffer->pos = append_pos;    
     size_t written = cxBufferWrite(ptr, size, nitems, buffer);
-    buffer->pos = pos;
+    // the buffer might have been flushed
+    // we must compute a possible delta for the position
+    // expected: pos = append_pos + written
+    // -> if this is not the case, there is a delta
+    size_t delta = append_pos + written*size - buffer->pos;
+    if (delta > pos) {
+        buffer->pos = 0;
+    } else {
+        buffer->pos = pos - delta;
+    }
     return written;
 }
 

mercurial