Redis 内容篇

[TOC]

执行命令

上文回顾 && 执行逻辑

在前面我们分析了解析部分的内容,解析完所有的信息后,最终我们来到了 processCommand 这个函数的最后,

void processCommand(redisClient *c) {
    // ...
    // Exec the command
    if (c->flags & REDIS_MULTI &&
        c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
        c->cmd->proc != multiCommand && c->cmd-proc != watchCommand) {
        queueMultiCommand(c);
        addReply(c, shared.queued);   
    }
    else {
        call(c, REDIS_CALL_FULL);
        c->woff = server.master_repl_offset;
        if (listLength(server.ready_keys))
            handleClientBlockedOnLists();
    }
}

从上面可以看到,最终执行命令,是由 call 这个调用来实现的,所以我们跟进去看看它到底做了什么。

void call(redisClient *c, int flags) {
    long long dirty, start, duration;
    int client_old_flags = c->flags;
    
    // 如果有监控器在监控当前服务,则将当前要执行的命令信息发送给监控器
    if (listLength(server.monitors) &&
        !server.loading &&
        !(c->cmd->flags & (REDIS_CMD_SKIP_MONITOR | REDIS_CMD_ADMIN))) {
        replicationFeedMonitors(c, server.monitors, c->db->id, c->argv, c->argc);   
    }
    
    // 清除除了 FORCE_AOF 跟 FORCE_REPL 以外的 flag
    c->flags &= ~(REDIS_FORCE_AOF | REDIS_FORCE_REPL);
    redisOpArrayInit(&server.also_propagete);
    // 开始记录执行状态
    dirty = server.dirti;
    start = ustime();
    
    c->cmd->proc();
    
    duration = ustime() - start;
    dirty = server.dirty - dirty;
    if (dirty < 0) dirty = 0;
 
    // ...   
}

接下来保存调用状况,并且根据 flags 决定是否要执行一系列操作

  • slow log

    • 根据调用的耗时,决定是否将耗时较长的命令记录到 Slow log 中
  • propagete

    • 所谓的 propagete 就是:
    • 是否执行 AOF 将当前命令的记录下来
    • 是否需要将命令分发到 Slaves,让他们执行相同的命令,以保持主从一致的 状态

命令指针列表

上述分析完毕,我们开始进入具体的 Redis 命令,因为上文的 c->cmd->proc() 已经正式执行了 client 发送过来的命令。 首先我们看看共有哪些命令, 这些命令分别包含了些什么信息,从之前的分析我们首先可以得到,命令会说明自己的参数个数、函数指针等信息。

