src/ucx/buffer.c

changeset 579
e10457d74fe1
parent 490
d218607f5a7e
equal deleted inserted replaced
578:eb48f716b31c 579:e10457d74fe1
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE. 26 * POSSIBILITY OF SUCH DAMAGE.
27 */ 27 */
28 28
29 #include "cx/buffer.h" 29 #include "cx/buffer.h"
30 #include "cx/utils.h"
31 30
32 #include <stdio.h> 31 #include <stdio.h>
33 #include <string.h> 32 #include <string.h>
33 #include <errno.h>
34
35 static int buffer_copy_on_write(CxBuffer* buffer) {
36 if (0 == (buffer->flags & CX_BUFFER_COPY_ON_WRITE)) return 0;
37 void *newspace = cxMalloc(buffer->allocator, buffer->capacity);
38 if (NULL == newspace) return -1;
39 memcpy(newspace, buffer->space, buffer->size);
40 buffer->space = newspace;
41 buffer->flags &= ~CX_BUFFER_COPY_ON_WRITE;
42 buffer->flags |= CX_BUFFER_FREE_CONTENTS;
43 return 0;
44 }
34 45
35 int cxBufferInit( 46 int cxBufferInit(
36 CxBuffer *buffer, 47 CxBuffer *buffer,
37 void *space, 48 void *space,
38 size_t capacity, 49 size_t capacity,
39 CxAllocator const *allocator, 50 const CxAllocator *allocator,
40 int flags 51 int flags
41 ) { 52 ) {
42 if (allocator == NULL) allocator = cxDefaultAllocator; 53 if (allocator == NULL) {
54 allocator = cxDefaultAllocator;
55 }
56 if (flags & CX_BUFFER_COPY_ON_EXTEND) {
57 flags |= CX_BUFFER_AUTO_EXTEND;
58 }
43 buffer->allocator = allocator; 59 buffer->allocator = allocator;
44 buffer->flags = flags; 60 buffer->flags = flags;
45 if (!space) { 61 if (!space) {
46 buffer->bytes = cxMalloc(allocator, capacity); 62 buffer->bytes = cxMalloc(allocator, capacity);
47 if (buffer->bytes == NULL) { 63 if (buffer->bytes == NULL) {
48 return 1; 64 return -1; // LCOV_EXCL_LINE
49 } 65 }
50 buffer->flags |= CX_BUFFER_FREE_CONTENTS; 66 buffer->flags |= CX_BUFFER_FREE_CONTENTS;
51 } else { 67 } else {
52 buffer->bytes = space; 68 buffer->bytes = space;
53 } 69 }
54 buffer->capacity = capacity; 70 buffer->capacity = capacity;
55 buffer->size = 0; 71 buffer->size = 0;
56 buffer->pos = 0; 72 buffer->pos = 0;
57 73
58 buffer->flush_func = NULL; 74 buffer->flush = NULL;
59 buffer->flush_target = NULL;
60 buffer->flush_blkmax = 0;
61 buffer->flush_blksize = 4096;
62 buffer->flush_threshold = SIZE_MAX;
63 75
64 return 0; 76 return 0;
65 } 77 }
66 78
79 int cxBufferEnableFlushing(
80 CxBuffer *buffer,
81 CxBufferFlushConfig config
82 ) {
83 buffer->flush = malloc(sizeof(CxBufferFlushConfig));
84 if (buffer->flush == NULL) return -1; // LCOV_EXCL_LINE
85 memcpy(buffer->flush, &config, sizeof(CxBufferFlushConfig));
86 return 0;
87 }
88
67 void cxBufferDestroy(CxBuffer *buffer) { 89 void cxBufferDestroy(CxBuffer *buffer) {
68 if ((buffer->flags & CX_BUFFER_FREE_CONTENTS) == CX_BUFFER_FREE_CONTENTS) { 90 if (buffer->flags & CX_BUFFER_FREE_CONTENTS) {
69 cxFree(buffer->allocator, buffer->bytes); 91 cxFree(buffer->allocator, buffer->bytes);
70 } 92 }
93 free(buffer->flush);
94 memset(buffer, 0, sizeof(CxBuffer));
71 } 95 }
72 96
73 CxBuffer *cxBufferCreate( 97 CxBuffer *cxBufferCreate(
74 void *space, 98 void *space,
75 size_t capacity, 99 size_t capacity,
76 CxAllocator const *allocator, 100 const CxAllocator *allocator,
77 int flags 101 int flags
78 ) { 102 ) {
103 if (allocator == NULL) {
104 allocator = cxDefaultAllocator;
105 }
79 CxBuffer *buf = cxMalloc(allocator, sizeof(CxBuffer)); 106 CxBuffer *buf = cxMalloc(allocator, sizeof(CxBuffer));
80 if (buf == NULL) return NULL; 107 if (buf == NULL) return NULL;
81 if (0 == cxBufferInit(buf, space, capacity, allocator, flags)) { 108 if (0 == cxBufferInit(buf, space, capacity, allocator, flags)) {
82 return buf; 109 return buf;
83 } else { 110 } else {
111 // LCOV_EXCL_START
84 cxFree(allocator, buf); 112 cxFree(allocator, buf);
85 return NULL; 113 return NULL;
114 // LCOV_EXCL_STOP
86 } 115 }
87 } 116 }
88 117
89 void cxBufferFree(CxBuffer *buffer) { 118 void cxBufferFree(CxBuffer *buffer) {
90 if ((buffer->flags & CX_BUFFER_FREE_CONTENTS) == CX_BUFFER_FREE_CONTENTS) { 119 if (buffer == NULL) return;
91 cxFree(buffer->allocator, buffer->bytes); 120 const CxAllocator *allocator = buffer->allocator;
92 } 121 cxBufferDestroy(buffer);
93 cxFree(buffer->allocator, buffer); 122 cxFree(allocator, buffer);
94 } 123 }
95 124
96 int cxBufferSeek( 125 int cxBufferSeek(
97 CxBuffer *buffer, 126 CxBuffer *buffer,
98 off_t offset, 127 off_t offset,
115 144
116 size_t opos = npos; 145 size_t opos = npos;
117 npos += offset; 146 npos += offset;
118 147
119 if ((offset > 0 && npos < opos) || (offset < 0 && npos > opos)) { 148 if ((offset > 0 && npos < opos) || (offset < 0 && npos > opos)) {
149 errno = EOVERFLOW;
120 return -1; 150 return -1;
121 } 151 }
122 152
123 if (npos >= buffer->size) { 153 if (npos > buffer->size) {
124 return -1; 154 return -1;
125 } else { 155 } else {
126 buffer->pos = npos; 156 buffer->pos = npos;
127 return 0; 157 return 0;
128 } 158 }
129 159
130 } 160 }
131 161
132 void cxBufferClear(CxBuffer *buffer) { 162 void cxBufferClear(CxBuffer *buffer) {
133 memset(buffer->bytes, 0, buffer->size); 163 if (0 == (buffer->flags & CX_BUFFER_COPY_ON_WRITE)) {
164 memset(buffer->bytes, 0, buffer->size);
165 }
134 buffer->size = 0; 166 buffer->size = 0;
135 buffer->pos = 0; 167 buffer->pos = 0;
136 } 168 }
137 169
138 int cxBufferEof(CxBuffer const *buffer) { 170 void cxBufferReset(CxBuffer *buffer) {
171 buffer->size = 0;
172 buffer->pos = 0;
173 }
174
175 bool cxBufferEof(const CxBuffer *buffer) {
139 return buffer->pos >= buffer->size; 176 return buffer->pos >= buffer->size;
140 } 177 }
141 178
142 int cxBufferMinimumCapacity( 179 int cxBufferMinimumCapacity(
143 CxBuffer *buffer, 180 CxBuffer *buffer,
145 ) { 182 ) {
146 if (newcap <= buffer->capacity) { 183 if (newcap <= buffer->capacity) {
147 return 0; 184 return 0;
148 } 185 }
149 186
150 if (cxReallocate(buffer->allocator, 187 const int force_copy_flags = CX_BUFFER_COPY_ON_WRITE | CX_BUFFER_COPY_ON_EXTEND;
188 if (buffer->flags & force_copy_flags) {
189 void *newspace = cxMalloc(buffer->allocator, newcap);
190 if (NULL == newspace) return -1;
191 memcpy(newspace, buffer->space, buffer->size);
192 buffer->space = newspace;
193 buffer->capacity = newcap;
194 buffer->flags &= ~force_copy_flags;
195 buffer->flags |= CX_BUFFER_FREE_CONTENTS;
196 return 0;
197 } else if (cxReallocate(buffer->allocator,
151 (void **) &buffer->bytes, newcap) == 0) { 198 (void **) &buffer->bytes, newcap) == 0) {
152 buffer->capacity = newcap; 199 buffer->capacity = newcap;
153 return 0; 200 return 0;
154 } else { 201 } else {
155 return -1; 202 return -1; // LCOV_EXCL_LINE
156 } 203 }
157 } 204 }
158 205
159 /** 206 static size_t cx_buffer_flush_helper(
160 * Helps flushing data to the flush target of a buffer. 207 const CxBuffer *buffer,
161 * 208 const unsigned char *src,
162 * @param buffer the buffer containing the config
163 * @param space the data to flush
164 * @param size the element size
165 * @param nitems the number of items
166 * @return the number of items flushed
167 */
168 static size_t cx_buffer_write_flush_helper(
169 CxBuffer *buffer,
170 unsigned char const *space,
171 size_t size, 209 size_t size,
172 size_t nitems 210 size_t nitems
173 ) { 211 ) {
174 size_t pos = 0; 212 // flush data from an arbitrary source
175 size_t remaining = nitems; 213 // does not need to be the buffer's contents
176 size_t max_items = buffer->flush_blksize / size; 214 size_t max_items = buffer->flush->blksize / size;
177 while (remaining > 0) { 215 size_t fblocks = 0;
178 size_t items = remaining > max_items ? max_items : remaining; 216 size_t flushed_total = 0;
179 size_t flushed = buffer->flush_func( 217 while (nitems > 0 && fblocks < buffer->flush->blkmax) {
180 space + pos, 218 fblocks++;
181 size, items, 219 size_t items = nitems > max_items ? max_items : nitems;
182 buffer->flush_target); 220 size_t flushed = buffer->flush->wfunc(
221 src, size, items, buffer->flush->target);
183 if (flushed > 0) { 222 if (flushed > 0) {
184 pos += (flushed * size); 223 flushed_total += flushed;
185 remaining -= flushed; 224 src += flushed * size;
225 nitems -= flushed;
186 } else { 226 } else {
187 // if no bytes can be flushed out anymore, we give up 227 // if no bytes can be flushed out anymore, we give up
188 break; 228 break;
189 } 229 }
190 } 230 }
191 return nitems - remaining; 231 return flushed_total;
232 }
233
234 static size_t cx_buffer_flush_impl(CxBuffer *buffer, size_t size) {
235 // flush the current contents of the buffer
236 unsigned char *space = buffer->bytes;
237 size_t remaining = buffer->pos / size;
238 size_t flushed_total = cx_buffer_flush_helper(
239 buffer, space, size, remaining);
240
241 // shift the buffer left after flushing
242 // IMPORTANT: up to this point, copy on write must have been
243 // performed already, because we can't do error handling here
244 cxBufferShiftLeft(buffer, flushed_total*size);
245
246 return flushed_total;
247 }
248
249 size_t cxBufferFlush(CxBuffer *buffer) {
250 if (buffer_copy_on_write(buffer)) return 0;
251 return cx_buffer_flush_impl(buffer, 1);
192 } 252 }
193 253
194 size_t cxBufferWrite( 254 size_t cxBufferWrite(
195 void const *ptr, 255 const void *ptr,
196 size_t size, 256 size_t size,
197 size_t nitems, 257 size_t nitems,
198 CxBuffer *buffer 258 CxBuffer *buffer
199 ) { 259 ) {
200 // optimize for easy case 260 // optimize for easy case
201 if (size == 1 && (buffer->capacity - buffer->pos) >= nitems) { 261 if (size == 1 && (buffer->capacity - buffer->pos) >= nitems) {
262 if (buffer_copy_on_write(buffer)) return 0;
202 memcpy(buffer->bytes + buffer->pos, ptr, nitems); 263 memcpy(buffer->bytes + buffer->pos, ptr, nitems);
203 buffer->pos += nitems; 264 buffer->pos += nitems;
204 if (buffer->pos > buffer->size) { 265 if (buffer->pos > buffer->size) {
205 buffer->size = buffer->pos; 266 buffer->size = buffer->pos;
206 } 267 }
207 return nitems; 268 return nitems;
208 } 269 }
209 270
210 size_t len; 271 size_t len, total_flushed = 0;
211 size_t nitems_out = nitems; 272 cx_buffer_write_retry:
212 if (cx_szmul(size, nitems, &len)) { 273 if (cx_szmul(size, nitems, &len)) {
213 return 0; 274 errno = EOVERFLOW;
214 } 275 return total_flushed;
276 }
277 if (buffer->pos > SIZE_MAX - len) {
278 errno = EOVERFLOW;
279 return total_flushed;
280 }
281
215 size_t required = buffer->pos + len; 282 size_t required = buffer->pos + len;
216 if (buffer->pos > required) {
217 return 0;
218 }
219
220 bool perform_flush = false; 283 bool perform_flush = false;
221 if (required > buffer->capacity) { 284 if (required > buffer->capacity) {
222 if ((buffer->flags & CX_BUFFER_AUTO_EXTEND) == CX_BUFFER_AUTO_EXTEND && required) { 285 if (buffer->flags & CX_BUFFER_AUTO_EXTEND) {
223 if (buffer->flush_blkmax > 0 && required > buffer->flush_threshold) { 286 if (buffer->flush != NULL && required > buffer->flush->threshold) {
224 perform_flush = true; 287 perform_flush = true;
225 } else { 288 } else {
226 if (cxBufferMinimumCapacity(buffer, required)) { 289 if (cxBufferMinimumCapacity(buffer, required)) {
227 return 0; 290 return total_flushed; // LCOV_EXCL_LINE
228 } 291 }
229 } 292 }
230 } else { 293 } else {
231 if (buffer->flush_blkmax > 0) { 294 if (buffer->flush != NULL) {
232 perform_flush = true; 295 perform_flush = true;
233 } else { 296 } else {
234 // truncate data to be written, if we can neither extend nor flush 297 // truncate data, if we can neither extend nor flush
235 len = buffer->capacity - buffer->pos; 298 len = buffer->capacity - buffer->pos;
236 if (size > 1) { 299 if (size > 1) {
237 len -= len % size; 300 len -= len % size;
238 } 301 }
239 nitems_out = len / size; 302 nitems = len / size;
240 } 303 }
241 } 304 }
242 } 305 }
243 306
307 // check here and not above because of possible truncation
244 if (len == 0) { 308 if (len == 0) {
245 return len; 309 return total_flushed;
246 } 310 }
247 311
312 // check if we need to copy
313 if (buffer_copy_on_write(buffer)) return 0;
314
315 // perform the operation
248 if (perform_flush) { 316 if (perform_flush) {
249 size_t flush_max; 317 size_t items_flushed;
250 if (cx_szmul(buffer->flush_blkmax, buffer->flush_blksize, &flush_max)) { 318 if (buffer->pos == 0) {
251 return 0; 319 // if we don't have data in the buffer, but are instructed
252 } 320 // to flush, it means that we are supposed to relay the data
253 size_t flush_pos = buffer->flush_func == NULL || buffer->flush_target == NULL 321 items_flushed = cx_buffer_flush_helper(buffer, ptr, size, nitems);
254 ? buffer->pos 322 if (items_flushed == 0) {
255 : cx_buffer_write_flush_helper(buffer, buffer->bytes, 1, buffer->pos); 323 // we needed to relay data, but could not flush anything
256 if (flush_pos == buffer->pos) { 324 // i.e. we have to give up to avoid endless trying
257 // entire buffer has been flushed, we can reset 325 return 0;
258 buffer->size = buffer->pos = 0;
259
260 size_t items_flush; // how many items can also be directly flushed
261 size_t items_keep; // how many items have to be written to the buffer
262
263 items_flush = flush_max >= required ? nitems : (flush_max - flush_pos) / size;
264 if (items_flush > 0) {
265 items_flush = cx_buffer_write_flush_helper(buffer, ptr, size, items_flush / size);
266 // in case we could not flush everything, keep the rest
267 } 326 }
268 items_keep = nitems - items_flush; 327 nitems -= items_flushed;
269 if (items_keep > 0) { 328 total_flushed += items_flushed;
270 // try again with the remaining stuff 329 if (nitems > 0) {
271 unsigned char const *new_ptr = ptr; 330 ptr = ((unsigned char*)ptr) + items_flushed * size;
272 new_ptr += items_flush * size; 331 goto cx_buffer_write_retry;
273 // report the directly flushed items as written plus the remaining stuff
274 return items_flush + cxBufferWrite(new_ptr, size, items_keep, buffer);
275 } else {
276 // all items have been flushed - report them as written
277 return nitems;
278 } 332 }
279 } else if (flush_pos == 0) { 333 return total_flushed;
280 // nothing could be flushed at all, we immediately give up without writing any data
281 return 0;
282 } else { 334 } else {
283 // we were partially successful, we shift left and try again 335 items_flushed = cx_buffer_flush_impl(buffer, size);
284 cxBufferShiftLeft(buffer, flush_pos); 336 if (items_flushed == 0) {
285 return cxBufferWrite(ptr, size, nitems, buffer); 337 // flush target is full, let's try to truncate
338 size_t remaining_space;
339 if (buffer->flags & CX_BUFFER_AUTO_EXTEND) {
340 remaining_space = buffer->flush->threshold > buffer->pos
341 ? buffer->flush->threshold - buffer->pos
342 : 0;
343 } else {
344 remaining_space = buffer->capacity > buffer->pos
345 ? buffer->capacity - buffer->pos
346 : 0;
347 }
348 nitems = remaining_space / size;
349 if (nitems == 0) {
350 return total_flushed;
351 }
352 }
353 goto cx_buffer_write_retry;
286 } 354 }
287 } else { 355 } else {
288 memcpy(buffer->bytes + buffer->pos, ptr, len); 356 memcpy(buffer->bytes + buffer->pos, ptr, len);
289 buffer->pos += len; 357 buffer->pos += len;
290 if (buffer->pos > buffer->size) { 358 if (buffer->pos > buffer->size) {
291 buffer->size = buffer->pos; 359 buffer->size = buffer->pos;
292 } 360 }
293 return nitems_out; 361 return total_flushed + nitems;
294 } 362 }
295 363 }
364
365 size_t cxBufferAppend(
366 const void *ptr,
367 size_t size,
368 size_t nitems,
369 CxBuffer *buffer
370 ) {
371 size_t pos = buffer->pos;
372 size_t append_pos = buffer->size;
373 buffer->pos = append_pos;
374 size_t written = cxBufferWrite(ptr, size, nitems, buffer);
375 // the buffer might have been flushed
376 // we must compute a possible delta for the position
377 // expected: pos = append_pos + written
378 // -> if this is not the case, there is a delta
379 size_t delta = append_pos + written*size - buffer->pos;
380 if (delta > pos) {
381 buffer->pos = 0;
382 } else {
383 buffer->pos = pos - delta;
384 }
385 return written;
296 } 386 }
297 387
298 int cxBufferPut( 388 int cxBufferPut(
299 CxBuffer *buffer, 389 CxBuffer *buffer,
300 int c 390 int c
303 unsigned char const ch = c; 393 unsigned char const ch = c;
304 if (cxBufferWrite(&ch, 1, 1, buffer) == 1) { 394 if (cxBufferWrite(&ch, 1, 1, buffer) == 1) {
305 return c; 395 return c;
306 } else { 396 } else {
307 return EOF; 397 return EOF;
398 }
399 }
400
401 int cxBufferTerminate(CxBuffer *buffer) {
402 bool success = 0 == cxBufferPut(buffer, 0);
403 if (success) {
404 buffer->pos--;
405 buffer->size--;
406 return 0;
407 } else {
408 return -1;
308 } 409 }
309 } 410 }
310 411
311 size_t cxBufferPutString( 412 size_t cxBufferPutString(
312 CxBuffer *buffer, 413 CxBuffer *buffer,
321 size_t nitems, 422 size_t nitems,
322 CxBuffer *buffer 423 CxBuffer *buffer
323 ) { 424 ) {
324 size_t len; 425 size_t len;
325 if (cx_szmul(size, nitems, &len)) { 426 if (cx_szmul(size, nitems, &len)) {
427 errno = EOVERFLOW;
326 return 0; 428 return 0;
327 } 429 }
328 if (buffer->pos + len > buffer->size) { 430 if (buffer->pos + len > buffer->size) {
329 len = buffer->size - buffer->pos; 431 len = buffer->size - buffer->pos;
330 if (size > 1) len -= len % size; 432 if (size > 1) len -= len % size;
355 size_t shift 457 size_t shift
356 ) { 458 ) {
357 if (shift >= buffer->size) { 459 if (shift >= buffer->size) {
358 buffer->pos = buffer->size = 0; 460 buffer->pos = buffer->size = 0;
359 } else { 461 } else {
462 if (buffer_copy_on_write(buffer)) return -1;
360 memmove(buffer->bytes, buffer->bytes + shift, buffer->size - shift); 463 memmove(buffer->bytes, buffer->bytes + shift, buffer->size - shift);
361 buffer->size -= shift; 464 buffer->size -= shift;
362 465
363 if (buffer->pos >= shift) { 466 if (buffer->pos >= shift) {
364 buffer->pos -= shift; 467 buffer->pos -= shift;
371 474
372 int cxBufferShiftRight( 475 int cxBufferShiftRight(
373 CxBuffer *buffer, 476 CxBuffer *buffer,
374 size_t shift 477 size_t shift
375 ) { 478 ) {
479 if (buffer->size > SIZE_MAX - shift) {
480 errno = EOVERFLOW;
481 return -1;
482 }
376 size_t req_capacity = buffer->size + shift; 483 size_t req_capacity = buffer->size + shift;
377 size_t movebytes; 484 size_t movebytes;
378 485
379 // auto extend buffer, if required and enabled 486 // auto extend buffer, if required and enabled
380 if (buffer->capacity < req_capacity) { 487 if (buffer->capacity < req_capacity) {
381 if ((buffer->flags & CX_BUFFER_AUTO_EXTEND) == CX_BUFFER_AUTO_EXTEND) { 488 if (buffer->flags & CX_BUFFER_AUTO_EXTEND) {
382 if (cxBufferMinimumCapacity(buffer, req_capacity)) { 489 if (cxBufferMinimumCapacity(buffer, req_capacity)) {
383 return 1; 490 return -1; // LCOV_EXCL_LINE
384 } 491 }
385 movebytes = buffer->size; 492 movebytes = buffer->size;
386 } else { 493 } else {
387 movebytes = buffer->capacity - shift; 494 movebytes = buffer->capacity - shift;
388 } 495 }
389 } else { 496 } else {
390 movebytes = buffer->size; 497 movebytes = buffer->size;
391 } 498 }
392 499
393 memmove(buffer->bytes + shift, buffer->bytes, movebytes); 500 if (movebytes > 0) {
394 buffer->size = shift + movebytes; 501 if (buffer_copy_on_write(buffer)) return -1;
502 memmove(buffer->bytes + shift, buffer->bytes, movebytes);
503 buffer->size = shift + movebytes;
504 }
395 505
396 buffer->pos += shift; 506 buffer->pos += shift;
397 if (buffer->pos > buffer->size) { 507 if (buffer->pos > buffer->size) {
398 buffer->pos = buffer->size; 508 buffer->pos = buffer->size;
399 } 509 }

mercurial