ucx/buffer.c

changeset 852
83fdf679df99
parent 816
839fefbdedc7
equal deleted inserted replaced
850:bbe2925eb590 852:83fdf679df99
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE. 26 * POSSIBILITY OF SUCH DAMAGE.
27 */ 27 */
28 28
29 #include "cx/buffer.h" 29 #include "cx/buffer.h"
30 #include "cx/utils.h"
31 30
32 #include <stdio.h> 31 #include <stdio.h>
33 #include <string.h> 32 #include <string.h>
33 #include <errno.h>
34
35 static int buffer_copy_on_write(CxBuffer* buffer) {
36 if (0 == (buffer->flags & CX_BUFFER_COPY_ON_WRITE)) return 0;
37 void *newspace = cxMalloc(buffer->allocator, buffer->capacity);
38 if (NULL == newspace) return -1;
39 memcpy(newspace, buffer->space, buffer->size);
40 buffer->space = newspace;
41 buffer->flags &= ~CX_BUFFER_COPY_ON_WRITE;
42 buffer->flags |= CX_BUFFER_FREE_CONTENTS;
43 return 0;
44 }
34 45
35 int cxBufferInit( 46 int cxBufferInit(
36 CxBuffer *buffer, 47 CxBuffer *buffer,
37 void *space, 48 void *space,
38 size_t capacity, 49 size_t capacity,
39 CxAllocator const *allocator, 50 const CxAllocator *allocator,
40 int flags 51 int flags
41 ) { 52 ) {
42 if (allocator == NULL) allocator = cxDefaultAllocator; 53 if (allocator == NULL) {
54 allocator = cxDefaultAllocator;
55 }
56 if (flags & CX_BUFFER_COPY_ON_EXTEND) {
57 flags |= CX_BUFFER_AUTO_EXTEND;
58 }
43 buffer->allocator = allocator; 59 buffer->allocator = allocator;
44 buffer->flags = flags; 60 buffer->flags = flags;
45 if (!space) { 61 if (!space) {
46 buffer->bytes = cxMalloc(allocator, capacity); 62 buffer->bytes = cxMalloc(allocator, capacity);
47 if (buffer->bytes == NULL) { 63 if (buffer->bytes == NULL) {
48 return 1; 64 return -1; // LCOV_EXCL_LINE
49 } 65 }
50 buffer->flags |= CX_BUFFER_FREE_CONTENTS; 66 buffer->flags |= CX_BUFFER_FREE_CONTENTS;
51 } else { 67 } else {
52 buffer->bytes = space; 68 buffer->bytes = space;
53 } 69 }
54 buffer->capacity = capacity; 70 buffer->capacity = capacity;
55 buffer->size = 0; 71 buffer->size = 0;
56 buffer->pos = 0; 72 buffer->pos = 0;
57 73
58 buffer->flush_func = NULL; 74 buffer->flush = NULL;
59 buffer->flush_target = NULL;
60 buffer->flush_blkmax = 0;
61 buffer->flush_blksize = 4096;
62 buffer->flush_threshold = SIZE_MAX;
63 75
64 return 0; 76 return 0;
65 } 77 }
66 78
79 int cxBufferEnableFlushing(
80 CxBuffer *buffer,
81 CxBufferFlushConfig config
82 ) {
83 buffer->flush = malloc(sizeof(CxBufferFlushConfig));
84 if (buffer->flush == NULL) return -1; // LCOV_EXCL_LINE
85 memcpy(buffer->flush, &config, sizeof(CxBufferFlushConfig));
86 return 0;
87 }
88
67 void cxBufferDestroy(CxBuffer *buffer) { 89 void cxBufferDestroy(CxBuffer *buffer) {
68 if ((buffer->flags & CX_BUFFER_FREE_CONTENTS) == CX_BUFFER_FREE_CONTENTS) { 90 if (buffer->flags & CX_BUFFER_FREE_CONTENTS) {
69 cxFree(buffer->allocator, buffer->bytes); 91 cxFree(buffer->allocator, buffer->bytes);
70 } 92 }
93 free(buffer->flush);
94 memset(buffer, 0, sizeof(CxBuffer));
71 } 95 }
72 96
73 CxBuffer *cxBufferCreate( 97 CxBuffer *cxBufferCreate(
74 void *space, 98 void *space,
75 size_t capacity, 99 size_t capacity,
76 CxAllocator const *allocator, 100 const CxAllocator *allocator,
77 int flags 101 int flags
78 ) { 102 ) {
103 if (allocator == NULL) {
104 allocator = cxDefaultAllocator;
105 }
79 CxBuffer *buf = cxMalloc(allocator, sizeof(CxBuffer)); 106 CxBuffer *buf = cxMalloc(allocator, sizeof(CxBuffer));
80 if (buf == NULL) return NULL; 107 if (buf == NULL) return NULL;
81 if (0 == cxBufferInit(buf, space, capacity, allocator, flags)) { 108 if (0 == cxBufferInit(buf, space, capacity, allocator, flags)) {
82 return buf; 109 return buf;
83 } else { 110 } else {
111 // LCOV_EXCL_START
84 cxFree(allocator, buf); 112 cxFree(allocator, buf);
85 return NULL; 113 return NULL;
114 // LCOV_EXCL_STOP
86 } 115 }
87 } 116 }
88 117
89 void cxBufferFree(CxBuffer *buffer) { 118 void cxBufferFree(CxBuffer *buffer) {
90 if ((buffer->flags & CX_BUFFER_FREE_CONTENTS) == CX_BUFFER_FREE_CONTENTS) { 119 if (buffer == NULL) return;
91 cxFree(buffer->allocator, buffer->bytes); 120 const CxAllocator *allocator = buffer->allocator;
92 } 121 cxBufferDestroy(buffer);
93 cxFree(buffer->allocator, buffer); 122 cxFree(allocator, buffer);
94 } 123 }
95 124
96 int cxBufferSeek( 125 int cxBufferSeek(
97 CxBuffer *buffer, 126 CxBuffer *buffer,
98 off_t offset, 127 off_t offset,
115 144
116 size_t opos = npos; 145 size_t opos = npos;
117 npos += offset; 146 npos += offset;
118 147
119 if ((offset > 0 && npos < opos) || (offset < 0 && npos > opos)) { 148 if ((offset > 0 && npos < opos) || (offset < 0 && npos > opos)) {
149 errno = EOVERFLOW;
120 return -1; 150 return -1;
121 } 151 }
122 152
123 if (npos >= buffer->size) { 153 if (npos > buffer->size) {
124 return -1; 154 return -1;
125 } else { 155 } else {
126 buffer->pos = npos; 156 buffer->pos = npos;
127 return 0; 157 return 0;
128 } 158 }
129 159
130 } 160 }
131 161
132 void cxBufferClear(CxBuffer *buffer) { 162 void cxBufferClear(CxBuffer *buffer) {
133 memset(buffer->bytes, 0, buffer->size); 163 if (0 == (buffer->flags & CX_BUFFER_COPY_ON_WRITE)) {
164 memset(buffer->bytes, 0, buffer->size);
165 }
134 buffer->size = 0; 166 buffer->size = 0;
135 buffer->pos = 0; 167 buffer->pos = 0;
136 } 168 }
137 169
138 void cxBufferReset(CxBuffer *buffer) { 170 void cxBufferReset(CxBuffer *buffer) {
139 buffer->size = 0; 171 buffer->size = 0;
140 buffer->pos = 0; 172 buffer->pos = 0;
141 } 173 }
142 174
143 int cxBufferEof(CxBuffer const *buffer) { 175 bool cxBufferEof(const CxBuffer *buffer) {
144 return buffer->pos >= buffer->size; 176 return buffer->pos >= buffer->size;
145 } 177 }
146 178
147 int cxBufferMinimumCapacity( 179 int cxBufferMinimumCapacity(
148 CxBuffer *buffer, 180 CxBuffer *buffer,
150 ) { 182 ) {
151 if (newcap <= buffer->capacity) { 183 if (newcap <= buffer->capacity) {
152 return 0; 184 return 0;
153 } 185 }
154 186
155 if (cxReallocate(buffer->allocator, 187 const int force_copy_flags = CX_BUFFER_COPY_ON_WRITE | CX_BUFFER_COPY_ON_EXTEND;
188 if (buffer->flags & force_copy_flags) {
189 void *newspace = cxMalloc(buffer->allocator, newcap);
190 if (NULL == newspace) return -1;
191 memcpy(newspace, buffer->space, buffer->size);
192 buffer->space = newspace;
193 buffer->capacity = newcap;
194 buffer->flags &= ~force_copy_flags;
195 buffer->flags |= CX_BUFFER_FREE_CONTENTS;
196 return 0;
197 } else if (cxReallocate(buffer->allocator,
156 (void **) &buffer->bytes, newcap) == 0) { 198 (void **) &buffer->bytes, newcap) == 0) {
157 buffer->capacity = newcap; 199 buffer->capacity = newcap;
158 return 0; 200 return 0;
159 } else { 201 } else {
160 return -1; 202 return -1; // LCOV_EXCL_LINE
161 } 203 }
162 } 204 }
163 205
164 /** 206 static size_t cx_buffer_flush_helper(
165 * Helps flushing data to the flush target of a buffer. 207 const CxBuffer *buffer,
166 *
167 * @param buffer the buffer containing the config
168 * @param space the data to flush
169 * @param size the element size
170 * @param nitems the number of items
171 * @return the number of items flushed
172 */
173 static size_t cx_buffer_write_flush_helper(
174 CxBuffer *buffer,
175 unsigned char const *space,
176 size_t size, 208 size_t size,
209 const unsigned char *src,
177 size_t nitems 210 size_t nitems
178 ) { 211 ) {
179 size_t pos = 0; 212 // flush data from an arbitrary source
180 size_t remaining = nitems; 213 // does not need to be the buffer's contents
181 size_t max_items = buffer->flush_blksize / size; 214 size_t max_items = buffer->flush->blksize / size;
182 while (remaining > 0) { 215 size_t fblocks = 0;
183 size_t items = remaining > max_items ? max_items : remaining; 216 size_t flushed_total = 0;
184 size_t flushed = buffer->flush_func( 217 while (nitems > 0 && fblocks < buffer->flush->blkmax) {
185 space + pos, 218 fblocks++;
186 size, items, 219 size_t items = nitems > max_items ? max_items : nitems;
187 buffer->flush_target); 220 size_t flushed = buffer->flush->wfunc(
221 src, size, items, buffer->flush->target);
188 if (flushed > 0) { 222 if (flushed > 0) {
189 pos += (flushed * size); 223 flushed_total += flushed;
190 remaining -= flushed; 224 src += flushed * size;
225 nitems -= flushed;
191 } else { 226 } else {
192 // if no bytes can be flushed out anymore, we give up 227 // if no bytes can be flushed out anymore, we give up
193 break; 228 break;
194 } 229 }
195 } 230 }
196 return nitems - remaining; 231 return flushed_total;
232 }
233
234 static size_t cx_buffer_flush_impl(CxBuffer *buffer, size_t size) {
235 // flush the current contents of the buffer
236 unsigned char *space = buffer->bytes;
237 size_t remaining = buffer->pos / size;
238 size_t flushed_total = cx_buffer_flush_helper(
239 buffer, size, space, remaining);
240
241 // shift the buffer left after flushing
242 // IMPORTANT: up to this point, copy on write must have been
243 // performed already, because we can't do error handling here
244 cxBufferShiftLeft(buffer, flushed_total*size);
245
246 return flushed_total;
247 }
248
249 size_t cxBufferFlush(CxBuffer *buffer) {
250 if (buffer_copy_on_write(buffer)) return 0;
251 return cx_buffer_flush_impl(buffer, 1);
197 } 252 }
198 253
199 size_t cxBufferWrite( 254 size_t cxBufferWrite(
200 void const *ptr, 255 const void *ptr,
201 size_t size, 256 size_t size,
202 size_t nitems, 257 size_t nitems,
203 CxBuffer *buffer 258 CxBuffer *buffer
204 ) { 259 ) {
205 // optimize for easy case 260 // optimize for easy case
206 if (size == 1 && (buffer->capacity - buffer->pos) >= nitems) { 261 if (size == 1 && (buffer->capacity - buffer->pos) >= nitems) {
262 if (buffer_copy_on_write(buffer)) return 0;
207 memcpy(buffer->bytes + buffer->pos, ptr, nitems); 263 memcpy(buffer->bytes + buffer->pos, ptr, nitems);
208 buffer->pos += nitems; 264 buffer->pos += nitems;
209 if (buffer->pos > buffer->size) { 265 if (buffer->pos > buffer->size) {
210 buffer->size = buffer->pos; 266 buffer->size = buffer->pos;
211 } 267 }
212 return nitems; 268 return nitems;
213 } 269 }
214 270
215 size_t len; 271 size_t len;
216 size_t nitems_out = nitems;
217 if (cx_szmul(size, nitems, &len)) { 272 if (cx_szmul(size, nitems, &len)) {
273 errno = EOVERFLOW;
274 return 0;
275 }
276 if (buffer->pos > SIZE_MAX - len) {
277 errno = EOVERFLOW;
218 return 0; 278 return 0;
219 } 279 }
220 size_t required = buffer->pos + len; 280 size_t required = buffer->pos + len;
221 if (buffer->pos > required) {
222 return 0;
223 }
224 281
225 bool perform_flush = false; 282 bool perform_flush = false;
226 if (required > buffer->capacity) { 283 if (required > buffer->capacity) {
227 if ((buffer->flags & CX_BUFFER_AUTO_EXTEND) == CX_BUFFER_AUTO_EXTEND && required) { 284 if (buffer->flags & CX_BUFFER_AUTO_EXTEND) {
228 if (buffer->flush_blkmax > 0 && required > buffer->flush_threshold) { 285 if (buffer->flush != NULL && required > buffer->flush->threshold) {
229 perform_flush = true; 286 perform_flush = true;
230 } else { 287 } else {
231 if (cxBufferMinimumCapacity(buffer, required)) { 288 if (cxBufferMinimumCapacity(buffer, required)) {
232 return 0; 289 return 0; // LCOV_EXCL_LINE
233 } 290 }
234 } 291 }
235 } else { 292 } else {
236 if (buffer->flush_blkmax > 0) { 293 if (buffer->flush != NULL) {
237 perform_flush = true; 294 perform_flush = true;
238 } else { 295 } else {
239 // truncate data to be written, if we can neither extend nor flush 296 // truncate data, if we can neither extend nor flush
240 len = buffer->capacity - buffer->pos; 297 len = buffer->capacity - buffer->pos;
241 if (size > 1) { 298 if (size > 1) {
242 len -= len % size; 299 len -= len % size;
243 } 300 }
244 nitems_out = len / size; 301 nitems = len / size;
245 } 302 }
246 } 303 }
247 } 304 }
248 305
306 // check here and not above because of possible truncation
249 if (len == 0) { 307 if (len == 0) {
250 return len; 308 return 0;
251 } 309 }
252 310
311 // check if we need to copy
312 if (buffer_copy_on_write(buffer)) return 0;
313
314 // perform the operation
253 if (perform_flush) { 315 if (perform_flush) {
254 size_t flush_max; 316 size_t items_flush;
255 if (cx_szmul(buffer->flush_blkmax, buffer->flush_blksize, &flush_max)) { 317 if (buffer->pos == 0) {
256 return 0; 318 // if we don't have data in the buffer, but are instructed
257 } 319 // to flush, it means that we are supposed to relay the data
258 size_t flush_pos = buffer->flush_func == NULL || buffer->flush_target == NULL 320 items_flush = cx_buffer_flush_helper(buffer, size, ptr, nitems);
259 ? buffer->pos 321 if (items_flush == 0) {
260 : cx_buffer_write_flush_helper(buffer, buffer->bytes, 1, buffer->pos); 322 // we needed to flush, but could not flush anything
261 if (flush_pos == buffer->pos) { 323 // give up and avoid endless trying
262 // entire buffer has been flushed, we can reset 324 return 0;
263 buffer->size = buffer->pos = 0;
264
265 size_t items_flush; // how many items can also be directly flushed
266 size_t items_keep; // how many items have to be written to the buffer
267
268 items_flush = flush_max >= required ? nitems : (flush_max - flush_pos) / size;
269 if (items_flush > 0) {
270 items_flush = cx_buffer_write_flush_helper(buffer, ptr, size, items_flush / size);
271 // in case we could not flush everything, keep the rest
272 } 325 }
273 items_keep = nitems - items_flush; 326 size_t ritems = nitems - items_flush;
274 if (items_keep > 0) { 327 const unsigned char *rest = ptr;
275 // try again with the remaining stuff 328 rest += items_flush * size;
276 unsigned char const *new_ptr = ptr; 329 return items_flush + cxBufferWrite(rest, size, ritems, buffer);
277 new_ptr += items_flush * size; 330 } else {
278 // report the directly flushed items as written plus the remaining stuff 331 items_flush = cx_buffer_flush_impl(buffer, size);
279 return items_flush + cxBufferWrite(new_ptr, size, items_keep, buffer); 332 if (items_flush == 0) {
280 } else { 333 return 0;
281 // all items have been flushed - report them as written
282 return nitems;
283 } 334 }
284 } else if (flush_pos == 0) {
285 // nothing could be flushed at all, we immediately give up without writing any data
286 return 0;
287 } else {
288 // we were partially successful, we shift left and try again
289 cxBufferShiftLeft(buffer, flush_pos);
290 return cxBufferWrite(ptr, size, nitems, buffer); 335 return cxBufferWrite(ptr, size, nitems, buffer);
291 } 336 }
292 } else { 337 } else {
293 memcpy(buffer->bytes + buffer->pos, ptr, len); 338 memcpy(buffer->bytes + buffer->pos, ptr, len);
294 buffer->pos += len; 339 buffer->pos += len;
295 if (buffer->pos > buffer->size) { 340 if (buffer->pos > buffer->size) {
296 buffer->size = buffer->pos; 341 buffer->size = buffer->pos;
297 } 342 }
298 return nitems_out; 343 return nitems;
299 } 344 }
300 345
346 }
347
348 size_t cxBufferAppend(
349 const void *ptr,
350 size_t size,
351 size_t nitems,
352 CxBuffer *buffer
353 ) {
354 size_t pos = buffer->pos;
355 buffer->pos = buffer->size;
356 size_t written = cxBufferWrite(ptr, size, nitems, buffer);
357 buffer->pos = pos;
358 return written;
301 } 359 }
302 360
303 int cxBufferPut( 361 int cxBufferPut(
304 CxBuffer *buffer, 362 CxBuffer *buffer,
305 int c 363 int c
308 unsigned char const ch = c; 366 unsigned char const ch = c;
309 if (cxBufferWrite(&ch, 1, 1, buffer) == 1) { 367 if (cxBufferWrite(&ch, 1, 1, buffer) == 1) {
310 return c; 368 return c;
311 } else { 369 } else {
312 return EOF; 370 return EOF;
371 }
372 }
373
374 int cxBufferTerminate(CxBuffer *buffer) {
375 bool success = 0 == cxBufferPut(buffer, 0);
376 if (success) {
377 buffer->pos--;
378 buffer->size--;
379 return 0;
380 } else {
381 return -1;
313 } 382 }
314 } 383 }
315 384
316 size_t cxBufferPutString( 385 size_t cxBufferPutString(
317 CxBuffer *buffer, 386 CxBuffer *buffer,
326 size_t nitems, 395 size_t nitems,
327 CxBuffer *buffer 396 CxBuffer *buffer
328 ) { 397 ) {
329 size_t len; 398 size_t len;
330 if (cx_szmul(size, nitems, &len)) { 399 if (cx_szmul(size, nitems, &len)) {
400 errno = EOVERFLOW;
331 return 0; 401 return 0;
332 } 402 }
333 if (buffer->pos + len > buffer->size) { 403 if (buffer->pos + len > buffer->size) {
334 len = buffer->size - buffer->pos; 404 len = buffer->size - buffer->pos;
335 if (size > 1) len -= len % size; 405 if (size > 1) len -= len % size;
360 size_t shift 430 size_t shift
361 ) { 431 ) {
362 if (shift >= buffer->size) { 432 if (shift >= buffer->size) {
363 buffer->pos = buffer->size = 0; 433 buffer->pos = buffer->size = 0;
364 } else { 434 } else {
435 if (buffer_copy_on_write(buffer)) return -1;
365 memmove(buffer->bytes, buffer->bytes + shift, buffer->size - shift); 436 memmove(buffer->bytes, buffer->bytes + shift, buffer->size - shift);
366 buffer->size -= shift; 437 buffer->size -= shift;
367 438
368 if (buffer->pos >= shift) { 439 if (buffer->pos >= shift) {
369 buffer->pos -= shift; 440 buffer->pos -= shift;
376 447
377 int cxBufferShiftRight( 448 int cxBufferShiftRight(
378 CxBuffer *buffer, 449 CxBuffer *buffer,
379 size_t shift 450 size_t shift
380 ) { 451 ) {
452 if (buffer->size > SIZE_MAX - shift) {
453 errno = EOVERFLOW;
454 return -1;
455 }
381 size_t req_capacity = buffer->size + shift; 456 size_t req_capacity = buffer->size + shift;
382 size_t movebytes; 457 size_t movebytes;
383 458
384 // auto extend buffer, if required and enabled 459 // auto extend buffer, if required and enabled
385 if (buffer->capacity < req_capacity) { 460 if (buffer->capacity < req_capacity) {
386 if ((buffer->flags & CX_BUFFER_AUTO_EXTEND) == CX_BUFFER_AUTO_EXTEND) { 461 if (buffer->flags & CX_BUFFER_AUTO_EXTEND) {
387 if (cxBufferMinimumCapacity(buffer, req_capacity)) { 462 if (cxBufferMinimumCapacity(buffer, req_capacity)) {
388 return 1; 463 return -1; // LCOV_EXCL_LINE
389 } 464 }
390 movebytes = buffer->size; 465 movebytes = buffer->size;
391 } else { 466 } else {
392 movebytes = buffer->capacity - shift; 467 movebytes = buffer->capacity - shift;
393 } 468 }
394 } else { 469 } else {
395 movebytes = buffer->size; 470 movebytes = buffer->size;
396 } 471 }
397 472
398 memmove(buffer->bytes + shift, buffer->bytes, movebytes); 473 if (movebytes > 0) {
399 buffer->size = shift + movebytes; 474 if (buffer_copy_on_write(buffer)) return -1;
475 memmove(buffer->bytes + shift, buffer->bytes, movebytes);
476 buffer->size = shift + movebytes;
477 }
400 478
401 buffer->pos += shift; 479 buffer->pos += shift;
402 if (buffer->pos > buffer->size) { 480 if (buffer->pos > buffer->size) {
403 buffer->pos = buffer->size; 481 buffer->pos = buffer->size;
404 } 482 }

mercurial