likes
comments
collection
share

redis源码学习|watch

作者站长头像
站长
· 阅读数 74

最近想深入学习一下redis,那阅读源码是最好的学习方式了,正好最近pika社区在讨论 事务 的实现,实现事务的基础就是数据的一致性 虽然redis的事务没有像 关系型数据库那样,支持数据回滚。但是redis的事务也可以保证数据的一致性,如何保证数据 一致性,就是靠 watch 这个功能来实现的。说白了,redis的watch功能就是一个乐观锁

乐观锁

我的理解是,所谓的乐观锁,对数据加锁不会阻止别的人修改数据,但是别人修改过的数据,加锁的人能知道这个数据被修改过

redis 实现

本文中的源码基于redis 7.0.11,不同的版本实现可能会不同

先放一张watch的数据结构图

redis源码学习|watch

在redis的 db 结构中,有一个map,用来记录所有被watch的key,所有watch这个key的client都使用链表串联在一起

源码

/* Redis database representation. There are multiple databases identified
 * by integers from 0 (the default database) up to the max configured
 * database. The database number is the 'id' field in the structure. */
typedef struct redisDb {
    dict *dict;                 /* The keyspace for this DB */
    dict *expires;              /* Timeout of keys with a timeout set */
    dict *blocking_keys;        /* Keys with clients waiting for data (BLPOP)*/
    dict *ready_keys;           /* Blocked keys that received a PUSH */
    dict *watched_keys;         /* WATCHED keys for MULTI/EXEC CAS 这个map就是记录被watch的key*/
    int id;                     /* Database ID */
    long long avg_ttl;          /* Average TTL, just for stats */
    unsigned long expires_cursor; /* Cursor of the active expire cycle. */
    list *defrag_later;         /* List of key names to attempt to defrag one by one, gradually. */
    clusterSlotToKeyMapping *slots_to_keys; /* Array of slots to keys. Only used in cluster mode (db 0). */
} redisDb;

在redis的 client中也有一个链表,记录了当前client watch的所有key

还有一个flag记录当前client的状态,这个flag可以记录很多状态,在watch功能里,可以记录这个当前client watch的key,是否被别的client修改过

源码

typedef struct client {
    uint64_t flags;         /* Client flags: CLIENT_* macros. */
    list *watched_keys;     /* Keys WATCHED for MULTI/EXEC CAS */
};

因为redis不同的db是互相隔离的,所以在db层面做watch就可以了

watch key

现在来看一下如何watch一个key

源码

void watchForKey(client *c, robj *key) {
    list *clients = NULL;
    listIter li;
    listNode *ln;
    watchedKey *wk;

    /* Check if we are already watching for this key */
    listRewind(c->watched_keys,&li);
    //遍历当前client已经watch的key中,是否包含当前要watch的key
    while((ln = listNext(&li))) {
        wk = listNodeValue(ln);
        //因为redis不同的db中,数据是隔离的,所以要判断一下,db是否相同
        //比如db0和db1都有key1这个key
        if (wk->db == c->db && equalStringObjects(key,wk->key))
            //如果这个key已经被watch了,直接rerun就好了
            return; /* Key already watched */
    }
    /* This key is not already watched in this DB. Let's add it */
    //去db 的 `watched_keys`这个map中,找到watch这个key的所有client
    clients = dictFetchValue(c->db->watched_keys,key);
    if (!clients) {
        //如果没有client watch 这个key,返回的链表是空指针,这时候初始化一个链表
        clients = listCreate();
        //把链表赋值给map
        dictAdd(c->db->watched_keys,key,clients);
        incrRefCount(key);
    }
    /* Add the new key to the list of keys watched by this client */
    wk = zmalloc(sizeof(*wk));
    wk->key = key;
    wk->client = c;
    wk->db = c->db;
    //记录一下,在watch时,这个key是否过期了,如果在watch前就已经过期了,在执行事务的时候,就忽略这个key
    wk->expired = keyIsExpired(c->db, key);
    incrRefCount(key);
    //把当前的client加到链表中
    listAddNodeTail(c->watched_keys,wk);
    //把watch的key 加入的client watch链表中
    listAddNodeTail(clients,wk);
}

watch一个key前,会检查一下这个key是否已经被watch了,避免重复watch

找到watch这个key的所有client,并使用尾插法把当前client加入链表,并且记录一下watch这个key之前,这个key是否过期了

到这里,watch的过程就完成了,通过源码可以发现,watch的过程还是挺好理解的,就是在记录一下

执行事务

当redis的client在执行 EXEC命令时,会把当前事务所有的命令一起执行,在执行命令前,会先检查一下watch的key是否被修改了,如果被修改了, 就会放弃执行命令,返回失败

源码

void execCommand(client *c) {
    int j;
    robj **orig_argv;
    int orig_argc, orig_argv_len;
    struct redisCommand *orig_cmd;

    //先检查一下,如果client在此之前没有执行过`MULTI`命令,就执行`EXEC`,返回错误
    if (!(c->flags & CLIENT_MULTI)) {
        addReplyError(c,"EXEC without MULTI");
        return;
    }

    /* EXEC with expired watched key is disallowed*/
    //检查一下被watch的key是否过期了,如果过期了,事务也会失败
    //如果这个key在watch之前就过期了,那么这个key会被忽略
    if (isWatchedKeyExpired(c)) {
        c->flags |= (CLIENT_DIRTY_CAS);
    }

    /* Check if we need to abort the EXEC because:
     * 1) Some WATCHed key was touched.
     * 2) There was a previous error while queueing commands.
     * A failed EXEC in the first case returns a multi bulk nil object
     * (technically it is not an error but a special behavior), while
     * in the second an EXECABORT error is returned. */
    //这里就是判断这个key是否被其他client改动了,如果key被别的client改动了,或者事务出错了,那么本次事务都会失败
    if (c->flags & (CLIENT_DIRTY_CAS | CLIENT_DIRTY_EXEC)) {
        if (c->flags & CLIENT_DIRTY_EXEC) {
            addReplyErrorObject(c, shared.execaborterr);
        } else {
            addReply(c, shared.nullarray[c->resp]);
        }

        discardTransaction(c);
        return;
    }
    uint64_t old_flags = c->flags;

    /* we do not want to allow blocking commands inside multi */
    c->flags |= CLIENT_DENY_BLOCKING;

    /* Exec all the queued commands */
    //取消watch的所有key
    unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */
    .....................................................
    省略
}

在执行事务前,会检查一下当前client是否开启了事务(是否执行了 MULTI 命令),没有开启事务,这次事务会失败

再检查这个client watch的key是否被修改了。如果这个key被其他的client修改了,则这个事务会执行失败

watch检查没有问题后,会清除当前client watch的所有key

到这,事务执行前的检查就完成了,后面就是事务相关的判断和操作了

修改watch的key

如果一个被watch的key被修改了,那么所有watch这个key的client都会知道,具体实现如下

源码

所有修改命令(比如set,hset等等)在执行后,都会调用这个函数,把watch的key标记为修改状态

void touchWatchedKey(redisDb *db, robj *key) {
    list *clients;
    listIter li;
    listNode *ln;

    //如果记录key的map为空或者client链表是空,直接return
    if (dictSize(db->watched_keys) == 0) return;
    clients = dictFetchValue(db->watched_keys, key);
    if (!clients) return;

    /* Mark all the clients watching this key as CLIENT_DIRTY_CAS */
    /* Check if we are already watching for this key */
    listRewind(clients,&li);
    //遍历watch这个key的client链表,把所有client的flag修改为 `CLIENT_DIRTY_CAS` 状态
    while((ln = listNext(&li))) {
        watchedKey *wk = listNodeValue(ln);
        client *c = wk->client;

        if (wk->expired) {
            /* The key was already expired when WATCH was called. */
            if (db == wk->db &&
                equalStringObjects(key, wk->key) &&
                dictFind(db->dict, key->ptr) == NULL)
            {
                //如果在这个client在watch前,这个key就已经过期了 && 是这个db的 && key是同一个 && 内存里没有这个key了
                //就清除 watch就过期的 flag
                /* Already expired key is deleted, so logically no change. Clear
                 * the flag. Deleted keys are not flagged as expired. */
                wk->expired = 0;
                goto skip_client;
            }
            //否则就结束
            break;
        }

        //把这个client的flag修改为 被其他client修改了
        c->flags |= CLIENT_DIRTY_CAS;
        /* As the client is marked as dirty, there is no point in getting here
         * again in case that key (or others) are modified again (or keep the
         * memory overhead till EXEC). */
        //如果这个client watch的key被标记为以修改,那么就把这个client watch的key都删掉
        unwatchAllKeys(c);

    skip_client:
        continue;
    }
}

//删除client watch的key
void unwatchAllKeys(client *c) {
    listIter li;
    listNode *ln;

    if (listLength(c->watched_keys) == 0) return;
    listRewind(c->watched_keys,&li);
    //变量当前client watch的key 把这些key在对应的db的`watched_keys`中删除
    while((ln = listNext(&li))) {
        list *clients;
        watchedKey *wk;

        /* Lookup the watched key -> clients list and remove the client's wk
         * from the list */
        wk = listNodeValue(ln);
        clients = dictFetchValue(wk->db->watched_keys, wk->key);
        serverAssertWithInfo(c,NULL,clients != NULL);
        listDelNode(clients,listSearchKey(clients,wk));
        /* Kill the entry at all if this was the only client */
        if (listLength(clients) == 0)
            dictDelete(wk->db->watched_keys, wk->key);
        /* Remove this watched key from the client->watched list */
        listDelNode(c->watched_keys,ln);
        decrRefCount(wk->key);
        zfree(wk);
    }
}

任意client在修改key后,都会调用 touchWatchedKey 把watch这个key的client的flag标记为被修改状态,watch这个key的client,会遍历自己的watch 的链表,把所有的key都删掉。后续在执行事务时会失败

在执行 UNWATCH 命令时,最终调用的也是 unwatchAllKeys 这个函数

总结

redis通过 watch 命令实现乐观锁,保证了事务中数据的一致性。

redis的乐观锁在集群模式下并不适用,在集群模式下,还是要使用 SETEX KEY_NAME TIMEOUT VALUE 这种方式加锁来保证数据一致

刚开始学习redis源码,对一些概念理解还不是很深,如果有错误的地方,还请批评指正

转载自:https://juejin.cn/post/7237725850677411897
评论
请登录