src/server/safs/proxy.c

changeset 720
8c7d08d3be2e
parent 707
5fb102d2c745
child 721
482f4c153636
equal deleted inserted replaced
719:c4c2b8e8ddc5 720:8c7d08d3be2e
65 * event structure for sn->csd 65 * event structure for sn->csd
66 */ 66 */
67 Event event; 67 Event event;
68 68
69 /* 69 /*
70 * websocket read buffer
71 */
72 char *read_buf;
73 size_t read_buf_alloc;
74 size_t read_buf_size;
75 size_t read_buf_pos;
76
77 /*
70 * Has the response started (proxy_response_start called) 78 * Has the response started (proxy_response_start called)
71 */ 79 */
72 int response_started; 80 int response_started;
73 81
74 /* 82 /*
78 } ProxyRequest; 86 } ProxyRequest;
79 87
80 static void proxy_unref(ProxyRequest *proxy) { 88 static void proxy_unref(ProxyRequest *proxy) {
81 if(--proxy->ref == 0) { 89 if(--proxy->ref == 0) {
82 http_client_free(proxy->client); 90 http_client_free(proxy->client);
91 free(proxy->read_buf);
83 free(proxy); 92 free(proxy);
84 } 93 }
85 } 94 }
86 95
87 static int proxy_response_start(HttpClient *client, int status, char *message, void *userdata) { 96 static int proxy_response_start(HttpClient *client, int status, char *message, void *userdata) {
88 ProxyRequest *proxy = userdata; 97 ProxyRequest *proxy = userdata;
98
99 if(status == 101) {
100 pblock_fr("connection", proxy->response_header_rewrite, TRUE);
101 proxy->rq->rq_attr.keep_alive = 0;
102 }
89 103
90 HeaderArray *headers = client->response_headers; 104 HeaderArray *headers = client->response_headers;
91 while(headers) { 105 while(headers) {
92 for(int i=0;i<headers->len;i++) { 106 for(int i=0;i<headers->len;i++) {
93 cxmutstr name = headers->headers[i].name; 107 cxmutstr name = headers->headers[i].name;
115 } 129 }
116 130
117 protocol_status(proxy->sn, proxy->rq, status, message); 131 protocol_status(proxy->sn, proxy->rq, status, message);
118 protocol_start_response(proxy->sn, proxy->rq); 132 protocol_start_response(proxy->sn, proxy->rq);
119 proxy->response_started = 1; 133 proxy->response_started = 1;
120 134
121 return 0; 135 return 0;
122 } 136 }
123 137
124 static void proxy_response_finished(HttpClient *client, void *userdata) { 138 static void proxy_response_finished(HttpClient *client, void *userdata) {
125 ProxyRequest *proxy = userdata; 139 ProxyRequest *proxy = userdata;
225 } 239 }
226 } 240 }
227 return ret; 241 return ret;
228 } 242 }
229 243
244 static ssize_t proxy_ws_write(HttpClient *client, void *buf, size_t nbytes, void *userdata) {
245 ProxyRequest *proxy = userdata;
246 ssize_t w = net_write(proxy->sn->csd, buf, nbytes);
247 if(w < 0) {
248 IOStream *stream = proxy->sn->csd;
249 return stream->io_errno == EWOULDBLOCK ? HTTP_CLIENT_CALLBACK_WOULD_BLOCK : HTTP_CLIENT_CALLBACK_ERROR;
250 }
251 return w;
252 }
253
254 static int proxy_ws_msg_ready(HttpClient *client, void *userdata) {
255 ProxyRequest *proxy = userdata;
256 IOStream *st = proxy->sn->csd;
257
258 if(proxy->read_buf == NULL) {
259 // proxy not initialized for websockets yet
260
261 st->setmode(st, IO_MODE_RAW);
262
263 proxy->read_buf_alloc = 1024*64;
264 proxy->read_buf = malloc(proxy->read_buf_alloc);
265 if(!proxy->read_buf) {
266 return 1;
267 }
268
269 proxy->event.cookie = proxy;
270 proxy->event.fn = proxy_request_read_event;
271 proxy->event.finish = proxy_request_read_finished;
272 if(event_pollin(client->ev, st, &proxy->event)) {
273 return 1;
274 }
275 proxy->ref++;
276 }
277
278 // is there still data in the read buffer, that needs to be transfered to
279 // the HttpClient message buffer?
280 while(proxy->read_buf_pos < proxy->read_buf_size) {
281 char *msg = proxy->read_buf + proxy->read_buf_pos;
282 size_t msglen = proxy->read_buf_size - proxy->read_buf_pos;
283 int n = http_client_add_message(client, msg, msglen);
284 if(n <= 0) {
285 return 0; // message buffer not ready
286 }
287 proxy->read_buf_pos += n;
288 }
289 // readbuf flushed, reset
290 proxy->read_buf_size = 0;
291 proxy->read_buf_pos = 0;
292
293 while(proxy->read_buf_size < proxy->read_buf_alloc) {
294 ssize_t r = net_read(proxy->sn->csd, proxy->read_buf + proxy->read_buf_size, proxy->read_buf_alloc - proxy->read_buf_size);
295 if(r <= 0) {
296 if(proxy->read_buf_size > 0) {
297 // we have read some data, the next proxy_ws_msg_ready will
298 // transfer this to the message buffer
299 return proxy_ws_msg_ready(client, userdata);
300 } else {
301 // no data read, return
302 return st->io_errno == EWOULDBLOCK ? 0 : 1; // return 1 in case of an error
303 }
304 }
305 proxy->read_buf_size += r;
306 }
307
308 // do this again, to add the now filled buffer to the HttpClient message buffer
309 // http_client_add_message or net_read should fail at some point
310 return proxy_ws_msg_ready(client, userdata);
311 }
312
230 int http_reverse_proxy_service(pblock *param, Session *sn, Request *rq) { 313 int http_reverse_proxy_service(pblock *param, Session *sn, Request *rq) {
231 EventHandler *ev = sn->ev; 314 EventHandler *ev = sn->ev;
232 const char *method = pblock_findkeyval(pb_key_method, rq->reqpb); 315 const char *method = pblock_findkeyval(pb_key_method, rq->reqpb);
233 const char *uri = pblock_findkeyval(pb_key_uri, rq->reqpb); 316 const char *uri = pblock_findkeyval(pb_key_uri, rq->reqpb);
234 const char *query = pblock_findkeyval(pb_key_query, rq->reqpb); 317 const char *query = pblock_findkeyval(pb_key_query, rq->reqpb);
268 char srvport[16]; 351 char srvport[16];
269 snprintf(srvport, 16, "%d", (int)srv_url.port); 352 snprintf(srvport, 16, "%d", (int)srv_url.port);
270 353
271 char srvhost_static[256]; 354 char srvhost_static[256];
272 char *srvhost; 355 char *srvhost;
273 if(srv_url.hostlen < 255) { 356 if(srv_url.hostlen + 10 < 256) {
274 memcpy(srvhost_static, srv_url.host, srv_url.hostlen); 357 memcpy(srvhost_static, srv_url.host, srv_url.hostlen);
275 srvhost_static[srv_url.hostlen] = 0; 358 srvhost_static[srv_url.hostlen] = 0;
276 srvhost = srvhost_static; 359 srvhost = srvhost_static;
277 } else { 360 } else {
278 srvhost = pool_malloc(sn->pool, srv_url.hostlen + 1); 361 srvhost = pool_malloc(sn->pool, srv_url.hostlen + 10);
279 if(!srvhost) { 362 if(!srvhost) {
280 return REQ_ABORTED; 363 return REQ_ABORTED;
281 } 364 }
282 memcpy(srvhost, srv_url.host, srv_url.hostlen); 365 memcpy(srvhost, srv_url.host, srv_url.hostlen);
283 srvhost[srv_url.hostlen] = 0; 366 srvhost[srv_url.hostlen] = 0;
321 proxy->sn = sn; 404 proxy->sn = sn;
322 proxy->rq = rq; 405 proxy->rq = rq;
323 proxy->request_header_rewrite = pblock_create_pool(sn->pool, 16); 406 proxy->request_header_rewrite = pblock_create_pool(sn->pool, 16);
324 proxy->response_header_rewrite = pblock_create_pool(sn->pool, 16); 407 proxy->response_header_rewrite = pblock_create_pool(sn->pool, 16);
325 proxy->response_started = 0; 408 proxy->response_started = 0;
409 proxy->read_buf = NULL;
410 proxy->read_buf_alloc = 0;
411 proxy->read_buf_size = 0;
412 proxy->read_buf_pos = 0;
326 proxy->ref = 1; 413 proxy->ref = 1;
327 414
328 // Some request/response headers should be removed or altered 415 // Some request/response headers should be removed or altered
329 // An empty string means, the header should be removed 416 // An empty string means, the header should be removed
330 pblock_nvinsert("host", "", proxy->request_header_rewrite); 417 pblock_nvinsert("host", "", proxy->request_header_rewrite);
331 pblock_nvinsert("connection", "", proxy->request_header_rewrite); 418 //pblock_nvinsert("connection", "", proxy->request_header_rewrite);
332 pblock_nvinsert("transfer-encoding", "", proxy->request_header_rewrite); 419 pblock_nvinsert("transfer-encoding", "", proxy->request_header_rewrite);
333 pblock_nvinsert("content-length", "", proxy->request_header_rewrite); 420 pblock_nvinsert("content-length", "", proxy->request_header_rewrite);
334 pblock_nvinsert("server", "", proxy->response_header_rewrite); 421 pblock_nvinsert("server", "", proxy->response_header_rewrite);
335 pblock_nvinsert("connection", "", proxy->response_header_rewrite); 422 pblock_nvinsert("connection", "", proxy->response_header_rewrite);
336 423
374 461
375 addr = addr->ai_next; 462 addr = addr->ai_next;
376 } 463 }
377 freeaddrinfo(srv_addr); 464 freeaddrinfo(srv_addr);
378 if(!addr_set) { 465 if(!addr_set) {
466 http_client_free(client);
467 return REQ_ABORTED;
468 }
469
470 // add host header
471 if(!((srv_url.scheme_num == WS_URI_HTTP && srv_url.port == 80) ||
472 (srv_url.scheme_num == WS_URI_HTTPS && srv_url.port == 443)))
473 {
474 // we have reserved enough space for the port
475 srvhost[srv_url.hostlen] = ':';
476 snprintf(srvhost + srv_url.hostlen, 8, "%d", (int)srv_url.port);
477 }
478 if(http_client_add_request_header(client, cx_mutstr("host"), cx_mutstr(srvhost))) {
379 http_client_free(client); 479 http_client_free(client);
380 return REQ_ABORTED; 480 return REQ_ABORTED;
381 } 481 }
382 482
383 // add request headers to the client 483 // add request headers to the client
432 client->response_start_userdata = proxy; 532 client->response_start_userdata = proxy;
433 client->response_body_write = proxy_response_write; 533 client->response_body_write = proxy_response_write;
434 client->response_body_write_userdata = proxy; 534 client->response_body_write_userdata = proxy;
435 client->response_finished = proxy_response_finished; 535 client->response_finished = proxy_response_finished;
436 client->response_finished_userdata = proxy; 536 client->response_finished_userdata = proxy;
537 client->ws_write = proxy_ws_write;
538 client->ws_write_userdata = proxy;
539 client->ws_msg_ready = proxy_ws_msg_ready;
540 client->ws_msg_ready_userdata = proxy;
437 541
438 net_setnonblock(sn->csd, 1); 542 net_setnonblock(sn->csd, 1);
439 if(http_client_start(client)) { 543 if(http_client_start(client)) {
440 net_setnonblock(sn->csd, 0); 544 net_setnonblock(sn->csd, 0);
441 http_client_free(client); 545 http_client_free(client);

mercurial