当前位置: > Linux服务器 > nginx >

nginx连接池实现

时间:2014-12-11 23:27来源:csdn.net 作者:liujiyong7

1. 配置

worker_connections

模块:EventsModule

语法: worker_connections number

默认:

位于main section的指令worker_connections和worker_processes可以帮助你计算你能处理的最大并发

max clients = worker_processes * worker_connections

在一个反向代理环境中,最大并发数变成了

max clients = worker_processes * worker_connections/2

 

配置示例

 
  1. worker_processes  12;  
  2.   
  3. events {  
  4.   
  5.         use epoll;  
  6.   
  7.         worker_connections  2048000;  
  8.   
  9. }  


 

2. 工作原理

 

同一个进程,上游下游共用一个连接池,连接池大小,进程数都可在配置中指定。

3. 数据结构

在nginx中connection就是对tcp连接的封装,其中包括连接的socket,读事件,写事件。利用nginx封装的connection,我们可以很方便的使用nginx来处理与连接相关的事情,比如,建立连接,发送与接受数据等。结构体如下:

Src/core/ngx_connection.h

 
  1. struct ngx_connection_s {  
  2.   
  3.     void               *data;  
  4.   
  5.     ngx_event_t        *read;  
  6.   
  7.     ngx_event_t        *write;  
  8.   
  9. ngx_socket_t        fd;  
  10.   
  11. 。。。  
  12.   
  13. ngx_queue_t         queue;  
  14.   
  15. 。。。  
  16.   
  17. }  


连接池数据结构如下:

 

连接池cycle->connections采用数组单链表实现,空闲连接链表free_connections也指向了连接池头结点,实现的非常优美。

1、使用数组实现,不用队列或栈,实现简单,可以随机访问,初始化很方便

2、所有操作只操作链表头节点,没有内存拷贝,性能复杂度O(1)

 

创建初始化如上数据结构在src/event/ngx_event.c:ngx_event_process_init中

 
  1. ngx_event_process_init(ngx_cycle_t *cycle)  
  2.   
  3. {。。。  
  4.   
  5. cycle->files_n = (ngx_uint_t) rlmt.rlim_cur;  
  6.   
  7.        cycle->files = ngx_calloc(sizeof(ngx_connection_t *) * cycle->files_n, cycle->log);  
  8.   
  9. 。。。  
  10.   
  11.     cycle->connections =  
  12.   
  13.         ngx_alloc(sizeof(ngx_connection_t) * cycle->connection_n, cycle->log);  
  14.   
  15. 。。。  
  16.   
  17.     i = cycle->connection_n;  
  18.   
  19.     next = NULL;  
  20.   
  21.     do {  
  22.   
  23.         i--;  
  24.   
  25.         c[i].data = next;  
  26.   
  27.         c[i].read = &cycle->read_events[i];  
  28.   
  29.         c[i].write = &cycle->write_events[i];  
  30.   
  31.         c[i].fd = (ngx_socket_t) -1;  
  32.   
  33.         next = &c[i];  
  34.   
  35.     } while (i);  
  36.   
  37.     cycle->free_connections = next;  
  38.   
  39. cycle->free_connection_n = cycle->connection_n;  
  40.   
  41. 。。。  
  42.   
  43. }  


 

 

4. 基本操作

Src/core/ngx_connection.c

Ngx_get_connection

 从free_connections获取一个connection,然后初始化 

 

Free_connections指向链表下一个节点,返回头结点。

同时,cycle->files[fd]也指向返回的结点

 

Ngx_close_connection

 主要关闭一个connection,包括“善后”以及调用ngx_reusable_connection(c,0) ngx_free_connection来将连接放回free_connections

可以认为是ngx_get_connection的逆操作

 

 

ngx_reusable_connection

 ngx_reusable_connection(ngx_connection_t *c, ngx_uint_t reusable) 

 reusable=1 ,放进queue中 

 reusable=0 ,从queue中出来 

Ngx_free_connection

 将使用的连接放回free_connections 

 

 

Ngx_drain_connection

当ngx_get_connection获取不到连接时(即并发比较高的时候,连接都用完了),那么使用

ngx_drain_connections来释放长连接,将长连接从queue拿出来,放回到free_connections,然后再获取

 

 

ngx_event_connect_peer

当从upstream中获得一个后端时,就会调用ngx_event_connect_peer去进行连接,ngx_event_connect_peer会调用 Ngx_get_connection得到一个connection结构体,然后去执行connect操作。

 

 
  1. ngx_event_connect_peer(ngx_peer_connection_t *pc)  
  2. {  
  3. 。。。  
  4. s = ngx_socket(pc->sockaddr->sa_family, SOCK_STREAM, 0);  
  5. 。。。  
  6. c = ngx_get_connection(s, pc->log);  
  7. 。。。  
  8. rc = connect(s, pc->sockaddr, pc->socklen);  
  9. 。。。  
  10. }  


 

 

 

5. Nginxaccept_mutex锁机制

为了全面理解,我们先捋一下这个过程

1.master进程bind端口,listen后,生成了监听套接字

2.Master进程fork出work进程,work进程继承了监听套接字,执行accept获得请求

所有的work进程继承了同样的监听套接字,那么一个连接过来的时候,多个空闲的进程,会竞争这个连接,会是哪个work进程来处理这个连接呢?

如果某个进程得到accept的机会比较多,它的空闲连接很 快就用完了,如果不提前做一些控制,当accept到一个新的tcp连接后,因为无法得到空闲连接,而且无法将此连接转交给其它进程,最终会导致此tcp 连接得不到处理,就中止掉了。这是不公平的。

我们必须设计一种机制,让一个请求,有且仅有一个work进程处理,也就是说只能有一个进程accept到这个连接。同时每个进程有大致均等的机会来处理连接。

 

 

我们看nginx是怎么做的?

在master进程中已经建立了listen套接字,

Src/core/ngx_cycle.c:ngx_init_cycle

 
  1. {  
  2.   
  3. ...  
  4.   
  5. ngx_queue_init(&cycle->reusable_connections_queue);  
  6.   
  7. ...  
  8.   
  9.     if (ngx_open_listening_sockets(cycle) != NGX_OK) {  
  10.   
  11.         goto failed;  
  12.   
  13.     }  
  14.   
  15. }  
  16.   
  17. 在src/core/ngx_connection.c:ngx_open_listening_sockets中  
  18.   
  19. {  
  20.   
  21. ls = cycle->listening.elts;  
  22.   
  23. s = ngx_socket(ls[i].sockaddr->sa_family, ls[i].type, 0);  
  24.   
  25. if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR,  
  26.   
  27.                            (const void *) &reuseaddr, sizeof(int))  
  28.   
  29.                 == -1)  
  30.   
  31.             {...  
  32.   
  33.     
  34.   
  35. if (bind(s, ls[i].sockaddr, ls[i].socklen) == -1) {...  
  36.   
  37. if (listen(s, ls[i].backlog) == -1) {...  
  38.   
  39. ls[i].listen = 1;  
  40.   
  41. ls[i].fd = s;  
  42.   
  43. }  


Work进程继承了master进程的listen套接字后,循环监听网络事件

Src/os/unix/ngx_process_cycle.c:ngx_worker_process_cycle

 
  1. {  
  2.   
  3. ...  
  4.   
  5. ngx_process_events_and_timers(cycle);  
  6.   
  7. ...  
  8.   
  9. }  
  10.   
  11. Src/event/ngx_event.c:ngx_process_events_andtimers(cycle)  
  12.   
  13. {...  
  14.   
  15. if(ngx_use_accept_mutex) {  
  16.   
  17. if(ngx_accept_disabled > 0) {  
  18.   
  19. ngx_accept_disabled--;  
  20.   
  21.       
  22.   
  23. //在src/event/nginx_event_accept.c:ngx_event_accept()中计算:ngx_accept_disabled = ngx_cycle->connection_n / 8 - ngx_cycle->free_connection_n;   初始值为0,以后每来新建一个连接,都会更新一下值。   
  24.   
  25.     
  26.   
  27. //当剩余连接数小于最大连接数的1/8的时候为正,表示连接有点多了,于是放弃一次争锁定机会    
  28.   
  29.     
  30.   
  31. }else{  
  32.   
  33. if(ngx_trylock_accept_mutex(cycle) == NGX_ERROR) {  
  34.   
  35.     
  36.   
  37. //这里ngx_trylock_accept_mutex函数就是争锁定函数,成功争得了锁则将全局变量ngx_accept_mutex_held置为1,否则置0    
  38.   
  39. return;  
  40.   
  41. }  
  42.   
  43.     
  44.   
  45. if(ngx_accept_mutex_held) {  
  46.   
  47.   flags |= NGX_POST_EVENTS;  
  48.   
  49.     
  50.   
  51. //占用了accept锁的进程在处理事件的时候是先将事件放入队列,后续慢慢处理,以便尽快走到下面释放锁。    
  52.   
  53.     
  54.   
  55. }else{  
  56.   
  57.     
  58.   
  59. //没争得锁的进程不需要分两步处理事件,但是把处理事件的timer更新为ngx_accept_mutex_delay    
  60.   
  61. if(timer == NGX_TIMER_INFINITE  
  62.   
  63.   || timer > ngx_accept_mutex_delay)  
  64.   
  65. {  
  66.   
  67.   timer = ngx_accept_mutex_delay;  
  68.   
  69. }  
  70.   
  71. }  
  72.   
  73. }  
  74.   
  75. }  
  76.   
  77.     
  78.   
  79. delta = ngx_current_msec;  
  80.   
  81.    


惊群

http://www.usenix.org/event/usenix2000/freenix/full_papers/molloy/molloy.pdf 

http://simohayha.iteye.com/blog/561424

http://blog.csdn.net/benbendy1984/article/details/5620482

http://blog.csdn.net/russell_tao/article/details/7204260

(责任编辑:IT)
------分隔线----------------------------
栏目列表
推荐内容