ucx/buffer.c

changeset 115
e57ca2747782
parent 108
77254bd6dccb
equal deleted inserted replaced
114:3da24640513a 115:e57ca2747782
30 30
31 #include <stdio.h> 31 #include <stdio.h>
32 #include <string.h> 32 #include <string.h>
33 #include <errno.h> 33 #include <errno.h>
34 34
35 #ifdef _WIN32
36 #include <Windows.h>
37 #include <sysinfoapi.h>
38 static unsigned long system_page_size() {
39 static unsigned long ps = 0;
40 if (ps == 0) {
41 SYSTEM_INFO sysinfo;
42 GetSystemInfo(&sysinfo);
43 ps = sysinfo.dwPageSize;
44 }
45 return ps;
46 }
47 #define SYSTEM_PAGE_SIZE system_page_size()
48 #else
49 #include <unistd.h>
50 #define SYSTEM_PAGE_SIZE sysconf(_SC_PAGESIZE)
51 #endif
52
53 static int buffer_copy_on_write(CxBuffer* buffer) { 35 static int buffer_copy_on_write(CxBuffer* buffer) {
54 if (0 == (buffer->flags & CX_BUFFER_COPY_ON_WRITE)) return 0; 36 if (0 == (buffer->flags & CX_BUFFER_COPY_ON_WRITE)) return 0;
55 void *newspace = cxMalloc(buffer->allocator, buffer->capacity); 37 void *newspace = cxMalloc(buffer->allocator, buffer->capacity);
56 if (NULL == newspace) return -1; 38 if (NULL == newspace) return -1; // LCOV_EXCL_LINE
57 memcpy(newspace, buffer->space, buffer->size); 39 memcpy(newspace, buffer->space, buffer->size);
58 buffer->space = newspace; 40 buffer->space = newspace;
59 buffer->flags &= ~CX_BUFFER_COPY_ON_WRITE; 41 buffer->flags &= ~CX_BUFFER_COPY_ON_WRITE;
60 buffer->flags |= CX_BUFFER_FREE_CONTENTS; 42 buffer->flags |= CX_BUFFER_FREE_CONTENTS;
61 return 0; 43 return 0;
76 } 58 }
77 buffer->allocator = allocator; 59 buffer->allocator = allocator;
78 buffer->flags = flags; 60 buffer->flags = flags;
79 if (!space) { 61 if (!space) {
80 buffer->bytes = cxMalloc(allocator, capacity); 62 buffer->bytes = cxMalloc(allocator, capacity);
81 if (buffer->bytes == NULL) { 63 if (buffer->bytes == NULL) return -1; // LCOV_EXCL_LINE
82 return -1; // LCOV_EXCL_LINE
83 }
84 buffer->flags |= CX_BUFFER_FREE_CONTENTS; 64 buffer->flags |= CX_BUFFER_FREE_CONTENTS;
85 } else { 65 } else {
86 buffer->bytes = space; 66 buffer->bytes = space;
87 } 67 }
88 buffer->capacity = capacity; 68 buffer->capacity = capacity;
69 buffer->max_capacity = SIZE_MAX;
89 buffer->size = 0; 70 buffer->size = 0;
90 buffer->pos = 0; 71 buffer->pos = 0;
91 72
92 buffer->flush = NULL;
93
94 return 0; 73 return 0;
95 } 74 }
96 75
97 int cxBufferEnableFlushing(
98 CxBuffer *buffer,
99 CxBufferFlushConfig config
100 ) {
101 buffer->flush = cxMallocDefault(sizeof(CxBufferFlushConfig));
102 if (buffer->flush == NULL) return -1; // LCOV_EXCL_LINE
103 memcpy(buffer->flush, &config, sizeof(CxBufferFlushConfig));
104 return 0;
105 }
106
107 void cxBufferDestroy(CxBuffer *buffer) { 76 void cxBufferDestroy(CxBuffer *buffer) {
108 if (buffer->flags & CX_BUFFER_FREE_CONTENTS) { 77 if ((buffer->flags & (CX_BUFFER_FREE_CONTENTS | CX_BUFFER_DO_NOT_FREE))
78 == CX_BUFFER_FREE_CONTENTS) {
109 cxFree(buffer->allocator, buffer->bytes); 79 cxFree(buffer->allocator, buffer->bytes);
110 } 80 }
111 cxFreeDefault(buffer->flush);
112 memset(buffer, 0, sizeof(CxBuffer)); 81 memset(buffer, 0, sizeof(CxBuffer));
113 } 82 }
114 83
115 CxBuffer *cxBufferCreate( 84 CxBuffer *cxBufferCreate(
116 void *space, 85 void *space,
120 ) { 89 ) {
121 if (allocator == NULL) { 90 if (allocator == NULL) {
122 allocator = cxDefaultAllocator; 91 allocator = cxDefaultAllocator;
123 } 92 }
124 CxBuffer *buf = cxMalloc(allocator, sizeof(CxBuffer)); 93 CxBuffer *buf = cxMalloc(allocator, sizeof(CxBuffer));
125 if (buf == NULL) return NULL; 94 if (buf == NULL) return NULL; // LCOV_EXCL_LINE
126 if (0 == cxBufferInit(buf, space, capacity, allocator, flags)) { 95 if (0 == cxBufferInit(buf, space, capacity, allocator, flags)) {
127 return buf; 96 return buf;
128 } else { 97 } else {
129 // LCOV_EXCL_START 98 // LCOV_EXCL_START
130 cxFree(allocator, buf); 99 cxFree(allocator, buf);
181 return 0; 150 return 0;
182 } 151 }
183 152
184 } 153 }
185 154
155 size_t cxBufferPop(CxBuffer *buffer, size_t size, size_t nitems) {
156 size_t len;
157 if (cx_szmul(size, nitems, &len)) {
158 // LCOV_EXCL_START
159 errno = EOVERFLOW;
160 return 0;
161 // LCOV_EXCL_STOP
162 }
163 if (len == 0) return 0;
164 if (len > buffer->size) {
165 if (size == 1) {
166 // simple case: everything can be discarded
167 len = buffer->size;
168 } else {
169 // complicated case: misaligned bytes must stay
170 size_t misalignment = buffer->size % size;
171 len = buffer->size - misalignment;
172 }
173 }
174 buffer->size -= len;
175
176 // adjust position, if required
177 if (buffer->pos > buffer->size) {
178 buffer->pos = buffer->size;
179 }
180
181 return len / size;
182 }
183
186 void cxBufferClear(CxBuffer *buffer) { 184 void cxBufferClear(CxBuffer *buffer) {
187 if (0 == (buffer->flags & CX_BUFFER_COPY_ON_WRITE)) { 185 if (0 == (buffer->flags & CX_BUFFER_COPY_ON_WRITE)) {
188 memset(buffer->bytes, 0, buffer->size); 186 memset(buffer->bytes, 0, buffer->size);
189 } 187 }
190 buffer->size = 0; 188 buffer->size = 0;
198 196
199 bool cxBufferEof(const CxBuffer *buffer) { 197 bool cxBufferEof(const CxBuffer *buffer) {
200 return buffer->pos >= buffer->size; 198 return buffer->pos >= buffer->size;
201 } 199 }
202 200
203 int cxBufferMinimumCapacity( 201 int cxBufferReserve(CxBuffer *buffer, size_t newcap) {
204 CxBuffer *buffer, 202 if (newcap == buffer->capacity) {
205 size_t newcap 203 return 0;
206 ) { 204 }
207 if (newcap <= buffer->capacity) { 205 if (newcap > buffer->max_capacity) {
208 return 0; 206 return -1;
209 } 207 }
210
211 unsigned long pagesize = SYSTEM_PAGE_SIZE;
212 // if page size is larger than 64 KB - for some reason - truncate to 64 KB
213 if (pagesize > 65536) pagesize = 65536;
214 if (newcap < pagesize) {
215 // when smaller as one page, map to the next power of two
216 newcap--;
217 newcap |= newcap >> 1;
218 newcap |= newcap >> 2;
219 newcap |= newcap >> 4;
220 // last operation only needed for pages larger 4096 bytes
221 // but if/else would be more expensive than just doing this
222 newcap |= newcap >> 8;
223 newcap++;
224 } else {
225 // otherwise, map to a multiple of the page size
226 newcap -= newcap % pagesize;
227 newcap += pagesize;
228 // note: if newcap is already page aligned,
229 // this gives a full additional page (which is good)
230 }
231
232
233 const int force_copy_flags = CX_BUFFER_COPY_ON_WRITE | CX_BUFFER_COPY_ON_EXTEND; 208 const int force_copy_flags = CX_BUFFER_COPY_ON_WRITE | CX_BUFFER_COPY_ON_EXTEND;
234 if (buffer->flags & force_copy_flags) { 209 if (buffer->flags & force_copy_flags) {
235 void *newspace = cxMalloc(buffer->allocator, newcap); 210 void *newspace = cxMalloc(buffer->allocator, newcap);
236 if (NULL == newspace) return -1; 211 if (NULL == newspace) return -1;
237 memcpy(newspace, buffer->space, buffer->size); 212 memcpy(newspace, buffer->space, buffer->size);
240 buffer->flags &= ~force_copy_flags; 215 buffer->flags &= ~force_copy_flags;
241 buffer->flags |= CX_BUFFER_FREE_CONTENTS; 216 buffer->flags |= CX_BUFFER_FREE_CONTENTS;
242 return 0; 217 return 0;
243 } else if (cxReallocate(buffer->allocator, 218 } else if (cxReallocate(buffer->allocator,
244 (void **) &buffer->bytes, newcap) == 0) { 219 (void **) &buffer->bytes, newcap) == 0) {
220 buffer->flags |= CX_BUFFER_FREE_CONTENTS;
245 buffer->capacity = newcap; 221 buffer->capacity = newcap;
222 if (buffer->size > newcap) {
223 buffer->size = newcap;
224 }
246 return 0; 225 return 0;
247 } else { 226 } else {
248 return -1; // LCOV_EXCL_LINE 227 return -1; // LCOV_EXCL_LINE
249 } 228 }
229 }
230
231 int cxBufferMaximumCapacity(CxBuffer *buffer, size_t capacity) {
232 if (capacity < buffer->capacity) {
233 return -1;
234 }
235 buffer->max_capacity = capacity;
236 return 0;
237 }
238
239 int cxBufferMinimumCapacity(CxBuffer *buffer, size_t newcap) {
240 if (newcap <= buffer->capacity) {
241 return 0;
242 }
243 if (newcap > buffer->max_capacity) {
244 return -1;
245 }
246 if (newcap < buffer->max_capacity) {
247 unsigned long pagesize = cx_system_page_size();
248 // if page size is larger than 64 KB - for some reason - truncate to 64 KB
249 if (pagesize > 65536) pagesize = 65536;
250 if (newcap < pagesize) {
251 // when smaller as one page, map to the next power of two
252 newcap--;
253 newcap |= newcap >> 1;
254 newcap |= newcap >> 2;
255 newcap |= newcap >> 4;
256 // last operation only needed for pages larger 4096 bytes
257 // but if/else would be more expensive than just doing this
258 newcap |= newcap >> 8;
259 newcap++;
260 } else {
261 // otherwise, map to a multiple of the page size
262 newcap -= newcap % pagesize;
263 newcap += pagesize;
264 // note: if newcap is already page aligned,
265 // this gives a full additional page (which is good)
266 }
267 if (newcap > buffer->max_capacity) {
268 newcap = buffer->max_capacity;
269 }
270 }
271 return cxBufferReserve(buffer, newcap);
250 } 272 }
251 273
252 void cxBufferShrink( 274 void cxBufferShrink(
253 CxBuffer *buffer, 275 CxBuffer *buffer,
254 size_t reserve 276 size_t reserve
269 buffer->capacity = newCapacity; 291 buffer->capacity = newCapacity;
270 } 292 }
271 } 293 }
272 } 294 }
273 295
274 static size_t cx_buffer_flush_helper(
275 const CxBuffer *buffer,
276 const unsigned char *src,
277 size_t size,
278 size_t nitems
279 ) {
280 // flush data from an arbitrary source
281 // does not need to be the buffer's contents
282 size_t max_items = buffer->flush->blksize / size;
283 size_t fblocks = 0;
284 size_t flushed_total = 0;
285 while (nitems > 0 && fblocks < buffer->flush->blkmax) {
286 fblocks++;
287 size_t items = nitems > max_items ? max_items : nitems;
288 size_t flushed = buffer->flush->wfunc(
289 src, size, items, buffer->flush->target);
290 if (flushed > 0) {
291 flushed_total += flushed;
292 src += flushed * size;
293 nitems -= flushed;
294 } else {
295 // if no bytes can be flushed out anymore, we give up
296 break;
297 }
298 }
299 return flushed_total;
300 }
301
302 static size_t cx_buffer_flush_impl(CxBuffer *buffer, size_t size) {
303 // flush the current contents of the buffer
304 unsigned char *space = buffer->bytes;
305 size_t remaining = buffer->pos / size;
306 size_t flushed_total = cx_buffer_flush_helper(
307 buffer, space, size, remaining);
308
309 // shift the buffer left after flushing
310 // IMPORTANT: up to this point, copy on write must have been
311 // performed already, because we can't do error handling here
312 cxBufferShiftLeft(buffer, flushed_total*size);
313
314 return flushed_total;
315 }
316
317 size_t cxBufferFlush(CxBuffer *buffer) {
318 if (buffer_copy_on_write(buffer)) return 0;
319 return cx_buffer_flush_impl(buffer, 1);
320 }
321
322 size_t cxBufferWrite( 296 size_t cxBufferWrite(
323 const void *ptr, 297 const void *ptr,
324 size_t size, 298 size_t size,
325 size_t nitems, 299 size_t nitems,
326 CxBuffer *buffer 300 CxBuffer *buffer
327 ) { 301 ) {
302 // trivial case
303 if (size == 0 || nitems == 0) return 0;
304
328 // optimize for easy case 305 // optimize for easy case
329 if (size == 1 && (buffer->capacity - buffer->pos) >= nitems) { 306 if (size == 1 && (buffer->capacity - buffer->pos) >= nitems) {
330 if (buffer_copy_on_write(buffer)) return 0; 307 if (buffer_copy_on_write(buffer)) return 0;
331 memcpy(buffer->bytes + buffer->pos, ptr, nitems); 308 memcpy(buffer->bytes + buffer->pos, ptr, nitems);
332 buffer->pos += nitems; 309 buffer->pos += nitems;
334 buffer->size = buffer->pos; 311 buffer->size = buffer->pos;
335 } 312 }
336 return nitems; 313 return nitems;
337 } 314 }
338 315
339 size_t len, total_flushed = 0; 316 size_t len;
340 cx_buffer_write_retry:
341 if (cx_szmul(size, nitems, &len)) { 317 if (cx_szmul(size, nitems, &len)) {
342 errno = EOVERFLOW; 318 errno = EOVERFLOW;
343 return total_flushed; 319 return 0;
344 } 320 }
345 if (buffer->pos > SIZE_MAX - len) { 321 if (buffer->pos > SIZE_MAX - len) {
346 errno = EOVERFLOW; 322 errno = EOVERFLOW;
347 return total_flushed; 323 return 0;
348 } 324 }
349 325 const size_t required = buffer->pos + len;
350 size_t required = buffer->pos + len; 326
351 bool perform_flush = false; 327 // check if we need to auto-extend
352 if (required > buffer->capacity) { 328 if (required > buffer->capacity) {
353 if (buffer->flags & CX_BUFFER_AUTO_EXTEND) { 329 if (buffer->flags & CX_BUFFER_AUTO_EXTEND) {
354 if (buffer->flush != NULL && required > buffer->flush->threshold) { 330 size_t newcap = required < buffer->max_capacity
355 perform_flush = true; 331 ? required : buffer->max_capacity;
356 } else { 332 if (cxBufferMinimumCapacity(buffer, newcap)) {
357 if (cxBufferMinimumCapacity(buffer, required)) { 333 return 0; // LCOV_EXCL_LINE
358 return total_flushed; // LCOV_EXCL_LINE
359 }
360 } 334 }
361 } else { 335 }
362 if (buffer->flush != NULL) { 336 }
363 perform_flush = true; 337
364 } else { 338 // check again and truncate data if capacity is still not enough
365 // truncate data, if we can neither extend nor flush 339 if (required > buffer->capacity) {
366 len = buffer->capacity - buffer->pos; 340 len = buffer->capacity - buffer->pos;
367 if (size > 1) { 341 if (size > 1) {
368 len -= len % size; 342 len -= len % size;
369 } 343 }
370 nitems = len / size; 344 nitems = len / size;
371 }
372 }
373 } 345 }
374 346
375 // check here and not above because of possible truncation 347 // check here and not above because of possible truncation
376 if (len == 0) { 348 if (len == 0) {
377 return total_flushed; 349 return 0;
378 } 350 }
379 351
380 // check if we need to copy 352 // check if we need to copy
381 if (buffer_copy_on_write(buffer)) return 0; 353 if (buffer_copy_on_write(buffer)) return 0;
382 354
383 // perform the operation 355 // perform the operation
384 if (perform_flush) { 356 memcpy(buffer->bytes + buffer->pos, ptr, len);
385 size_t items_flushed; 357 buffer->pos += len;
386 if (buffer->pos == 0) { 358 if (buffer->pos > buffer->size) {
387 // if we don't have data in the buffer, but are instructed 359 buffer->size = buffer->pos;
388 // to flush, it means that we are supposed to relay the data 360 }
389 items_flushed = cx_buffer_flush_helper(buffer, ptr, size, nitems); 361 return nitems;
390 if (items_flushed == 0) {
391 // we needed to relay data, but could not flush anything
392 // i.e. we have to give up to avoid endless trying
393 return 0;
394 }
395 nitems -= items_flushed;
396 total_flushed += items_flushed;
397 if (nitems > 0) {
398 ptr = ((unsigned char*)ptr) + items_flushed * size;
399 goto cx_buffer_write_retry;
400 }
401 return total_flushed;
402 } else {
403 items_flushed = cx_buffer_flush_impl(buffer, size);
404 if (items_flushed == 0) {
405 // flush target is full, let's try to truncate
406 size_t remaining_space;
407 if (buffer->flags & CX_BUFFER_AUTO_EXTEND) {
408 remaining_space = buffer->flush->threshold > buffer->pos
409 ? buffer->flush->threshold - buffer->pos
410 : 0;
411 } else {
412 remaining_space = buffer->capacity > buffer->pos
413 ? buffer->capacity - buffer->pos
414 : 0;
415 }
416 nitems = remaining_space / size;
417 if (nitems == 0) {
418 return total_flushed;
419 }
420 }
421 goto cx_buffer_write_retry;
422 }
423 } else {
424 memcpy(buffer->bytes + buffer->pos, ptr, len);
425 buffer->pos += len;
426 if (buffer->pos > buffer->size) {
427 buffer->size = buffer->pos;
428 }
429 return total_flushed + nitems;
430 }
431 } 362 }
432 363
433 size_t cxBufferAppend( 364 size_t cxBufferAppend(
434 const void *ptr, 365 const void *ptr,
435 size_t size, 366 size_t size,
436 size_t nitems, 367 size_t nitems,
437 CxBuffer *buffer 368 CxBuffer *buffer
438 ) { 369 ) {
439 size_t pos = buffer->pos; 370 // trivial case
440 size_t append_pos = buffer->size; 371 if (size == 0 || nitems == 0) return 0;
441 buffer->pos = append_pos; 372
442 size_t written = cxBufferWrite(ptr, size, nitems, buffer); 373 const size_t pos = buffer->pos;
443 // the buffer might have been flushed 374 buffer->pos = buffer->size;
444 // we must compute a possible delta for the position 375 const size_t written = cxBufferWrite(ptr, size, nitems, buffer);
445 // expected: pos = append_pos + written 376 buffer->pos = pos;
446 // -> if this is not the case, there is a delta
447 size_t delta = append_pos + written*size - buffer->pos;
448 if (delta > pos) {
449 buffer->pos = 0;
450 } else {
451 buffer->pos = pos - delta;
452 }
453 return written; 377 return written;
454 } 378 }
455 379
456 int cxBufferPut( 380 int cxBufferPut(
457 CxBuffer *buffer, 381 CxBuffer *buffer,
465 return EOF; 389 return EOF;
466 } 390 }
467 } 391 }
468 392
469 int cxBufferTerminate(CxBuffer *buffer) { 393 int cxBufferTerminate(CxBuffer *buffer) {
470 if (0 == cxBufferPut(buffer, 0)) { 394 // try to extend / shrink the buffer
471 buffer->size = buffer->pos - 1; 395 if (buffer->pos >= buffer->capacity) {
472 return 0; 396 if ((buffer->flags & CX_BUFFER_AUTO_EXTEND) == 0) {
473 } else { 397 return -1;
474 return -1; 398 }
475 } 399 if (cxBufferReserve(buffer, buffer->pos + 1)) {
476 } 400 return -1; // LCOV_EXCL_LINE
477 401 }
478 size_t cxBufferPutString( 402 } else {
479 CxBuffer *buffer, 403 buffer->size = buffer->pos;
480 const char *str 404 cxBufferShrink(buffer, 1);
481 ) { 405 // set the capacity explicitly, in case shrink was skipped due to CoW
482 return cxBufferWrite(str, 1, strlen(str), buffer); 406 buffer->capacity = buffer->size + 1;
407 }
408
409 // check if we are still on read-only memory
410 if (buffer_copy_on_write(buffer)) return -1;
411
412 // write the terminator and exit
413 buffer->space[buffer->pos] = '\0';
414 return 0;
415 }
416
417 size_t cx_buffer_put_string(CxBuffer *buffer, cxstring str) {
418 return cxBufferWrite(str.ptr, 1, str.length, buffer);
419 }
420
421 size_t cx_buffer_append_string(CxBuffer *buffer, cxstring str) {
422 return cxBufferAppend(str.ptr, 1, str.length, buffer);
483 } 423 }
484 424
485 size_t cxBufferRead( 425 size_t cxBufferRead(
486 void *ptr, 426 void *ptr,
487 size_t size, 427 size_t size,

mercurial