src/server/safs/service.c

changeset 193
aa8393527b1e
parent 192
6a145e13d933
child 199
d62f2016cfe5
equal deleted inserted replaced
183:f33974f0dce0 193:aa8393527b1e
40 #include "../util/strbuf.h" 40 #include "../util/strbuf.h"
41 #include <ucx/string.h> 41 #include <ucx/string.h>
42 #include <ucx/utils.h> 42 #include <ucx/utils.h>
43 43
44 #include <errno.h> 44 #include <errno.h>
45
46 45
47 /* 46 /*
48 * prepares servicing a file 47 * prepares servicing a file
49 * 48 *
50 * adds content-length header 49 * adds content-length header
275 } 274 }
276 275
277 return 0; 276 return 0;
278 } 277 }
279 278
279
280 static void send_range_cleanup(AsyncSendRange *asr) {
281 WSBool error = asr->error;
282 Session *sn = asr->sn;
283 Request *rq = asr->rq;
284
285 pool_handle_t *pool = asr->sn->pool;
286 vfs_close(asr->in);
287 pool_free(pool, asr->aio->buf);
288 pool_free(pool, asr->aio);
289 pool_free(pool, asr->readev);
290 pool_free(pool, asr->writeev);
291 pool_free(pool, asr);
292
293 int ret = REQ_PROCEED;
294 if(error) {
295 rq->rq_attr.keep_alive = 0;
296 ret = REQ_ABORTED;
297 }
298 // return to nsapi loop
299 nsapi_function_return(sn, rq, ret);
300 }
301
302 static int send_buf(
303 SYS_NETFD out,
304 char *restrict buf,
305 size_t len,
306 size_t *restrict pos)
307 {
308 while(*pos < len) {
309 ssize_t w = net_write(out, buf + *pos, len - *pos);
310 if(w <= 0) {
311 return -1;
312 }
313 *pos += w;
314 }
315 return 0;
316 }
317
318 static int send_bytes(AsyncSendRange *asr, WSBool *completed) {
319 *completed = FALSE;
320 if(asr->header) {
321 if(send_buf(asr->out, asr->header, asr->headerlen, &asr->headerpos)) {
322 if(net_errno(asr->out) == EAGAIN) {
323 return 0;
324 } else {
325 asr->error = TRUE;
326 return 1;
327 }
328 }
329 if(asr->headerpos >= asr->headerlen) {
330 asr->header = NULL;
331 }
332 }
333
334 if(send_buf(asr->out, asr->aio->buf, asr->aio->result, &asr->wpos)) {
335 if(net_errno(asr->out) == EAGAIN) {
336 return 0;
337 } else {
338 asr->error = TRUE;
339 return 1;
340 }
341 }
342
343 if(!asr->read_complete) {
344 // write completed => new asynchronous read
345 asr->aio->offset += asr->aio->result;
346 size_t length = asr->end - asr->offset;
347 asr->aio->nbytes = AIO_BUF_SIZE < length ? AIO_BUF_SIZE : length;
348 asr->read_inprogress = TRUE;
349 if(system_aio_read(asr->aio)) {
350 asr->error = TRUE;
351 return 1;
352 }
353 }
354 *completed = TRUE;
355 return 0;
356 }
357
358 static int send_range_readevent(EventHandler *ev, Event *event) {
359 AsyncSendRange *asr = event->cookie;
360 asr->read_inprogress = FALSE;
361 asr->wpos = 0;
362 asr->offset += asr->aio->result;
363 if(asr->error || asr->aio->result < 0) {
364 return 0;
365 }
366
367 int ret = 1;
368 if(asr->aio->result == 0 || asr->offset >= asr->end) {
369 asr->read_complete = TRUE;
370 ret = 0;
371 }
372
373 WSBool completed;
374 if(send_bytes(asr, &completed)) {
375 return 0;
376 }
377 if(!completed && !asr->write_inprogress) {
378 asr->write_inprogress = TRUE;
379 if(event_pollout(ev, asr->out, asr->writeev)) {
380 asr->error = TRUE;
381 return 0;
382 }
383 }
384
385 return ret;
386 }
387
388 static int send_range_writeevent(EventHandler *ev, Event *event) {
389 AsyncSendRange *asr = event->cookie;
390 if(asr->error) {
391 return 1;
392 }
393
394 WSBool completed;
395 if(send_bytes(asr, &completed)) {
396 return 1;
397 }
398
399 if(completed) {
400 return 0;
401 }
402
403 return 1;
404 }
405
406 static int send_range_aio_finish(EventHandler *ev, Event *event) {
407 AsyncSendRange *asr = event->cookie;
408 if(!asr->write_inprogress) {
409 send_range_cleanup(asr);
410 }
411 asr->read_inprogress = FALSE;
412 return 0;
413 }
414
415 static int send_range_poll_finish(EventHandler *ev, Event *event) {
416 AsyncSendRange *asr = event->cookie;
417 if(!asr->read_inprogress) {
418 send_range_cleanup(asr);
419 }
420 asr->write_inprogress = FALSE;
421 return 0;
422 }
423
424 static int send_range_aio(Session *sn, Request *rq, SYS_FILE fd, off_t offset, off_t length, char *header, int headerlen) {
425 net_setnonblock(sn->csd, TRUE);
426
427 // try to send the header
428 ssize_t hw = net_write(sn->csd, header, headerlen);
429 if(hw < 0) {
430 if(net_errno(sn->csd) == EAGAIN) {
431 hw = 0;
432 } else {
433 return REQ_ABORTED;
434 }
435 }
436
437 AsyncSendRange *asr = pool_malloc(sn->pool, sizeof(AsyncSendRange));
438 asr->sn = sn;
439 asr->rq = rq;
440 asr->in = fd;
441 asr->out = sn->csd;
442 asr->offset = offset;
443 asr->end = offset + length;
444 //asr->length = length;
445 asr->pos = offset;
446 asr->read_complete = FALSE;
447 asr->read_inprogress = FALSE;
448 asr->write_inprogress = FALSE;
449 asr->error = FALSE;
450 if(hw == headerlen) {
451 asr->header = NULL;
452 asr->headerlen = 0;
453 asr->headerpos = 0;
454 } else {
455 asr->header = header;
456 asr->headerlen = headerlen;
457 asr->headerpos = hw;
458 }
459
460 Event *readev = pool_malloc(sn->pool, sizeof(Event));
461 ZERO(readev, sizeof(Event));
462 readev->cookie = asr;
463 readev->fn = send_range_readevent;
464 readev->finish = send_range_aio_finish;
465
466 Event *writeev = pool_malloc(sn->pool, sizeof(Event));
467 ZERO(writeev, sizeof(Event));
468 writeev->cookie = asr;
469 writeev->fn = send_range_writeevent;
470 writeev->finish = send_range_poll_finish;
471
472 asr->readev = readev;
473 asr->writeev = writeev;
474
475 aiocb_s *aio = pool_malloc(sn->pool, sizeof(aiocb_s));
476 aio->buf = pool_malloc(sn->pool, AIO_BUF_SIZE);
477 aio->nbytes = AIO_BUF_SIZE < length ? AIO_BUF_SIZE : length;
478 aio->filedes = fd;
479 aio->offset = offset;
480 aio->evhandler = sn->ev;
481 aio->event = readev;
482
483 asr->aio = aio;
484 asr->wpos = 0;
485
486 asr->read_inprogress = TRUE;
487 if(system_aio_read(aio)) {
488 send_range_cleanup(asr);
489 return REQ_ABORTED;
490 }
491 asr->read_inprogress = TRUE;
492
493 return REQ_PROCESSING;
494 }
495
280 struct multi_range_elm { 496 struct multi_range_elm {
281 sstr_t header; 497 sstr_t header;
282 off_t offset; 498 off_t offset;
283 off_t length; 499 off_t length;
284 }; 500 };
414 } else { 630 } else {
415 offset = 0; 631 offset = 0;
416 length = s.st_size; 632 length = s.st_size;
417 } 633 }
418 634
635 int ret = REQ_NOACTION;
419 if(single_range) { 636 if(single_range) {
420 // send response header 637 // send response header
421 http_start_response(sn, rq); 638 http_start_response(sn, rq);
422 // send content 639 // send content
640 ret = send_range_aio(sn, rq, fd, offset, length, NULL, 0);
641 if(ret == REQ_PROCESSING) {
642 return ret;
643 }
644 /*
423 if(send_range(sn, fd, offset, length, NULL, 0)) { 645 if(send_range(sn, fd, offset, length, NULL, 0)) {
424 // TODO: error 646 // TODO: error
425 } 647 }
648 //*/
426 } else { 649 } else {
427 if(send_multi_range(sn, rq, fd, s.st_size, range)) { 650 ret = send_multi_range(sn, rq, fd, s.st_size, range);
428 // TODO: error 651 // TODO: error
429 }
430 } 652 }
431 653
432 // cleanup 654 // cleanup
433 vfs_close(fd); 655 vfs_close(fd);
434 free_range(sn, range); 656 free_range(sn, range);
435 657
436 return REQ_PROCEED; 658 return ret;
437 } 659 }
438 660
439 661
440 662
441 int service_hello(pblock *pb, Session *sn, Request *rq) { 663 int service_hello(pblock *pb, Session *sn, Request *rq) {

mercurial