对于web server来说,必须能够监听到客户端的连接才能与之通信,这篇文章就看一下nginx是如何实现连接的建立。监听到新的连接实际上就是监听socket上的读事件,此时监听socket的已完成连接队列是非空的,可以非阻塞的调用accpet获取新到的连接。在nginx中每个socket都会被封装成一个连接结构,就是ngx_connection_t类型。每个ngx_connection_t结构具有读写事件read和write,它们是ngx_event_t类型的,有一个handler回调函数指针,在发生读写事件时被调用。在ngx_event_process_init函数中为每个监听socket分配连接,并将这些连接的read的handler初始化为ngx_event_accept,也就是说说在监听到连接时会调用,用于初始化连接等,最后将其添加到事件循环中。 在事件循环中,已经介绍过为了防止惊群(新到的一个连接会唤醒所有阻塞的worker进程),只有获取accept锁的worker进程才能accept新的连接,接下来才会去调用ngx_event_accept函数处理。下面就看看具体过程. ngx_connection_s数据结构: truct ngx_connection_s { void *data; ngx_event_t *read; ngx_event_t *write; ngx_socket_t fd; ngx_recv_pt recv; ngx_send_pt send; ngx_recv_chain_pt recv_chain; ngx_send_chain_pt send_chain; ngx_listening_t *listening; off_t sent; ngx_log_t *log; ngx_pool_t *pool; struct sockaddr *sockaddr; socklen_t socklen; ngx_str_t addr_text; ngx_buf_t *buffer; ngx_queue_t queue; ngx_atomic_uint_t number; ngx_uint_t requests; } struct ngx_http_request_s { uint32_t signature; /* "HTTP" */ ngx_connection_t *connection; void **ctx; void **main_conf; void **srv_conf; void **loc_conf; ngx_http_event_handler_pt read_event_handler; ngx_http_event_handler_pt write_event_handler; ngx_http_upstream_t *upstream; ngx_array_t *upstream_states; /* of ngx_http_upstream_state_t */ ngx_pool_t *pool; ngx_buf_t *header_in; ngx_http_headers_in_t headers_in; ngx_http_headers_out_t headers_out; ngx_http_request_body_t *request_body; time_t lingering_time; time_t start_sec; ngx_msec_t start_msec; ngx_uint_t method; ngx_uint_t http_version; ngx_str_t request_line; ngx_str_t uri; ngx_str_t args; ngx_str_t exten; ngx_str_t unparsed_uri; ngx_str_t method_name; ngx_str_t http_protocol; ngx_chain_t *out; ngx_http_request_t *main; ngx_http_request_t *parent; ngx_http_postponed_request_t *postponed; ngx_http_post_subrequest_t *post_subrequest; ngx_http_posted_request_t *posted_requests; ngx_http_virtual_names_t *virtual_names; ngx_int_t phase_handler; ngx_http_handler_pt content_handler; ngx_uint_t access_code; ngx_http_variable_value_t *variables; ......... } 1. ngx_event_accept : ecf = ngx_event_get_conf(ngx_cycle->conf_ctx, ngx_event_core_module); if (ngx_event_flags & NGX_USE_RTSIG_EVENT) { ev->available = 1; } else if (!(ngx_event_flags & NGX_USE_KQUEUE_EVENT)) { ev->available = ecf->multi_accept; } lc = ev->data; ls = lc->listening; ev->ready = 0; ngx_event_t的data属性是该事件所在的连接,对于监听socket的连接,可以通过listening属性获取对应的监听socket(ngx_listening_t)。接下来的while循环用于迭代ev->available次数,获取对应的连接。在while循环一开始的部分就是调用accept获取连接socket。
ngx_accept_disabled = ngx_cycle->connection_n / 8
- ngx_cycle->free_connection_n;
提到过ngx_accept_disabled主要用于实现worker进程的简单的负载均衡,在一个worker进程的空闲连接的个数小于连接池大小的1/8时,该进程会放弃竞争accept锁,ngx_accept_disabled在ngx_process_events_and_timers函数中使用。 c = ngx_get_connection(s, ev->log); if (c == NULL) { if (ngx_close_socket(s) == -1) { ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno, ngx_close_socket_n " failed"); } s是连接socket的文件描述符,从连接池中获取一个连接并分配给该socket。 if (ngx_add_conn && (ngx_event_flags & NGX_USE_EPOLL_EVENT) == 0) { if (ngx_add_conn(c) == NGX_ERROR) { ngx_close_accepted_connection(c); return; } } 调用ngx_add_conn将新建的连接加入nginx的事件循环。在使用epoll时,实际上会调用ngx_epoll_add_connection函数,最终调用epoll_ctl添加事件,这样后续就会监听到来自该socket的数据。 ls->handler(c); //回调函数 ngx_http_init_connection ls是监听socket(ngx_listening_t),handler在accept到新连接时调用,注释已经说的很清楚,就是调用ngx_http_init_connection完成连接的初始化,其中最主要的就是为读事件设置handler,下面看一下这个函数。 2 ngx_http_init_connection void ngx_http_init_connection(ngx_connection_t *c) { ngx_event_t *rev; ngx_http_log_ctx_t *ctx; ctx = ngx_palloc(c->pool, sizeof(ngx_http_log_ctx_t)); if (ctx == NULL) { ngx_http_close_connection(c); return; } ctx->connection = c; ctx->request = NULL; ctx->current_request = NULL; c->log->connection = c->number; c->log->handler = ngx_http_log_error; c->log->data = ctx; c->log->action = "reading client request line"; c->log_error = NGX_ERROR_INFO; rev = c->read; rev->handler = ngx_http_init_request; // 这段代码是最核心的,将读事件的handler设置为ngx_http_init_request,在客户端向服务器发送数据时会被调用,用于初始化并处理客户端请求 c->write->handler = ngx_http_empty_handler; ///如果接收准备好了,则直接调用ngx_http_init_request if (rev->ready) { /* the deferred accept(), rtsig, aio, iocp */ if (ngx_use_accept_mutex) { ngx_post_event(rev, &ngx_posted_events);//如果使用了mutex锁,则post 这个event,然后返回。 return; } ngx_http_init_request(rev); return; } ngx_add_timer(rev, c->listening->post_accept_timeout);//添加定时器 if (ngx_handle_read_event(rev, 0) != NGX_OK) {////将事件挂载到事件处理器 ngx_http_close_connection(c); return; } } 在ngx_http_init_connection函数中,这个函数主要是设置当前句柄的读handler,如果数据可读,则直接调用request handler,如果数据不可读,则设置定时器(超时定时器),并将这个句柄挂载到事件处理器上。这里有一个需要注意的地方,那就是如果使用了ngx_use_accept_mutex锁的话,那么就不能够立即处理request,因为处理request是一个非常耗时的操作,而现在在锁里面,所以此时之需要将这个读事件挂载到ngx_posted_events队列,等退出锁之后再进行处理。 而一般来说默认都会使用mutex锁,因此此时就将rev加到post_events队列中,然后直接返回. 当我们从ngx_http_init_connection返回时,我们回到函数ngx_process_event_and_timers函数中: if (ngx_posted_events) {//如果post_event链表存在,则会进入链表的处理 if (ngx_threaded) { ngx_wakeup_worker_thread(cycle); } else { ngx_event_process_posted(cycle, &ngx_posted_events);//处理post event } } 此时我们已经让出mutex,所以我们此时能处理整个的request。接下来,让我们来看一下ngx_event_process_posted这个函数: void ngx_event_process_posted(ngx_cycle_t *cycle, ngx_thread_volatile ngx_event_t **posted) { ngx_event_t *ev; for ( ;; ) { ev = (ngx_event_t *) *posted; ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "posted event %p", ev); if (ev == NULL) { return; } ngx_delete_posted_event(ev); ev->handler(ev); //调用event的回调函数 } } 这个函数很简单,就是遍历event的队列,调用event的handler函数。 然后让我们来看一下ngx_http_init_request函数,进入这个函数说明客户端有请求进来,此时我们就进入了http协议解析的部分了,因此这个函数主要时初始化request结构,初始化完毕后,就进入解析处理http请求. static void ngx_http_init_request(ngx_event_t *rev) { .......... //ngx_event_t *rev , ngx_http_request_t *r; rev->handler = ngx_http_process_request_line; //设置回调 ngx_http_process_request_line r->read_event_handler = ngx_http_block_reading; ........... rev->handler(rev); //调用ngx_http_process_request_line函数
}
ngx_http_process_request_line,这个函数主要解析request line。我们知道nginx 使用的是epoll的ET模式,而et模式的话,就需要能够判断这次读取的数据是否读完,这里nginx是这样判断的,那就是根据协议来判断,也就是协议驱动,由协议来判断是否有读取完毕。
(责任编辑:IT) |