src/server/safs/service.c

branch
aio
changeset 172
5580517faafc
parent 127
84e206063b64
child 184
a2a15ad6e4b9
equal deleted inserted replaced
170:711d00eeed25 172:5580517faafc
275 } 275 }
276 276
277 return 0; 277 return 0;
278 } 278 }
279 279
280
281 static void send_range_cleanup(AsyncSendRange *asr) {
282 WSBool error = asr->error;
283 Session *sn = asr->sn;
284 Request *rq = asr->rq;
285
286 pool_handle_t *pool = asr->sn->pool;
287 vfs_close(asr->in);
288 pool_free(pool, asr->aio->buf);
289 pool_free(pool, asr->aio);
290 pool_free(pool, asr->readev);
291 pool_free(pool, asr->writeev);
292 pool_free(pool, asr);
293
294 int ret = REQ_PROCEED;
295 if(error) {
296 rq->rq_attr.keep_alive = 0;
297 ret = REQ_ABORTED;
298 }
299 // return to nsapi loop
300 nsapi_function_return(sn, rq, ret);
301 }
302
303 static int send_buf(
304 SYS_NETFD out,
305 char *restrict buf,
306 size_t len,
307 size_t *restrict pos)
308 {
309 while(*pos < len) {
310 ssize_t w = net_write(out, buf + *pos, len - *pos);
311 if(w <= 0) {
312 return -1;
313 }
314 *pos += w;
315 }
316 return 0;
317 }
318
319 static int send_bytes(AsyncSendRange *asr, WSBool *completed) {
320 *completed = FALSE;
321 if(send_buf(asr->out, asr->header, asr->headerlen, &asr->headerpos)) {
322 if(net_errno(asr->out) == EAGAIN) {
323 return 0;
324 } else {
325 asr->error = TRUE;
326 return 1;
327 }
328 }
329
330 if(send_buf(asr->out, asr->aio->buf, asr->aio->result, &asr->wpos)) {
331 if(net_errno(asr->out) == EAGAIN) {
332 return 0;
333 } else {
334 asr->error = TRUE;
335 return 1;
336 }
337 }
338
339 if(!asr->read_complete) {
340 // write completed => new asynchronous read
341 asr->aio->offset += asr->aio->result;
342 if(system_aio_read(asr->aio)) {
343 asr->error = TRUE;
344 return 1;
345 }
346 }
347 *completed = TRUE;
348 return 0;
349 }
350
351 static int send_range_readevent(EventHandler *ev, Event *event) {
352 AsyncSendRange *asr = event->cookie;
353 asr->read_inprogress = FALSE;
354 asr->wpos = 0;
355 if(asr->error) {
356 return 0;
357 }
358
359 int ret = 1;
360 if(asr->aio->result == 0) {
361 asr->read_complete = TRUE;
362 ret = 0;
363 }
364
365 WSBool completed;
366 if(send_bytes(asr, &completed)) {
367 return 0;
368 }
369 if(!completed && !asr->write_inprogress) {
370 asr->write_inprogress = TRUE;
371 if(event_pollout(ev, asr->out, asr->writeev)) {
372 asr->error = TRUE;
373 return 0;
374 }
375 }
376
377 return ret;
378 }
379
380 static int send_range_writeevent(EventHandler *ev, Event *event) {
381 AsyncSendRange *asr = event->cookie;
382 if(asr->error) {
383 return 0;
384 }
385
386 WSBool completed;
387 if(send_bytes(asr, &completed)) {
388 return 0;
389 }
390
391 if(completed && asr->read_complete) {
392 // everything completed
393 return 0;
394 }
395
396 return 1;
397 }
398
399 static int send_range_aio_finish(EventHandler *ev, Event *event) {
400 AsyncSendRange *asr = event->cookie;
401 if(!asr->write_inprogress) {
402 send_range_cleanup(asr);
403 }
404 asr->read_inprogress = FALSE;
405 return 0;
406 }
407
408 static int send_range_poll_finish(EventHandler *ev, Event *event) {
409 AsyncSendRange *asr = event->cookie;
410 if(!asr->read_inprogress) {
411 send_range_cleanup(asr);
412 }
413 asr->write_inprogress = FALSE;
414 return 0;
415 }
416
417 static int send_range_aio(Session *sn, Request *rq, SYS_FILE fd, off_t offset, off_t length, char *header, int headerlen) {
418 net_setnonblock(sn->csd, TRUE);
419
420 // try to send the header
421 ssize_t hw = net_write(sn->csd, header, headerlen);
422 if(hw < 0) {
423 if(net_errno(sn->csd) == EAGAIN) {
424 hw = 0;
425 } else {
426 return REQ_ABORTED;
427 }
428 }
429
430 AsyncSendRange *asr = pool_malloc(sn->pool, sizeof(AsyncSendRange));
431 asr->sn = sn;
432 asr->rq = rq;
433 asr->in = fd;
434 asr->out = sn->csd;
435 asr->offset = offset;
436 asr->length = length;
437 asr->pos = offset;
438 asr->read_complete = FALSE;
439 asr->read_inprogress = FALSE;
440 asr->write_inprogress = FALSE;
441 asr->error = FALSE;
442 if(hw == headerlen) {
443 asr->header = NULL;
444 asr->headerlen = 0;
445 asr->headerpos = 0;
446 } else {
447 asr->header = header;
448 asr->headerlen = headerlen;
449 asr->headerpos = hw;
450 }
451
452 Event *readev = pool_malloc(sn->pool, sizeof(Event));
453 ZERO(readev, sizeof(Event));
454 readev->cookie = asr;
455 readev->fn = send_range_readevent;
456 readev->finish = send_range_aio_finish;
457
458 Event *writeev = pool_malloc(sn->pool, sizeof(Event));
459 ZERO(writeev, sizeof(Event));
460 writeev->cookie = asr;
461 writeev->fn = send_range_writeevent;
462 writeev->finish = send_range_poll_finish;
463
464 asr->readev = readev;
465 asr->writeev = writeev;
466
467 aiocb_s *aio = pool_malloc(sn->pool, sizeof(aiocb_s));
468 aio->buf = pool_malloc(sn->pool, AIO_BUF_SIZE);
469 aio->nbytes = AIO_BUF_SIZE < length ? AIO_BUF_SIZE : length;
470 aio->filedes = fd;
471 aio->offset = offset;
472 aio->evhandler = sn->ev;
473 aio->event = readev;
474
475 asr->aio = aio;
476 asr->wpos = 0;
477
478 if(system_aio_read(aio)) {
479 send_range_cleanup(asr);
480 return REQ_ABORTED;
481 }
482 asr->read_inprogress = TRUE;
483
484 return REQ_PROCESSING;
485 }
486
280 struct multi_range_elm { 487 struct multi_range_elm {
281 sstr_t header; 488 sstr_t header;
282 off_t offset; 489 off_t offset;
283 off_t length; 490 off_t length;
284 }; 491 };
418 625
419 if(single_range) { 626 if(single_range) {
420 // send response header 627 // send response header
421 http_start_response(sn, rq); 628 http_start_response(sn, rq);
422 // send content 629 // send content
630 int ret = send_range_aio(sn, rq, fd, offset, length, NULL, 0);
631 if(ret == REQ_PROCESSING) {
632 return ret;
633 }
634 /*
423 if(send_range(sn, fd, offset, length, NULL, 0)) { 635 if(send_range(sn, fd, offset, length, NULL, 0)) {
424 // TODO: error 636 // TODO: error
425 } 637 }
638 */
426 } else { 639 } else {
427 if(send_multi_range(sn, rq, fd, s.st_size, range)) { 640 if(send_multi_range(sn, rq, fd, s.st_size, range)) {
428 // TODO: error 641 // TODO: error
429 } 642 }
430 } 643 }

mercurial