ucx/buffer.c

changeset 30
d33eaaec15da
parent 23
b26390e77237
equal deleted inserted replaced
29:b8c826c720f3 30:d33eaaec15da
30 30
31 #include <stdio.h> 31 #include <stdio.h>
32 #include <string.h> 32 #include <string.h>
33 #include <errno.h> 33 #include <errno.h>
34 34
35 #ifdef _WIN32
36 #include <Windows.h>
37 #include <sysinfoapi.h>
38 static unsigned long system_page_size(void) {
39 static unsigned long ps = 0;
40 if (ps == 0) {
41 SYSTEM_INFO sysinfo;
42 GetSystemInfo(&sysinfo);
43 ps = sysinfo.dwPageSize;
44 }
45 return ps;
46 }
47 #else
48 #include <unistd.h>
49 static unsigned long system_page_size(void) {
50 static unsigned long ps = 0;
51 if (ps == 0) {
52 long sc = sysconf(_SC_PAGESIZE);
53 if (sc < 0) {
54 // fallback for systems which do not report a value here
55 ps = 4096; // LCOV_EXCL_LINE
56 } else {
57 ps = (unsigned long) sc;
58 }
59 }
60 return ps;
61 }
62 #endif
63
64 static int buffer_copy_on_write(CxBuffer* buffer) { 35 static int buffer_copy_on_write(CxBuffer* buffer) {
65 if (0 == (buffer->flags & CX_BUFFER_COPY_ON_WRITE)) return 0; 36 if (0 == (buffer->flags & CX_BUFFER_COPY_ON_WRITE)) return 0;
66 void *newspace = cxMalloc(buffer->allocator, buffer->capacity); 37 void *newspace = cxMalloc(buffer->allocator, buffer->capacity);
67 if (NULL == newspace) return -1; // LCOV_EXCL_LINE 38 if (NULL == newspace) return -1; // LCOV_EXCL_LINE
68 memcpy(newspace, buffer->space, buffer->size); 39 memcpy(newspace, buffer->space, buffer->size);
93 buffer->flags |= CX_BUFFER_FREE_CONTENTS; 64 buffer->flags |= CX_BUFFER_FREE_CONTENTS;
94 } else { 65 } else {
95 buffer->bytes = space; 66 buffer->bytes = space;
96 } 67 }
97 buffer->capacity = capacity; 68 buffer->capacity = capacity;
69 buffer->max_capacity = SIZE_MAX;
98 buffer->size = 0; 70 buffer->size = 0;
99 buffer->pos = 0; 71 buffer->pos = 0;
100 72
101 buffer->flush = NULL;
102
103 return 0;
104 }
105
106 int cxBufferEnableFlushing(
107 CxBuffer *buffer,
108 CxBufferFlushConfig config
109 ) {
110 buffer->flush = cxMallocDefault(sizeof(CxBufferFlushConfig));
111 if (buffer->flush == NULL) return -1; // LCOV_EXCL_LINE
112 memcpy(buffer->flush, &config, sizeof(CxBufferFlushConfig));
113 return 0; 73 return 0;
114 } 74 }
115 75
116 void cxBufferDestroy(CxBuffer *buffer) { 76 void cxBufferDestroy(CxBuffer *buffer) {
117 if (buffer->flags & CX_BUFFER_FREE_CONTENTS) { 77 if (buffer->flags & CX_BUFFER_FREE_CONTENTS) {
118 cxFree(buffer->allocator, buffer->bytes); 78 cxFree(buffer->allocator, buffer->bytes);
119 } 79 }
120 cxFreeDefault(buffer->flush);
121 memset(buffer, 0, sizeof(CxBuffer)); 80 memset(buffer, 0, sizeof(CxBuffer));
122 } 81 }
123 82
124 CxBuffer *cxBufferCreate( 83 CxBuffer *cxBufferCreate(
125 void *space, 84 void *space,
237 bool cxBufferEof(const CxBuffer *buffer) { 196 bool cxBufferEof(const CxBuffer *buffer) {
238 return buffer->pos >= buffer->size; 197 return buffer->pos >= buffer->size;
239 } 198 }
240 199
241 int cxBufferReserve(CxBuffer *buffer, size_t newcap) { 200 int cxBufferReserve(CxBuffer *buffer, size_t newcap) {
242 if (newcap <= buffer->capacity) { 201 if (newcap == buffer->capacity) {
243 return 0; 202 return 0;
203 }
204 if (newcap > buffer->max_capacity) {
205 return -1;
244 } 206 }
245 const int force_copy_flags = CX_BUFFER_COPY_ON_WRITE | CX_BUFFER_COPY_ON_EXTEND; 207 const int force_copy_flags = CX_BUFFER_COPY_ON_WRITE | CX_BUFFER_COPY_ON_EXTEND;
246 if (buffer->flags & force_copy_flags) { 208 if (buffer->flags & force_copy_flags) {
247 void *newspace = cxMalloc(buffer->allocator, newcap); 209 void *newspace = cxMalloc(buffer->allocator, newcap);
248 if (NULL == newspace) return -1; 210 if (NULL == newspace) return -1;
252 buffer->flags &= ~force_copy_flags; 214 buffer->flags &= ~force_copy_flags;
253 buffer->flags |= CX_BUFFER_FREE_CONTENTS; 215 buffer->flags |= CX_BUFFER_FREE_CONTENTS;
254 return 0; 216 return 0;
255 } else if (cxReallocate(buffer->allocator, 217 } else if (cxReallocate(buffer->allocator,
256 (void **) &buffer->bytes, newcap) == 0) { 218 (void **) &buffer->bytes, newcap) == 0) {
219 buffer->flags |= CX_BUFFER_FREE_CONTENTS;
257 buffer->capacity = newcap; 220 buffer->capacity = newcap;
221 if (buffer->size > newcap) {
222 buffer->size = newcap;
223 }
258 return 0; 224 return 0;
259 } else { 225 } else {
260 return -1; // LCOV_EXCL_LINE 226 return -1; // LCOV_EXCL_LINE
261 } 227 }
262 } 228 }
263 229
264 static size_t cx_buffer_calculate_minimum_capacity(size_t mincap) { 230 int cxBufferMaximumCapacity(CxBuffer *buffer, size_t capacity) {
265 unsigned long pagesize = system_page_size(); 231 if (capacity < buffer->capacity) {
266 // if page size is larger than 64 KB - for some reason - truncate to 64 KB 232 return -1;
267 if (pagesize > 65536) pagesize = 65536; 233 }
268 if (mincap < pagesize) { 234 buffer->max_capacity = capacity;
269 // when smaller as one page, map to the next power of two 235 return 0;
270 mincap--;
271 mincap |= mincap >> 1;
272 mincap |= mincap >> 2;
273 mincap |= mincap >> 4;
274 // last operation only needed for pages larger 4096 bytes
275 // but if/else would be more expensive than just doing this
276 mincap |= mincap >> 8;
277 mincap++;
278 } else {
279 // otherwise, map to a multiple of the page size
280 mincap -= mincap % pagesize;
281 mincap += pagesize;
282 // note: if newcap is already page aligned,
283 // this gives a full additional page (which is good)
284 }
285 return mincap;
286 } 236 }
287 237
288 int cxBufferMinimumCapacity(CxBuffer *buffer, size_t newcap) { 238 int cxBufferMinimumCapacity(CxBuffer *buffer, size_t newcap) {
289 if (newcap <= buffer->capacity) { 239 if (newcap <= buffer->capacity) {
290 return 0; 240 return 0;
291 } 241 }
292 newcap = cx_buffer_calculate_minimum_capacity(newcap); 242 if (newcap > buffer->max_capacity) {
243 return -1;
244 }
245 if (newcap < buffer->max_capacity) {
246 unsigned long pagesize = cx_system_page_size();
247 // if page size is larger than 64 KB - for some reason - truncate to 64 KB
248 if (pagesize > 65536) pagesize = 65536;
249 if (newcap < pagesize) {
250 // when smaller as one page, map to the next power of two
251 newcap--;
252 newcap |= newcap >> 1;
253 newcap |= newcap >> 2;
254 newcap |= newcap >> 4;
255 // last operation only needed for pages larger 4096 bytes
256 // but if/else would be more expensive than just doing this
257 newcap |= newcap >> 8;
258 newcap++;
259 } else {
260 // otherwise, map to a multiple of the page size
261 newcap -= newcap % pagesize;
262 newcap += pagesize;
263 // note: if newcap is already page aligned,
264 // this gives a full additional page (which is good)
265 }
266 if (newcap > buffer->max_capacity) {
267 newcap = buffer->max_capacity;
268 }
269 }
293 return cxBufferReserve(buffer, newcap); 270 return cxBufferReserve(buffer, newcap);
294 } 271 }
295 272
296 void cxBufferShrink( 273 void cxBufferShrink(
297 CxBuffer *buffer, 274 CxBuffer *buffer,
311 if (newCapacity < buffer->capacity) { 288 if (newCapacity < buffer->capacity) {
312 if (0 == cxReallocate(buffer->allocator, &buffer->bytes, newCapacity)) { 289 if (0 == cxReallocate(buffer->allocator, &buffer->bytes, newCapacity)) {
313 buffer->capacity = newCapacity; 290 buffer->capacity = newCapacity;
314 } 291 }
315 } 292 }
316 }
317
318 static size_t cx_buffer_flush_helper(
319 const CxBuffer *buffer,
320 const unsigned char *src,
321 size_t size,
322 size_t nitems
323 ) {
324 // flush data from an arbitrary source
325 // does not need to be the buffer's contents
326 size_t max_items = buffer->flush->blksize / size;
327 size_t fblocks = 0;
328 size_t flushed_total = 0;
329 while (nitems > 0 && fblocks < buffer->flush->blkmax) {
330 fblocks++;
331 size_t items = nitems > max_items ? max_items : nitems;
332 size_t flushed = buffer->flush->wfunc(
333 src, size, items, buffer->flush->target);
334 if (flushed > 0) {
335 flushed_total += flushed;
336 src += flushed * size;
337 nitems -= flushed;
338 } else {
339 // if no bytes can be flushed out anymore, we give up
340 break;
341 }
342 }
343 return flushed_total;
344 }
345
346 static size_t cx_buffer_flush_impl(CxBuffer *buffer, size_t size) {
347 // flush the current contents of the buffer
348 unsigned char *space = buffer->bytes;
349 size_t remaining = buffer->pos / size;
350 size_t flushed_total = cx_buffer_flush_helper(
351 buffer, space, size, remaining);
352
353 // shift the buffer left after flushing
354 // IMPORTANT: up to this point, copy on write must have been
355 // performed already, because we can't do error handling here
356 cxBufferShiftLeft(buffer, flushed_total*size);
357
358 return flushed_total;
359 }
360
361 size_t cxBufferFlush(CxBuffer *buffer) {
362 if (buffer_copy_on_write(buffer)) return 0;
363 return cx_buffer_flush_impl(buffer, 1);
364 } 293 }
365 294
366 size_t cxBufferWrite( 295 size_t cxBufferWrite(
367 const void *ptr, 296 const void *ptr,
368 size_t size, 297 size_t size,
378 buffer->size = buffer->pos; 307 buffer->size = buffer->pos;
379 } 308 }
380 return nitems; 309 return nitems;
381 } 310 }
382 311
383 size_t len, total_flushed = 0; 312 size_t len;
384 cx_buffer_write_retry:
385 if (cx_szmul(size, nitems, &len)) { 313 if (cx_szmul(size, nitems, &len)) {
386 errno = EOVERFLOW; 314 errno = EOVERFLOW;
387 return total_flushed; 315 return 0;
388 } 316 }
389 if (buffer->pos > SIZE_MAX - len) { 317 if (buffer->pos > SIZE_MAX - len) {
390 errno = EOVERFLOW; 318 errno = EOVERFLOW;
391 return total_flushed; 319 return 0;
392 } 320 }
393 321 const size_t required = buffer->pos + len;
394 size_t required = buffer->pos + len; 322
395 bool perform_flush = false; 323 // check if we need to auto-extend
396 if (required > buffer->capacity) { 324 if (required > buffer->capacity) {
397 if (buffer->flags & CX_BUFFER_AUTO_EXTEND) { 325 if (buffer->flags & CX_BUFFER_AUTO_EXTEND) {
398 if (buffer->flush != NULL) { 326 size_t newcap = required < buffer->max_capacity
399 size_t newcap = cx_buffer_calculate_minimum_capacity(required); 327 ? required : buffer->max_capacity;
400 if (newcap > buffer->flush->threshold) { 328 if (cxBufferMinimumCapacity(buffer, newcap)) {
401 newcap = buffer->flush->threshold; 329 return 0; // LCOV_EXCL_LINE
402 }
403 if (cxBufferReserve(buffer, newcap)) {
404 return total_flushed; // LCOV_EXCL_LINE
405 }
406 if (required > newcap) {
407 perform_flush = true;
408 }
409 } else {
410 if (cxBufferMinimumCapacity(buffer, required)) {
411 return total_flushed; // LCOV_EXCL_LINE
412 }
413 } 330 }
414 } else { 331 }
415 if (buffer->flush != NULL) { 332 }
416 perform_flush = true; 333
417 } else { 334 // check again and truncate data if capacity is still not enough
418 // truncate data, if we can neither extend nor flush 335 if (required > buffer->capacity) {
419 len = buffer->capacity - buffer->pos; 336 len = buffer->capacity - buffer->pos;
420 if (size > 1) { 337 if (size > 1) {
421 len -= len % size; 338 len -= len % size;
422 } 339 }
423 nitems = len / size; 340 nitems = len / size;
424 }
425 }
426 } 341 }
427 342
428 // check here and not above because of possible truncation 343 // check here and not above because of possible truncation
429 if (len == 0) { 344 if (len == 0) {
430 return total_flushed; 345 return 0;
431 } 346 }
432 347
433 // check if we need to copy 348 // check if we need to copy
434 if (buffer_copy_on_write(buffer)) return 0; 349 if (buffer_copy_on_write(buffer)) return 0;
435 350
436 // perform the operation 351 // perform the operation
437 if (perform_flush) { 352 memcpy(buffer->bytes + buffer->pos, ptr, len);
438 size_t items_flushed; 353 buffer->pos += len;
439 if (buffer->pos == 0) { 354 if (buffer->pos > buffer->size) {
440 // if we don't have data in the buffer, but are instructed 355 buffer->size = buffer->pos;
441 // to flush, it means that we are supposed to relay the data 356 }
442 items_flushed = cx_buffer_flush_helper(buffer, ptr, size, nitems); 357 return nitems;
443 if (items_flushed == 0) {
444 // we needed to relay data, but could not flush anything
445 // i.e. we have to give up to avoid endless trying
446 return 0;
447 }
448 nitems -= items_flushed;
449 total_flushed += items_flushed;
450 if (nitems > 0) {
451 ptr = ((unsigned char*)ptr) + items_flushed * size;
452 goto cx_buffer_write_retry;
453 }
454 return total_flushed;
455 } else {
456 items_flushed = cx_buffer_flush_impl(buffer, size);
457 if (items_flushed == 0) {
458 // flush target is full, let's try to truncate
459 size_t remaining_space;
460 if (buffer->flags & CX_BUFFER_AUTO_EXTEND) {
461 remaining_space = buffer->flush->threshold > buffer->pos
462 ? buffer->flush->threshold - buffer->pos
463 : 0;
464 } else {
465 remaining_space = buffer->capacity > buffer->pos
466 ? buffer->capacity - buffer->pos
467 : 0;
468 }
469 nitems = remaining_space / size;
470 if (nitems == 0) {
471 return total_flushed;
472 }
473 }
474 goto cx_buffer_write_retry;
475 }
476 } else {
477 memcpy(buffer->bytes + buffer->pos, ptr, len);
478 buffer->pos += len;
479 if (buffer->pos > buffer->size) {
480 buffer->size = buffer->pos;
481 }
482 return total_flushed + nitems;
483 }
484 } 358 }
485 359
486 size_t cxBufferAppend( 360 size_t cxBufferAppend(
487 const void *ptr, 361 const void *ptr,
488 size_t size, 362 size_t size,

mercurial