这次主要来看upstream的几个相关的hook函数。
首先要知道,对于upstream,同时有两个连接,一个时client和nginx,一个是nginx和upstream,这个时候就会有两个回调,然后上篇blog中,我们能看到在upstream中,会改变read_event_handler和write_event_handler,不过这里有三个条件,分别是
1 没有使用cache,
2 不忽略client的提前终止
3 不是post_action
1
2
3
4
5
6
|
//条件赋值
if (!u->store && !r->post_action && !u->conf->ignore_client_abort) {
//然后给读写handler赋值
r->read_event_handler = ngx_http_upstream_rd_check_broken_connection;
r->write_event_handler = ngx_http_upstream_wr_check_broken_connection;
}
|
然后我们来看这个两个函数,这两个都会调用ngx_http_upstream_check_broken_connection,因此我们就先来详细分析这个函数。
这个函数主要是用来检测client的连接是否完好。因此它使用了MSG_PEEK这个参数,也就是预读,然后通过recv的返回值来判断是否连接已经断开。
这里的代码分为两部分,第一部分是本身连接在进入这个回调函数之前连接都已经有错误了,这个时候如果是水平触发,则删除事件,然后finalize这个upstream(没有cache’的情况下),否则就直接finalize这个upstream。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
c = r->connection;
u = r->upstream;
//如果连接已经出现错误。
if (c->error) {
//如果是水平触发
if ((ngx_event_flags & NGX_USE_LEVEL_EVENT) && ev->active) {
event = ev->write ? NGX_WRITE_EVENT : NGX_READ_EVENT;
//删除事件
if (ngx_del_event(ev, event, 0) != NGX_OK) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
}
if (!u->cacheable) {
//清理upstream request
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_CLIENT_CLOSED_REQUEST);
}
return;
}
|
紧接着就是第二部分,这部分的工作就是预读取1个字节,然后来判断是否连接已经被client断掉。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
|
//读取1个字节
n = recv(c->fd, buf, 1, MSG_PEEK);
err = ngx_socket_errno;
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, ev->log, err,
"http upstream recv(): %d", n);
if (ev->write && (n >= 0 || err == NGX_EAGAIN)) {
return;
}
//如果水平触发则删除事件
if ((ngx_event_flags & NGX_USE_LEVEL_EVENT) && ev->active) {
event = ev->write ? NGX_WRITE_EVENT : NGX_READ_EVENT;
if (ngx_del_event(ev, event, 0) != NGX_OK) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
}
//如果还有数据,则直接返回
if (n > 0) {
return;
}
if (n == -1) {
if (err == NGX_EAGAIN) {
return;
}
ev->error = 1;
} else { /* n == 0 */
err = 0;
}
//到达这里说明有错误产生了
ev->eof = 1;
//设置错误,可以看到这个值在函数一开始有检测.
c->error = 1;
//如果没有cache,则finalize upstream request
if (!u->cacheable && u->peer.connection) {
ngx_log_error(NGX_LOG_INFO, ev->log, err,
"client closed prematurely connection, "
"so upstream connection is closed too");
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_CLIENT_CLOSED_REQUEST);
return;
}
ngx_log_error(NGX_LOG_INFO, ev->log, err,
"client closed prematurely connection");
//如果有cache,并且后端的upstream还在处理,则此时继续处理upstream,忽略对端的错误.
if (u->peer.connection == NULL) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_CLIENT_CLOSED_REQUEST);
}
|
然后我们来看nginx如何连接后端的upstream,在上篇blog的结束的时候,我们看到最终会调用ngx_http_upstream_connect来进入连接upstream的处理,因此我们来详细分析这个函数以及相关的函数。
函数一开始是初始化请求开始事件一些参数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
..........................................................
//取得upstream的状态
u->state = ngx_array_push(r->upstream_states);
if (u->state == NULL) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
ngx_memzero(u->state, sizeof(ngx_http_upstream_state_t));
tp = ngx_timeofday();
//初始化时间
u->state->response_sec = tp->sec;
u->state->response_msec = tp->msec;
|
然后是调用ngx_event_connect_peer开始连接后端upstream.并且对返回值进行处理,等下会详细分析ngx_event_connect_peer这个函数.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
//连接后端
rc = ngx_event_connect_peer(&u->peer);
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"http upstream connect: %i", rc);
if (rc == NGX_ERROR) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
//这个是很关键的一个结构peer,后面的blog会详细分析
u->state->peer = u->peer.name;
if (rc == NGX_BUSY) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "no live upstreams");
ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_NOLIVE);
return;
}
if (rc == NGX_DECLINED) {
ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
return;
}
|
当返回值为NGX_OK或者NGX_AGAIN的话,就说明连接成功或者暂时异步的连接还没成功,所以需要挂载upstream端的回调函数.这里要注意就是NGX_AGAIN的情况,因为是异步的connect,所以可能会连接不成功。所以如果返回NGX_AGAIN的话,需要挂载写函数.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
/* rc == NGX_OK || rc == NGX_AGAIN */
c = u->peer.connection;
c->data = r;
c->write->handler = ngx_http_upstream_handler;
c->read->handler = ngx_http_upstream_handler;
//开始挂载回调函数,一个是读,一个是写。
u->write_event_handler = ngx_http_upstream_send_request_handler;
u->read_event_handler = ngx_http_upstream_process_header;
c->sendfile &= r->connection->sendfile;
u->output.sendfile = c->sendfile;
c->pool = r->pool;
c->log = r->connection->log;
c->read->log = c->log;
c->write->log = c->log;
/* init or reinit the ngx_output_chain() and ngx_chain_writer() contexts */
u->writer.out = NULL;
u->writer.last = &u->writer.out;
u->writer.connection = c;
u->writer.limit = 0;
|
然后时对request_body的一些处理以及如果request_sent已经设置,也就是这个upstream已经发送过一部分数据了,此时需要重新初始化upstream.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
if (u->request_sent) {
//重新初始化upstream
if (ngx_http_upstream_reinit(r, u) != NGX_OK) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
}
//如果request_body存在的话,保存request_body
if (r->request_body
&& r->request_body->buf
&& r->request_body->temp_file
&& r == r->main)
{
/*
* the r->request_body->buf can be reused for one request only,
* the subrequests should allocate their own temporay bufs
*/
u->output.free = ngx_alloc_chain_link(r->pool);
if (u->output.free == NULL) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
//保存到output
u->output.free->buf = r->request_body->buf;
u->output.free->next = NULL;
u->output.allocated = 1;
//重置request_body
r->request_body->buf->pos = r->request_body->buf->start;
r->request_body->buf->last = r->request_body->buf->start;
r->request_body->buf->tag = u->output.tag;
}
|
最后则是先判断rc的返回值,如果是NGX_AGAIN,则说明连接没有返回,则设置定时器,然后返回,否则说明连接成功,这时就需要发送请求到后端。
1
2
3
4
5
6
7
|
if (rc == NGX_AGAIN) {
//添加定时器
ngx_add_timer(c->write, u->conf->connect_timeout);
return;
}
ngx_http_upstream_send_request(r, u);
|
紧接着我们来看最后的两个函数,分别是上面的ngx_event_connect_peer和ngx_http_upstream_send_request,我们来一个个看。
先来看ngx_event_connect_peer。它主要是用来连接后端,函数比较长,一部分一部分来看。
下面这部分主要是建立socket,然后设置属性,从连接池取出来connection.这里后面的一部分和我们前面client请求上来之后,我们初始化connect类似.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
|
//取得我们将要发送的upstream对端
rc = pc->get(pc, pc->data);
if (rc != NGX_OK) {
return rc;
}
//新建socket
s = ngx_socket(pc->sockaddr->sa_family, SOCK_STREAM, 0);
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, pc->log, 0, "socket %d", s);
if (s == -1) {
ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno,
ngx_socket_n " failed");
return NGX_ERROR;
}
//取得连接
c = ngx_get_connection(s, pc->log);
if (c == NULL) {
if (ngx_close_socket(s) == -1) {
ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno,
ngx_close_socket_n "failed");
}
return NGX_ERROR;
}
//设置rcvbuf的大小
if (pc->rcvbuf) {
if (setsockopt(s, SOL_SOCKET, SO_RCVBUF,
(const void *) &pc->rcvbuf, sizeof(int)) == -1)
{
ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno,
"setsockopt(SO_RCVBUF) failed");
goto failed;
}
}
//设置非阻塞
if (ngx_nonblocking(s) == -1) {
ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno,
ngx_nonblocking_n " failed");
goto failed;
}
if (pc->local) {
if (bind(s, pc->local->sockaddr, pc->local->socklen) == -1) {
ngx_log_error(NGX_LOG_CRIT, pc->log, ngx_socket_errno,
"bind(%V) failed", &pc->local->name);
goto failed;
}
}
//开始挂载对应的读写函数.
c->recv = ngx_recv;
c->send = ngx_send;
c->recv_chain = ngx_recv_chain;
c->send_chain = ngx_send_chain;
c->sendfile = 1;
c->log_error = pc->log_error;
if (pc->sockaddr->sa_family != AF_INET) {
c->tcp_nopush = NGX_TCP_NOPUSH_DISABLED;
c->tcp_nodelay = NGX_TCP_NODELAY_DISABLED;
#if (NGX_SOLARIS)
/* Solaris's sendfilev() supports AF_NCA, AF_INET, and AF_INET6 */
c->sendfile = 0;
#endif
}
rev = c->read;
wev = c->write;
rev->log = pc->log;
wev->log = pc->log;
pc->connection = c;
c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1);
#if (NGX_THREADS)
/* TODO: lock event when call completion handler */
rev->lock = pc->lock;
wev->lock = pc->lock;
rev->own_lock = &c->lock;
wev->own_lock = &c->lock;
#endif
if (ngx_add_conn) {
//添加读写事件
if (ngx_add_conn(c) == NGX_ERROR) {
goto failed;
}
}
|
等socket设置完毕,nginx就开始连接后端的upstream,这段代码可以学习一个好的代码是如何处理错误的,
下面这段主要是处理当返回值为-1,并且err不等于NGX_EINPROGRESS的时候,而NGX_EINPROGRESS表示非阻塞的socket,然后connect,然后连接还没有完成,可是提前返回,就回设置这个errno.这个error不算出错,因此需要过滤掉.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
rc = connect(s, pc->sockaddr, pc->socklen);
if (rc == -1) {
err = ngx_socket_errno;
//判断错误号
if (err != NGX_EINPROGRESS
#if (NGX_WIN32)
/* Winsock returns WSAEWOULDBLOCK (NGX_EAGAIN) */
&& err != NGX_EAGAIN
#endif
)
{
if (err == NGX_ECONNREFUSED
#if (NGX_LINUX)
/*
* Linux returns EAGAIN instead of ECONNREFUSED
* for unix sockets if listen queue is full
*/
|| err == NGX_EAGAIN
#endif
|| err == NGX_ECONNRESET
|| err == NGX_ENETDOWN
|| err == NGX_ENETUNREACH
|| err == NGX_EHOSTDOWN
|| err == NGX_EHOSTUNREACH)
{
level = NGX_LOG_ERR;
} else {
level = NGX_LOG_CRIT;
}
ngx_log_error(level, c->log, err, "connect() to %V failed",
pc->name);
//返回declined
return NGX_DECLINED;
}
}
|
然后就是下面的部门就是处理连接成功和错误号为NGX_EINPROGRESS的情况,
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
//如果当前的事件模型支持add_conn,则事件在开始已经加好了,因此如果rc==-1则直接返回
if (ngx_add_conn) {
if (rc == -1) {
/* NGX_EINPROGRESS */
return NGX_AGAIN;
}
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, pc->log, 0, "connected");
wev->ready = 1;
return NGX_OK;
}
..............................................
//添加可读事件
if (ngx_add_event(rev, NGX_READ_EVENT, event) != NGX_OK) {
goto failed;
}
if (rc == -1) {
/* NGX_EINPROGRESS */
//如果错误号是 EINPROGRES 添加可写事件
if (ngx_add_event(wev, NGX_WRITE_EVENT, event) != NGX_OK) {
goto failed;
}
return NGX_AGAIN;
}
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, pc->log, 0, "connected");
wev->ready = 1;
return NGX_OK;
..............................................
|
最后我们来看下ngx_http_upstream_send_request的实现,这个函数是用来发送数据到后端的upstream,然后这里有一个需要注意的地方,那就是在linux下当非阻塞的connect,然后没有连接完成,如果挂载写事件,此时如果写事件上报上来,并不代表连接成功,此时还需要调用getsockopt来判断SO_ERROR,如果没有错误才能保证连接成功。
SOL_SOCKET
to determine whether connect() completed successfully (SO_ERROR is zero) or unsuccessfully (SO_ERROR is one of the usual error codes listed
here, explaining the reason for the failure).
这里我看了下内核的代码,就是如果连接失败,比如对端不可达,内核会设置sock->sk_soft_err,而在tcp_poll中只会检测sk_err , 对应的SO_ERROR会检测这两个错误。在内核里面的注释是这样子的
* @sk_err: last error
* @sk_err_soft: errors that don’t cause failure but are the cause of a
* persistent failure not just ‘timed out’
这个按照我的理解,内核里面的sk_err 表示4层的错误,而sk_err_soft下层的错误.
在nginx中是在ngx_http_upstream_test_connect中对连接是否断开进行判断的(调用getsockopt).
然后发送数据则是调用ngx_output_chain,不过这里我们知道在ngx_output_chain中会依次调用filter链,可是upstream明显不需要调用filter链,那么nginx是怎么做的呢,是这样子的,在upstream的初始化的时候,已经讲u->output.output_filter改成ngx_chain_writer了:
1
|
u->output.output_filter = ngx_chain_writer;
|
最后就是一些对错误的处理,我们来看代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
|
static void
ngx_http_upstream_send_request(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
ngx_int_t rc;
ngx_connection_t *c;
c = u->peer.connection;
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
"http upstream send request");
//如果test connect失败,则说明连接失败,于是跳到下一个upstream,然后返回
if (!u->request_sent && ngx_http_upstream_test_connect(c) != NGX_OK) {
ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
return;
}
c->log->action = "sending request to upstream";
//发送数据,这里的u->output.output_filter已经被修改过了
rc = ngx_output_chain(&u->output, u->request_sent ? NULL : u->request_bufs);
u->request_sent = 1;
........................................................
//和request的处理类似,如果again,则说明数据没有发送完毕,此时挂载写事件.
if (rc == NGX_AGAIN) {
ngx_add_timer(c->write, u->conf->send_timeout);
if (ngx_handle_write_event(c->write, u->conf->send_lowat) != NGX_OK) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
return;
}
/* rc == NGX_OK */
//设置tcp_cork,远离和前面的keepalive部分的处理类似
if (c->tcp_nopush == NGX_TCP_NOPUSH_SET) {
if (ngx_tcp_push(c->fd) == NGX_ERROR) {
ngx_log_error(NGX_LOG_CRIT, c->log, ngx_socket_errno,
ngx_tcp_push_n " failed");
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
c->tcp_nopush = NGX_TCP_NOPUSH_UNSET;
}
ngx_add_timer(c->read, u->conf->read_timeout);
#if 1
//如果读也可以了,则开始解析头
if (c->read->ready) {
/* post aio operation */
/*
* TODO comment
* although we can post aio operation just in the end
* of ngx_http_upstream_connect() CHECK IT !!!
* it's better to do here because we postpone header buffer allocation
*/
ngx_http_upstream_process_header(r, u);
return;
}
#endif
...........................................
}
|
在下一篇blog里面,我会详细的分析nginx对后端来的数据如何解析以及如何发送数据到client.
(责任编辑:IT) |