前面我们介绍了key在惰性删除时同步删除过程的实现,具体可见:

本篇文章,我们将继续探索惰性删除时,key的异步删除过程的实现。

6.1.2 异步删除

直接看异步删除的代码:

#define LAZYFREE_THRESHOLD 64
int dbAsyncDelete(redisDb *db, robj *key) {
    // 第一步,进行expireDict的删除,仍旧保留dataDict中的dictEntry
    if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
		// 第二步,进行unlink操作,只是简单的把dataDict内部hash桶里的链表解链
  	// 内部调用的dictGenericDelete函数在同步删除代码中分析过,区别是此处nofree传入的值为1,即不进行内存释放
    dictEntry *de = dictUnlink(db->dict,key->ptr);
    if (de) {
        robj *val = dictGetVal(de);
      	// 计算当前释放value内存需要的开销,根据value的类型会得到不同的开销
        size_t free_effort = lazyfreeGetFreeEffort(val);

       	// 第三步,如果开销过大,并且当前引用计数为1,添加任务调度进行异步释放value
        if (free_effort > LAZYFREE_THRESHOLD && val->refcount == 1) {
            atomicIncr(lazyfree_objects,1);
            bioCreateBackgroundJob(BIO_LAZY_FREE,val,NULL,NULL);
            dictSetVal(db->dict,de,NULL);
        }
    }

    // 如果此时dictEntry仍未被释放/或是开销不大,触发兜底机制,直接调用dictFreeUnlinkedEntry进行key和value以及dictEntry的释放
  	// dictFreeUnlinkedEntry内部仍旧调用dictFreeKey,dictFreeVal,zfree,在同步删除中已经分析过
    if (de) {
        dictFreeUnlinkedEntry(db->dict,de);
      	//集群模式下,寻找对应的slot进行remove操作
        if (server.cluster_enabled) slotToKeyDel(key);
        return 1;
    } else {
        return 0;
    }
}

可以看到异步删除的过程中,添加异步任务的前置条件仍旧是引用计数,至于添加异步调度任务,核心代码在bioCreateBackgroundJob中,实际上就是调用了操作系统提供的标准库函数加锁,成功加锁后往类型为释放内存的任务队列中添加一个新的任务:

void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) {
  	//构造一个job
    struct bio_job *job = zmalloc(sizeof(*job));
		//为当前的任务设置任务创建时间
    job->time = time(NULL);
    //任务参数赋值
    job->arg1 = arg1;
    job->arg2 = arg2;
    job->arg3 = arg3;
  	//获取互斥锁 
    pthread_mutex_lock(&bio_mutex[type]);
  	//bio_jobs为二级指针,可以理解为二维数组,由任务类型定位到具体存储的队列,将任务添加至队尾
    listAddNodeTail(bio_jobs[type],job);
    bio_pending[type]++;
  	//唤醒阻塞线程,阻塞的线程会从cond_wait队列切换至mutex_lock队列,尝试争抢互斥锁
    pthread_cond_signal(&bio_newjob_cond[type]);
    //解锁
    pthread_mutex_unlock(&bio_mutex[type]);
}

那么,既然调度任务进入队列,就一定会有出队的时候。bioCreateBackgroundJob所在的文件为bio.c,bio全称background io,作为redis的后台io线程,支撑着redis-server运行时所有的异步任务调度。bio.c中存在这样一个函数bioProcessBackgroundJobs,就是它从任务队列中拿出创建的调度任务并执行,这个函数在redis-server启动时就会被调用,redis-server启动的入口,就在server.c中的main函数:

int main(int argc, char **argv) {
    ......
    // 检查是否启用哨兵模式
    server.sentinel_mode = checkForSentinelMode(argc,argv);
  	// 加载配置项
    initServerConfig();
   	......


  	// server初始化,分配hashtable的内存,初始化loop线程
    initServer();
    ......
    if (!server.sentinel_mode) {
      	......
        InitServerLast();
      	//从磁盘恢复数据,模式可以为RDB或AOF
        loadDataFromDisk();
        if (server.cluster_enabled) {
            if (verifyClusterConfigWithData() == C_ERR) {
                serverLog(LL_WARNING,
                    "You can't have keys in a DB different than DB 0 when in "
                    "Cluster mode. Exiting.");
                exit(1);
            }
        }
        if (server.ipfd_count > 0)
            serverLog(LL_NOTICE,"Ready to accept connections");
        if (server.sofd > 0)
            serverLog(LL_NOTICE,"The server is now ready to accept connections at %s", server.unixsocket);
    } else {
        InitServerLast();
        sentinelIsRunning();
    }

    if (server.maxmemory > 0 && server.maxmemory < 1024*1024) {
        serverLog(LL_WARNING,"WARNING: You specified a maxmemory value that is less than 1MB (current value is %llu bytes). Are you sure this is what you really want?", server.maxmemory);
    }

    aeSetBeforeSleepProc(server.el,beforeSleep);
    aeSetAfterSleepProc(server.el,afterSleep);
    aeMain(server.el);
    aeDeleteEventLoop(server.el);
    return 0;
}

不论是单机模式还是哨兵集群模式,都会调用InitServerLast进行server初始化:

void InitServerLast() {
    bioInit();
    server.initial_memory_usage = zmalloc_used_memory();
}

void bioInit(void) {
    pthread_attr_t attr;
    pthread_t thread;
    size_t stacksize;
    int j;

    //BIO_NUM_OPS默认值为3,也就意味着通过pthread孵化了三组调度线程模型,它们分别对应着三种不同类型的调度任务
  	//每一组线程调度模型有着自己的互斥锁、任务队列为空等待条件、线程唤醒条件,分别对应bio_mutex、bio_newjob_cond、bio_step_cond
  	//bio_jobs为具体的任务队列,由listCreate创建的一个含有头尾指针的链表
  	//bio_pending存储每种类型的任务等待处理的数量
    for (j = 0; j < BIO_NUM_OPS; j++) {
        pthread_mutex_init(&bio_mutex[j],NULL);
        pthread_cond_init(&bio_newjob_cond[j],NULL);
        pthread_cond_init(&bio_step_cond[j],NULL);
        bio_jobs[j] = listCreate();
        bio_pending[j] = 0;
    }

    ......
      
    for (j = 0; j < BIO_NUM_OPS; j++) {
        void *arg = (void*)(unsigned long) j;
      	//每种类型创建一个loop线程,调用bioProcessBackgroundJobs进行任务调度
        if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {
            serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs.");
            exit(1);
        }
        bio_threads[j] = thread;
    }
} 

到了bioProcessBackgroundJobs这一层,开始真正的任务调度过程:

void *bioProcessBackgroundJobs(void *arg) {
    struct bio_job *job;
  	//type为任务的类型
    unsigned long type = (unsigned long) arg;
    sigset_t sigset;

    //任务类型校验合法性
    if (type >= BIO_NUM_OPS) {
        serverLog(LL_WARNING,
            "Warning: bio thread started with wrong type %lu",type);
        return NULL;
    }

  	//设置线程属性,让线程可以被中断
    pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
    pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
		//获取互斥锁
    pthread_mutex_lock(&bio_mutex[type]);

    sigemptyset(&sigset);
    sigaddset(&sigset, SIGALRM);
    if (pthread_sigmask(SIG_BLOCK, &sigset, NULL))
        serverLog(LL_WARNING,
            "Warning: can't mask SIGALRM in bio.c thread: %s", strerror(errno));

    while(1) {
        listNode *ln;

        
        if (listLength(bio_jobs[type]) == 0) {
          	//如果当前type对应的任务队列为空,当前线程占有互斥锁并等待
            pthread_cond_wait(&bio_newjob_cond[type],&bio_mutex[type]);
            continue;
        }
        //直到任务队列不为空,获取头节点,占有互斥锁的线程将在这一步获得job
        ln = listFirst(bio_jobs[type]);
        job = ln->value;
        
      	//获取到任务后释放互斥锁
        pthread_mutex_unlock(&bio_mutex[type]);

        //根据类型处理任务调度,这里我们传入的type为BIO_LAZY_FREE,代表释放内存类型的任务
        if (type == BIO_CLOSE_FILE) {
            close((long)job->arg1);
        } else if (type == BIO_AOF_FSYNC) {
            redis_fsync((long)job->arg1);
        } else if (type == BIO_LAZY_FREE) {
            //根据参数来调用不同的释放函数
            if (job->arg1)
              	//释放指针指向的对象内存
                lazyfreeFreeObjectFromBioThread(job->arg1);
            else if (job->arg2 && job->arg3)
              	//清除一个dict中的两个hashTable,即dataDict和expireDict,也就是ht[2]
                lazyfreeFreeDatabaseFromBioThread(job->arg2,job->arg3);
            else if (job->arg3)
              	//清除zset的底层实现:skipList
                lazyfreeFreeSlotsMapFromBioThread(job->arg3);
        } else {
            serverPanic("Wrong job type in bioProcessBackgroundJobs().");
        }
      	//释放调度任务占用的内存
        zfree(job);

        //再次获取互斥锁,配合循环中空任务队列等待条件使用
        pthread_mutex_lock(&bio_mutex[type]);
      	//任务队列出队
        listDelNode(bio_jobs[type],ln);
      	//正在等待处理的任务数-1
        bio_pending[type]--;

        pthread_cond_broadcast(&bio_step_cond[type]);
    }
}

在异步删除的代码中很明显看到只传入了一个arg1的值:

bioCreateBackgroundJob(BIO_LAZY_FREE,val,NULL,NULL);

因此可以推断出在任务调度过程中会调用lazyfreeFreeObjectFromBioThread方法进行内存释放:

void lazyfreeFreeObjectFromBioThread(robj *o) {
    decrRefCount(o);
    atomicDecr(lazyfree_objects,1);
}

也是调用了decrRefCount,针对不同类型的value值,调用不同的释放内存函数。

所以不论是同步删除还是异步删除的方式,二者都是针对key所在的sds内存释放,和value不同类型的内存释放,底层最终都是会调用zfree进行内存释放,zfree封装了操作系统提供的free接口。

至此,惰性删除的流程,以及底层函数的调用已经分析完毕。