ucx/buffer.c

changeset 102
64ded9f6a6c6
parent 101
7b3a3130be44
equal deleted inserted replaced
101:7b3a3130be44 102:64ded9f6a6c6
203 } 203 }
204 } 204 }
205 205
206 static size_t cx_buffer_flush_helper( 206 static size_t cx_buffer_flush_helper(
207 const CxBuffer *buffer, 207 const CxBuffer *buffer,
208 const unsigned char *src,
208 size_t size, 209 size_t size,
209 const unsigned char *src,
210 size_t nitems 210 size_t nitems
211 ) { 211 ) {
212 // flush data from an arbitrary source 212 // flush data from an arbitrary source
213 // does not need to be the buffer's contents 213 // does not need to be the buffer's contents
214 size_t max_items = buffer->flush->blksize / size; 214 size_t max_items = buffer->flush->blksize / size;
234 static size_t cx_buffer_flush_impl(CxBuffer *buffer, size_t size) { 234 static size_t cx_buffer_flush_impl(CxBuffer *buffer, size_t size) {
235 // flush the current contents of the buffer 235 // flush the current contents of the buffer
236 unsigned char *space = buffer->bytes; 236 unsigned char *space = buffer->bytes;
237 size_t remaining = buffer->pos / size; 237 size_t remaining = buffer->pos / size;
238 size_t flushed_total = cx_buffer_flush_helper( 238 size_t flushed_total = cx_buffer_flush_helper(
239 buffer, size, space, remaining); 239 buffer, space, size, remaining);
240 240
241 // shift the buffer left after flushing 241 // shift the buffer left after flushing
242 // IMPORTANT: up to this point, copy on write must have been 242 // IMPORTANT: up to this point, copy on write must have been
243 // performed already, because we can't do error handling here 243 // performed already, because we can't do error handling here
244 cxBufferShiftLeft(buffer, flushed_total*size); 244 cxBufferShiftLeft(buffer, flushed_total*size);
266 buffer->size = buffer->pos; 266 buffer->size = buffer->pos;
267 } 267 }
268 return nitems; 268 return nitems;
269 } 269 }
270 270
271 size_t len; 271 size_t len, total_flushed = 0;
272 cx_buffer_write_retry:
272 if (cx_szmul(size, nitems, &len)) { 273 if (cx_szmul(size, nitems, &len)) {
273 errno = EOVERFLOW; 274 errno = EOVERFLOW;
274 return 0; 275 return total_flushed;
275 } 276 }
276 if (buffer->pos > SIZE_MAX - len) { 277 if (buffer->pos > SIZE_MAX - len) {
277 errno = EOVERFLOW; 278 errno = EOVERFLOW;
278 return 0; 279 return total_flushed;
279 } 280 }
281
280 size_t required = buffer->pos + len; 282 size_t required = buffer->pos + len;
281
282 bool perform_flush = false; 283 bool perform_flush = false;
283 if (required > buffer->capacity) { 284 if (required > buffer->capacity) {
284 if (buffer->flags & CX_BUFFER_AUTO_EXTEND) { 285 if (buffer->flags & CX_BUFFER_AUTO_EXTEND) {
285 if (buffer->flush != NULL && required > buffer->flush->threshold) { 286 if (buffer->flush != NULL && required > buffer->flush->threshold) {
286 perform_flush = true; 287 perform_flush = true;
287 } else { 288 } else {
288 if (cxBufferMinimumCapacity(buffer, required)) { 289 if (cxBufferMinimumCapacity(buffer, required)) {
289 return 0; // LCOV_EXCL_LINE 290 return total_flushed; // LCOV_EXCL_LINE
290 } 291 }
291 } 292 }
292 } else { 293 } else {
293 if (buffer->flush != NULL) { 294 if (buffer->flush != NULL) {
294 perform_flush = true; 295 perform_flush = true;
303 } 304 }
304 } 305 }
305 306
306 // check here and not above because of possible truncation 307 // check here and not above because of possible truncation
307 if (len == 0) { 308 if (len == 0) {
308 return 0; 309 return total_flushed;
309 } 310 }
310 311
311 // check if we need to copy 312 // check if we need to copy
312 if (buffer_copy_on_write(buffer)) return 0; 313 if (buffer_copy_on_write(buffer)) return 0;
313 314
314 // perform the operation 315 // perform the operation
315 if (perform_flush) { 316 if (perform_flush) {
316 size_t items_flush; 317 size_t items_flushed;
317 if (buffer->pos == 0) { 318 if (buffer->pos == 0) {
318 // if we don't have data in the buffer, but are instructed 319 // if we don't have data in the buffer, but are instructed
319 // to flush, it means that we are supposed to relay the data 320 // to flush, it means that we are supposed to relay the data
320 items_flush = cx_buffer_flush_helper(buffer, size, ptr, nitems); 321 items_flushed = cx_buffer_flush_helper(buffer, ptr, size, nitems);
321 if (items_flush == 0) { 322 if (items_flushed == 0) {
322 // we needed to flush, but could not flush anything 323 // we needed to relay data, but could not flush anything
323 // give up and avoid endless trying 324 // i.e. we have to give up to avoid endless trying
324 return 0; 325 return 0;
325 } 326 }
326 size_t ritems = nitems - items_flush; 327 nitems -= items_flushed;
327 const unsigned char *rest = ptr; 328 total_flushed += items_flushed;
328 rest += items_flush * size; 329 if (nitems > 0) {
329 return items_flush + cxBufferWrite(rest, size, ritems, buffer); 330 ptr = ((unsigned char*)ptr) + items_flushed * size;
331 goto cx_buffer_write_retry;
332 }
333 return total_flushed;
330 } else { 334 } else {
331 items_flush = cx_buffer_flush_impl(buffer, size); 335 items_flushed = cx_buffer_flush_impl(buffer, size);
332 if (items_flush == 0) { 336 if (items_flushed == 0) {
333 return 0; 337 // flush target is full, let's try to truncate
338 size_t remaining_space;
339 if (buffer->flags & CX_BUFFER_AUTO_EXTEND) {
340 remaining_space = buffer->flush->threshold > buffer->pos
341 ? buffer->flush->threshold - buffer->pos
342 : 0;
343 } else {
344 remaining_space = buffer->capacity > buffer->pos
345 ? buffer->capacity - buffer->pos
346 : 0;
347 }
348 nitems = remaining_space / size;
349 if (nitems == 0) {
350 return total_flushed;
351 }
334 } 352 }
335 return cxBufferWrite(ptr, size, nitems, buffer); 353 goto cx_buffer_write_retry;
336 } 354 }
337 } else { 355 } else {
338 memcpy(buffer->bytes + buffer->pos, ptr, len); 356 memcpy(buffer->bytes + buffer->pos, ptr, len);
339 buffer->pos += len; 357 buffer->pos += len;
340 if (buffer->pos > buffer->size) { 358 if (buffer->pos > buffer->size) {
341 buffer->size = buffer->pos; 359 buffer->size = buffer->pos;
342 } 360 }
343 return nitems; 361 return total_flushed + nitems;
344 } 362 }
345
346 } 363 }
347 364
348 size_t cxBufferAppend( 365 size_t cxBufferAppend(
349 const void *ptr, 366 const void *ptr,
350 size_t size, 367 size_t size,
351 size_t nitems, 368 size_t nitems,
352 CxBuffer *buffer 369 CxBuffer *buffer
353 ) { 370 ) {
354 size_t pos = buffer->pos; 371 size_t pos = buffer->pos;
355 buffer->pos = buffer->size; 372 size_t append_pos = buffer->size;
373 buffer->pos = append_pos;
356 size_t written = cxBufferWrite(ptr, size, nitems, buffer); 374 size_t written = cxBufferWrite(ptr, size, nitems, buffer);
357 buffer->pos = pos; 375 // the buffer might have been flushed
376 // we must compute a possible delta for the position
377 // expected: pos = append_pos + written
378 // -> if this is not the case, there is a delta
379 size_t delta = append_pos + written*size - buffer->pos;
380 if (delta > pos) {
381 buffer->pos = 0;
382 } else {
383 buffer->pos = pos - delta;
384 }
358 return written; 385 return written;
359 } 386 }
360 387
361 int cxBufferPut( 388 int cxBufferPut(
362 CxBuffer *buffer, 389 CxBuffer *buffer,

mercurial