Redis 6.0源码学习

"多线程网络IO模型"

Posted by Simon on November 9, 2020

如果让我评选一个最喜欢的C/C++开源项目,那一定是Redis

对于问题“你认为Redis有如此高的性能原因是什么?”在今年上半年你依然可以回答说:

  1. 单线程无锁操作
  2. epoll
  3. 纯内存

但自从Redis 6.0版本发布后,Redis迎来了第一个多线程版本,今天来分析学习一下Redis 6.0版本多线程相关的代码。

redis 6.0 中默认是不启用多线程网络 IO,可以通过修改 redis.conf 的相关配置项打开,打开方法如下所示:

# So for instance if you have a four cores boxes, try to use 2 or 3 I/O
# threads, if you have a 8 cores, try to use 6 threads. In order to
# enable I/O threads use the following configuration directive:
#
# io-threads 4
#
# Setting io-threads to 1 will just use the main thread as usually.
# When I/O threads are enabled, we only use threads for writes, that is
# to thread the write(2) syscall and transfer the client buffers to the
# socket. However it is also possible to enable threading of reads and
# protocol parsing using the following configuration directive, by setting
# it to yes:
#
# io-threads-do-reads no
#

io-threads 打开(去掉前面的 # )设置成你期望的线程数目,io-threads-do-reads 配置也要打开(去掉前面的 # ),其值改为 yes。

修改了这两个配置项后,我们使用 gdb 命令 set args "../redis.conf" 给 redis-server 设置参数,然后重启 redis-server。

(gdb) set args "../redis.conf"
(gdb) r
The program being debugged has been started already.
Start it from the beginning? (y or n) y
Starting program: /root/redis-6.0.3/src/redis-server "../redis.conf"
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/usr/lib64/libthread_db.so.1".

然后按 Ctrl + C 将程序中断下来,使用 info threads 命令查看此时的线程状况:

(gdb) info threads
  Id   Target Id                                          Frame 
* 1    Thread 0x7ffff7feb740 (LWP 11992) "redis-server"   0x00007ffff71e2603 in epoll_wait () from /usr/lib64/libc.so.6
  2    Thread 0x7ffff0bb9700 (LWP 11993) "bio_close_file" 0x00007ffff74bc965 in pthread_cond_wait@@GLIBC_2.3.2 () from /usr/lib64/libpthread.so.0
  3    Thread 0x7ffff03b8700 (LWP 11994) "bio_aof_fsync"  0x00007ffff74bc965 in pthread_cond_wait@@GLIBC_2.3.2 () from /usr/lib64/libpthread.so.0
  4    Thread 0x7fffefbb7700 (LWP 11995) "bio_lazy_free"  0x00007ffff74bc965 in pthread_cond_wait@@GLIBC_2.3.2 () from /usr/lib64/libpthread.so.0
  5    Thread 0x7fffef3b6700 (LWP 11996) "io_thd_1"       0x00007ffff74bf4ed in __lll_lock_wait () from /usr/lib64/libpthread.so.0
  6    Thread 0x7fffeebb5700 (LWP 11997) "io_thd_2"       0x00007ffff74bf4ed in __lll_lock_wait () from /usr/lib64/libpthread.so.0
  7    Thread 0x7fffee3b4700 (LWP 11998) "io_thd_3"       0x00007ffff74bf4ed in __lll_lock_wait () from /usr/lib64/libpthread.so.0
(gdb)

与未开启多线程网络 IO 的线程情况相比,多了线程名为 io_thd_1、io_thd_2、io_thd_3 线程,加上主线程一共四个 IO 线程(io-threads = 4),我们重点来看下这三个 IO 工作线程,这三个工作线程的逻辑一样,我们以 io_thd_1 为例。使用 thread 5 命令切换到 io_thd_1 线程,使用 bt 命令查看这个线程的调用堆栈:

(gdb) bt
#0  0x00007ffff74bf4ed in __lll_lock_wait () from /usr/lib64/libpthread.so.0
#1  0x00007ffff74badcb in _L_lock_883 () from /usr/lib64/libpthread.so.0
#2  0x00007ffff74bac98 in pthread_mutex_lock () from /usr/lib64/libpthread.so.0
#3  0x0000000000447907 in IOThreadMain (myid=0x1) at networking.c:2921
#4  0x00007ffff74b8dd5 in start_thread () from /usr/lib64/libpthread.so.0
#5  0x00007ffff71e202d in clone () from /usr/lib64/libc.so.6

堆栈 #3 处的代码如下:

//networking.c 2903行
void *IOThreadMain(void *myid) {
    /* The ID is the thread number (from 0 to server.iothreads_num-1), and is
     * used by the thread to just manipulate a single sub-array of clients. */
    long id = (unsigned long)myid;
    char thdname[16];

    snprintf(thdname, sizeof(thdname), "io_thd_%ld", id);
    redis_set_thread_title(thdname);
    redisSetCpuAffinity(server.server_cpulist);

    while(1) {
        /* Wait for start */
        for (int j = 0; j < 1000000; j++) {
            if (io_threads_pending[id] != 0) break;
        }

        /* Give the main thread a chance to stop this thread. */
        if (io_threads_pending[id] == 0) {
            pthread_mutex_lock(&io_threads_mutex[id]);
            pthread_mutex_unlock(&io_threads_mutex[id]);
            continue;
        }

        serverAssert(io_threads_pending[id] != 0);

        if (tio_debug) printf("[%ld] %d to handle\n", id, (int)listLength(io_threads_list[id]));

        /* Process: note that the main thread will never touch our list
         * before we drop the pending count to 0. */
        listIter li;
        listNode *ln;
        listRewind(io_threads_list[id],&li);
        while((ln = listNext(&li))) {
            client *c = listNodeValue(ln);
            if (io_threads_op == IO_THREADS_OP_WRITE) {
                writeToClient(c,0);
            } else if (io_threads_op == IO_THREADS_OP_READ) {
                readQueryFromClient(c->conn);
            } else {
                serverPanic("io_threads_op value is unknown");
            }
        }
        listEmpty(io_threads_list[id]);
        io_threads_pending[id] = 0;

        if (tio_debug) printf("[%ld] Done\n", id);
    }
}

IOThreadMain 函数是工作线程函数,主要逻辑是一些初始化工作和一个主要的 while 循环,初始化工作主要逻辑是设置线程的名称:

//networking.c 2906行
long id = (unsigned long)myid;
char thdname[16];

snprintf(thdname, sizeof(thdname), "io_thd_%ld", id);
redis_set_thread_title(thdname);

这就是在 gdb 中看到线程名为 io_thd_1、io_thd_2、io_thd_3 的原因。工作线程 id 是主线程创建线程时通过线程参数传递过来的,从 1 开始,0 号 IO 线程是主线程。主线程在 main 函数中调用 InitServerLast 函数,InitServerLast 函数中调用 initThreadedIO 函数,initThreadedIO 函数中根据配置文件中的线程数量创建对应数量的 IO 工作线程数量。我们可以给 initThreadedIO 函数加个断点,然后重启 gdb,就可以看到对应的调用关系和相应的代码位置:

Thread 1 "redis-server" hit Breakpoint 2, initThreadedIO () at networking.c:2954
2954        io_threads_active = 0; /* We start with threads not active. */
(gdb) bt
#0  initThreadedIO () at networking.c:2954
#1  0x0000000000431aa8 in InitServerLast () at server.c:2954
#2  0x0000000000437195 in main (argc=2, argv=0x7fffffffe308) at server.c:5142
(gdb)

initThreadedIO 函数定义如下:

//networking.c 2953行
void initThreadedIO(void) {
    io_threads_active = 0; /* We start with threads not active. */

    /* Don't spawn any thread if the user selected a single thread:
     * we'll handle I/O directly from the main thread. */
    if (server.io_threads_num == 1) return;

    if (server.io_threads_num > IO_THREADS_MAX_NUM) {
        serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
                             "The maximum number is %d.", IO_THREADS_MAX_NUM);
        exit(1);
    }

    /* Spawn and initialize the I/O threads. */
    for (int i = 0; i < server.io_threads_num; i++) {
        /* Things we do for all the threads including the main thread. */
        io_threads_list[i] = listCreate();
        //编号为 0 时是主线程
        if (i == 0) continue; /* Thread 0 is the main thread. */

        /* Things we do only for the additional threads. */
        pthread_t tid;
        pthread_mutex_init(&io_threads_mutex[i],NULL);
        io_threads_pending[i] = 0;
        pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */
        if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
            serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
            exit(1);
        }
        io_threads[i] = tid;
    }
}

