Tue, 13 Aug 2024 19:59:42 +0200
new linux event_send implementation, replace event pipes with eventfd
/* * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER. * * Copyright 2021 Mike Becker, Olaf Wintermann All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ #include "cx/buffer.h" #include "cx/utils.h" #include <stdio.h> #include <string.h> int cxBufferInit( CxBuffer *buffer, void *space, size_t capacity, CxAllocator const *allocator, int flags ) { if (allocator == NULL) allocator = cxDefaultAllocator; buffer->allocator = allocator; buffer->flags = flags; if (!space) { buffer->bytes = cxMalloc(allocator, capacity); if (buffer->bytes == NULL) { return 1; } buffer->flags |= CX_BUFFER_FREE_CONTENTS; } else { buffer->bytes = space; } buffer->capacity = capacity; buffer->size = 0; buffer->pos = 0; buffer->flush_func = NULL; buffer->flush_target = NULL; buffer->flush_blkmax = 0; buffer->flush_blksize = 4096; buffer->flush_threshold = SIZE_MAX; return 0; } void cxBufferDestroy(CxBuffer *buffer) { if ((buffer->flags & CX_BUFFER_FREE_CONTENTS) == CX_BUFFER_FREE_CONTENTS) { cxFree(buffer->allocator, buffer->bytes); } } CxBuffer *cxBufferCreate( void *space, size_t capacity, CxAllocator const *allocator, int flags ) { CxBuffer *buf = cxMalloc(allocator, sizeof(CxBuffer)); if (buf == NULL) return NULL; if (0 == cxBufferInit(buf, space, capacity, allocator, flags)) { return buf; } else { cxFree(allocator, buf); return NULL; } } void cxBufferFree(CxBuffer *buffer) { if ((buffer->flags & CX_BUFFER_FREE_CONTENTS) == CX_BUFFER_FREE_CONTENTS) { cxFree(buffer->allocator, buffer->bytes); } cxFree(buffer->allocator, buffer); } int cxBufferSeek( CxBuffer *buffer, off_t offset, int whence ) { size_t npos; switch (whence) { case SEEK_CUR: npos = buffer->pos; break; case SEEK_END: npos = buffer->size; break; case SEEK_SET: npos = 0; break; default: return -1; } size_t opos = npos; npos += offset; if ((offset > 0 && npos < opos) || (offset < 0 && npos > opos)) { return -1; } if (npos >= buffer->size) { return -1; } else { buffer->pos = npos; return 0; } } void cxBufferClear(CxBuffer *buffer) { memset(buffer->bytes, 0, buffer->size); buffer->size = 0; buffer->pos = 0; } int cxBufferEof(CxBuffer const *buffer) { return buffer->pos >= buffer->size; } int cxBufferMinimumCapacity( CxBuffer *buffer, size_t newcap ) { if (newcap <= buffer->capacity) { return 0; } if (cxReallocate(buffer->allocator, (void **) &buffer->bytes, newcap) == 0) { buffer->capacity = newcap; return 0; } else { return -1; } } /** * Helps flushing data to the flush target of a buffer. * * @param buffer the buffer containing the config * @param space the data to flush * @param size the element size * @param nitems the number of items * @return the number of items flushed */ static size_t cx_buffer_write_flush_helper( CxBuffer *buffer, unsigned char const *space, size_t size, size_t nitems ) { size_t pos = 0; size_t remaining = nitems; size_t max_items = buffer->flush_blksize / size; while (remaining > 0) { size_t items = remaining > max_items ? max_items : remaining; size_t flushed = buffer->flush_func( space + pos, size, items, buffer->flush_target); if (flushed > 0) { pos += (flushed * size); remaining -= flushed; } else { // if no bytes can be flushed out anymore, we give up break; } } return nitems - remaining; } size_t cxBufferWrite( void const *ptr, size_t size, size_t nitems, CxBuffer *buffer ) { // optimize for easy case if (size == 1 && (buffer->capacity - buffer->pos) >= nitems) { memcpy(buffer->bytes + buffer->pos, ptr, nitems); buffer->pos += nitems; if (buffer->pos > buffer->size) { buffer->size = buffer->pos; } return nitems; } size_t len; size_t nitems_out = nitems; if (cx_szmul(size, nitems, &len)) { return 0; } size_t required = buffer->pos + len; if (buffer->pos > required) { return 0; } bool perform_flush = false; if (required > buffer->capacity) { if ((buffer->flags & CX_BUFFER_AUTO_EXTEND) == CX_BUFFER_AUTO_EXTEND && required) { if (buffer->flush_blkmax > 0 && required > buffer->flush_threshold) { perform_flush = true; } else { if (cxBufferMinimumCapacity(buffer, required)) { return 0; } } } else { if (buffer->flush_blkmax > 0) { perform_flush = true; } else { // truncate data to be written, if we can neither extend nor flush len = buffer->capacity - buffer->pos; if (size > 1) { len -= len % size; } nitems_out = len / size; } } } if (len == 0) { return len; } if (perform_flush) { size_t flush_max; if (cx_szmul(buffer->flush_blkmax, buffer->flush_blksize, &flush_max)) { return 0; } size_t flush_pos = buffer->flush_func == NULL || buffer->flush_target == NULL ? buffer->pos : cx_buffer_write_flush_helper(buffer, buffer->bytes, 1, buffer->pos); if (flush_pos == buffer->pos) { // entire buffer has been flushed, we can reset buffer->size = buffer->pos = 0; size_t items_flush; // how many items can also be directly flushed size_t items_keep; // how many items have to be written to the buffer items_flush = flush_max >= required ? nitems : (flush_max - flush_pos) / size; if (items_flush > 0) { items_flush = cx_buffer_write_flush_helper(buffer, ptr, size, items_flush / size); // in case we could not flush everything, keep the rest } items_keep = nitems - items_flush; if (items_keep > 0) { // try again with the remaining stuff unsigned char const *new_ptr = ptr; new_ptr += items_flush * size; // report the directly flushed items as written plus the remaining stuff return items_flush + cxBufferWrite(new_ptr, size, items_keep, buffer); } else { // all items have been flushed - report them as written return nitems; } } else if (flush_pos == 0) { // nothing could be flushed at all, we immediately give up without writing any data return 0; } else { // we were partially successful, we shift left and try again cxBufferShiftLeft(buffer, flush_pos); return cxBufferWrite(ptr, size, nitems, buffer); } } else { memcpy(buffer->bytes + buffer->pos, ptr, len); buffer->pos += len; if (buffer->pos > buffer->size) { buffer->size = buffer->pos; } return nitems_out; } } int cxBufferPut( CxBuffer *buffer, int c ) { c &= 0xFF; unsigned char const ch = c; if (cxBufferWrite(&ch, 1, 1, buffer) == 1) { return c; } else { return EOF; } } size_t cxBufferPutString( CxBuffer *buffer, const char *str ) { return cxBufferWrite(str, 1, strlen(str), buffer); } size_t cxBufferRead( void *ptr, size_t size, size_t nitems, CxBuffer *buffer ) { size_t len; if (cx_szmul(size, nitems, &len)) { return 0; } if (buffer->pos + len > buffer->size) { len = buffer->size - buffer->pos; if (size > 1) len -= len % size; } if (len <= 0) { return len; } memcpy(ptr, buffer->bytes + buffer->pos, len); buffer->pos += len; return len / size; } int cxBufferGet(CxBuffer *buffer) { if (cxBufferEof(buffer)) { return EOF; } else { int c = buffer->bytes[buffer->pos]; buffer->pos++; return c; } } int cxBufferShiftLeft( CxBuffer *buffer, size_t shift ) { if (shift >= buffer->size) { buffer->pos = buffer->size = 0; } else { memmove(buffer->bytes, buffer->bytes + shift, buffer->size - shift); buffer->size -= shift; if (buffer->pos >= shift) { buffer->pos -= shift; } else { buffer->pos = 0; } } return 0; } int cxBufferShiftRight( CxBuffer *buffer, size_t shift ) { size_t req_capacity = buffer->size + shift; size_t movebytes; // auto extend buffer, if required and enabled if (buffer->capacity < req_capacity) { if ((buffer->flags & CX_BUFFER_AUTO_EXTEND) == CX_BUFFER_AUTO_EXTEND) { if (cxBufferMinimumCapacity(buffer, req_capacity)) { return 1; } movebytes = buffer->size; } else { movebytes = buffer->capacity - shift; } } else { movebytes = buffer->size; } memmove(buffer->bytes + shift, buffer->bytes, movebytes); buffer->size = shift + movebytes; buffer->pos += shift; if (buffer->pos > buffer->size) { buffer->pos = buffer->size; } return 0; } int cxBufferShift( CxBuffer *buffer, off_t shift ) { if (shift < 0) { return cxBufferShiftLeft(buffer, (size_t) (-shift)); } else if (shift > 0) { return cxBufferShiftRight(buffer, (size_t) shift); } else { return 0; } }