ucx/buffer.c

changeset 431
bb7da585debc
parent 324
ce13a778654a
child 440
7c4b9cba09ca
--- a/ucx/buffer.c	Sun May 23 09:44:43 2021 +0200
+++ b/ucx/buffer.c	Sat Jan 04 16:38:48 2025 +0100
@@ -1,7 +1,7 @@
 /*
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
  *
- * Copyright 2017 Mike Becker, Olaf Wintermann All rights reserved.
+ * Copyright 2021 Mike Becker, Olaf Wintermann All rights reserved.
  *
  * Redistribution and use in source and binary forms, with or without
  * modification, are permitted provided that the following conditions are met:
@@ -26,90 +26,100 @@
  * POSSIBILITY OF SUCH DAMAGE.
  */
 
-#include "ucx/buffer.h"
+#include "cx/buffer.h"
+#include "cx/utils.h"
 
-#include <stdarg.h>
-#include <stdlib.h>
+#include <stdio.h>
 #include <string.h>
 
-UcxBuffer *ucx_buffer_new(void *space, size_t capacity, int flags) {
-    UcxBuffer *buffer = (UcxBuffer*) malloc(sizeof(UcxBuffer));
-    if (buffer) {
-        buffer->flags = flags;
-        if (!space) {
-            buffer->space = (char*)malloc(capacity);
-            if (!buffer->space) {
-                free(buffer);
-                return NULL;
-            }
-            memset(buffer->space, 0, capacity);
-            buffer->flags |= UCX_BUFFER_AUTOFREE;
-        } else {
-            buffer->space = (char*)space;
+int cxBufferInit(
+        CxBuffer *buffer,
+        void *space,
+        size_t capacity,
+        const CxAllocator *allocator,
+        int flags
+) {
+    if (allocator == NULL) allocator = cxDefaultAllocator;
+    buffer->allocator = allocator;
+    buffer->flags = flags;
+    if (!space) {
+        buffer->bytes = cxMalloc(allocator, capacity);
+        if (buffer->bytes == NULL) {
+            return 1;
         }
-        buffer->capacity = capacity;
-        buffer->size = 0;
-
-        buffer->pos = 0;
+        buffer->flags |= CX_BUFFER_FREE_CONTENTS;
+    } else {
+        buffer->bytes = space;
     }
+    buffer->capacity = capacity;
+    buffer->size = 0;
+    buffer->pos = 0;
 
-    return buffer;
-}
+    buffer->flush_func = NULL;
+    buffer->flush_target = NULL;
+    buffer->flush_blkmax = 0;
+    buffer->flush_blksize = 4096;
+    buffer->flush_threshold = SIZE_MAX;
 
-void ucx_buffer_free(UcxBuffer *buffer) {
-    if ((buffer->flags & UCX_BUFFER_AUTOFREE) == UCX_BUFFER_AUTOFREE) {
-        free(buffer->space);
-    }
-    free(buffer);
+    return 0;
 }
 
-UcxBuffer* ucx_buffer_extract(
-        UcxBuffer *src, size_t start, size_t length, int flags) {
-    if (src->size == 0 || length == 0 ||
-        ((size_t)-1) - start < length || start+length > src->capacity)
-    {
+void cxBufferDestroy(CxBuffer *buffer) {
+    if ((buffer->flags & CX_BUFFER_FREE_CONTENTS) == CX_BUFFER_FREE_CONTENTS) {
+        cxFree(buffer->allocator, buffer->bytes);
+    }
+}
+
+CxBuffer *cxBufferCreate(
+        void *space,
+        size_t capacity,
+        const CxAllocator *allocator,
+        int flags
+) {
+    CxBuffer *buf = cxMalloc(allocator, sizeof(CxBuffer));
+    if (buf == NULL) return NULL;
+    if (0 == cxBufferInit(buf, space, capacity, allocator, flags)) {
+        return buf;
+    } else {
+        cxFree(allocator, buf);
         return NULL;
     }
+}
 
-    UcxBuffer *dst = (UcxBuffer*) malloc(sizeof(UcxBuffer));
-    if (dst) {
-        dst->space = (char*)malloc(length);
-        if (!dst->space) {
-            free(dst);
-            return NULL;
-        }
-        dst->capacity = length;
-        dst->size = length;
-        dst->flags = flags | UCX_BUFFER_AUTOFREE;
-        dst->pos = 0;
-        memcpy(dst->space, src->space+start, length);
+void cxBufferFree(CxBuffer *buffer) {
+    if ((buffer->flags & CX_BUFFER_FREE_CONTENTS) == CX_BUFFER_FREE_CONTENTS) {
+        cxFree(buffer->allocator, buffer->bytes);
     }
-    return dst;
+    cxFree(buffer->allocator, buffer);
 }
 
-int ucx_buffer_seek(UcxBuffer *buffer, off_t offset, int whence) {
+int cxBufferSeek(
+        CxBuffer *buffer,
+        off_t offset,
+        int whence
+) {
     size_t npos;
     switch (whence) {
-    case SEEK_CUR:
-        npos = buffer->pos;
-        break;
-    case SEEK_END:
-        npos = buffer->size;
-        break;
-    case SEEK_SET:
-        npos = 0;
-        break;
-    default:
-        return -1;
+        case SEEK_CUR:
+            npos = buffer->pos;
+            break;
+        case SEEK_END:
+            npos = buffer->size;
+            break;
+        case SEEK_SET:
+            npos = 0;
+            break;
+        default:
+            return -1;
     }
 
     size_t opos = npos;
     npos += offset;
-    
+
     if ((offset > 0 && npos < opos) || (offset < 0 && npos > opos)) {
         return -1;
     }
-    
+
     if (npos >= buffer->size) {
         return -1;
     } else {
@@ -119,135 +129,242 @@
 
 }
 
-int ucx_buffer_eof(UcxBuffer *buffer) {
+void cxBufferClear(CxBuffer *buffer) {
+    memset(buffer->bytes, 0, buffer->size);
+    buffer->size = 0;
+    buffer->pos = 0;
+}
+
+void cxBufferReset(CxBuffer *buffer) {
+    buffer->size = 0;
+    buffer->pos = 0;
+}
+
+int cxBufferEof(const CxBuffer *buffer) {
     return buffer->pos >= buffer->size;
 }
 
-int ucx_buffer_extend(UcxBuffer *buffer, size_t len) {
-    size_t newcap = buffer->capacity;
-    
-    if (buffer->capacity + len < buffer->capacity) {
-        return -1;
+int cxBufferMinimumCapacity(
+        CxBuffer *buffer,
+        size_t newcap
+) {
+    if (newcap <= buffer->capacity) {
+        return 0;
     }
-    
-    while (buffer->capacity + len > newcap) {
-        newcap <<= 1;
-        if (newcap < buffer->capacity) {
-            return -1;
-        }
-    }
-    
-    char *newspace = (char*)realloc(buffer->space, newcap);
-    if (newspace) {
-        memset(newspace+buffer->size, 0, newcap-buffer->size);
-        buffer->space = newspace;
+
+    if (cxReallocate(buffer->allocator,
+                     (void **) &buffer->bytes, newcap) == 0) {
         buffer->capacity = newcap;
+        return 0;
     } else {
         return -1;
     }
-    
-    return 0;
 }
 
-size_t ucx_buffer_write(const void *ptr, size_t size, size_t nitems,
-        UcxBuffer *buffer) {
+/**
+ * Helps flushing data to the flush target of a buffer.
+ *
+ * @param buffer the buffer containing the config
+ * @param space the data to flush
+ * @param size the element size
+ * @param nitems the number of items
+ * @return the number of items flushed
+ */
+static size_t cx_buffer_write_flush_helper(
+        CxBuffer *buffer,
+        const unsigned char *space,
+        size_t size,
+        size_t nitems
+) {
+    size_t pos = 0;
+    size_t remaining = nitems;
+    size_t max_items = buffer->flush_blksize / size;
+    while (remaining > 0) {
+        size_t items = remaining > max_items ? max_items : remaining;
+        size_t flushed = buffer->flush_func(
+                space + pos,
+                size, items,
+                buffer->flush_target);
+        if (flushed > 0) {
+            pos += (flushed * size);
+            remaining -= flushed;
+        } else {
+            // if no bytes can be flushed out anymore, we give up
+            break;
+        }
+    }
+    return nitems - remaining;
+}
+
+size_t cxBufferWrite(
+        const void *ptr,
+        size_t size,
+        size_t nitems,
+        CxBuffer *buffer
+) {
+    // optimize for easy case
+    if (size == 1 && (buffer->capacity - buffer->pos) >= nitems) {
+        memcpy(buffer->bytes + buffer->pos, ptr, nitems);
+        buffer->pos += nitems;
+        if (buffer->pos > buffer->size) {
+            buffer->size = buffer->pos;
+        }
+        return nitems;
+    }
+
     size_t len;
-    if(ucx_szmul(size, nitems, &len)) {
+    size_t nitems_out = nitems;
+    if (cx_szmul(size, nitems, &len)) {
         return 0;
     }
     size_t required = buffer->pos + len;
     if (buffer->pos > required) {
         return 0;
     }
-    
+
+    bool perform_flush = false;
     if (required > buffer->capacity) {
-        if ((buffer->flags & UCX_BUFFER_AUTOEXTEND) == UCX_BUFFER_AUTOEXTEND) {
-            if (ucx_buffer_extend(buffer, required - buffer->capacity)) {
-                return 0;
+        if ((buffer->flags & CX_BUFFER_AUTO_EXTEND) == CX_BUFFER_AUTO_EXTEND && required) {
+            if (buffer->flush_blkmax > 0 && required > buffer->flush_threshold) {
+                perform_flush = true;
+            } else {
+                if (cxBufferMinimumCapacity(buffer, required)) {
+                    return 0;
+                }
             }
         } else {
-            len = buffer->capacity - buffer->pos;
-            if (size > 1) {
-                len -= len%size;
+            if (buffer->flush_blkmax > 0) {
+                perform_flush = true;
+            } else {
+                // truncate data to be written, if we can neither extend nor flush
+                len = buffer->capacity - buffer->pos;
+                if (size > 1) {
+                    len -= len % size;
+                }
+                nitems_out = len / size;
             }
         }
     }
-    
+
     if (len == 0) {
         return len;
     }
-    
-    memcpy(buffer->space + buffer->pos, ptr, len);
-    buffer->pos += len;
-    if(buffer->pos > buffer->size) {
-        buffer->size = buffer->pos;
+
+    if (perform_flush) {
+        size_t flush_max;
+        if (cx_szmul(buffer->flush_blkmax, buffer->flush_blksize, &flush_max)) {
+            return 0;
+        }
+        size_t flush_pos = buffer->flush_func == NULL || buffer->flush_target == NULL
+                           ? buffer->pos
+                           : cx_buffer_write_flush_helper(buffer, buffer->bytes, 1, buffer->pos);
+        if (flush_pos == buffer->pos) {
+            // entire buffer has been flushed, we can reset
+            buffer->size = buffer->pos = 0;
+
+            size_t items_flush; // how many items can also be directly flushed
+            size_t items_keep; // how many items have to be written to the buffer
+
+            items_flush = flush_max >= required ? nitems : (flush_max - flush_pos) / size;
+            if (items_flush > 0) {
+                items_flush = cx_buffer_write_flush_helper(buffer, ptr, size, items_flush / size);
+                // in case we could not flush everything, keep the rest
+            }
+            items_keep = nitems - items_flush;
+            if (items_keep > 0) {
+                // try again with the remaining stuff
+                const unsigned char *new_ptr = ptr;
+                new_ptr += items_flush * size;
+                // report the directly flushed items as written plus the remaining stuff
+                return items_flush + cxBufferWrite(new_ptr, size, items_keep, buffer);
+            } else {
+                // all items have been flushed - report them as written
+                return nitems;
+            }
+        } else if (flush_pos == 0) {
+            // nothing could be flushed at all, we immediately give up without writing any data
+            return 0;
+        } else {
+            // we were partially successful, we shift left and try again
+            cxBufferShiftLeft(buffer, flush_pos);
+            return cxBufferWrite(ptr, size, nitems, buffer);
+        }
+    } else {
+        memcpy(buffer->bytes + buffer->pos, ptr, len);
+        buffer->pos += len;
+        if (buffer->pos > buffer->size) {
+            buffer->size = buffer->pos;
+        }
+        return nitems_out;
     }
-    
-    return len / size;
+
 }
 
-size_t ucx_buffer_read(void *ptr, size_t size, size_t nitems,
-        UcxBuffer *buffer) {
+int cxBufferPut(
+        CxBuffer *buffer,
+        int c
+) {
+    c &= 0xFF;
+    unsigned char const ch = c;
+    if (cxBufferWrite(&ch, 1, 1, buffer) == 1) {
+        return c;
+    } else {
+        return EOF;
+    }
+}
+
+size_t cxBufferPutString(
+        CxBuffer *buffer,
+        const char *str
+) {
+    return cxBufferWrite(str, 1, strlen(str), buffer);
+}
+
+size_t cxBufferRead(
+        void *ptr,
+        size_t size,
+        size_t nitems,
+        CxBuffer *buffer
+) {
     size_t len;
-    if(ucx_szmul(size, nitems, &len)) {
+    if (cx_szmul(size, nitems, &len)) {
         return 0;
     }
     if (buffer->pos + len > buffer->size) {
         len = buffer->size - buffer->pos;
-        if (size > 1) len -= len%size;
+        if (size > 1) len -= len % size;
     }
-    
+
     if (len <= 0) {
         return len;
     }
-    
-    memcpy(ptr, buffer->space + buffer->pos, len);
+
+    memcpy(ptr, buffer->bytes + buffer->pos, len);
     buffer->pos += len;
-    
+
     return len / size;
 }
 
-int ucx_buffer_putc(UcxBuffer *buffer, int c) {
-    if(buffer->pos >= buffer->capacity) {
-        if ((buffer->flags & UCX_BUFFER_AUTOEXTEND) == UCX_BUFFER_AUTOEXTEND) {
-            if(ucx_buffer_extend(buffer, 1)) {
-                return EOF;
-            }
-        } else {
-            return EOF;
-        }
-    }
-    
-    c &= 0xFF;
-    buffer->space[buffer->pos] = (char) c;
-    buffer->pos++;
-    if(buffer->pos > buffer->size) {
-        buffer->size = buffer->pos;
-    }
-    return c;
-}
-
-int ucx_buffer_getc(UcxBuffer *buffer) {
-    if (ucx_buffer_eof(buffer)) {
+int cxBufferGet(CxBuffer *buffer) {
+    if (cxBufferEof(buffer)) {
         return EOF;
     } else {
-        int c = ((unsigned char*)buffer->space)[buffer->pos];
+        int c = buffer->bytes[buffer->pos];
         buffer->pos++;
         return c;
     }
 }
 
-size_t ucx_buffer_puts(UcxBuffer *buffer, const char *str) {
-    return ucx_buffer_write((const void*)str, 1, strlen(str), buffer);
-}
-
-int ucx_buffer_shift_left(UcxBuffer* buffer, size_t shift) {
+int cxBufferShiftLeft(
+        CxBuffer *buffer,
+        size_t shift
+) {
     if (shift >= buffer->size) {
         buffer->pos = buffer->size = 0;
     } else {
-        memmove(buffer->space, buffer->space + shift, buffer->size - shift);
+        memmove(buffer->bytes, buffer->bytes + shift, buffer->size - shift);
         buffer->size -= shift;
-        
+
         if (buffer->pos >= shift) {
             buffer->pos -= shift;
         } else {
@@ -257,14 +374,17 @@
     return 0;
 }
 
-int ucx_buffer_shift_right(UcxBuffer* buffer, size_t shift) {
+int cxBufferShiftRight(
+        CxBuffer *buffer,
+        size_t shift
+) {
     size_t req_capacity = buffer->size + shift;
     size_t movebytes;
-    
+
     // auto extend buffer, if required and enabled
     if (buffer->capacity < req_capacity) {
-        if ((buffer->flags & UCX_BUFFER_AUTOEXTEND) == UCX_BUFFER_AUTOEXTEND) {
-            if (ucx_buffer_extend(buffer, req_capacity - buffer->capacity)) {
+        if ((buffer->flags & CX_BUFFER_AUTO_EXTEND) == CX_BUFFER_AUTO_EXTEND) {
+            if (cxBufferMinimumCapacity(buffer, req_capacity)) {
                 return 1;
             }
             movebytes = buffer->size;
@@ -274,23 +394,26 @@
     } else {
         movebytes = buffer->size;
     }
-    
-    memmove(buffer->space + shift, buffer->space, movebytes);
-    buffer->size = shift+movebytes;
-    
+
+    memmove(buffer->bytes + shift, buffer->bytes, movebytes);
+    buffer->size = shift + movebytes;
+
     buffer->pos += shift;
     if (buffer->pos > buffer->size) {
         buffer->pos = buffer->size;
     }
-    
+
     return 0;
 }
 
-int ucx_buffer_shift(UcxBuffer* buffer, off_t shift) {
+int cxBufferShift(
+        CxBuffer *buffer,
+        off_t shift
+) {
     if (shift < 0) {
-        return ucx_buffer_shift_left(buffer, (size_t) (-shift));
+        return cxBufferShiftLeft(buffer, (size_t) (-shift));
     } else if (shift > 0) {
-        return ucx_buffer_shift_right(buffer, (size_t) shift);
+        return cxBufferShiftRight(buffer, (size_t) shift);
     } else {
         return 0;
     }

mercurial