src/server/proxy/httpclient.c

changeset 717
2edcb361b8be
parent 715
30732c5b292c
child 718
9e98618464ae
equal deleted inserted replaced
716:0b3d0af5d74f 717:2edcb361b8be
37 #include <string.h> 37 #include <string.h>
38 #include <errno.h> 38 #include <errno.h>
39 39
40 static int client_connected(EventHandler *ev, Event *event); 40 static int client_connected(EventHandler *ev, Event *event);
41 static int client_io(EventHandler *ev, Event *event); 41 static int client_io(EventHandler *ev, Event *event);
42 static int client_ws_io(EventHandler *ev, Event *event);
42 static int client_process(HttpClient *client, Event *event); 43 static int client_process(HttpClient *client, Event *event);
44 static int client_ws_process(HttpClient *client, Event *event);
43 static int client_finished(EventHandler *ev, Event *event); 45 static int client_finished(EventHandler *ev, Event *event);
44 46
45 static int client_send_buf(HttpClient *client); 47 static int client_send_buf(HttpClient *client);
46 static int client_send_request_body(HttpClient *client); 48 static int client_send_request_body(HttpClient *client);
47 static int client_read_response_header(HttpClient *client); 49 static int client_read_response_header(HttpClient *client);
316 static int client_io(EventHandler *ev, Event *event) { 318 static int client_io(EventHandler *ev, Event *event) {
317 HttpClient *client = event->cookie; 319 HttpClient *client = event->cookie;
318 return client_process(client, event); 320 return client_process(client, event);
319 } 321 }
320 322
323 static int client_ws_io(EventHandler *ev, Event *event) {
324 HttpClient *client = event->cookie;
325 return client_ws_process(client, event);
326 }
327
321 static int client_process(HttpClient *client, Event *event) { 328 static int client_process(HttpClient *client, Event *event) {
322 client->last_event = event; 329 client->last_event = event;
323 if(client->stage < 0) { 330 if(client->stage < 0) {
324 return 0; 331 return 0;
325 } 332 }
362 if(client_read_response_header(client)) { 369 if(client_read_response_header(client)) {
363 return client->error == 0; 370 return client->error == 0;
364 } 371 }
365 int ret = 0; 372 int ret = 0;
366 if(client->stage == 2) { 373 if(client->stage == 2) {
367 // websocket: write message buffer 374 return client_ws_process(client, event);
368 ret = client_send_buf(client);
369 } 375 }
370 if(client_read_response_body(client)) { 376 if(client_read_response_body(client)) {
371 ret = 1; 377 ret = 1;
372 } 378 }
373 379
374 if(ret) { 380 if(ret) {
375 return client->error == 0; 381 return client->error == 0;
376 } 382 }
377 383
378 return 0; 384 return 0;
385 }
386
387 static int client_ws_process(HttpClient *client, Event *event) {
388 // send available data from the transfer buffer
389 int ret = client_send_buf(client);
390 if(client->error) {
391 return 0;
392 }
393 // readiness notification
394 if(ret == 0 && client->ws_msg_ready) {
395 if(client->ws_msg_ready(client, client->ws_msg_ready_userdata)) {
396 return 0;
397 }
398 }
399
400 // read message
401 char *buf = client->transfer2_buffer + client->transfer2_buffer_pos;
402 size_t available = client->transfer2_buffer_alloc - client->transfer2_buffer_len;
403 ssize_t r = -1;
404 while(available > 0) {
405 ssize_t r = net_read(client->stream, buf, available);
406 if(r <= 0) {
407 break;
408 }
409 client->transfer2_buffer_len += r;
410 if(client->ws_write) {
411 char *out = client->transfer2_buffer + client->transfer2_buffer_pos;
412 size_t nbytes = client->transfer2_buffer_len - client->transfer2_buffer_pos;
413 while(nbytes > 0) {
414 ssize_t w = client->ws_write(client, out, nbytes, client->ws_write_userdata);
415 if(w == HTTP_CLIENT_CALLBACK_WOULD_BLOCK) {
416 break;
417 } else if(w <= 0) {
418 client->error = 1;
419 return 0;
420 }
421 client->transfer2_buffer_pos += w;
422
423 // adjust buffer
424 out = client->transfer2_buffer + client->transfer2_buffer_pos;
425 nbytes = client->transfer2_buffer_len - client->transfer2_buffer_pos;
426 }
427 } else {
428 // noop
429 client->transfer2_buffer_pos = client->transfer2_buffer_len;
430 }
431
432 // adjust buffer
433 buf = client->transfer2_buffer + client->transfer2_buffer_pos;
434 available = client->transfer2_buffer_alloc - client->transfer2_buffer_len;
435 }
436
437 return r == 0 || client->error ? 0 : 1;
379 } 438 }
380 439
381 static int client_finished(EventHandler *ev, Event *event) { 440 static int client_finished(EventHandler *ev, Event *event) {
382 HttpClient *client = event->cookie; 441 HttpClient *client = event->cookie;
383 442
598 657
599 if(client->statuscode == 101) { 658 if(client->statuscode == 101) {
600 if(!cx_strcasecmp(hdr_upgrade, "websocket") && !cx_strcasecmp(hdr_connection, "upgrade")) { 659 if(!cx_strcasecmp(hdr_upgrade, "websocket") && !cx_strcasecmp(hdr_connection, "upgrade")) {
601 client->stage = 2; 660 client->stage = 2;
602 client->event.events = EVENT_POLLIN|EVENT_POLLOUT; 661 client->event.events = EVENT_POLLIN|EVENT_POLLOUT;
662 client->event.fn = client_ws_io;
603 663
604 // prepare IO buffers for websockets 664 // prepare IO buffers for websockets
665 // transfer_buffer is used for outgoing traffic
605 client->transfer_buffer_len = 0; 666 client->transfer_buffer_len = 0;
606 client->transfer_buffer_pos = 0; 667 client->transfer_buffer_pos = 0;
607 668
669 // transfer2_buffer is used for reading
608 client->transfer2_buffer = malloc(HTTP_CLIENT_BUFFER_SIZE); 670 client->transfer2_buffer = malloc(HTTP_CLIENT_BUFFER_SIZE);
609 if(!client->transfer2_buffer) { 671 if(!client->transfer2_buffer) {
610 client->error = 1; 672 client->error = 1;
611 return 1; 673 return 1;
612 } 674 }

mercurial