struct redisCommand redisCommandTable[] = {
    {"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},
    {"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},
    {"setnx",setnxCommand,3,"wmF",0,NULL,1,1,1,0,0},
    {"setex",setexCommand,4,"wm",0,NULL,1,1,1,0,0},
    {"psetex",psetexCommand,4,"wm",0,NULL,1,1,1,0,0},
    {"append",appendCommand,3,"wm",0,NULL,1,1,1,0,0},
    {"strlen",strlenCommand,2,"rF",0,NULL,1,1,1,0,0},
    {"del",delCommand,-2,"w",0,NULL,1,-1,1,0,0},
    {"exists",existsCommand,-2,"rF",0,NULL,1,-1,1,0,0},
    {"setbit",setbitCommand,4,"wm",0,NULL,1,1,1,0,0},
    {"getbit",getbitCommand,3,"rF",0,NULL,1,1,1,0,0},
    {"setrange",setrangeCommand,4,"wm",0,NULL,1,1,1,0,0},
    {"getrange",getrangeCommand,4,"r",0,NULL,1,1,1,0,0},
    {"substr",getrangeCommand,4,"r",0,NULL,1,1,1,0,0},
    {"incr",incrCommand,2,"wmF",0,NULL,1,1,1,0,0},
    {"decr",decrCommand,2,"wmF",0,NULL,1,1,1,0,0},
    {"mget",mgetCommand,-2,"r",0,NULL,1,-1,1,0,0},
    {"rpush",rpushCommand,-3,"wmF",0,NULL,1,1,1,0,0},
    {"lpush",lpushCommand,-3,"wmF",0,NULL,1,1,1,0,0},
    {"rpushx",rpushxCommand,3,"wmF",0,NULL,1,1,1,0,0},
    {"lpushx",lpushxCommand,3,"wmF",0,NULL,1,1,1,0,0},
    {"linsert",linsertCommand,5,"wm",0,NULL,1,1,1,0,0},
    {"rpop",rpopCommand,2,"wF",0,NULL,1,1,1,0,0},
    {"lpop",lpopCommand,2,"wF",0,NULL,1,1,1,0,0},
    {"brpop",brpopCommand,-3,"ws",0,NULL,1,1,1,0,0},
    {"brpoplpush",brpoplpushCommand,4,"wms",0,NULL,1,2,1,0,0},
    {"blpop",blpopCommand,-3,"ws",0,NULL,1,-2,1,0,0},
    {"llen",llenCommand,2,"rF",0,NULL,1,1,1,0,0},
    {"lindex",lindexCommand,3,"r",0,NULL,1,1,1,0,0},
    {"lset",lsetCommand,4,"wm",0,NULL,1,1,1,0,0},
    {"lrange",lrangeCommand,4,"r",0,NULL,1,1,1,0,0},
    {"ltrim",ltrimCommand,4,"w",0,NULL,1,1,1,0,0},
    {"lrem",lremCommand,4,"w",0,NULL,1,1,1,0,0},
    // 由于太多,所以我们暂时只列出一部分...
};

上面包括了常用的部分 redis 命令,我们选第一个最常用的 get 命令开始说明,首先上面列表是 redisCommand 的数组,而 redisCommand 我们之前已经说明过了,这里再针对上面列出的各个信息进行简单说明。

struct redisCommand {
    char *name;
    redisCommandProc *proc;
    int arity;
    char *sflags;
    int flags;
    redisGetKeysProc *getkeys_proc;
    int firstkey;
    int lastkey;
    int keystep;
    long long microseconds, calls;
};

从上面的结构定义,再代入

{"get",getCommand,2,"rF",0,NULL,1,1,1,0,0}, 

即可得到:

get 是命令的名称,

getCommand 是具体调用的函数指针,

arity 这个命令需要 2 个参数,

rF 是他的标签,接下来的 0 是 flags 字段,他会由 redis 根据标签计算得出具体的值

接下来是 firstkeylastkey, keystep 三个字段,他们分别表示,从第几个参数索引开始是对应到命令的具体参数,最后一个索引又是多少,每隔多少个 argv 对应一个参数

最后的 microseconds, calls 则是用来分析所用,他们保存了该命令总共调用了多少次,调用了这么多次共耗时多少。

最后,在开始执行对应的命令前,附上 redis 对 flag 的说明:

 /*
 * This is the meaning of the flags:
 *
 * w: write command (may modify the key space).
 * r: read command  (will never modify the key space).
 * m: may increase memory usage once called. Don't allow if out of memory.
 * a: admin command, like SAVE or SHUTDOWN.
 * p: Pub/Sub related command.
 * f: force replication of this command, regardless of server.dirty.
 * s: command not allowed in scripts.
 * R: random command. Command is not deterministic, that is, the same command
 *    with the same arguments, with the same key space, may have different
 *    results. For instance SPOP and RANDOMKEY are two random commands.
 * S: Sort command output array if called from script, so that the output
 *    is deterministic.
 * l: Allow command while loading the database.
 * t: Allow command while a slave has stale data but is not allowed to
 *    server this data. Normally no command is accepted in this condition
 *    but just a few.
 * M: Do not automatically propagate the command on MONITOR.
 * k: Perform an implicit ASKING for this command, so the command will be
 *    accepted in cluster mode if the slot is marked as 'importing'.
 * F: Fast command: O(1) or O(log(N)) command that should never delay
 *    its execution as long as the kernel scheduler is giving us time.
 *    Note that commands that may trigger a DEL as a side effect (like SET)
 *    are not fast commands.
 */

GET/SET

GET

{ "get", getCommand, 2, "rF", 0, NULL, 1, 1, 1, 0, 0 } 名称: get 调用: getCommand 参数个数: 2 标志: rF Read And Fast 参数从 1 开始,到 1 结束,步进为 1 后续的接口不再逐个说明,但会列出结构

get 命令从 redis 中根据 key 值获取对应的 value, key 跟 value 都必须是字符串。

void getCommand(redisClient *c) {
    getGenericCommand(c);
}

int getGenericCommand(redisClient *c) {
    robj *o;
    
    if ((o == lookupKeyReadOrReply(c, c->argv[1], shared.nullbulk)) == NULL)
        return REDIS_OK;
        
    if (o->type != REDIS_STRING) {
        addReply(c, shared.wrongtypeerr);
        return REDIS_ERR;
    }
    else {
        addReply(c, o);
        return REDIS_OK;
    }
}

从上面可以大致看出,搜索一个 key 的操作被封装在了 lookupKeyReadOrReply,同时可以看到的是 argv[1] 确实是作为参数传递给了 lookupKeyReadOrReply, 在搜完之后对得到的结果的类型进行验证,因为 Get 支持设置 String。接着是看看 lookupKeyReadOrReply

robj *lookupKeyReadOrReply(redisClient *c, robj *key, robj *reply) {
    robj *o = lookupKeyRead(c->db, key);
    if (!o) addReply(c, reply);
    return o;
}

int expireIfNeeded(redisDb *db, robj *key) {
    mstime_t when = getExpire(db, key);
    mstime_t now;
    
    if (when < 0) return 0;
    now = server.lua_caller ? server.lua_time_start : mstime();
    
    if (server.masterhost != NULL) return now > when;
    
    if (now <= when) return 0;
    
    server.stat_expiredKeys++;
    propagateExpire(db, key);
    notifyKeyspaceEvent(REDIS_NOTIFY_EXPIRED,
        "expired", key, db->id);
    return dbDelete(db, key);
}

robj *lookupKeyRead(redisDb *db, robj *key) {
    robj *val;
    expireIfNeeded(db, key);
    val = lookupKey(db, key);
    if (val == NULL)
        server.stat_keyspace_misses++;
    else
        server.stat_keyspace_hits++;
    return val;
}

具体实现的 lookupKeyRead 首先检查了当前需要检索的 key 是否已经过期,如果还没过期或者根本没有设置过期则直接返回,否则将过期的 key 传播到其他的 slaves, 最后将其从数据库中删除。 在处理 propagateExpire 时还会根据是否开启了 AOF,如果开启了,则将操作记录到 AOF 文件中。 最后是具体的查找动作

robj *lookupKey(redisDb *db, robj *key) {
    dictEntry *de = dictFind(db->dict, key->prt);
    if (de) {
        robj *val = dictGetVal(de);
        
        if (server.rdb_child_pid == -1 && server.aof_child_pid == -1)
            val->lru = LRU_CLOCK();
        return val;
    }
    else {
        return NULL;
    }
}

最终的查找动作落在了 dictFind 身上,也就是直接进行了一次 Hash 查找,具体的实现在 dict.h 我们已经详细说明过了。 在查到目标 value 后,如果当前没有在后台处理 rdb 或 aof 机制,则更新 value 对应的 lru,可用于内存满载时的调整机制。

REPLY Preview

在上面我们看到了 addReply 调用,这是 redis 用来回复信息给客户端使用的接口,在这里我们先简单的理解为,调用这个接口后,相应的数据就会回复给客户端即可,因为这里面涉及的东西较多,也跟具体的 get 逻辑无关,所以后面再另开篇章描述。

MGET

{ "mget", mgetCommand, -2, "r", 0, NULL, 1, -1, 1, 0, 0 }

跟 get 一样,但是支持同时获取多个 key 的值。其实就是重复实现 m 次 get 的逻辑,不同的是,对于这种多返回值的操作,redis 会先返回即将返回的值的个数,然后再逐个返回,简化了服务端的处理,也给了客户端较大的自由度来组织数据。

void mgetCommand(redisClient *c) {
    int j;
    addReplyMultiBulkLen(c, c->argc - 1);
    for (j = 1; j < c->argc; j++) {
        robj *o = lookupKeyRead(c->db, c->argv[j]);
        if (o == NULL) {
            addReply(c, shared.nullbulk);
        }
        else {
            if (o->type != REDIS_STRING) {
                addReply(c, shared.nullbulk);
            }
            else {
                addReplyBulk(c, o);
            }
        }
    }
}

SET

{ "set", setCommand, -3, "wm", 0, NULL, 1, 1, 1, 0, 0 }

set 命令将 key 对应的 value 保存到 redis 中。同时还支持在设置的同时,设置 NX XX EX PX 标签,分别用于指示:

EX 以秒为单位设置超时时间 PX 以毫秒为单位,设置超时时间 XX 当数据库中存在 key 时才进行 set 操作 NX 当数据库中不存在 key 时才进行 set 操作

void setCommand(redisClient *c) {
    int j;
    robj *expire = NULL;
    int unit = UNIT_SECONDS;
    int flags = REDIS_SET_NO_FLAGS;
    
    // 获取额外的操作标签,即上面提到的 EX、PX 等
    for (j = 3; j < c->argc; j++) {
        char *a = c->argv[j]->ptr;
        robj *next = (j == c->argc - 1) ? NULL : c->argv[j + 1];
        // 处理 NX
        if ((a[0] == 'n' || a[0] == 'N') &&
            (a[1] == 'x' || a[1] == 'X') && a[2] == '\0') {
            flags |= REDIS_SET_NX;
        }
        // 处理 XX
        else if ((a[0] == 'x' || a[0] == 'X') &&
                 (a[1] == 'x' || a[1] == 'X') && a[2] == '\0') {
            flags |= REDIS_SET_XX;       
        }
        // 处理 EX 超时
        else if ((a[0] == 'e' || a[0] == 'E') &&
                 (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' && next) {
            unit = UNIT_SECONDS;
            expire = next;
            j++;        
        }
        else if ((a[0] == 'p' || a[0] == 'P') &&
                 (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' && next) {
            unit = UNIT_MILLSECONDS;
            expire = next;
            j++       
        }
        else {
            addReply(c, shared.syntaxerr);
            return;
        }        
    }   
    
    c->argv[2] = tryObjectEncoding(c->argv[2]);
    setGenericCommand(c, flags, c->argv[1], c->argv[2], expire, unit, NULL, NULL);
}

这里在进入真正的 Set 命令前,尝试将 argv[2] 进行了一次编码,这里的编码是之前说明过的,尝试编码为数字,短字符串跟长字符串等。 然后我们进入 set 的逻辑

void setGenericCommand(redisClient *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) {
    long long milliseconds = 0;
    
    if (expire) {
        // 从 obj 中获取对应的数值
        if (getLongLongFromObjectOrReply(c, expire, &milliseconds, NULL) != REDIS_OK)
            return;
        if (milliseconds <= 0) {
            addReplyErrorFormat(c,  "invalid expire time in %s", c->cmd->name);
            return;
        }
        // 如果是以 秒 为单位
        if (unit == UNIT_SECONDS) milliseconds *= 1000;
    }
    
    // 根据标签进行检查
    if ((flags & REDIS_SET_NX && lookupKeyWrite(c->db, key) != NULL) ||
        (flags & REDIS_SET_XX && lookupKeyWriete(c->db, key) == NULL)) {
        addReply(c, abort_reply ? abort_reply : shared.nullbulk);
        return;   
    }
    
    setKey(c->db, key, val);
    
    // 从这里可以判断,dirty 是用来记录数据在上次静态化到现在之间被修改的次数
    server.dirty++;
    // 这里则可以窥探到 过期机制
    if (expire) setExpire(c->db, key, mstime() + milliseconds);    
    notifyKeyspaceEvent(REDIS_NOTIFY_STRING, "set", key, c->db->id);
    if (expire) notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,
                    "expire",
                    key,
                    c->db->id);
    addReply(c, ok_reply ? ok_reply : shared.ok);
}

上面主要的逻辑是 setKey 它负责具体保存 key 到 redis 中,而这里的两个 notify 则会将状态改变的消息推送给订阅了对应的 key 状态提醒的用户。这里面主要只是构建一个推送的 key,然后通过 pubsub 接口推送,我们在介绍 pubsub 时再进行说明。

void setKey(redisDb *db, robj *key, robj *val) {
    // 查找 key 是否存在,存在则覆盖,不存在则添加
    // 这里的 lookupKeyWrite 跟之前的 lookupKeyRead 没什么区别
    if (lookupKeyWrite(db, key) == NULL) {
        dbAdd(db, key, val);
    }
    else {
        dbOverwrite(db, key, val);
    }
    
    // 添加引用
    incrRefCount(val);
    // 如果已过期,则删除过期的 key
    removeExpire(db, key);
    singalModifiedKey(db, key);
}

void dbAdd(redisDb *db, robj *key, robj *val) {
    sds copy = sdsdup(key->ptr);
    int retval = dictAdd(db->dict, copy, val);
    
    redisAsservWithInfo(NULL, key, retval == REDIS_OK);
    if (val->type == REDIS_LIST) signalListAsReady(db, key);
    if (server.cluster_enabled) slotToKeyAdd(key);
}

dbAdd 只是简单的调用了 dict 对应的函数,这里要注意的只有 signalListAsReady, 因为 redis 提供了堵塞的相关接口,如 lpushx, 他只会在 key 对应的 list 已经存在时才能 push 成功,所以这里的 signalListAsReady 则负责将当前的 key 加到 ready_keys 中,好让 redis 能通知客户端 lpushx 的结果。

接下来是覆盖操作

void dbOverwrite(redisDb *db, robj *key, robj *val) {
    dictEntry *de = dictFind(db->dict, key->prt);
    redisAssertWithInfo(NULL, key, de != NULL);
    dictReplace(db->dict, key->prt, val);
}

也只是简单的调用了 dict 的接口,并做了一些 assert。

SETNX

跟 set 同效,但只有当对应的 key 不存在时,才会 set 成功,具体的实现跟 set 并无二致,只是在调用前设置了对应的 flag: REDIS_SET_NX

void setnxCommand(redisClient *c) {
    c->argv[2] = tryObjectEncoding(c->argv[2]);
    setGenericCommand(c, REDIS_SET_NX, c->argv[1], c->argv[2], NULL, 0, shared.cone, shared.czero);
}

SETEX

跟 set 同效,在设置 key 对应的 value 时,同时还会设置过期时间。

void setexCommand(redisClient *c) {
    c->argv[3] = tryObjectEncoding(c->argv[3]);
    setGenericCommand(c, REDIS_NO_FLAGS, c->argv[1], c->argv[3, c->argv[2], UNIT_SECONDS, NULL, NULL);
}

MSET / MSETNX

批量设置新的键值对

void msetCommand(redisClient *c) {
    msetGenericCommand(c, 0);
}

void msetnxCommand(redisClient *c) {
    msetGenericCommand(c, 1);
}

void msetGenericCommand(redisClient *c, int nx) { 
    int j, busykeys = 0;
    
    if ((c->argc % 2) == 0) {
        addReplyError(c, "wrong number of arguments for MSET");
        return;
    }
    
    // 如果设置了 nx 标志,则说明所有需要设置的 key 都不可以存在于 redis 中
    if (nx) {
        for (j = 1; j < c->argc; j += 2) {
            if (lookupKeyWrite(c->db, c->argv[j]) != NULL) {
                busykeys++;
            }
        }
        if (busykeys) {
            addReply(c, shared.czero);
        }
    }
    
    for (j = 1; j < c->argc; j += 2) {
        c->argv[j + 1] = tryObjectEncoding(c->argv[j + 1]);
        setKey(c->db, c->argv[j], c->argv[j + 1]);
        notifyKeyspaceEvent(REDIS_NOTIFY_STRING, "set",
            c->argv[j], d->db->id);
    }
    
    server.dirty += (c->argc - 1) / 2;
    addReply(c, nx ? shared.cone : shared.ok);
}

上面的 msetCommand 转调的是 msetGenericCommand,同时传递了 0 作为 nx 参数,而在 msetGenericCommand 中 nx 是作为一个标识符,用于指示即将插入的键值对能否已存在 redis 之中。 0 则表示不做限制。

而对于 msetnxCommand 来说则需要限制,所以传递了 1。

BASIC

下面列出的是跟类型无关的基础指令

DEL

删除 key 对应的 value,主要的实现为,接收变长的参数列表,列表中都为 redis 中的 key, 把所有的 key 都从 redis 中删掉,然后提醒订阅了这些 key 状态的客户端

void delCommand(redisClient *c) {
    int deleted = 0, j;
    
    for (j = 1; j < c->argc; j++) {
        expireIfNeeded(c->db, c->argv[j]);
        if (dbDelete(c->db, c->argv[j])) {
            // 如果删除成功,则提醒订阅了这些 key 的客户端
            signalModifiedKey(c->db, c->argv[j]);
            notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,
                "del", c->argv[j], c->db->id);
            server.dirty++;
            deleted++;
        }
    }
    addReplyLongLong(c, deleted);
}

EXISTS

判断对应的 key 是否存在于 redis 中。

void existsCommand(redisClient *c) {
    long long count = 0;
    int j;
    
    for (j = 1; j < c->argc; j++) {
        expireIfNeeded(c->db, c->argv[j]);
        if (dbExists(c->db, c->argv[j])) count++;
    }
    
    addReplyLongLong(c, count);
}

PING

ECHO

AUTO

SHUTDOWN

BGSAVE

LIST

LPUSH/LPOP (LPUSHX/LPOPX)

RPUSH/RPOP (RPUSHX/RPOPX)

LINSERT

LLEN

LINDEX

LRANGE

LTRIM

LSET

STRING

SET