通过上述代码段,我们可以得到两个结论:

  • redis 最大允许 IO 工作线程数目为 128个(IO_THREADS_MAX_NUM 宏);
  • 序号为 0 的线程是主线程,因此实际的工作线程数目是 io-threads - 1。

创建新的 IO 线程之前,为每个线程创建一个存储代表客户端的 client 对象链表 io_threads_list[i],它们在存储在全局数组对象 io_threads_list 中,与线程序号一一对应;同时创建相应数量的整型变量(unsigned long)存储于另外一个全局数组 io_threads_pending 中,同样与线程序号一一对应,这些整型变量和 另外一组 Linux 互斥体对象(存储在 io_threads_mutex 数组中)一起让主线程可以控制工作线程的启动与停止,控制逻辑如下:

  1. 将 io_threads_pending[i] 设置为 0;
  2. 在上述循环中,初始化 io_threads_mutex[i] 对象后,立刻调用 pthread_mutex_lock(&io_threads_mutex[i]) 将这些互斥体锁定;
  3. 接着开始创建对应的 IO 工作线程,在 IO 工作线程函数 IOThreadMain 中有如下代码:
//networking.c 2903行
void *IOThreadMain(void *myid) {
    //...省略部分代码...

    while(1) {
        /* Wait for start */
        for (int j = 0; j < 1000000; j++) {
            if (io_threads_pending[id] != 0) break;
        }

        /* Give the main thread a chance to stop this thread. */
        if (io_threads_pending[id] == 0) {
            pthread_mutex_lock(&io_threads_mutex[id]);
            pthread_mutex_unlock(&io_threads_mutex[id]);
            continue;
        }

        //...省略部分代码...

        /* Process: note that the main thread will never touch our list
         * before we drop the pending count to 0. */
        listIter li;
        listNode *ln;
        listRewind(io_threads_list[id],&li);
        while((ln = listNext(&li))) {
            client *c = listNodeValue(ln);
            if (io_threads_op == IO_THREADS_OP_WRITE) {
                writeToClient(c,0);
            } else if (io_threads_op == IO_THREADS_OP_READ) {
                readQueryFromClient(c->conn);
            } else {
                serverPanic("io_threads_op value is unknown");
            }
        }
        listEmpty(io_threads_list[id]);
        io_threads_pending[id] = 0;
    }
}

工作线程执行上述代码 pthread_mutex_lock(&io_threads_mutex[id]) 行时,由于 io_threads_mutex[id] 这个互斥体已经被主线程加锁了,因此工作线程阻塞在这里。如果想启用这些 IO 工作线程,可以调用 startThreadedIO 函数,startThreadedIO 函数实现如下:

//networking.c 2985行
void startThreadedIO(void) {
    //...省略部分代码...
    for (int j = 1; j < server.io_threads_num; j++)
        pthread_mutex_unlock(&io_threads_mutex[j]);
    io_threads_active = 1;
}

startThreadedIO 对相应的互斥体 io_threads_mutex[id] 进行解锁,同时设置启用 IO 线程的标志变量 io_threads_active,这个变量将在下文介绍。有读者可能会注意到:即使解锁 io_threads_mutex[id] 互斥体后,continue 之后,下一轮循环由于 io_threads_pending[id] 仍然为 0,循环会继续加锁解锁再 continue,仍然不能执行 IOThreadMain 处理由 client 对象组成的链表对象。确实如此,因此除了解锁 io_threads_mutex[id] 互斥体还必须将 io_threads_pending[id] 设置为非 0 值,才能执行 IO 工作线程的主要逻辑。那么 io_threads_pending[id] 在什么地方被设置成非 0 值呢?

beforeSleep 函数中分别调用了 handleClientsWithPendingReadsUsingThreadshandleClientsWithPendingWritesUsingThreads() ,这两个函数分别对应读和写的情况。

//server.c 2106行
void beforeSleep(struct aeEventLoop *eventLoop) {
    //...省略部分代码...

    /* We should handle pending reads clients ASAP after event loop. */
    handleClientsWithPendingReadsUsingThreads();

    //...省略部分代码...

    /* Handle writes with pending output buffers. */
    handleClientsWithPendingWritesUsingThreads();

      //...省略部分代码...
}

先来看读的情况,handleClientsWithPendingReadsUsingThreads 函数定义如下:

//networking 3126行
int handleClientsWithPendingReadsUsingThreads(void) {
    if (!io_threads_active || !server.io_threads_do_reads) return 0;
    int processed = listLength(server.clients_pending_read);
    if (processed == 0) return 0;

    if (tio_debug) printf("%d TOTAL READ pending clients\n", processed);

    /* Distribute the clients across N different lists. */
    listIter li;
    listNode *ln;
    listRewind(server.clients_pending_read,&li);
    int item_id = 0;
    //主线程给工作线程分配client对象的策略
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        int target_id = item_id % server.io_threads_num;
        listAddNodeTail(io_threads_list[target_id],c);
        item_id++;
    }

    /* Give the start condition to the waiting threads, by setting the
     * start condition atomic var. */
    io_threads_op = IO_THREADS_OP_READ;
    for (int j = 1; j < server.io_threads_num; j++) {
        int count = listLength(io_threads_list[j]);
        io_threads_pending[j] = count;
    }

    /* Also use the main thread to process a slice of clients. */
    listRewind(io_threads_list[0],&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        readQueryFromClient(c->conn);
    }
    listEmpty(io_threads_list[0]);

    /* Wait for all the other threads to end their work. */
    while(1) {
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
            pending += io_threads_pending[j];
        if (pending == 0) break;
    }
    if (tio_debug) printf("I/O READ All threads finshed\n");

    /* Run the list of clients again to process the new buffers. */
    while(listLength(server.clients_pending_read)) {
        ln = listFirst(server.clients_pending_read);
        client *c = listNodeValue(ln);
        c->flags &= ~CLIENT_PENDING_READ;
        listDelNode(server.clients_pending_read,ln);

        if (c->flags & CLIENT_PENDING_COMMAND) {
            c->flags &= ~CLIENT_PENDING_COMMAND;
            if (processCommandAndResetClient(c) == C_ERR) {
                /* If the client is no longer valid, we avoid
                 * processing the client later. So we just go
                 * to the next. */
                continue;
            }
        }
        processInputBuffer(c);
    }
    return processed;
}

上述代码先通过 io_threads_active 和 server.io_threads_do_reads 两个标志判断是否开启了 IO 线程,如果没开启则直接退出该函数,所有的 IO 操作在主线程中处理。如果开启了 IO 线程,第一个 while 循环处是主线程给 IO 线程分配 client 对象的策略,这里的策略也很简单,即所谓的 Round-Robin(轮询策略),根据当前处理序号与线程数量求余,分别将对应的 client 对象放入相应的线程(包括主线程)存储 client 的链表中。假设现在包括主线程一共有 4 个 IO 线程,则第 0 个 client 对象分配给主线程,第 1 个分配给 1 号工作线程,第 2 个分配 2 号工作线程,第 3 个 分配给 3 号线程,第 4 个再次分配给主线程,第 5 个分配给 1 号线程,第 6 个分配给 2 号线程……以此类推。

分配号 client 对象到相应的 IO 线程的链表中后,设置与这些工作线程相对应的 io_threads_pending[j] 变量值为非 0 值,这里实际设置的值是对应的工作线程的链表的长度,因为在 client 对象少于 IO 线程数量的情况下,某些IO 线程的链表长度为 0,此时就没必要唤醒该工作线程。

//networking.c 3147行
io_threads_op = IO_THREADS_OP_READ;
for (int j = 1; j < server.io_threads_num; j++) {
    int count = listLength(io_threads_list[j]);
    io_threads_pending[j] = count;
}

主线程给 IO 工作线程分配好相应的 client 对象、并设置唤醒标志(io_threads_pending[j])后,由于主线程自己也参与了分配,因此接下来需要处理自己被分配到的 client 对象,然后开始遍历自己的链表挨个处理:

//networking.c 3153行
/* Also use the main thread to process a slice of clients. */
listRewind(io_threads_list[0],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
    readQueryFromClient(c->conn);
}
listEmpty(io_threads_list[0]);

上述代码,主线程从自己的链表(io_threads_list[0])中挨个取出各个 client 对象,然后调用 readQueryFromClient 读取数据和解包,这个流程在上文已经介绍过了。处理完毕后,将自己的链表清空。

同样的道理,IO 工作线程在处理自己的链表时也是一样的操作:

//networking.c 2903行
void *IOThreadMain(void *myid) {
    //...省略部分代码...

    while(1) {
        //...省略部分代码...

        /* Give the main thread a chance to stop this thread. */
        if (io_threads_pending[id] == 0) {
            pthread_mutex_lock(&io_threads_mutex[id]);
            pthread_mutex_unlock(&io_threads_mutex[id]);
            continue;
        }

        //...省略部分代码...

        listIter li;
        listNode *ln;
        listRewind(io_threads_list[id],&li);
        while((ln = listNext(&li))) {
            client *c = listNodeValue(ln);
            if (io_threads_op == IO_THREADS_OP_WRITE) {
                writeToClient(c,0);
            } else if (io_threads_op == IO_THREADS_OP_READ) {
                readQueryFromClient(c->conn);
            } else {
                serverPanic("io_threads_op value is unknown");
            }
        }
        //处理完成后将自己的清空自己的链表
        listEmpty(io_threads_list[id]);
        //重置状态标志值
        io_threads_pending[id] = 0;

        //...省略部分代码...
    }
}

IO 线程在处理完自己链表的 client 对象后也会清空自己的链表并重置 io_threads_pending[id] 标志。而此时主线程的利用一个无限循环等待 IO 工作线程将自己链表中的 client 处理完毕:

//networking.c 3126行
int handleClientsWithPendingReadsUsingThreads(void) {
    //...省略部分代码...

    /* Wait for all the other threads to end their work. */
    while(1) {
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
            pending += io_threads_pending[j];
        if (pending == 0) break;
    }

    //...省略部分代码...
}

由于每个 IO 工作线程在处理完自己的链表中的 client 对象后,会将自己的 io_threads_pending[id] 重置为 0,这样最终主线程的 for 循环的 pending 值会变为 0,退出这个 while 无限循环。

以上就是 redis 6.0 之后如何利用 IO 工作线程对读事件的处理。但是如果读者仔细研究源码会发现两个问题:

  1. 要想让主线程在 handleClientsWithPendingReadsUsingThreads 函数中给 IO 工作线程分配含有读事件的 client 对象,必须满足 io_threads_activeserver.io_threads_do_reads 这两个标志都是非 0 值。
//networking.c 3126行
int handleClientsWithPendingReadsUsingThreads(void) {
    if (!io_threads_active || !server.io_threads_do_reads) return 0;

    //...省略部分代码...
}

server.io_threads_do_reads 的值只要在配置文件中配置一下就可以了,前文已经介绍了;但是 io_threads_active 标志默认值是 0:

//networking.c 2953行
void initThreadedIO(void) {
    io_threads_active = 0; /* We start with threads not active. */

    //...省略部分代码...
}

io_threads_active 标志只会在 startThreadedIO 函数中被设置为非 0 值:

//networking.c 2985行
void startThreadedIO(void) {
    //...省略部分代码...

    for (int j = 1; j < server.io_threads_num; j++)
        pthread_mutex_unlock(&io_threads_mutex[j]);
    io_threads_active = 1;
}

但是 handleClientsWithPendingReadsUsingThreads 函数中并没有调用 startThreadedIO 函数,这是问题一

问题二是根据工作线程中的逻辑,必须同时满足两个条件才能执行处理自己链表中的 client 对象。

条件一:io_threads_pending[id] 不等于 0,这在 handleClientsWithPendingReadsUsingThreads 给工作 IO 线程分配 client 对象后就已经满足;

条件二:工作线程创建后,阻塞在 pthread_mutex_lock(&io_threads_mutex[id]); 处,必须解锁 io_threads_mutex[id] 这个互斥体才能让线程继续往下运行。

void *IOThreadMain(void *myid) {
    //...省略部分代码...

    while(1) {
        //...省略部分代码...

        /* Give the main thread a chance to stop this thread. */
        if (io_threads_pending[id] == 0) {
            //工作线程创建后阻塞在这里
            pthread_mutex_lock(&io_threads_mutex[id]);
            pthread_mutex_unlock(&io_threads_mutex[id]);
            continue;
        }

        //...省略部分代码...

        listIter li;
        listNode *ln;
        listRewind(io_threads_list[id],&li);
        while((ln = listNext(&li))) {
            client *c = listNodeValue(ln);
            if (io_threads_op == IO_THREADS_OP_WRITE) {
                writeToClient(c,0);
            } else if (io_threads_op == IO_THREADS_OP_READ) {
                readQueryFromClient(c->conn);
            } else {
                serverPanic("io_threads_op value is unknown");
            }
        }
        listEmpty(io_threads_list[id]);
        io_threads_pending[id] = 0;

        //...省略部分代码...
    }
}

综上所述,只有调用了 startThreadedIO 先解锁 io_threads_mutex[id] 并设置 io_threads_active 标志非0,然后 handleClientsWithPendingReadsUsingThreads 函数中才会分配 client 对象给 IO 工作线程,并设置 io_threads_pending[id] 为非 0 值,这样 IO 工作线程才能处理分配的 client 对象。所以大的前提是 startThreadedIO 函数被调用,但是通过分析流程发现,在主循环第一次调用 beforeSleepbeforeSleep 第一次调用 handleClientsWithPendingReadsUsingThreads 函数时 startThreadedIO 未被调用,也就是说,第一次是不会使用 IO 工作线程处理 client 对象上的读事件的。那么 startThreadedIO 函数在哪里被调用的呢?

答案是 beforeSleep 函数中除了调用了 handleClientsWithPendingReadsUsingThreads 处理读事件,还有另外一个函数 handleClientsWithPendingWritesUsingThreads 用于处理写事件:

//server.c 2106行
void beforeSleep(struct aeEventLoop *eventLoop) {
    //...省略部分代码...

    /* We should handle pending reads clients ASAP after event loop. */
    handleClientsWithPendingReadsUsingThreads();

    //...省略部分代码...

    /* Handle writes with pending output buffers. */
    handleClientsWithPendingWritesUsingThreads();

      //...省略部分代码...
}

handleClientsWithPendingWritesUsingThreads 函数定义如下:

//networking.c 3031行
int handleClientsWithPendingWritesUsingThreads(void) {
    int processed = listLength(server.clients_pending_write);
    if (processed == 0) return 0; /* Return ASAP if there are no clients. */

    /* If I/O threads are disabled or we have few clients to serve, don't
     * use I/O threads, but thejboring synchronous code. */
    if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
        return handleClientsWithPendingWrites();
    }

    /* Start threads if needed. */
    if (!io_threads_active) startThreadedIO();

    if (tio_debug) printf("%d TOTAL WRITE pending clients\n", processed);

    /* Distribute the clients across N different lists. */
    listIter li;
    listNode *ln;
    listRewind(server.clients_pending_write,&li);
    int item_id = 0;
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        c->flags &= ~CLIENT_PENDING_WRITE;
        int target_id = item_id % server.io_threads_num;
        listAddNodeTail(io_threads_list[target_id],c);
        item_id++;
    }

    /* Give the start condition to the waiting threads, by setting the
     * start condition atomic var. */
    io_threads_op = IO_THREADS_OP_WRITE;
    for (int j = 1; j < server.io_threads_num; j++) {
        int count = listLength(io_threads_list[j]);
        io_threads_pending[j] = count;
    }

    /* Also use the main thread to process a slice of clients. */
    listRewind(io_threads_list[0],&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        writeToClient(c,0);
    }
    listEmpty(io_threads_list[0]);

    /* Wait for all the other threads to end their work. */
    while(1) {
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
            pending += io_threads_pending[j];
        if (pending == 0) break;
    }
    if (tio_debug) printf("I/O WRITE All threads finshed\n");

    /* Run the list of clients again to install the write handler where
     * needed. */
    listRewind(server.clients_pending_write,&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);

        /* Install the write handler if there are pending writes in some
         * of the clients. */
        if (clientHasPendingReplies(c) &&
                connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR)
        {
            freeClientAsync(c);
        }
    }
    listEmpty(server.clients_pending_write);
    return processed;
}

handleClientsWithPendingWritesUsingThreads 有几段关键逻辑,我们来挨个看一下:

逻辑一

按需停止 IO 线程,在 IO 线程停止的情况下,直接调用 handleClientsWithPendingWrites 函数处理客户端可写事件。

//networking.c 3037行
if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
    return handleClientsWithPendingWrites();
}

stopThreadedIOIfNeeded 函数中按需停止 IO 工作线程:

//networking.c 3017行
int stopThreadedIOIfNeeded(void) {
    int pending = listLength(server.clients_pending_write);

    /* Return ASAP if IO threads are disabled (single threaded mode). */
    if (server.io_threads_num == 1) return 1;

    if (pending < (server.io_threads_num*2)) {
        if (io_threads_active) stopThreadedIO();
        return 1;
    } else {
        return 0;
    }
}

停止 IO 工作线程的时机是当前需要处理写事件的客户端数量少于 IO 线程数的两倍,例如如果 IO 线程数量为 4(包括主线程),则当前需要处理写事件的客户端数量少于 8 个的时候,则停止 IO 工作线程。停止 IO 线程的方法是调用 stopThreadedIO() 函数,该函数中会给 io_threads_mutex[id] 加锁,并将 io_threads_active 设置为 0。stopThreadedIO 函数定义如下:

//networking.c 2994行
void stopThreadedIO(void) {
    /* We may have still clients with pending reads when this function
     * is called: handle them before stopping the threads. */
    handleClientsWithPendingReadsUsingThreads();

    //...省略部分代码...

    for (int j = 1; j < server.io_threads_num; j++)
        pthread_mutex_lock(&io_threads_mutex[j]);
    io_threads_active = 0;
}

在停止 IO 工作线程之前必须调用 handleClientsWithPendingReadsUsingThreads(),这是需要注意的地方,因为一旦 IO 工作线程之前分配给各个 IO 工作线程有读事件的 client 对象可能还没来得及处理,所以必须在停止 IO 线程之前主动调用一次 handleClientsWithPendingReadsUsingThreads() 函数将这些 client 对象的读事件处理掉。

我们回到 handleClientsWithPendingWritesUsingThreads 主流程上来。

逻辑二

如果逻辑一中未导致退出 handleClientsWithPendingWritesUsingThreads 函数,则根据 io_threads_active 标志值启动网络 IO 工作线程。

//networking.c 3031行
int handleClientsWithPendingWritesUsingThreads(void) {
    //...省略部分代码...

    /* If I/O threads are disabled or we have few clients to serve, don't
     * use I/O threads, but thejboring synchronous code. */
    if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
        //如果是单线程模式或者停止 IO 工作线程,则执行流在此处直接退出
        return handleClientsWithPendingWrites();
    }

    /* Start threads if needed. */
    if (!io_threads_active) startThreadedIO();

    //...省略部分代码...
}

根据上面的分析,开启 IO 工作线程需要满足如下三个条件:

  1. 配置文件中配置了工作线程数目大于 1;
  2. 待处理写事件的客户端数目达到所有 IO 线程数目的两倍或更多;
  3. IO 工作线程还没处于激活状态(全局变量 io_threads_active 记录该状态)。

逻辑三

handleClientsWithPendingWritesUsingThreads 函数接下来的几个逻辑就和 handleClientsWithPendingReadsUsingThreads 中一样了。

  1. 使用 round-robin 策略将有读事件分配给各个工作线程;
  2. 设置io_threads_pending[id]标志值,以激活工作线程;
  3. 主线程处理分配给自己链表中的client对象的写事件;
  4. 等待工作线程处理完对应的链表中所有的写事件;
  5. 所有写事件处理完毕之后,仍存在一些 client 存在未发送完的数据,则继续给这些 client 注册写事件。
//networking.c 3031行
int handleClientsWithPendingWritesUsingThreads(void) {
    //...省略部分代码...

    //使用 round-robin 策略将有读事件分配给各个工作线程
    /* Distribute the clients across N different lists. */
    listIter li;
    listNode *ln;
    listRewind(server.clients_pending_write,&li);
    int item_id = 0;
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        c->flags &= ~CLIENT_PENDING_WRITE;
        int target_id = item_id % server.io_threads_num;
        listAddNodeTail(io_threads_list[target_id],c);
        item_id++;
    }

    //设置io_threads_pending[id]标志值,以激活工作线程
    /* Give the start condition to the waiting threads, by setting the
     * start condition atomic var. */
    io_threads_op = IO_THREADS_OP_WRITE;
    for (int j = 1; j < server.io_threads_num; j++) {
        int count = listLength(io_threads_list[j]);
        io_threads_pending[j] = count;
    }

    //主线程处理分配给自己链表中的client对象的写事件
    /* Also use the main thread to process a slice of clients. */
    listRewind(io_threads_list[0],&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        writeToClient(c,0);
    }
    listEmpty(io_threads_list[0]);

    //等待工作线程处理完对应的链表中所有的写事件
    /* Wait for all the other threads to end their work. */
    while(1) {
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
            pending += io_threads_pending[j];
        if (pending == 0) break;
    }
    if (tio_debug) printf("I/O WRITE All threads finshed\n");

    //所有写事件处理完毕之后,仍存在一些 client 存在未发送完的数据,则继续给这些 client 注册写事件
    /* Run the list of clients again to install the write handler where
     * needed. */
    listRewind(server.clients_pending_write,&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);

        /* Install the write handler if there are pending writes in some
         * of the clients. */
        if (clientHasPendingReplies(c) &&
                connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR)
        {
            freeClientAsync(c);
        }
    }
    listEmpty(server.clients_pending_write);
    return processed;
}

无论读写事件,工作线程处理完自己的链表中含有读写事件的 client 对象后立刻挂起自己,下一次在是情况被主线程再次唤醒。

本质上来看,redis 6.0 中新增的多线程 IO 只是在 client 较多的情况下,利用多个 IO 工作线程加快处理速度,在工作线程处理这些 client 期间,主线程必须等待直到所有工作线程处理完毕。

我们将 redis 6.0 多线程网络 IO 处理逻辑绘制成如下流程图:

image