5分钟
Redis存储原理剖析<三>:key的惰性删除–异步删除策略
前面我们介绍了key在惰性删除时同步删除过程的实现,具体可见:
本篇文章,我们将继续探索惰性删除时,key的异步删除过程的实现。
6.1.2 异步删除
直接看异步删除的代码:
#define LAZYFREE_THRESHOLD 64
int dbAsyncDelete(redisDb *db, robj *key) {
// 第一步,进行expireDict的删除,仍旧保留dataDict中的dictEntry
if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
// 第二步,进行unlink操作,只是简单的把dataDict内部hash桶里的链表解链
// 内部调用的dictGenericDelete函数在同步删除代码中分析过,区别是此处nofree传入的值为1,即不进行内存释放
dictEntry *de = dictUnlink(db->dict,key->ptr);
if (de) {
robj *val = dictGetVal(de);
// 计算当前释放value内存需要的开销,根据value的类型会得到不同的开销
size_t free_effort = lazyfreeGetFreeEffort(val);
// 第三步,如果开销过大,并且当前引用计数为1,添加任务调度进行异步释放value
if (free_effort > LAZYFREE_THRESHOLD && val->refcount == 1) {
atomicIncr(lazyfree_objects,1);
bioCreateBackgroundJob(BIO_LAZY_FREE,val,NULL,NULL);
dictSetVal(db->dict,de,NULL);
}
}
// 如果此时dictEntry仍未被释放/或是开销不大,触发兜底机制,直接调用dictFreeUnlinkedEntry进行key和value以及dictEntry的释放
// dictFreeUnlinkedEntry内部仍旧调用dictFreeKey,dictFreeVal,zfree,在同步删除中已经分析过
if (de) {
dictFreeUnlinkedEntry(db->dict,de);
//集群模式下,寻找对应的slot进行remove操作
if (server.cluster_enabled) slotToKeyDel(key);
return 1;
} else {
return 0;
}
}
可以看到异步删除的过程中,添加异步任务的前置条件仍旧是引用计数,至于添加异步调度任务,核心代码在bioCreateBackgroundJob中,实际上就是调用了操作系统提供的标准库函数加锁,成功加锁后往类型为释放内存的任务队列中添加一个新的任务:
void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) {
//构造一个job
struct bio_job *job = zmalloc(sizeof(*job));
//为当前的任务设置任务创建时间
job->time = time(NULL);
//任务参数赋值
job->arg1 = arg1;
job->arg2 = arg2;
job->arg3 = arg3;
//获取互斥锁
pthread_mutex_lock(&bio_mutex[type]);
//bio_jobs为二级指针,可以理解为二维数组,由任务类型定位到具体存储的队列,将任务添加至队尾
listAddNodeTail(bio_jobs[type],job);
bio_pending[type]++;
//唤醒阻塞线程,阻塞的线程会从cond_wait队列切换至mutex_lock队列,尝试争抢互斥锁
pthread_cond_signal(&bio_newjob_cond[type]);
//解锁
pthread_mutex_unlock(&bio_mutex[type]);
}
那么,既然调度任务进入队列,就一定会有出队的时候。bioCreateBackgroundJob所在的文件为bio.c,bio全称background io,作为redis的后台io线程,支撑着redis-server运行时所有的异步任务调度。bio.c中存在这样一个函数bioProcessBackgroundJobs,就是它从任务队列中拿出创建的调度任务并执行,这个函数在redis-server启动时就会被调用,redis-server启动的入口,就在server.c中的main函数:
int main(int argc, char **argv) {
......
// 检查是否启用哨兵模式
server.sentinel_mode = checkForSentinelMode(argc,argv);
// 加载配置项
initServerConfig();
......
// server初始化,分配hashtable的内存,初始化loop线程
initServer();
......
if (!server.sentinel_mode) {
......
InitServerLast();
//从磁盘恢复数据,模式可以为RDB或AOF
loadDataFromDisk();
if (server.cluster_enabled) {
if (verifyClusterConfigWithData() == C_ERR) {
serverLog(LL_WARNING,
"You can't have keys in a DB different than DB 0 when in "
"Cluster mode. Exiting.");
exit(1);
}
}
if (server.ipfd_count > 0)
serverLog(LL_NOTICE,"Ready to accept connections");
if (server.sofd > 0)
serverLog(LL_NOTICE,"The server is now ready to accept connections at %s", server.unixsocket);
} else {
InitServerLast();
sentinelIsRunning();
}
if (server.maxmemory > 0 && server.maxmemory < 1024*1024) {
serverLog(LL_WARNING,"WARNING: You specified a maxmemory value that is less than 1MB (current value is %llu bytes). Are you sure this is what you really want?", server.maxmemory);
}
aeSetBeforeSleepProc(server.el,beforeSleep);
aeSetAfterSleepProc(server.el,afterSleep);
aeMain(server.el);
aeDeleteEventLoop(server.el);
return 0;
}
不论是单机模式还是哨兵集群模式,都会调用InitServerLast进行server初始化:
void InitServerLast() {
bioInit();
server.initial_memory_usage = zmalloc_used_memory();
}
void bioInit(void) {
pthread_attr_t attr;
pthread_t thread;
size_t stacksize;
int j;
//BIO_NUM_OPS默认值为3,也就意味着通过pthread孵化了三组调度线程模型,它们分别对应着三种不同类型的调度任务
//每一组线程调度模型有着自己的互斥锁、任务队列为空等待条件、线程唤醒条件,分别对应bio_mutex、bio_newjob_cond、bio_step_cond
//bio_jobs为具体的任务队列,由listCreate创建的一个含有头尾指针的链表
//bio_pending存储每种类型的任务等待处理的数量
for (j = 0; j < BIO_NUM_OPS; j++) {
pthread_mutex_init(&bio_mutex[j],NULL);
pthread_cond_init(&bio_newjob_cond[j],NULL);
pthread_cond_init(&bio_step_cond[j],NULL);
bio_jobs[j] = listCreate();
bio_pending[j] = 0;
}
......
for (j = 0; j < BIO_NUM_OPS; j++) {
void *arg = (void*)(unsigned long) j;
//每种类型创建一个loop线程,调用bioProcessBackgroundJobs进行任务调度
if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {
serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs.");
exit(1);
}
bio_threads[j] = thread;
}
}
到了bioProcessBackgroundJobs这一层,开始真正的任务调度过程:
void *bioProcessBackgroundJobs(void *arg) {
struct bio_job *job;
//type为任务的类型
unsigned long type = (unsigned long) arg;
sigset_t sigset;
//任务类型校验合法性
if (type >= BIO_NUM_OPS) {
serverLog(LL_WARNING,
"Warning: bio thread started with wrong type %lu",type);
return NULL;
}
//设置线程属性,让线程可以被中断
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
//获取互斥锁
pthread_mutex_lock(&bio_mutex[type]);
sigemptyset(&sigset);
sigaddset(&sigset, SIGALRM);
if (pthread_sigmask(SIG_BLOCK, &sigset, NULL))
serverLog(LL_WARNING,
"Warning: can't mask SIGALRM in bio.c thread: %s", strerror(errno));
while(1) {
listNode *ln;
if (listLength(bio_jobs[type]) == 0) {
//如果当前type对应的任务队列为空,当前线程占有互斥锁并等待
pthread_cond_wait(&bio_newjob_cond[type],&bio_mutex[type]);
continue;
}
//直到任务队列不为空,获取头节点,占有互斥锁的线程将在这一步获得job
ln = listFirst(bio_jobs[type]);
job = ln->value;
//获取到任务后释放互斥锁
pthread_mutex_unlock(&bio_mutex[type]);
//根据类型处理任务调度,这里我们传入的type为BIO_LAZY_FREE,代表释放内存类型的任务
if (type == BIO_CLOSE_FILE) {
close((long)job->arg1);
} else if (type == BIO_AOF_FSYNC) {
redis_fsync((long)job->arg1);
} else if (type == BIO_LAZY_FREE) {
//根据参数来调用不同的释放函数
if (job->arg1)
//释放指针指向的对象内存
lazyfreeFreeObjectFromBioThread(job->arg1);
else if (job->arg2 && job->arg3)
//清除一个dict中的两个hashTable,即dataDict和expireDict,也就是ht[2]
lazyfreeFreeDatabaseFromBioThread(job->arg2,job->arg3);
else if (job->arg3)
//清除zset的底层实现:skipList
lazyfreeFreeSlotsMapFromBioThread(job->arg3);
} else {
serverPanic("Wrong job type in bioProcessBackgroundJobs().");
}
//释放调度任务占用的内存
zfree(job);
//再次获取互斥锁,配合循环中空任务队列等待条件使用
pthread_mutex_lock(&bio_mutex[type]);
//任务队列出队
listDelNode(bio_jobs[type],ln);
//正在等待处理的任务数-1
bio_pending[type]--;
pthread_cond_broadcast(&bio_step_cond[type]);
}
}
在异步删除的代码中很明显看到只传入了一个arg1的值:
bioCreateBackgroundJob(BIO_LAZY_FREE,val,NULL,NULL);
因此可以推断出在任务调度过程中会调用lazyfreeFreeObjectFromBioThread方法进行内存释放:
void lazyfreeFreeObjectFromBioThread(robj *o) {
decrRefCount(o);
atomicDecr(lazyfree_objects,1);
}
也是调用了decrRefCount,针对不同类型的value值,调用不同的释放内存函数。
所以不论是同步删除还是异步删除的方式,二者都是针对key所在的sds内存释放,和value不同类型的内存释放,底层最终都是会调用zfree进行内存释放,zfree封装了操作系统提供的free接口。
至此,惰性删除的流程,以及底层函数的调用已经分析完毕。