Redis源码学习之BIO
BIO顾名思义,background IO,是redis中运行的后台IO。 网上千篇一律的说法是redis是单线程单进程。 实际上redis运行过程中并不是严格单进程单线程应用。
Redis中的多进程:
在写入备份(RDB,AOF)的时候,会fork出子进程进行备份文件的写入。
Redis中的多线程:
-
AOF的备份模式中,如果我们设置的是AOF_FSYNC_EVERYSEC(每秒备份一次,这个设置可理解为弱同步备份),redis会create一个backgroud线程,在这个线程中执行aof备份文件的写入。
-
新生成的AOF文件,在覆盖旧AOF文件时。 如果在此之前AOF备份已经开启,在执行该fd的close前,我们的Redis进程与旧的AOF文件存在引用, 旧的AOF文件不会真正被删除。 所以当我们执行close(oldfd)时,旧AOF文件的被打开该文件的进程数为0,即没有进程打开过这个文件,这时这个文件在执行close时会被真正删除。 而删除旧AOF文件可能会阻塞服务,所以我们将它放到另一个线程调用。
-
执行DEL操作,假如碰巧这个key对应有非常多对象,那么这个删除操作会阻塞服务器几秒钟时间, 所以将删除操作放到另一个线程执行。 具体可看这篇文章: Lazy Redis is better Redis。
BIO
Redis将所有多线程操作封装到BIO中,在bio.c,bio.h中可以看到。 本文我们关注的不是具体的操作,而是Redis封装的BIO行为, 这个代码简洁,维护性好。 值得学习一下。
BIO提供以下几个api:
void bioInit(void); //初始化BIOvoid bioCreateBackgroundJob(int type, void \*arg1, void \*arg2, void *arg3); //新建一个BIO任务unsigned long long bioPendingJobsOfType(int type); //获取当前BIO任务类型,队列中待执行的任务个数unsigned long long bioWaitStepOfType(int type); //阻塞等待某个类型的BIO任务的执行,返回等待任务个数void bioKillThreads(void); //中断所有BIO进程
BIO操作的类型:
/* Background job opcodes */#define BIO\_CLOSE\_FILE 0 /* 关闭文件*/#define BIO\_AOF\_FSYNC 1 /* AOF写入 */#define BIO\_LAZY\_FREE 2 /* 释放对象 */#define BIO\_NUM\_OPS 3 /\*BIO数\*/
BIO对象:
static pthread\_t bio\_threads\[BIO\_NUM\_OPS\]; //BIO线程static pthread\_mutex\_t bio\_mutex\[BIO\_NUM\_OPS\]; //BIO每个线程的mutex锁变量static pthread\_cond\_t bio\_newjob\_cond\[BIO\_NUM\_OPS\]; //BIO线程锁的条件变量, 监听这个条件变量唤起当前线程static pthread\_cond\_t bio\_step\_cond\[BIO\_NUM\_OPS\]; //BIO线程阻塞锁,bioWaitStepOfType监听这个条件变量被通知该操作的执行。static list *bio\_jobs\[BIO\_NUM\_OPS\];static unsigned long long bio\_pending\[BIO\_NUM_OPS\]; // BIO未执行的
我们先看初始化的时候执行的部分:
bioInit() { for (j = 0; j < BIO\_NUM\_OPS; j++) { void \*arg = (void\*)(unsigned long) j; if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) { // 初始化线程
serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs."); exit(1);
}
bio_threads\[j\] = thread;
}
}
主要功能分为两个部分:
-
bioCreateBackgroundJob: 创建BIO任务,插入bio_jobs,并调用pthread_cond_signal,通知进程解锁。
-
bioProcessBackgroundJobs: 执行BIO任务线程。 线程中通过pthread管理进程锁,当bioCreateBackgroundJob执行pthread_cond_signal通知到该任务对应的线程时,从bio_jobs读出上一个任务,并执行。
bioCreateBackgroundJob
void bioCreateBackgroundJob(int type, void \*arg1, void \*arg2, void \*arg3) { struct bio_job \*job = zmalloc(sizeof(*job));
job->time = time(NULL);
job->arg1 = arg1;
job->arg2 = arg2;
job->arg3 = arg3;
pthread\_mutex\_lock(&bio\_mutex\[type\]); // 加锁 保护bio\_jobs和bio_pending的一致性
listAddNodeTail(bio_jobs\[type\],job); //插入到任务队列中
bio_pending\[type\]++;
pthread\_cond\_signal(&bio\_newjob\_cond\[type\]); //通知preocess线程,执行任务
pthread\_mutex\_unlock(&bio_mutex\[type\]); //解锁}
bioProcessBackgroundJobs
void \*bioProcessBackgroundJobs(void \*arg) {
// 使进程可以被手动kill
pthread\_setcancelstate(PTHREAD\_CANCEL_ENABLE, NULL);
pthread\_setcanceltype(PTHREAD\_CANCEL_ASYNCHRONOUS, NULL);
// 加锁 确保不会有两个进程使用pthread\_cond\_wait监听同一个锁
pthread\_mutex\_lock(&bio_mutex\[type\]); while(1) {
listNode *ln;
/* The loop always starts with the lock hold. */ if (listLength(bio_jobs\[type\]) == 0) {
pthread\_cond\_wait(&bio\_newjob\_cond\[type\],&bio_mutex\[type\]); // 等待bioCreateBackgroundJob通知解锁 continue;
}
//取队列中第一个任务
ln = listFirst(bio_jobs\[type\]);
job = ln->value;
/* It is now possible to unlock the background system as we know have
* a stand alone job structure to process.*/
pthread\_mutex\_unlock(&bio_mutex\[type\]); //解锁
// 根据type执行任务
// do somethings...
pthread\_cond\_broadcast(&bio\_step\_cond\[type\]); // 广播解锁,用于解bioWaitStepOfType中的锁, 接触阻塞。
pthread\_mutex\_lock(&bio\_mutex\[type\]); // 为下面的操作加锁,且用于下一个循环的pthread\_cond_wait阻塞。
listDelNode(bio\_jobs\[type\],ln); // 操作bio\_jobs 和 bio_pending 标志这个任务已完成。
bio_pending\[type\]--;
}
}
pthread
整个BIO就是通过锁进行的阻塞后台IO。 如果我们梳理一下这个锁过程:
-
bioInit,新建线程,执行bioProcessBackgroundJobs。
-
bioProcessBackgroundJobs 中,pthread_mutex_lock(&bio_mutex[type]),给该任务的锁变量加锁。
-
进入while循环, 调用pthread_cond_wait, 等待解锁。 由于mutex锁是“sleep-lock”,线程会sleep,等待唤醒。
-
主线程调用创建BIO任务, 调用bioCreateBackgroundJob。
-
bioCreateBackgroundJob中 pthread_mutex_lock(&bio_mutex[type]); 又对bio_mutex[type]加锁
-
bioCreateBackgroundJob中pthread_cond_signal(&bio_newjob_cond[type]) //发送信号,通知BIO线程继续执行。
-
bioCreateBackgroundJob中pthread_mutex_unlock(&bio_mutex[type]); //解锁
-
bioProcessBackgroundJobs 中被唤醒继续进行。
-
执行任务完毕后,pthread_mutex_unlock解锁, pthread_cond_broadcast广播解锁。
-
再pthread_mutex_lock加锁 。 用于下一次while循环。
在梳理的时候,我发现一个奇怪的地方,我们第2步在BIO线程中加锁,第5步调用bioCreateBackgroundJob在主线程中又对mutex进行了一次加锁。 而在他们之间并没有pthread_mutex_unlock执行。 为什么bioCreateBackgroundJob没有被mutex的锁阻塞?
一切的关键都在pthread_cond_wait这个函数中。 按照我原来的理解,pthread_cond_wait应该只是进行了一次信号等待, 等到某个信号后,将mutex[type]解锁。 为什么在信号发送前,pthread_mutex_lock没有将主线程的bioCreateBackgroundJob阻塞住。 所以我猜测, pthread_cond_wait不不仅仅是一次wait signal,而是unlock+wait。
为了验证这个猜想,我们进去看pthread_cond_wait的实现:
glibc中的pthread_cond_wait
// line 93int \_\_pthread\_cond_wait (cond, mutex)// line 110
err = \_\_pthread\_mutex\_unlock\_usercnt (mutex, 0); //解锁mutexdo {// line 155 lll\_futex\_wait (&cond->\_\_data.\_\_futex, futex\_val, pshared); // wait signal} while (val == seq || cond->\_\_data.\_\_woken\_seq == val);// line 193return \_\_pthread\_mutex\_cond\_lock (mutex);
可以看到, pthread_cond_wait 实际上就是一次 Unlock -> Wait -> Lock。