如果让我评选一个最喜欢的C/C++开源项目,那一定是Redis
对于问题“你认为Redis有如此高的性能原因是什么?”在今年上半年你依然可以回答说:
- 单线程无锁操作
- epoll
- 纯内存
- …
但自从Redis 6.0版本发布后,Redis迎来了第一个多线程版本,今天来分析学习一下Redis 6.0版本多线程相关的代码。
redis 6.0 中默认是不启用多线程网络 IO,可以通过修改 redis.conf 的相关配置项打开,打开方法如下所示:
# So for instance if you have a four cores boxes, try to use 2 or 3 I/O
# threads, if you have a 8 cores, try to use 6 threads. In order to
# enable I/O threads use the following configuration directive:
#
# io-threads 4
#
# Setting io-threads to 1 will just use the main thread as usually.
# When I/O threads are enabled, we only use threads for writes, that is
# to thread the write(2) syscall and transfer the client buffers to the
# socket. However it is also possible to enable threading of reads and
# protocol parsing using the following configuration directive, by setting
# it to yes:
#
# io-threads-do-reads no
#
将 io-threads 打开(去掉前面的 # )设置成你期望的线程数目,io-threads-do-reads 配置也要打开(去掉前面的 # ),其值改为 yes。
修改了这两个配置项后,我们使用 gdb 命令 set args "../redis.conf"
给 redis-server 设置参数,然后重启 redis-server。
(gdb) set args "../redis.conf"
(gdb) r
The program being debugged has been started already.
Start it from the beginning? (y or n) y
Starting program: /root/redis-6.0.3/src/redis-server "../redis.conf"
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/usr/lib64/libthread_db.so.1".
然后按 Ctrl + C 将程序中断下来,使用 info threads
命令查看此时的线程状况:
(gdb) info threads
Id Target Id Frame
* 1 Thread 0x7ffff7feb740 (LWP 11992) "redis-server" 0x00007ffff71e2603 in epoll_wait () from /usr/lib64/libc.so.6
2 Thread 0x7ffff0bb9700 (LWP 11993) "bio_close_file" 0x00007ffff74bc965 in pthread_cond_wait@@GLIBC_2.3.2 () from /usr/lib64/libpthread.so.0
3 Thread 0x7ffff03b8700 (LWP 11994) "bio_aof_fsync" 0x00007ffff74bc965 in pthread_cond_wait@@GLIBC_2.3.2 () from /usr/lib64/libpthread.so.0
4 Thread 0x7fffefbb7700 (LWP 11995) "bio_lazy_free" 0x00007ffff74bc965 in pthread_cond_wait@@GLIBC_2.3.2 () from /usr/lib64/libpthread.so.0
5 Thread 0x7fffef3b6700 (LWP 11996) "io_thd_1" 0x00007ffff74bf4ed in __lll_lock_wait () from /usr/lib64/libpthread.so.0
6 Thread 0x7fffeebb5700 (LWP 11997) "io_thd_2" 0x00007ffff74bf4ed in __lll_lock_wait () from /usr/lib64/libpthread.so.0
7 Thread 0x7fffee3b4700 (LWP 11998) "io_thd_3" 0x00007ffff74bf4ed in __lll_lock_wait () from /usr/lib64/libpthread.so.0
(gdb)
与未开启多线程网络 IO 的线程情况相比,多了线程名为 io_thd_1、io_thd_2、io_thd_3 线程,加上主线程一共四个 IO 线程(io-threads = 4),我们重点来看下这三个 IO 工作线程,这三个工作线程的逻辑一样,我们以 io_thd_1 为例。使用 thread 5
命令切换到 io_thd_1 线程,使用 bt 命令查看这个线程的调用堆栈:
(gdb) bt
#0 0x00007ffff74bf4ed in __lll_lock_wait () from /usr/lib64/libpthread.so.0
#1 0x00007ffff74badcb in _L_lock_883 () from /usr/lib64/libpthread.so.0
#2 0x00007ffff74bac98 in pthread_mutex_lock () from /usr/lib64/libpthread.so.0
#3 0x0000000000447907 in IOThreadMain (myid=0x1) at networking.c:2921
#4 0x00007ffff74b8dd5 in start_thread () from /usr/lib64/libpthread.so.0
#5 0x00007ffff71e202d in clone () from /usr/lib64/libc.so.6
堆栈 #3 处的代码如下:
//networking.c 2903行
void *IOThreadMain(void *myid) {
/* The ID is the thread number (from 0 to server.iothreads_num-1), and is
* used by the thread to just manipulate a single sub-array of clients. */
long id = (unsigned long)myid;
char thdname[16];
snprintf(thdname, sizeof(thdname), "io_thd_%ld", id);
redis_set_thread_title(thdname);
redisSetCpuAffinity(server.server_cpulist);
while(1) {
/* Wait for start */
for (int j = 0; j < 1000000; j++) {
if (io_threads_pending[id] != 0) break;
}
/* Give the main thread a chance to stop this thread. */
if (io_threads_pending[id] == 0) {
pthread_mutex_lock(&io_threads_mutex[id]);
pthread_mutex_unlock(&io_threads_mutex[id]);
continue;
}
serverAssert(io_threads_pending[id] != 0);
if (tio_debug) printf("[%ld] %d to handle\n", id, (int)listLength(io_threads_list[id]));
/* Process: note that the main thread will never touch our list
* before we drop the pending count to 0. */
listIter li;
listNode *ln;
listRewind(io_threads_list[id],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
if (io_threads_op == IO_THREADS_OP_WRITE) {
writeToClient(c,0);
} else if (io_threads_op == IO_THREADS_OP_READ) {
readQueryFromClient(c->conn);
} else {
serverPanic("io_threads_op value is unknown");
}
}
listEmpty(io_threads_list[id]);
io_threads_pending[id] = 0;
if (tio_debug) printf("[%ld] Done\n", id);
}
}
IOThreadMain 函数是工作线程函数,主要逻辑是一些初始化工作和一个主要的 while 循环,初始化工作主要逻辑是设置线程的名称:
//networking.c 2906行
long id = (unsigned long)myid;
char thdname[16];
snprintf(thdname, sizeof(thdname), "io_thd_%ld", id);
redis_set_thread_title(thdname);
这就是在 gdb 中看到线程名为 io_thd_1、io_thd_2、io_thd_3 的原因。工作线程 id 是主线程创建线程时通过线程参数传递过来的,从 1 开始,0 号 IO 线程是主线程。主线程在 main 函数中调用 InitServerLast 函数,InitServerLast 函数中调用 initThreadedIO 函数,initThreadedIO 函数中根据配置文件中的线程数量创建对应数量的 IO 工作线程数量。我们可以给 initThreadedIO 函数加个断点,然后重启 gdb,就可以看到对应的调用关系和相应的代码位置:
Thread 1 "redis-server" hit Breakpoint 2, initThreadedIO () at networking.c:2954
2954 io_threads_active = 0; /* We start with threads not active. */
(gdb) bt
#0 initThreadedIO () at networking.c:2954
#1 0x0000000000431aa8 in InitServerLast () at server.c:2954
#2 0x0000000000437195 in main (argc=2, argv=0x7fffffffe308) at server.c:5142
(gdb)
initThreadedIO 函数定义如下:
//networking.c 2953行
void initThreadedIO(void) {
io_threads_active = 0; /* We start with threads not active. */
/* Don't spawn any thread if the user selected a single thread:
* we'll handle I/O directly from the main thread. */
if (server.io_threads_num == 1) return;
if (server.io_threads_num > IO_THREADS_MAX_NUM) {
serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
"The maximum number is %d.", IO_THREADS_MAX_NUM);
exit(1);
}
/* Spawn and initialize the I/O threads. */
for (int i = 0; i < server.io_threads_num; i++) {
/* Things we do for all the threads including the main thread. */
io_threads_list[i] = listCreate();
//编号为 0 时是主线程
if (i == 0) continue; /* Thread 0 is the main thread. */
/* Things we do only for the additional threads. */
pthread_t tid;
pthread_mutex_init(&io_threads_mutex[i],NULL);
io_threads_pending[i] = 0;
pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */
if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
exit(1);
}
io_threads[i] = tid;
}
}
通过上述代码段,我们可以得到两个结论:
- redis 最大允许 IO 工作线程数目为 128个(IO_THREADS_MAX_NUM 宏);
- 序号为 0 的线程是主线程,因此实际的工作线程数目是 io-threads - 1。
创建新的 IO 线程之前,为每个线程创建一个存储代表客户端的 client 对象链表 io_threads_list[i],它们在存储在全局数组对象 io_threads_list 中,与线程序号一一对应;同时创建相应数量的整型变量(unsigned long)存储于另外一个全局数组 io_threads_pending 中,同样与线程序号一一对应,这些整型变量和 另外一组 Linux 互斥体对象(存储在 io_threads_mutex 数组中)一起让主线程可以控制工作线程的启动与停止,控制逻辑如下:
- 将 io_threads_pending[i] 设置为 0;
- 在上述循环中,初始化 io_threads_mutex[i] 对象后,立刻调用 pthread_mutex_lock(&io_threads_mutex[i]) 将这些互斥体锁定;
- 接着开始创建对应的 IO 工作线程,在 IO 工作线程函数 IOThreadMain 中有如下代码:
//networking.c 2903行
void *IOThreadMain(void *myid) {
//...省略部分代码...
while(1) {
/* Wait for start */
for (int j = 0; j < 1000000; j++) {
if (io_threads_pending[id] != 0) break;
}
/* Give the main thread a chance to stop this thread. */
if (io_threads_pending[id] == 0) {
pthread_mutex_lock(&io_threads_mutex[id]);
pthread_mutex_unlock(&io_threads_mutex[id]);
continue;
}
//...省略部分代码...
/* Process: note that the main thread will never touch our list
* before we drop the pending count to 0. */
listIter li;
listNode *ln;
listRewind(io_threads_list[id],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
if (io_threads_op == IO_THREADS_OP_WRITE) {
writeToClient(c,0);
} else if (io_threads_op == IO_THREADS_OP_READ) {
readQueryFromClient(c->conn);
} else {
serverPanic("io_threads_op value is unknown");
}
}
listEmpty(io_threads_list[id]);
io_threads_pending[id] = 0;
}
}
工作线程执行上述代码 pthread_mutex_lock(&io_threads_mutex[id])
行时,由于 io_threads_mutex[id] 这个互斥体已经被主线程加锁了,因此工作线程阻塞在这里。如果想启用这些 IO 工作线程,可以调用 startThreadedIO 函数,startThreadedIO 函数实现如下:
//networking.c 2985行
void startThreadedIO(void) {
//...省略部分代码...
for (int j = 1; j < server.io_threads_num; j++)
pthread_mutex_unlock(&io_threads_mutex[j]);
io_threads_active = 1;
}
startThreadedIO 对相应的互斥体 io_threads_mutex[id] 进行解锁,同时设置启用 IO 线程的标志变量 io_threads_active,这个变量将在下文介绍。有读者可能会注意到:即使解锁 io_threads_mutex[id] 互斥体后,continue 之后,下一轮循环由于 io_threads_pending[id] 仍然为 0,循环会继续加锁解锁再 continue,仍然不能执行 IOThreadMain 处理由 client 对象组成的链表对象。确实如此,因此除了解锁 io_threads_mutex[id] 互斥体还必须将 io_threads_pending[id] 设置为非 0 值,才能执行 IO 工作线程的主要逻辑。那么 io_threads_pending[id] 在什么地方被设置成非 0 值呢?
在 beforeSleep 函数中分别调用了 handleClientsWithPendingReadsUsingThreads 和 handleClientsWithPendingWritesUsingThreads() ,这两个函数分别对应读和写的情况。
//server.c 2106行
void beforeSleep(struct aeEventLoop *eventLoop) {
//...省略部分代码...
/* We should handle pending reads clients ASAP after event loop. */
handleClientsWithPendingReadsUsingThreads();
//...省略部分代码...
/* Handle writes with pending output buffers. */
handleClientsWithPendingWritesUsingThreads();
//...省略部分代码...
}
先来看读的情况,handleClientsWithPendingReadsUsingThreads 函数定义如下:
//networking 3126行
int handleClientsWithPendingReadsUsingThreads(void) {
if (!io_threads_active || !server.io_threads_do_reads) return 0;
int processed = listLength(server.clients_pending_read);
if (processed == 0) return 0;
if (tio_debug) printf("%d TOTAL READ pending clients\n", processed);
/* Distribute the clients across N different lists. */
listIter li;
listNode *ln;
listRewind(server.clients_pending_read,&li);
int item_id = 0;
//主线程给工作线程分配client对象的策略
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
/* Give the start condition to the waiting threads, by setting the
* start condition atomic var. */
io_threads_op = IO_THREADS_OP_READ;
for (int j = 1; j < server.io_threads_num; j++) {
int count = listLength(io_threads_list[j]);
io_threads_pending[j] = count;
}
/* Also use the main thread to process a slice of clients. */
listRewind(io_threads_list[0],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
readQueryFromClient(c->conn);
}
listEmpty(io_threads_list[0]);
/* Wait for all the other threads to end their work. */
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += io_threads_pending[j];
if (pending == 0) break;
}
if (tio_debug) printf("I/O READ All threads finshed\n");
/* Run the list of clients again to process the new buffers. */
while(listLength(server.clients_pending_read)) {
ln = listFirst(server.clients_pending_read);
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_READ;
listDelNode(server.clients_pending_read,ln);
if (c->flags & CLIENT_PENDING_COMMAND) {
c->flags &= ~CLIENT_PENDING_COMMAND;
if (processCommandAndResetClient(c) == C_ERR) {
/* If the client is no longer valid, we avoid
* processing the client later. So we just go
* to the next. */
continue;
}
}
processInputBuffer(c);
}
return processed;
}
上述代码先通过 io_threads_active 和 server.io_threads_do_reads 两个标志判断是否开启了 IO 线程,如果没开启则直接退出该函数,所有的 IO 操作在主线程中处理。如果开启了 IO 线程,第一个 while 循环处是主线程给 IO 线程分配 client 对象的策略,这里的策略也很简单,即所谓的 Round-Robin(轮询策略),根据当前处理序号与线程数量求余,分别将对应的 client 对象放入相应的线程(包括主线程)存储 client 的链表中。假设现在包括主线程一共有 4 个 IO 线程,则第 0 个 client 对象分配给主线程,第 1 个分配给 1 号工作线程,第 2 个分配 2 号工作线程,第 3 个 分配给 3 号线程,第 4 个再次分配给主线程,第 5 个分配给 1 号线程,第 6 个分配给 2 号线程……以此类推。
分配号 client 对象到相应的 IO 线程的链表中后,设置与这些工作线程相对应的 io_threads_pending[j] 变量值为非 0 值,这里实际设置的值是对应的工作线程的链表的长度,因为在 client 对象少于 IO 线程数量的情况下,某些IO 线程的链表长度为 0,此时就没必要唤醒该工作线程。
//networking.c 3147行
io_threads_op = IO_THREADS_OP_READ;
for (int j = 1; j < server.io_threads_num; j++) {
int count = listLength(io_threads_list[j]);
io_threads_pending[j] = count;
}
主线程给 IO 工作线程分配好相应的 client 对象、并设置唤醒标志(io_threads_pending[j])后,由于主线程自己也参与了分配,因此接下来需要处理自己被分配到的 client 对象,然后开始遍历自己的链表挨个处理:
//networking.c 3153行
/* Also use the main thread to process a slice of clients. */
listRewind(io_threads_list[0],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
readQueryFromClient(c->conn);
}
listEmpty(io_threads_list[0]);
上述代码,主线程从自己的链表(io_threads_list[0])中挨个取出各个 client 对象,然后调用 readQueryFromClient 读取数据和解包,这个流程在上文已经介绍过了。处理完毕后,将自己的链表清空。
同样的道理,IO 工作线程在处理自己的链表时也是一样的操作:
//networking.c 2903行
void *IOThreadMain(void *myid) {
//...省略部分代码...
while(1) {
//...省略部分代码...
/* Give the main thread a chance to stop this thread. */
if (io_threads_pending[id] == 0) {
pthread_mutex_lock(&io_threads_mutex[id]);
pthread_mutex_unlock(&io_threads_mutex[id]);
continue;
}
//...省略部分代码...
listIter li;
listNode *ln;
listRewind(io_threads_list[id],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
if (io_threads_op == IO_THREADS_OP_WRITE) {
writeToClient(c,0);
} else if (io_threads_op == IO_THREADS_OP_READ) {
readQueryFromClient(c->conn);
} else {
serverPanic("io_threads_op value is unknown");
}
}
//处理完成后将自己的清空自己的链表
listEmpty(io_threads_list[id]);
//重置状态标志值
io_threads_pending[id] = 0;
//...省略部分代码...
}
}
IO 线程在处理完自己链表的 client 对象后也会清空自己的链表并重置 io_threads_pending[id] 标志。而此时主线程的利用一个无限循环等待 IO 工作线程将自己链表中的 client 处理完毕:
//networking.c 3126行
int handleClientsWithPendingReadsUsingThreads(void) {
//...省略部分代码...
/* Wait for all the other threads to end their work. */
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += io_threads_pending[j];
if (pending == 0) break;
}
//...省略部分代码...
}
由于每个 IO 工作线程在处理完自己的链表中的 client 对象后,会将自己的 io_threads_pending[id] 重置为 0,这样最终主线程的 for 循环的 pending 值会变为 0,退出这个 while 无限循环。
以上就是 redis 6.0 之后如何利用 IO 工作线程对读事件的处理。但是如果读者仔细研究源码会发现两个问题:
- 要想让主线程在 handleClientsWithPendingReadsUsingThreads 函数中给 IO 工作线程分配含有读事件的 client 对象,必须满足 io_threads_active 和 server.io_threads_do_reads 这两个标志都是非 0 值。
//networking.c 3126行
int handleClientsWithPendingReadsUsingThreads(void) {
if (!io_threads_active || !server.io_threads_do_reads) return 0;
//...省略部分代码...
}
server.io_threads_do_reads 的值只要在配置文件中配置一下就可以了,前文已经介绍了;但是 io_threads_active 标志默认值是 0:
//networking.c 2953行
void initThreadedIO(void) {
io_threads_active = 0; /* We start with threads not active. */
//...省略部分代码...
}
io_threads_active 标志只会在 startThreadedIO 函数中被设置为非 0 值:
//networking.c 2985行
void startThreadedIO(void) {
//...省略部分代码...
for (int j = 1; j < server.io_threads_num; j++)
pthread_mutex_unlock(&io_threads_mutex[j]);
io_threads_active = 1;
}
但是 handleClientsWithPendingReadsUsingThreads 函数中并没有调用 startThreadedIO 函数,这是问题一。
问题二是根据工作线程中的逻辑,必须同时满足两个条件才能执行处理自己链表中的 client 对象。
条件一:io_threads_pending[id] 不等于 0,这在 handleClientsWithPendingReadsUsingThreads 给工作 IO 线程分配 client 对象后就已经满足;
条件二:工作线程创建后,阻塞在 pthread_mutex_lock(&io_threads_mutex[id]);
处,必须解锁 io_threads_mutex[id] 这个互斥体才能让线程继续往下运行。
void *IOThreadMain(void *myid) {
//...省略部分代码...
while(1) {
//...省略部分代码...
/* Give the main thread a chance to stop this thread. */
if (io_threads_pending[id] == 0) {
//工作线程创建后阻塞在这里
pthread_mutex_lock(&io_threads_mutex[id]);
pthread_mutex_unlock(&io_threads_mutex[id]);
continue;
}
//...省略部分代码...
listIter li;
listNode *ln;
listRewind(io_threads_list[id],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
if (io_threads_op == IO_THREADS_OP_WRITE) {
writeToClient(c,0);
} else if (io_threads_op == IO_THREADS_OP_READ) {
readQueryFromClient(c->conn);
} else {
serverPanic("io_threads_op value is unknown");
}
}
listEmpty(io_threads_list[id]);
io_threads_pending[id] = 0;
//...省略部分代码...
}
}
综上所述,只有调用了 startThreadedIO 先解锁 io_threads_mutex[id] 并设置 io_threads_active 标志非0,然后 handleClientsWithPendingReadsUsingThreads 函数中才会分配 client 对象给 IO 工作线程,并设置 io_threads_pending[id] 为非 0 值,这样 IO 工作线程才能处理分配的 client 对象。所以大的前提是 startThreadedIO 函数被调用,但是通过分析流程发现,在主循环第一次调用 beforeSleep,beforeSleep 第一次调用 handleClientsWithPendingReadsUsingThreads 函数时 startThreadedIO 未被调用,也就是说,第一次是不会使用 IO 工作线程处理 client 对象上的读事件的。那么 startThreadedIO 函数在哪里被调用的呢?
答案是 beforeSleep 函数中除了调用了 handleClientsWithPendingReadsUsingThreads 处理读事件,还有另外一个函数 handleClientsWithPendingWritesUsingThreads 用于处理写事件:
//server.c 2106行
void beforeSleep(struct aeEventLoop *eventLoop) {
//...省略部分代码...
/* We should handle pending reads clients ASAP after event loop. */
handleClientsWithPendingReadsUsingThreads();
//...省略部分代码...
/* Handle writes with pending output buffers. */
handleClientsWithPendingWritesUsingThreads();
//...省略部分代码...
}
handleClientsWithPendingWritesUsingThreads 函数定义如下:
//networking.c 3031行
int handleClientsWithPendingWritesUsingThreads(void) {
int processed = listLength(server.clients_pending_write);
if (processed == 0) return 0; /* Return ASAP if there are no clients. */
/* If I/O threads are disabled or we have few clients to serve, don't
* use I/O threads, but thejboring synchronous code. */
if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
return handleClientsWithPendingWrites();
}
/* Start threads if needed. */
if (!io_threads_active) startThreadedIO();
if (tio_debug) printf("%d TOTAL WRITE pending clients\n", processed);
/* Distribute the clients across N different lists. */
listIter li;
listNode *ln;
listRewind(server.clients_pending_write,&li);
int item_id = 0;
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_WRITE;
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
/* Give the start condition to the waiting threads, by setting the
* start condition atomic var. */
io_threads_op = IO_THREADS_OP_WRITE;
for (int j = 1; j < server.io_threads_num; j++) {
int count = listLength(io_threads_list[j]);
io_threads_pending[j] = count;
}
/* Also use the main thread to process a slice of clients. */
listRewind(io_threads_list[0],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
writeToClient(c,0);
}
listEmpty(io_threads_list[0]);
/* Wait for all the other threads to end their work. */
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += io_threads_pending[j];
if (pending == 0) break;
}
if (tio_debug) printf("I/O WRITE All threads finshed\n");
/* Run the list of clients again to install the write handler where
* needed. */
listRewind(server.clients_pending_write,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
/* Install the write handler if there are pending writes in some
* of the clients. */
if (clientHasPendingReplies(c) &&
connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR)
{
freeClientAsync(c);
}
}
listEmpty(server.clients_pending_write);
return processed;
}
handleClientsWithPendingWritesUsingThreads 有几段关键逻辑,我们来挨个看一下:
逻辑一
按需停止 IO 线程,在 IO 线程停止的情况下,直接调用 handleClientsWithPendingWrites 函数处理客户端可写事件。
//networking.c 3037行
if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
return handleClientsWithPendingWrites();
}
stopThreadedIOIfNeeded 函数中按需停止 IO 工作线程:
//networking.c 3017行
int stopThreadedIOIfNeeded(void) {
int pending = listLength(server.clients_pending_write);
/* Return ASAP if IO threads are disabled (single threaded mode). */
if (server.io_threads_num == 1) return 1;
if (pending < (server.io_threads_num*2)) {
if (io_threads_active) stopThreadedIO();
return 1;
} else {
return 0;
}
}
停止 IO 工作线程的时机是当前需要处理写事件的客户端数量少于 IO 线程数的两倍,例如如果 IO 线程数量为 4(包括主线程),则当前需要处理写事件的客户端数量少于 8 个的时候,则停止 IO 工作线程。停止 IO 线程的方法是调用 stopThreadedIO() 函数,该函数中会给 io_threads_mutex[id] 加锁,并将 io_threads_active 设置为 0。stopThreadedIO 函数定义如下:
//networking.c 2994行
void stopThreadedIO(void) {
/* We may have still clients with pending reads when this function
* is called: handle them before stopping the threads. */
handleClientsWithPendingReadsUsingThreads();
//...省略部分代码...
for (int j = 1; j < server.io_threads_num; j++)
pthread_mutex_lock(&io_threads_mutex[j]);
io_threads_active = 0;
}
在停止 IO 工作线程之前必须调用 handleClientsWithPendingReadsUsingThreads(),这是需要注意的地方,因为一旦 IO 工作线程之前分配给各个 IO 工作线程有读事件的 client 对象可能还没来得及处理,所以必须在停止 IO 线程之前主动调用一次 handleClientsWithPendingReadsUsingThreads() 函数将这些 client 对象的读事件处理掉。
我们回到 handleClientsWithPendingWritesUsingThreads 主流程上来。
逻辑二
如果逻辑一中未导致退出 handleClientsWithPendingWritesUsingThreads 函数,则根据 io_threads_active 标志值启动网络 IO 工作线程。
//networking.c 3031行
int handleClientsWithPendingWritesUsingThreads(void) {
//...省略部分代码...
/* If I/O threads are disabled or we have few clients to serve, don't
* use I/O threads, but thejboring synchronous code. */
if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
//如果是单线程模式或者停止 IO 工作线程,则执行流在此处直接退出
return handleClientsWithPendingWrites();
}
/* Start threads if needed. */
if (!io_threads_active) startThreadedIO();
//...省略部分代码...
}
根据上面的分析,开启 IO 工作线程需要满足如下三个条件:
- 配置文件中配置了工作线程数目大于 1;
- 待处理写事件的客户端数目达到所有 IO 线程数目的两倍或更多;
- IO 工作线程还没处于激活状态(全局变量 io_threads_active 记录该状态)。
逻辑三
handleClientsWithPendingWritesUsingThreads 函数接下来的几个逻辑就和 handleClientsWithPendingReadsUsingThreads 中一样了。
- 使用 round-robin 策略将有读事件分配给各个工作线程;
- 设置io_threads_pending[id]标志值,以激活工作线程;
- 主线程处理分配给自己链表中的client对象的写事件;
- 等待工作线程处理完对应的链表中所有的写事件;
- 所有写事件处理完毕之后,仍存在一些 client 存在未发送完的数据,则继续给这些 client 注册写事件。
//networking.c 3031行
int handleClientsWithPendingWritesUsingThreads(void) {
//...省略部分代码...
//使用 round-robin 策略将有读事件分配给各个工作线程
/* Distribute the clients across N different lists. */
listIter li;
listNode *ln;
listRewind(server.clients_pending_write,&li);
int item_id = 0;
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_WRITE;
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
//设置io_threads_pending[id]标志值,以激活工作线程
/* Give the start condition to the waiting threads, by setting the
* start condition atomic var. */
io_threads_op = IO_THREADS_OP_WRITE;
for (int j = 1; j < server.io_threads_num; j++) {
int count = listLength(io_threads_list[j]);
io_threads_pending[j] = count;
}
//主线程处理分配给自己链表中的client对象的写事件
/* Also use the main thread to process a slice of clients. */
listRewind(io_threads_list[0],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
writeToClient(c,0);
}
listEmpty(io_threads_list[0]);
//等待工作线程处理完对应的链表中所有的写事件
/* Wait for all the other threads to end their work. */
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += io_threads_pending[j];
if (pending == 0) break;
}
if (tio_debug) printf("I/O WRITE All threads finshed\n");
//所有写事件处理完毕之后,仍存在一些 client 存在未发送完的数据,则继续给这些 client 注册写事件
/* Run the list of clients again to install the write handler where
* needed. */
listRewind(server.clients_pending_write,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
/* Install the write handler if there are pending writes in some
* of the clients. */
if (clientHasPendingReplies(c) &&
connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR)
{
freeClientAsync(c);
}
}
listEmpty(server.clients_pending_write);
return processed;
}
无论读写事件,工作线程处理完自己的链表中含有读写事件的 client 对象后立刻挂起自己,下一次在是情况被主线程再次唤醒。
本质上来看,redis 6.0 中新增的多线程 IO 只是在 client 较多的情况下,利用多个 IO 工作线程加快处理速度,在工作线程处理这些 client 期间,主线程必须等待直到所有工作线程处理完毕。
我们将 redis 6.0 多线程网络 IO 处理逻辑绘制成如下流程图: