Redis源码学习之BIO

BIO顾名思义,background IO,是redis中运行的后台IO。 网上千篇一律的说法是redis是单线程单进程。 实际上redis运行过程中并不是严格单进程单线程应用。 
Redis中的多进程: 
在写入备份(RDB,AOF)的时候,会fork出子进程进行备份文件的写入。 
Redis中的多线程:

  1. AOF的备份模式中,如果我们设置的是AOF_FSYNC_EVERYSEC(每秒备份一次,这个设置可理解为弱同步备份),redis会create一个backgroud线程,在这个线程中执行aof备份文件的写入。

  2. 新生成的AOF文件,在覆盖旧AOF文件时。 如果在此之前AOF备份已经开启,在执行该fd的close前,我们的Redis进程与旧的AOF文件存在引用, 旧的AOF文件不会真正被删除。 所以当我们执行close(oldfd)时,旧AOF文件的被打开该文件的进程数为0,即没有进程打开过这个文件,这时这个文件在执行close时会被真正删除。 而删除旧AOF文件可能会阻塞服务,所以我们将它放到另一个线程调用。

  3. 执行DEL操作,假如碰巧这个key对应有非常多对象,那么这个删除操作会阻塞服务器几秒钟时间, 所以将删除操作放到另一个线程执行。 具体可看这篇文章: Lazy Redis is better Redis

BIO

Redis将所有多线程操作封装到BIO中,在bio.c,bio.h中可以看到。 本文我们关注的不是具体的操作,而是Redis封装的BIO行为, 这个代码简洁,维护性好。 值得学习一下。 
BIO提供以下几个api:

void bioInit(void); //初始化BIOvoid bioCreateBackgroundJob(int type, void \*arg1, void \*arg2, void *arg3);   //新建一个BIO任务unsigned long long bioPendingJobsOfType(int type);  //获取当前BIO任务类型,队列中待执行的任务个数unsigned long long bioWaitStepOfType(int type); //阻塞等待某个类型的BIO任务的执行,返回等待任务个数void bioKillThreads(void); //中断所有BIO进程

BIO操作的类型:

/* Background job opcodes */#define BIO\_CLOSE\_FILE    0 /* 关闭文件*/#define BIO\_AOF\_FSYNC     1 /* AOF写入  */#define BIO\_LAZY\_FREE     2 /* 释放对象 */#define BIO\_NUM\_OPS       3 /\*BIO数\*/

BIO对象:

static pthread\_t bio\_threads\[BIO\_NUM\_OPS\];  //BIO线程static pthread\_mutex\_t bio\_mutex\[BIO\_NUM\_OPS\]; //BIO每个线程的mutex锁变量static pthread\_cond\_t bio\_newjob\_cond\[BIO\_NUM\_OPS\]; //BIO线程锁的条件变量, 监听这个条件变量唤起当前线程static pthread\_cond\_t bio\_step\_cond\[BIO\_NUM\_OPS\]; //BIO线程阻塞锁,bioWaitStepOfType监听这个条件变量被通知该操作的执行。static list *bio\_jobs\[BIO\_NUM\_OPS\];static unsigned long long bio\_pending\[BIO\_NUM_OPS\]; // BIO未执行的

我们先看初始化的时候执行的部分:

bioInit() {    for (j = 0; j < BIO\_NUM\_OPS; j++) {        void \*arg = (void\*)(unsigned long) j;        if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) { // 初始化线程
            serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs.");            exit(1);
        }
        bio_threads\[j\] = thread;
    }
}

主要功能分为两个部分:

  1. bioCreateBackgroundJob: 创建BIO任务,插入bio_jobs,并调用pthread_cond_signal,通知进程解锁。

  2. bioProcessBackgroundJobs: 执行BIO任务线程。 线程中通过pthread管理进程锁,当bioCreateBackgroundJob执行pthread_cond_signal通知到该任务对应的线程时,从bio_jobs读出上一个任务,并执行。

bioCreateBackgroundJob

void bioCreateBackgroundJob(int type, void \*arg1, void \*arg2, void \*arg3) {    struct bio_job \*job = zmalloc(sizeof(*job));
    job->time = time(NULL);
    job->arg1 = arg1;
    job->arg2 = arg2;
    job->arg3 = arg3;
    pthread\_mutex\_lock(&bio\_mutex\[type\]); // 加锁 保护bio\_jobs和bio_pending的一致性
    listAddNodeTail(bio_jobs\[type\],job);  //插入到任务队列中
    bio_pending\[type\]++;  
    pthread\_cond\_signal(&bio\_newjob\_cond\[type\]);  //通知preocess线程,执行任务
    pthread\_mutex\_unlock(&bio_mutex\[type\]);  //解锁}

bioProcessBackgroundJobs

void \*bioProcessBackgroundJobs(void \*arg) {
    // 使进程可以被手动kill
    pthread\_setcancelstate(PTHREAD\_CANCEL_ENABLE, NULL);
    pthread\_setcanceltype(PTHREAD\_CANCEL_ASYNCHRONOUS, NULL); 
    // 加锁 确保不会有两个进程使用pthread\_cond\_wait监听同一个锁
    pthread\_mutex\_lock(&bio_mutex\[type\]);    while(1) {
        listNode *ln;
        /* The loop always starts with the lock hold. */        if (listLength(bio_jobs\[type\]) == 0) {
            pthread\_cond\_wait(&bio\_newjob\_cond\[type\],&bio_mutex\[type\]); // 等待bioCreateBackgroundJob通知解锁            continue;
        }
        //取队列中第一个任务
        ln = listFirst(bio_jobs\[type\]);
        job = ln->value;
        /* It is now possible to unlock the background system as we know have
         * a stand alone job structure to process.*/
        pthread\_mutex\_unlock(&bio_mutex\[type\]); //解锁
        // 根据type执行任务
        // do somethings...
        pthread\_cond\_broadcast(&bio\_step\_cond\[type\]); // 广播解锁,用于解bioWaitStepOfType中的锁, 接触阻塞。
        pthread\_mutex\_lock(&bio\_mutex\[type\]); // 为下面的操作加锁,且用于下一个循环的pthread\_cond_wait阻塞。
        listDelNode(bio\_jobs\[type\],ln);  // 操作bio\_jobs 和 bio_pending  标志这个任务已完成。
        bio_pending\[type\]--;
    }
}

pthread

整个BIO就是通过锁进行的阻塞后台IO。 如果我们梳理一下这个锁过程:

  1. bioInit,新建线程,执行bioProcessBackgroundJobs。

  2. bioProcessBackgroundJobs 中,pthread_mutex_lock(&bio_mutex[type]),给该任务的锁变量加锁。

  3. 进入while循环, 调用pthread_cond_wait, 等待解锁。 由于mutex锁是“sleep-lock”,线程会sleep,等待唤醒。

  4. 主线程调用创建BIO任务, 调用bioCreateBackgroundJob。

  5. bioCreateBackgroundJob中 pthread_mutex_lock(&bio_mutex[type]); 又对bio_mutex[type]加锁

  6. bioCreateBackgroundJob中pthread_cond_signal(&bio_newjob_cond[type]) //发送信号,通知BIO线程继续执行。

  7. bioCreateBackgroundJob中pthread_mutex_unlock(&bio_mutex[type]); //解锁

  8. bioProcessBackgroundJobs 中被唤醒继续进行。

  9. 执行任务完毕后,pthread_mutex_unlock解锁, pthread_cond_broadcast广播解锁。

  10. 再pthread_mutex_lock加锁 。 用于下一次while循环。

在梳理的时候,我发现一个奇怪的地方,我们第2步在BIO线程中加锁,第5步调用bioCreateBackgroundJob在主线程中又对mutex进行了一次加锁。 而在他们之间并没有pthread_mutex_unlock执行。 为什么bioCreateBackgroundJob没有被mutex的锁阻塞? 
一切的关键都在pthread_cond_wait这个函数中。 按照我原来的理解,pthread_cond_wait应该只是进行了一次信号等待, 等到某个信号后,将mutex[type]解锁。 为什么在信号发送前,pthread_mutex_lock没有将主线程的bioCreateBackgroundJob阻塞住。 所以我猜测, pthread_cond_wait不不仅仅是一次wait signal,而是unlock+wait。 
为了验证这个猜想,我们进去看pthread_cond_wait的实现: 
glibc中的pthread_cond_wait

// line 93int \_\_pthread\_cond_wait (cond, mutex)// line 110
  err = \_\_pthread\_mutex\_unlock\_usercnt (mutex, 0);  //解锁mutexdo {// line 155    lll\_futex\_wait (&cond->\_\_data.\_\_futex, futex\_val, pshared);  // wait signal} while (val == seq || cond->\_\_data.\_\_woken\_seq == val);// line 193return \_\_pthread\_mutex\_cond\_lock (mutex);

可以看到, pthread_cond_wait 实际上就是一次 Unlock -> Wait -> Lock。

  1. Redis源码

  2. Redis源码注释