diff --git a/src/event/ngx_event_pipe.c b/src/event/ngx_event_pipe.c --- a/src/event/ngx_event_pipe.c +++ b/src/event/ngx_event_pipe.c @@ -392,8 +392,30 @@ ngx_event_pipe_read_upstream(ngx_event_p cl->buf->file_last - cl->buf->file_pos); } + ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, + "pipe length: %O", p->length); + #endif + if (p->free_raw_bufs && p->length != -1) { + cl = p->free_raw_bufs; + + if (cl->buf->last - cl->buf->pos >= p->length) { + + /* STUB */ cl->buf->num = p->num++; + + if (p->input_filter(p, cl->buf) == NGX_ERROR) { + return NGX_ABORT; + } + + p->free_raw_bufs = cl->next; + } + } + + if (p->length == 0) { + p->upstream_done = 1; + } + if ((p->upstream_eof || p->upstream_error) && p->free_raw_bufs) { /* STUB */ p->free_raw_bufs->buf->num = p->num++; @@ -848,6 +870,12 @@ ngx_event_pipe_copy_input_filter(ngx_eve } p->last_in = &cl->next; + if (p->length == -1) { + return NGX_OK; + } + + p->length -= b->last - b->pos; + return NGX_OK; } diff --git a/src/event/ngx_event_pipe.h b/src/event/ngx_event_pipe.h --- a/src/event/ngx_event_pipe.h +++ b/src/event/ngx_event_pipe.h @@ -65,6 +65,7 @@ struct ngx_event_pipe_s { ssize_t busy_size; off_t read_length; + off_t length; off_t max_temp_file_size; ssize_t temp_file_write_size; diff --git a/src/http/modules/ngx_http_fastcgi_module.c b/src/http/modules/ngx_http_fastcgi_module.c --- a/src/http/modules/ngx_http_fastcgi_module.c +++ b/src/http/modules/ngx_http_fastcgi_module.c @@ -77,6 +77,8 @@ typedef struct { #define NGX_HTTP_FASTCGI_RESPONDER 1 +#define NGX_HTTP_FASTCGI_KEEP_CONN 1 + #define NGX_HTTP_FASTCGI_BEGIN_REQUEST 1 #define NGX_HTTP_FASTCGI_ABORT_REQUEST 2 #define NGX_HTTP_FASTCGI_END_REQUEST 3 @@ -130,6 +132,7 @@ static ngx_int_t ngx_http_fastcgi_create static ngx_int_t ngx_http_fastcgi_create_request(ngx_http_request_t *r); static ngx_int_t ngx_http_fastcgi_reinit_request(ngx_http_request_t *r); static ngx_int_t ngx_http_fastcgi_process_header(ngx_http_request_t *r); +static ngx_int_t ngx_http_fastcgi_input_filter_init(void *data); static ngx_int_t ngx_http_fastcgi_input_filter(ngx_event_pipe_t *p, ngx_buf_t *buf); static ngx_int_t ngx_http_fastcgi_process_record(ngx_http_request_t *r, @@ -484,7 +487,7 @@ static ngx_http_fastcgi_request_start_t { 0, /* role_hi */ NGX_HTTP_FASTCGI_RESPONDER, /* role_lo */ - 0, /* NGX_HTTP_FASTCGI_KEEP_CONN */ /* flags */ + NGX_HTTP_FASTCGI_KEEP_CONN, /* flags */ { 0, 0, 0, 0, 0 } }, /* reserved[5] */ { 1, /* version */ @@ -600,6 +603,8 @@ ngx_http_fastcgi_handler(ngx_http_reques u->pipe->input_filter = ngx_http_fastcgi_input_filter; u->pipe->input_ctx = r; + u->input_filter_init = ngx_http_fastcgi_input_filter_init; + rc = ngx_http_read_client_request_body(r, ngx_http_upstream_init); if (rc >= NGX_HTTP_SPECIAL_RESPONSE) { @@ -1566,6 +1571,17 @@ ngx_http_fastcgi_process_header(ngx_http static ngx_int_t +ngx_http_fastcgi_input_filter_init(void *data) +{ + ngx_http_request_t *r = data; + + r->upstream->pipe->length = sizeof(ngx_http_fastcgi_header_t); + + return NGX_OK; +} + + +static ngx_int_t ngx_http_fastcgi_input_filter(ngx_event_pipe_t *p, ngx_buf_t *buf) { u_char *m, *msg; @@ -1603,7 +1619,6 @@ ngx_http_fastcgi_input_filter(ngx_event_ if (f->type == NGX_HTTP_FASTCGI_STDOUT && f->length == 0) { f->state = ngx_http_fastcgi_st_version; - p->upstream_done = 1; ngx_log_debug0(NGX_LOG_DEBUG_HTTP, p->log, 0, "http fastcgi closed stdout"); @@ -1614,6 +1629,7 @@ ngx_http_fastcgi_input_filter(ngx_event_ if (f->type == NGX_HTTP_FASTCGI_END_REQUEST) { f->state = ngx_http_fastcgi_st_version; p->upstream_done = 1; + r->upstream->keepalive = 1; ngx_log_debug0(NGX_LOG_DEBUG_HTTP, p->log, 0, "http fastcgi sent end request"); @@ -1773,6 +1789,20 @@ ngx_http_fastcgi_input_filter(ngx_event_ } + /* set p->length, minimal amount of data we want to see */ + + if (f->state < ngx_http_fastcgi_st_data) { + p->length = 1; + + } else if (f->state == ngx_http_fastcgi_st_padding) { + p->length = f->padding; + + } else { + /* ngx_http_fastcgi_st_data */ + + p->length = f->length; + } + if (b) { b->shadow = buf; b->last_shadow = 1; diff --git a/src/http/modules/ngx_http_memcached_module.c b/src/http/modules/ngx_http_memcached_module.c --- a/src/http/modules/ngx_http_memcached_module.c +++ b/src/http/modules/ngx_http_memcached_module.c @@ -344,8 +344,8 @@ found: while (*p && *p++ != CR) { /* void */ } - r->headers_out.content_length_n = ngx_atoof(len, p - len - 1); - if (r->headers_out.content_length_n == -1) { + u->headers_in.content_length_n = ngx_atoof(len, p - len - 1); + if (u->headers_in.content_length_n == -1) { ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "memcached sent invalid length in response \"%V\" " "for key \"%V\"", @@ -366,6 +366,7 @@ found: u->headers_in.status_n = 404; u->state->status = 404; + u->keepalive = 1; return NGX_OK; } @@ -407,7 +408,7 @@ ngx_http_memcached_filter(void *data, ss u = ctx->request->upstream; b = &u->buffer; - if (u->length == ctx->rest) { + if (u->length == (ssize_t) ctx->rest) { if (ngx_strncmp(b->last, ngx_http_memcached_end + NGX_HTTP_MEMCACHED_END - ctx->rest, @@ -426,6 +427,10 @@ ngx_http_memcached_filter(void *data, ss u->length -= bytes; ctx->rest -= bytes; + if (u->length == 0) { + u->keepalive = 1; + } + return NGX_OK; } @@ -463,6 +468,13 @@ ngx_http_memcached_filter(void *data, ss if (ngx_strncmp(last, ngx_http_memcached_end, b->last - last) != 0) { ngx_log_error(NGX_LOG_ERR, ctx->request->connection->log, 0, "memcached sent invalid trailer"); + + b->last = last; + cl->buf->last = last; + u->length = 0; + ctx->rest = 0; + + return NGX_OK; } ctx->rest -= b->last - last; @@ -470,6 +482,10 @@ ngx_http_memcached_filter(void *data, ss cl->buf->last = last; u->length = ctx->rest; + if (u->length == 0) { + u->keepalive = 1; + } + return NGX_OK; } diff --git a/src/http/modules/ngx_http_proxy_module.c b/src/http/modules/ngx_http_proxy_module.c --- a/src/http/modules/ngx_http_proxy_module.c +++ b/src/http/modules/ngx_http_proxy_module.c @@ -71,6 +71,8 @@ typedef struct { ngx_flag_t redirect; + ngx_uint_t http_version; + ngx_uint_t headers_hash_max_size; ngx_uint_t headers_hash_bucket_size; } ngx_http_proxy_loc_conf_t; @@ -80,6 +82,12 @@ typedef struct { ngx_http_status_t status; ngx_http_proxy_vars_t vars; size_t internal_body_length; + + ngx_uint_t state; + off_t size; + off_t length; + + ngx_uint_t head; /* unsigned head:1 */ } ngx_http_proxy_ctx_t; @@ -92,6 +100,15 @@ static ngx_int_t ngx_http_proxy_create_r static ngx_int_t ngx_http_proxy_reinit_request(ngx_http_request_t *r); static ngx_int_t ngx_http_proxy_process_status_line(ngx_http_request_t *r); static ngx_int_t ngx_http_proxy_process_header(ngx_http_request_t *r); +static ngx_int_t ngx_http_proxy_input_filter_init(void *data); +static ngx_int_t ngx_http_proxy_copy_filter(ngx_event_pipe_t *p, + ngx_buf_t *buf); +static ngx_int_t ngx_http_proxy_chunked_filter(ngx_event_pipe_t *p, + ngx_buf_t *buf); +static ngx_int_t ngx_http_proxy_non_buffered_copy_filter(void *data, + ssize_t bytes); +static ngx_int_t ngx_http_proxy_non_buffered_chunked_filter(void *data, + ssize_t bytes); static void ngx_http_proxy_abort_request(ngx_http_request_t *r); static void ngx_http_proxy_finalize_request(ngx_http_request_t *r, ngx_int_t rc); @@ -157,6 +174,13 @@ static ngx_conf_bitmask_t ngx_http_prox }; +static ngx_conf_enum_t ngx_http_proxy_http_version[] = { + { ngx_string("1.0"), NGX_HTTP_VERSION_10 }, + { ngx_string("1.1"), NGX_HTTP_VERSION_11 }, + { ngx_null_string, 0 } +}; + + ngx_module_t ngx_http_proxy_module; @@ -432,6 +456,13 @@ static ngx_command_t ngx_http_proxy_com offsetof(ngx_http_proxy_loc_conf_t, upstream.ignore_headers), &ngx_http_upstream_ignore_headers_masks }, + { ngx_string("proxy_http_version"), + NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1, + ngx_conf_set_enum_slot, + NGX_HTTP_LOC_CONF_OFFSET, + offsetof(ngx_http_proxy_loc_conf_t, http_version), + &ngx_http_proxy_http_version }, + #if (NGX_HTTP_SSL) { ngx_string("proxy_ssl_session_reuse"), @@ -479,6 +510,7 @@ ngx_module_t ngx_http_proxy_module = { static char ngx_http_proxy_version[] = " HTTP/1.0" CRLF; +static char ngx_http_proxy_version_11[] = " HTTP/1.1" CRLF; static ngx_keyval_t ngx_http_proxy_headers[] = { @@ -486,6 +518,7 @@ static ngx_keyval_t ngx_http_proxy_head { ngx_string("Connection"), ngx_string("close") }, { ngx_string("Keep-Alive"), ngx_string("") }, { ngx_string("Expect"), ngx_string("") }, + { ngx_string("Upgrade"), ngx_string("") }, { ngx_null_string, ngx_null_string } }; @@ -610,7 +643,12 @@ ngx_http_proxy_handler(ngx_http_request_ return NGX_HTTP_INTERNAL_SERVER_ERROR; } - u->pipe->input_filter = ngx_event_pipe_copy_input_filter; + u->pipe->input_filter = ngx_http_proxy_copy_filter; + u->pipe->input_ctx = r; + + u->input_filter_init = ngx_http_proxy_input_filter_init; + u->input_filter = ngx_http_proxy_non_buffered_copy_filter; + u->input_filter_ctx = r; u->accel = 1; @@ -866,14 +904,20 @@ ngx_http_proxy_create_request(ngx_http_r method.len++; } + ctx = ngx_http_get_module_ctx(r, ngx_http_proxy_module); + + if (method.len == 5 + && ngx_strncasecmp(method.data, (u_char *) "HEAD ", 5) == 0) + { + ctx->head = 1; + } + len = method.len + sizeof(ngx_http_proxy_version) - 1 + sizeof(CRLF) - 1; escape = 0; loc_len = 0; unparsed_uri = 0; - ctx = ngx_http_get_module_ctx(r, ngx_http_proxy_module); - if (plcf->proxy_lengths) { uri_len = ctx->vars.uri.len; @@ -1009,8 +1053,14 @@ ngx_http_proxy_create_request(ngx_http_r u->uri.len = b->last - u->uri.data; - b->last = ngx_cpymem(b->last, ngx_http_proxy_version, - sizeof(ngx_http_proxy_version) - 1); + if (plcf->http_version == NGX_HTTP_VERSION_11) { + b->last = ngx_cpymem(b->last, ngx_http_proxy_version_11, + sizeof(ngx_http_proxy_version_11) - 1); + + } else { + b->last = ngx_cpymem(b->last, ngx_http_proxy_version, + sizeof(ngx_http_proxy_version) - 1); + } ngx_memzero(&e, sizeof(ngx_http_script_engine_t)); @@ -1159,8 +1209,11 @@ ngx_http_proxy_reinit_request(ngx_http_r ctx->status.count = 0; ctx->status.start = NULL; ctx->status.end = NULL; + ctx->state = 0; r->upstream->process_header = ngx_http_proxy_process_status_line; + r->upstream->pipe->input_filter = ngx_http_proxy_copy_filter; + r->upstream->input_filter = ngx_http_proxy_non_buffered_copy_filter; r->state = 0; return NGX_OK; @@ -1246,6 +1299,8 @@ ngx_http_proxy_process_header(ngx_http_r { ngx_int_t rc; ngx_table_elt_t *h; + ngx_http_upstream_t *u; + ngx_http_proxy_ctx_t *ctx; ngx_http_upstream_header_t *hh; ngx_http_upstream_main_conf_t *umcf; @@ -1341,6 +1396,23 @@ ngx_http_proxy_process_header(ngx_http_r h->lowcase_key = (u_char *) "date"; } + /* + * set u->keepalive if response has no body; this allows to keep + * connections alive in case of r->header_only or X-Accel-Redirect + */ + + u = r->upstream; + ctx = ngx_http_get_module_ctx(r, ngx_http_proxy_module); + + if (u->headers_in.status_n == NGX_HTTP_NO_CONTENT + || u->headers_in.status_n == NGX_HTTP_NOT_MODIFIED + || ctx->head + || (!u->headers_in.chunked + && u->headers_in.content_length_n == 0)) + { + u->keepalive = 1; + } + return NGX_OK; } @@ -1358,6 +1430,690 @@ ngx_http_proxy_process_header(ngx_http_r } +static ngx_int_t +ngx_http_proxy_input_filter_init(void *data) +{ + ngx_http_request_t *r = data; + ngx_http_upstream_t *u; + ngx_http_proxy_ctx_t *ctx; + + u = r->upstream; + ctx = ngx_http_get_module_ctx(r, ngx_http_proxy_module); + + ngx_log_debug4(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, + "http proxy filter init s:%d h:%d c:%d l:%O", + u->headers_in.status_n, ctx->head, u->headers_in.chunked, + u->headers_in.content_length_n); + + /* as per RFC2616, 4.4 Message Length */ + + if (u->headers_in.status_n == NGX_HTTP_NO_CONTENT + || u->headers_in.status_n == NGX_HTTP_NOT_MODIFIED + || ctx->head) + { + /* 1xx, 204, and 304 and replies to HEAD requests */ + /* no 1xx since we don't send Expect and Upgrade */ + + u->pipe->length = 0; + u->length = 0; + u->keepalive = 1; + + } else if (u->headers_in.chunked) { + /* chunked */ + + u->pipe->input_filter = ngx_http_proxy_chunked_filter; + u->pipe->length = 3; /* "0" LF LF */ + + u->input_filter = ngx_http_proxy_non_buffered_chunked_filter; + u->length = -1; + + } else if (u->headers_in.content_length_n == 0) { + /* empty body: special case as filter won't be called */ + + u->pipe->length = 0; + u->length = 0; + u->keepalive = 1; + + } else { + /* content length or connection close */ + + u->pipe->length = u->headers_in.content_length_n; + u->length = u->headers_in.content_length_n; + } + + return NGX_OK; +} + + +static ngx_int_t +ngx_http_proxy_copy_filter(ngx_event_pipe_t *p, ngx_buf_t *buf) +{ + ngx_buf_t *b; + ngx_chain_t *cl; + ngx_http_request_t *r; + + if (buf->pos == buf->last) { + return NGX_OK; + } + + if (p->free) { + cl = p->free; + b = cl->buf; + p->free = cl->next; + ngx_free_chain(p->pool, cl); + + } else { + b = ngx_alloc_buf(p->pool); + if (b == NULL) { + return NGX_ERROR; + } + } + + ngx_memcpy(b, buf, sizeof(ngx_buf_t)); + b->shadow = buf; + b->tag = p->tag; + b->last_shadow = 1; + b->recycled = 1; + buf->shadow = b; + + cl = ngx_alloc_chain_link(p->pool); + if (cl == NULL) { + return NGX_ERROR; + } + + cl->buf = b; + cl->next = NULL; + + ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, "input buf #%d", b->num); + + if (p->in) { + *p->last_in = cl; + } else { + p->in = cl; + } + p->last_in = &cl->next; + + if (p->length == -1) { + return NGX_OK; + } + + p->length -= b->last - b->pos; + + if (p->length == 0) { + r = p->input_ctx; + p->upstream_done = 1; + r->upstream->keepalive = 1; + + } else if (p->length < 0) { + r = p->input_ctx; + p->upstream_done = 1; + + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, + "upstream sent too many data"); + } + + return NGX_OK; +} + + +static ngx_inline ngx_int_t +ngx_http_proxy_parse_chunked(ngx_http_request_t *r, ngx_buf_t *buf) +{ + u_char *pos, ch, c; + ngx_int_t rc; + ngx_http_proxy_ctx_t *ctx; + enum { + sw_chunk_start = 0, + sw_chunk_size, + sw_chunk_extension, + sw_chunk_extension_almost_done, + sw_chunk_data, + sw_after_data, + sw_after_data_almost_done, + sw_last_chunk_extension, + sw_last_chunk_extension_almost_done, + sw_trailer, + sw_trailer_almost_done, + sw_trailer_header, + sw_trailer_header_almost_done + } state; + + ctx = ngx_http_get_module_ctx(r, ngx_http_proxy_module); + state = ctx->state; + + if (state == sw_chunk_data && ctx->size == 0) { + state = sw_after_data; + } + + rc = NGX_AGAIN; + + for (pos = buf->pos; pos < buf->last; pos++) { + + ch = *pos; + + ngx_log_debug2(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, + "http proxy chunked byte: %02Xd s:%d", ch, state); + + switch (state) { + + case sw_chunk_start: + if (ch >= '0' && ch <= '9') { + state = sw_chunk_size; + ctx->size = ch - '0'; + break; + } + + c = (u_char) (ch | 0x20); + + if (c >= 'a' && c <= 'f') { + state = sw_chunk_size; + ctx->size = c - 'a' + 10; + break; + } + + goto invalid; + + case sw_chunk_size: + if (ch >= '0' && ch <= '9') { + ctx->size = ctx->size * 16 + (ch - '0'); + break; + } + + c = (u_char) (ch | 0x20); + + if (c >= 'a' && c <= 'f') { + ctx->size = ctx->size * 16 + (c - 'a' + 10); + break; + } + + if (ctx->size == 0) { + + switch (ch) { + case CR: + state = sw_last_chunk_extension_almost_done; + break; + case LF: + state = sw_trailer; + break; + case ';': + state = sw_last_chunk_extension; + break; + default: + goto invalid; + } + + break; + } + + switch (ch) { + case CR: + state = sw_chunk_extension_almost_done; + break; + case LF: + state = sw_chunk_data; + break; + case ';': + state = sw_chunk_extension; + break; + default: + goto invalid; + } + + break; + + case sw_chunk_extension: + switch (ch) { + case CR: + state = sw_chunk_extension_almost_done; + break; + case LF: + state = sw_chunk_data; + } + break; + + case sw_chunk_extension_almost_done: + if (ch == LF) { + state = sw_chunk_data; + break; + } + goto invalid; + + case sw_chunk_data: + rc = NGX_OK; + goto data; + + case sw_after_data: + switch (ch) { + case CR: + state = sw_after_data_almost_done; + break; + case LF: + state = sw_chunk_start; + } + break; + + case sw_after_data_almost_done: + if (ch == LF) { + state = sw_chunk_start; + break; + } + goto invalid; + + case sw_last_chunk_extension: + switch (ch) { + case CR: + state = sw_last_chunk_extension_almost_done; + break; + case LF: + state = sw_trailer; + } + break; + + case sw_last_chunk_extension_almost_done: + if (ch == LF) { + state = sw_trailer; + break; + } + goto invalid; + + case sw_trailer: + switch (ch) { + case CR: + state = sw_trailer_almost_done; + break; + case LF: + goto done; + default: + state = sw_trailer_header; + } + break; + + case sw_trailer_almost_done: + if (ch == LF) { + goto done; + } + goto invalid; + + case sw_trailer_header: + switch (ch) { + case CR: + state = sw_trailer_header_almost_done; + break; + case LF: + state = sw_trailer; + } + break; + + case sw_trailer_header_almost_done: + if (ch == LF) { + state = sw_trailer; + break; + } + goto invalid; + + } + } + +data: + + ctx->state = state; + buf->pos = pos; + + switch (state) { + + case sw_chunk_start: + ctx->length = 3 /* "0" LF LF */; + break; + case sw_chunk_size: + ctx->length = 2 /* LF LF */ + + (ctx->size ? ctx->size + 4 /* LF "0" LF LF */ : 0); + break; + case sw_chunk_extension: + case sw_chunk_extension_almost_done: + ctx->length = 1 /* LF */ + ctx->size + 4 /* LF "0" LF LF */; + break; + case sw_chunk_data: + ctx->length = ctx->size + 4 /* LF "0" LF LF */; + break; + case sw_after_data: + case sw_after_data_almost_done: + ctx->length = 4 /* LF "0" LF LF */; + break; + case sw_last_chunk_extension: + case sw_last_chunk_extension_almost_done: + ctx->length = 2 /* LF LF */; + break; + case sw_trailer: + case sw_trailer_almost_done: + ctx->length = 1 /* LF */; + break; + case sw_trailer_header: + case sw_trailer_header_almost_done: + ctx->length = 2 /* LF LF */; + break; + + } + + return rc; + +done: + + return NGX_DONE; + +invalid: + + ngx_log_error(NGX_LOG_ALERT, r->connection->log, 0, + "upstream sent invalid chunked response"); + + return NGX_ERROR; +} + + +static ngx_int_t +ngx_http_proxy_chunked_filter(ngx_event_pipe_t *p, ngx_buf_t *buf) +{ + ngx_int_t rc; + ngx_buf_t *b, **prev; + ngx_chain_t *cl; + ngx_http_request_t *r; + ngx_http_proxy_ctx_t *ctx; + + if (buf->pos == buf->last) { + return NGX_OK; + } + + r = p->input_ctx; + ctx = ngx_http_get_module_ctx(r, ngx_http_proxy_module); + + b = NULL; + prev = &buf->shadow; + + for ( ;; ) { + + rc = ngx_http_proxy_parse_chunked(r, buf); + + if (rc == NGX_OK) { + + /* a chunk has been parsed successfully */ + + if (p->free) { + cl = p->free; + b = cl->buf; + p->free = cl->next; + ngx_free_chain(p->pool, cl); + + } else { + b = ngx_alloc_buf(p->pool); + if (b == NULL) { + return NGX_ERROR; + } + } + + ngx_memzero(b, sizeof(ngx_buf_t)); + + b->pos = buf->pos; + b->start = buf->start; + b->end = buf->end; + b->tag = p->tag; + b->temporary = 1; + b->recycled = 1; + + *prev = b; + prev = &b->shadow; + + cl = ngx_alloc_chain_link(p->pool); + if (cl == NULL) { + return NGX_ERROR; + } + + cl->buf = b; + cl->next = NULL; + + if (p->in) { + *p->last_in = cl; + } else { + p->in = cl; + } + p->last_in = &cl->next; + + /* STUB */ b->num = buf->num; + + ngx_log_debug2(NGX_LOG_DEBUG_EVENT, p->log, 0, + "input buf #%d %p", b->num, b->pos); + + if (buf->last - buf->pos >= ctx->size) { + + buf->pos += ctx->size; + b->last = buf->pos; + ctx->size = 0; + + continue; + } + + ctx->size -= buf->last - buf->pos; + buf->pos = buf->last; + b->last = buf->last; + + continue; + } + + if (rc == NGX_DONE) { + + /* a whole response has been parsed successfully */ + + p->upstream_done = 1; /* or p->length = 0; ? */ + r->upstream->keepalive = 1; + + break; + } + + if (rc == NGX_AGAIN) { + + /* set p->length, minimal amount of data we want to see */ + + p->length = ctx->length; + + break; + } + + /* invalid response */ + + ngx_log_error(NGX_LOG_ALERT, r->connection->log, 0, + "upstream sent invalid chunked response"); + + return NGX_ERROR; + } + + ngx_log_debug2(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, + "http proxy chunked state %d, length %d", + ctx->state, p->length); + + if (b) { + b->shadow = buf; + b->last_shadow = 1; + + ngx_log_debug2(NGX_LOG_DEBUG_EVENT, p->log, 0, + "input buf %p %z", b->pos, b->last - b->pos); + + return NGX_OK; + } + + /* there is no data record in the buf, add it to free chain */ + + if (ngx_event_pipe_add_free_buf(p, buf) != NGX_OK) { + return NGX_ERROR; + } + + return NGX_OK; +} + + +static ngx_int_t +ngx_http_proxy_non_buffered_copy_filter(void *data, ssize_t bytes) +{ + ngx_http_request_t *r = data; + + ngx_buf_t *b; + ngx_chain_t *cl, **ll; + ngx_http_upstream_t *u; + + u = r->upstream; + + for (cl = u->out_bufs, ll = &u->out_bufs; cl; cl = cl->next) { + ll = &cl->next; + } + + cl = ngx_chain_get_free_buf(r->pool, &u->free_bufs); + if (cl == NULL) { + return NGX_ERROR; + } + + *ll = cl; + + cl->buf->flush = 1; + cl->buf->memory = 1; + + b = &u->buffer; + + cl->buf->pos = b->last; + b->last += bytes; + cl->buf->last = b->last; + cl->buf->tag = u->output.tag; + + if (u->length == -1) { + return NGX_OK; + } + + u->length -= bytes; + + if (u->length == 0) { + u->keepalive = 1; + } + + return NGX_OK; +} + + +static ngx_int_t +ngx_http_proxy_non_buffered_chunked_filter(void *data, ssize_t bytes) +{ + ngx_http_request_t *r = data; + + ngx_int_t rc; + ngx_buf_t *b, *buf; + ngx_chain_t *cl, **ll; + ngx_http_upstream_t *u; + ngx_http_proxy_ctx_t *ctx; + + ctx = ngx_http_get_module_ctx(r, ngx_http_proxy_module); + u = r->upstream; + buf = &u->buffer; + + buf->pos = buf->last; + buf->last += bytes; + + for (cl = u->out_bufs, ll = &u->out_bufs; cl; cl = cl->next) { + ll = &cl->next; + } + + for ( ;; ) { + + rc = ngx_http_proxy_parse_chunked(r, buf); + + if (rc == NGX_OK) { + + /* a chunk has been parsed successfully */ + + cl = ngx_chain_get_free_buf(r->pool, &u->free_bufs); + if (cl == NULL) { + return NGX_ERROR; + } + + *ll = cl; + ll = &cl->next; + + b = cl->buf; + + b->flush = 1; + b->memory = 1; + + b->pos = buf->pos; + b->tag = u->output.tag; + + if (buf->last - buf->pos >= ctx->size) { + buf->pos += ctx->size; + b->last = buf->pos; + ctx->size = 0; + + } else { + ctx->size -= buf->last - buf->pos; + buf->pos = buf->last; + b->last = buf->last; + } + + ngx_log_debug2(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, + "http proxy out buf %p %z", + b->pos, b->last - b->pos); + + continue; + } + + if (rc == NGX_DONE) { + + /* a whole response has been parsed successfully */ + + u->keepalive = 1; + u->length = 0; + + break; + } + + if (rc == NGX_AGAIN) { + break; + } + + /* invalid response */ + + ngx_log_error(NGX_LOG_ALERT, r->connection->log, 0, + "upstream sent invalid chunked response"); + + return NGX_ERROR; + } + + /* provide continuous buffer for subrequests in memory */ + + if (r->subrequest_in_memory) { + + cl = u->out_bufs; + + if (cl) { + buf->pos = cl->buf->pos; + } + + buf->last = buf->pos; + + for (cl = u->out_bufs; cl; cl = cl->next) { + ngx_log_debug3(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, + "http proxy in memory %p-%p %uz", + cl->buf->pos, cl->buf->last, ngx_buf_size(cl->buf)); + + if (buf->last == cl->buf->pos) { + buf->last = cl->buf->last; + continue; + } + + buf->last = ngx_movemem(buf->last, cl->buf->pos, + cl->buf->last - cl->buf->pos); + + cl->buf->pos = buf->last - (cl->buf->last - cl->buf->pos); + cl->buf->last = buf->last; + } + } + + return NGX_OK; +} + + static void ngx_http_proxy_abort_request(ngx_http_request_t *r) { @@ -1706,6 +2462,8 @@ ngx_http_proxy_create_loc_conf(ngx_conf_ conf->redirect = NGX_CONF_UNSET; conf->upstream.change_buffering = 1; + conf->http_version = NGX_CONF_UNSET_UINT; + conf->headers_hash_max_size = NGX_CONF_UNSET_UINT; conf->headers_hash_bucket_size = NGX_CONF_UNSET_UINT; @@ -2009,6 +2767,9 @@ ngx_http_proxy_merge_loc_conf(ngx_conf_t } #endif + ngx_conf_merge_uint_value(conf->http_version, prev->http_version, + NGX_HTTP_VERSION_10); + ngx_conf_merge_uint_value(conf->headers_hash_max_size, prev->headers_hash_max_size, 512); diff --git a/src/http/ngx_http_upstream.c b/src/http/ngx_http_upstream.c --- a/src/http/ngx_http_upstream.c +++ b/src/http/ngx_http_upstream.c @@ -72,6 +72,8 @@ static void ngx_http_upstream_finalize_r static ngx_int_t ngx_http_upstream_process_header_line(ngx_http_request_t *r, ngx_table_elt_t *h, ngx_uint_t offset); +static ngx_int_t ngx_http_upstream_process_content_length(ngx_http_request_t *r, + ngx_table_elt_t *h, ngx_uint_t offset); static ngx_int_t ngx_http_upstream_process_set_cookie(ngx_http_request_t *r, ngx_table_elt_t *h, ngx_uint_t offset); static ngx_int_t @@ -89,6 +91,9 @@ static ngx_int_t ngx_http_upstream_proce ngx_table_elt_t *h, ngx_uint_t offset); static ngx_int_t ngx_http_upstream_process_charset(ngx_http_request_t *r, ngx_table_elt_t *h, ngx_uint_t offset); +static ngx_int_t + ngx_http_upstream_process_transfer_encoding(ngx_http_request_t *r, + ngx_table_elt_t *h, ngx_uint_t offset); static ngx_int_t ngx_http_upstream_copy_header_line(ngx_http_request_t *r, ngx_table_elt_t *h, ngx_uint_t offset); static ngx_int_t @@ -96,8 +101,6 @@ static ngx_int_t ngx_table_elt_t *h, ngx_uint_t offset); static ngx_int_t ngx_http_upstream_copy_content_type(ngx_http_request_t *r, ngx_table_elt_t *h, ngx_uint_t offset); -static ngx_int_t ngx_http_upstream_copy_content_length(ngx_http_request_t *r, - ngx_table_elt_t *h, ngx_uint_t offset); static ngx_int_t ngx_http_upstream_copy_last_modified(ngx_http_request_t *r, ngx_table_elt_t *h, ngx_uint_t offset); static ngx_int_t ngx_http_upstream_rewrite_location(ngx_http_request_t *r, @@ -149,9 +152,9 @@ ngx_http_upstream_header_t ngx_http_ups ngx_http_upstream_copy_content_type, 0, 1 }, { ngx_string("Content-Length"), - ngx_http_upstream_process_header_line, + ngx_http_upstream_process_content_length, offsetof(ngx_http_upstream_headers_in_t, content_length), - ngx_http_upstream_copy_content_length, 0, 0 }, + ngx_http_upstream_ignore_header_line, 0, 0 }, { ngx_string("Date"), ngx_http_upstream_process_header_line, @@ -247,6 +250,10 @@ ngx_http_upstream_header_t ngx_http_ups ngx_http_upstream_process_charset, 0, ngx_http_upstream_copy_header_line, 0, 0 }, + { ngx_string("Transfer-Encoding"), + ngx_http_upstream_process_transfer_encoding, 0, + ngx_http_upstream_ignore_header_line, 0, 0 }, + #if (NGX_HTTP_GZIP) { ngx_string("Content-Encoding"), ngx_http_upstream_process_header_line, @@ -396,6 +403,8 @@ ngx_http_upstream_create(ngx_http_reques r->cache = NULL; #endif + u->headers_in.content_length_n = -1; + return NGX_OK; } @@ -800,6 +809,7 @@ ngx_http_upstream_cache_send(ngx_http_re u->buffer.pos += c->header_start; ngx_memzero(&u->headers_in, sizeof(ngx_http_upstream_headers_in_t)); + u->headers_in.content_length_n = -1; if (ngx_list_init(&u->headers_in.headers, r->pool, 8, sizeof(ngx_table_elt_t)) @@ -1282,7 +1292,10 @@ ngx_http_upstream_reinit(ngx_http_reques return NGX_ERROR; } + u->keepalive = 0; + ngx_memzero(&u->headers_in, sizeof(ngx_http_upstream_headers_in_t)); + u->headers_in.content_length_n = -1; if (ngx_list_init(&u->headers_in.headers, r->pool, 8, sizeof(ngx_table_elt_t)) @@ -1924,14 +1937,9 @@ ngx_http_upstream_process_headers(ngx_ht r->headers_out.status = u->headers_in.status_n; r->headers_out.status_line = u->headers_in.status_line; - u->headers_in.content_length_n = r->headers_out.content_length_n; - - if (r->headers_out.content_length_n != -1) { - u->length = (size_t) r->headers_out.content_length_n; - - } else { - u->length = NGX_MAX_SIZE_T_VALUE; - } + r->headers_out.content_length_n = u->headers_in.content_length_n; + + u->length = u->headers_in.content_length_n; return NGX_OK; } @@ -1995,6 +2003,11 @@ ngx_http_upstream_process_body_in_memory } } + if (u->length == 0) { + ngx_http_upstream_finalize_request(r, u, 0); + return; + } + if (ngx_handle_read_event(rev, 0) != NGX_OK) { ngx_http_upstream_finalize_request(r, u, NGX_ERROR); return; @@ -2296,6 +2309,15 @@ ngx_http_upstream_send_response(ngx_http u->read_event_handler = ngx_http_upstream_process_upstream; r->write_event_handler = ngx_http_upstream_process_downstream; + p->length = -1; + + if (u->input_filter_init + && u->input_filter_init(p->input_ctx) != NGX_OK) + { + ngx_http_upstream_finalize_request(r, u, 0); + return; + } + ngx_http_upstream_process_upstream(r, u); } @@ -2388,6 +2410,10 @@ ngx_http_upstream_process_non_buffered_r if (u->busy_bufs == NULL) { + ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, + "http upstream non buffered length:%O", + u->length); + if (u->length == 0 || upstream->read->eof || upstream->read->error) @@ -2403,10 +2429,6 @@ ngx_http_upstream_process_non_buffered_r size = b->end - b->last; - if (size > u->length) { - size = u->length; - } - if (size && upstream->read->ready) { n = upstream->recv(upstream, b->last, size); @@ -2503,12 +2525,16 @@ ngx_http_upstream_non_buffered_filter(vo cl->buf->last = b->last; cl->buf->tag = u->output.tag; - if (u->length == NGX_MAX_SIZE_T_VALUE) { + if (u->length == -1) { return NGX_OK; } u->length -= bytes; + if (u->length == 0) { + u->keepalive = 1; + } + return NGX_OK; } @@ -3060,6 +3086,21 @@ ngx_http_upstream_ignore_header_line(ngx static ngx_int_t +ngx_http_upstream_process_content_length(ngx_http_request_t *r, + ngx_table_elt_t *h, ngx_uint_t offset) +{ + ngx_http_upstream_t *u; + + u = r->upstream; + + u->headers_in.content_length = h; + u->headers_in.content_length_n = ngx_atoof(h->value.data, h->value.len); + + return NGX_OK; +} + + +static ngx_int_t ngx_http_upstream_process_set_cookie(ngx_http_request_t *r, ngx_table_elt_t *h, ngx_uint_t offset) { @@ -3321,6 +3362,23 @@ ngx_http_upstream_process_charset(ngx_ht static ngx_int_t +ngx_http_upstream_process_transfer_encoding(ngx_http_request_t *r, + ngx_table_elt_t *h, ngx_uint_t offset) +{ + r->upstream->headers_in.transfer_encoding = h; + + if (ngx_strlcasestrn(h->value.data, h->value.data + h->value.len, + (u_char *) "chunked", 7 - 1) + != NULL) + { + r->upstream->headers_in.chunked = 1; + } + + return NGX_OK; +} + + +static ngx_int_t ngx_http_upstream_copy_header_line(ngx_http_request_t *r, ngx_table_elt_t *h, ngx_uint_t offset) { @@ -3428,26 +3486,6 @@ ngx_http_upstream_copy_content_type(ngx_ static ngx_int_t -ngx_http_upstream_copy_content_length(ngx_http_request_t *r, ngx_table_elt_t *h, - ngx_uint_t offset) -{ - ngx_table_elt_t *ho; - - ho = ngx_list_push(&r->headers_out.headers); - if (ho == NULL) { - return NGX_ERROR; - } - - *ho = *h; - - r->headers_out.content_length = ho; - r->headers_out.content_length_n = ngx_atoof(h->value.data, h->value.len); - - return NGX_OK; -} - - -static ngx_int_t ngx_http_upstream_copy_last_modified(ngx_http_request_t *r, ngx_table_elt_t *h, ngx_uint_t offset) { diff --git a/src/http/ngx_http_upstream.h b/src/http/ngx_http_upstream.h --- a/src/http/ngx_http_upstream.h +++ b/src/http/ngx_http_upstream.h @@ -217,6 +217,7 @@ typedef struct { ngx_table_elt_t *location; ngx_table_elt_t *accept_ranges; ngx_table_elt_t *www_authenticate; + ngx_table_elt_t *transfer_encoding; #if (NGX_HTTP_GZIP) ngx_table_elt_t *content_encoding; @@ -225,6 +226,8 @@ typedef struct { off_t content_length_n; ngx_array_t cache_control; + + unsigned chunked:1; } ngx_http_upstream_headers_in_t; @@ -267,7 +270,7 @@ struct ngx_http_upstream_s { ngx_http_upstream_resolved_t *resolved; ngx_buf_t buffer; - size_t length; + off_t length; ngx_chain_t *out_bufs; ngx_chain_t *busy_bufs; @@ -308,6 +311,7 @@ struct ngx_http_upstream_s { #endif unsigned buffering:1; + unsigned keepalive:1; unsigned request_sent:1; unsigned header_sent:1;