`
on__the__way
  • 浏览: 23986 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

memcache 网络驱动模型 (原创+转载)

 
阅读更多

memcache使用多线程模型程序,使用libevent事件模型,与nginx类似,由一个主线程和多个worker线程组成。

1,主线程初始化。

    memcache的主线程主要进行在监听套接字上注册事件监听网络中客户端的连接及处理和worker的初始化的工作。
    首先初始化主线程的libevent实例,然后初始化所有worker线程,并为每个worker建立通知的管道用于主线程与worker之间的通信,然后每个worker线程都会注册管道可读的libevent事件,再为每个worker建立自己的CQ队列用于存放主线程分配的客户端连接,最后启动所有worker线程,并在监听套接字上箭筒客户端连接的事件,启动主线程的循环监听。同时每个worker启动后也会进入循环监听。

    线程的初始化

void thread_init(int nthreads, struct event_base *main_base) {
 //。。。省略
   threads = malloc(sizeof(LIBEVENT_THREAD) * nthreads);// 申请内容空间
    if (! threads) {
        perror("Can't allocate thread descriptors");
        exit(1);
    }

    threads[0].base = main_base;
    threads[0].thread_id = pthread_self();
    
    // 为每个线程创建管道
    for (i = 0; i < nthreads; i++) {
        int fds[2];
        if (pipe(fds)) {
            perror("Can't create notify pipe");
            exit(1);
        }

        threads[i].notify_receive_fd = fds[0];
        threads[i].notify_send_fd = fds[1];

    setup_thread(&threads[i]);
    }

    /* Create threads after we've done all the libevent setup. */
    for (i = 1; i < nthreads; i++) {
        create_worker(worker_libevent, &threads[i]);
    }
}

    setup_thread主要做了三个事情,创建所有workers线程的libevent实例,注册所有workers线程的管道读端的libevent的读事件,初始化CQ队列。
create_worker真正启动了线程,进入循环监听。

 

2,几个结构体

    CQ_ITEM 是主线程accept客户端连接后,新分配的客户端套接字的封装,除了fd,还有state,缓冲区大小,协议类型等。

/* An item in the connection queue. */
typedef struct conn_queue_item CQ_ITEM;
struct conn_queue_item {
    int     sfd;
    int     init_state;
    int     event_flags;
    int     read_buffer_size;
    int     is_udp;
    CQ_ITEM *next;
};

    CQ是由CQ_ITEM组成链表,称为CQ队列,每个worker维护自己的CQ队列,对应每个worker自己维护的客户端连接。

/* A connection queue. */
typedef struct conn_queue CQ;
struct conn_queue {
    CQ_ITEM *head;
    CQ_ITEM *tail;
    pthread_mutex_t lock;
    pthread_cond_t  cond;
};

    每当主线程新分配一个客户端连接后都会通过round的方式选择一个worker线程,并将新的CQ_ITEM放入对应worker的CQ队列,同时为了通知worker及时的获悉这个队列,会在管道中写入'c',这样worker端注册的libevent可读事件触发,由于采取的水平触发,因此这里必须将'c'取出否则会一直触发。

typedef struct {
    pthread_t thread_id;        /* unique ID of this thread */
    struct event_base *base;    /* libevent handle this thread uses */
    struct event notify_event;  /* listen event for notify pipe */
    int notify_receive_fd;      /* receiving end of notify pipe */
    int notify_send_fd;         /* sending end of notify pipe */
    CQ  new_conn_queue;         /* queue of new connections to handle */
} LIBEVENT_THREAD;

    这是每个线程的封装,可知每个线程包含线程id,libevent实例,监听事件,管道的接收和发送id,CQ队列。

    CONN是对每个网络连接的封装,上文的CQ_ITEM是主线程对连接的封装,而CONN是worker线程对连接的封装。

typedef struct{
  int sfd;
  int state;
  struct event event;
  short which;
  char *rbuf;
  ... //省去很多状态标志和读写buf信息等
}conn;

   这里state是很重要的属性,因为主线程和worker都进入循环监听,作不同的处理就是根据state这个状态位来作区分。

3,事件处理

    示意图如下图

     当主线程监听套接字事件触发时,会将新的CQ_ITEM写入到worker的CQ队列,同时向管道写入触发worker在管道注册的读事件,通知worker,然后worker将新的客户端连接进行libevent事件注册,指定由该worker线程处理。

      worker线程拿到了这个连接之后,就应该是分配给这个连接一个结构体,包括这个连接所有的状态,都写buf等,这个结构体就是conn,然后这个worker线程会在它自己的event_base加入对这个新的连接的事件的监听。上面也说过了workerevent_base有两套处理逻辑,一个对notify_ receive_fd的,还有一套是对新连接的。这个notify_ receive_fd的处理逻辑就是处理2个事件,一个是建立连接,一个是改变锁的粒度

      a, 核心循环

        drive_machine是多线程环境执行的,主线程和workers都会执行drive_machine

static void drive_machine(conn *c) {
    bool stop = false;
    int sfd, flags = 1;
    socklen_t addrlen;
    struct sockaddr_storage addr;
    int res;

    assert(c != NULL);

    while (!stop) {

        switch(c->state) {
        case conn_listening:// 监听的fd的connection,初始状态就是这个,表示监听中,
                               有连接
            addrlen = sizeof(addr);
            if ((sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen)) == -1) {
                //回调到了这个地方,肯定有连接,这个函数直接得到连接的sfd
                break;
            }
            if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
                fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
                perror("setting O_NONBLOCK");
                close(sfd);
                break;
            }
            // 这里把新的请求分配给相应的worker线程
            dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST,
                                     DATA_BUFFER_SIZE, false);
            break;

        case conn_read:// 客户端连接套接字,由worker处理
            if (try_read_command(c) != 0) {
                continue;
            }
	    ....//省略
     }     
 }

   drive_machine主要是通过当前连接的state来判断该进行何种处理,因为通过libevent注册了读写时间后回调的都是 这个核心函数,所以实际上我们在注册libevent相应事件时,会同时把事件状态写到该conn结构体里,libevent进行回调时会把 该conn结构作为参数传递过来,就是该方法的形参 。

    b, 状态机(转载)

1.listening:这个状态是主线程的connection默认状态,它只有这一个状态,它做的工作就是把接到连接分发到worker子线程。

2.conn_new_cmd:每个新连接的初始状态,这个状态会清空读写buf

3.conn_waiting:这个状态就是在event_base中设置读事件,然后状态机暂停,挂起当前connection(函数退出,回调函数的attachment会记录这个connection),等待有新的信息过来,然后通过回调函数的attachment重新找到这个connection,然后启动状态机。

4.conn_read:该状态从sfd中读取客户端的指令信息。

5.conn_parse_cmd:判断具体的指令,如果是update的指令,那么需要跳转到conn_nread中,因为需要在从网络中读取固定byte的数据,如果是查询之类的指令,就直接查询完成后,跳转到conn_mwrite中,返回数据

6.conn_nread:从网络中读取指定大小的数据,这个数据就是更新到item的数据,然后将数据更新到hashlru中去,然后跳转到conn_write

7.conn_write:这个状态主要是调用out_string函数会跳转到这个状态,一般都是提示信息和返回的状态信息,然后输出这些数据,然后根据write_to_go的状态,继续跳转

8.conn_mwrite:这个写是把connection中的msglist返回到客户端,这个msglist存的是item的数据,用于那种get等获得item的操作的返回数据。

9.conn_swallow:对于那种update操作,如果分配item失败,显然后面的nread,是无效的,客户端是不知道的,这样客户端继续发送特定的数量的数据,就需要把读到的这些数据忽略掉,然后如果把后面指定的数据都忽略掉了(set的两部提交,数据部分忽略掉),那么connection跳转到conn_new_cmd,如果读nread的那些特定数量的数据没有读到,直接跳转到conn_closing

10.conn_closing:服务器端主动关闭连接,调用close函数关闭文件描述符,同时把conn结构体放到空闲队列中,供新的连接重用这写conn结构体。

总结:memcached的网络模块的事件模型依赖于libevent的实现,memcachedfd关心的事件注册给libevent并注册了回调函数,libevent负责回调memcached,主线程把连接dispatch到具体的worker线程,同时把这个连接的描述符注册worker线程自己的一套libevent,这样worker就接收了这个连接,以后这个fd上的事件回调都是这个线程上的做的工作了,每个连接都有自己的一套状态机,如果接受到数据就通过状态机处理,状态扭转,如果暂时没有数据就把连接暂时挂起,等到有了数据继续执行状态机。

 

 

 

 

参考:

http://blog.chinaunix.net/uid-28345378-id-4239012.html

http://www.iteye.com/topic/344172

  • 大小: 169.1 KB
  • 大小: 39.9 KB
  • 大小: 42.9 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics