Redis 内容篇
[TOC]
执行命令
上文回顾 && 执行逻辑
在前面我们分析了解析部分的内容,解析完所有的信息后,最终我们来到了 processCommand
这个函数的最后,
void processCommand(redisClient *c) {
// ...
// Exec the command
if (c->flags & REDIS_MULTI &&
c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
c->cmd->proc != multiCommand && c->cmd-proc != watchCommand) {
queueMultiCommand(c);
addReply(c, shared.queued);
}
else {
call(c, REDIS_CALL_FULL);
c->woff = server.master_repl_offset;
if (listLength(server.ready_keys))
handleClientBlockedOnLists();
}
}
从上面可以看到,最终执行命令,是由 call
这个调用来实现的,所以我们跟进去看看它到底做了什么。
void call(redisClient *c, int flags) {
long long dirty, start, duration;
int client_old_flags = c->flags;
// 如果有监控器在监控当前服务,则将当前要执行的命令信息发送给监控器
if (listLength(server.monitors) &&
!server.loading &&
!(c->cmd->flags & (REDIS_CMD_SKIP_MONITOR | REDIS_CMD_ADMIN))) {
replicationFeedMonitors(c, server.monitors, c->db->id, c->argv, c->argc);
}
// 清除除了 FORCE_AOF 跟 FORCE_REPL 以外的 flag
c->flags &= ~(REDIS_FORCE_AOF | REDIS_FORCE_REPL);
redisOpArrayInit(&server.also_propagete);
// 开始记录执行状态
dirty = server.dirti;
start = ustime();
c->cmd->proc();
duration = ustime() - start;
dirty = server.dirty - dirty;
if (dirty < 0) dirty = 0;
// ...
}
接下来保存调用状况,并且根据 flags 决定是否要执行一系列操作
-
slow log
- 根据调用的耗时,决定是否将耗时较长的命令记录到 Slow log 中
-
propagete
- 所谓的 propagete 就是:
- 是否执行 AOF 将当前命令的记录下来
- 是否需要将命令分发到 Slaves,让他们执行相同的命令,以保持主从一致的 状态
命令指针列表
上述分析完毕,我们开始进入具体的 Redis 命令,因为上文的 c->cmd->proc()
已经正式执行了 client
发送过来的命令。
首先我们看看共有哪些命令, 这些命令分别包含了些什么信息,从之前的分析我们首先可以得到,命令会说明自己的参数个数、函数指针等信息。
struct redisCommand redisCommandTable[] = {
{"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},
{"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},
{"setnx",setnxCommand,3,"wmF",0,NULL,1,1,1,0,0},
{"setex",setexCommand,4,"wm",0,NULL,1,1,1,0,0},
{"psetex",psetexCommand,4,"wm",0,NULL,1,1,1,0,0},
{"append",appendCommand,3,"wm",0,NULL,1,1,1,0,0},
{"strlen",strlenCommand,2,"rF",0,NULL,1,1,1,0,0},
{"del",delCommand,-2,"w",0,NULL,1,-1,1,0,0},
{"exists",existsCommand,-2,"rF",0,NULL,1,-1,1,0,0},
{"setbit",setbitCommand,4,"wm",0,NULL,1,1,1,0,0},
{"getbit",getbitCommand,3,"rF",0,NULL,1,1,1,0,0},
{"setrange",setrangeCommand,4,"wm",0,NULL,1,1,1,0,0},
{"getrange",getrangeCommand,4,"r",0,NULL,1,1,1,0,0},
{"substr",getrangeCommand,4,"r",0,NULL,1,1,1,0,0},
{"incr",incrCommand,2,"wmF",0,NULL,1,1,1,0,0},
{"decr",decrCommand,2,"wmF",0,NULL,1,1,1,0,0},
{"mget",mgetCommand,-2,"r",0,NULL,1,-1,1,0,0},
{"rpush",rpushCommand,-3,"wmF",0,NULL,1,1,1,0,0},
{"lpush",lpushCommand,-3,"wmF",0,NULL,1,1,1,0,0},
{"rpushx",rpushxCommand,3,"wmF",0,NULL,1,1,1,0,0},
{"lpushx",lpushxCommand,3,"wmF",0,NULL,1,1,1,0,0},
{"linsert",linsertCommand,5,"wm",0,NULL,1,1,1,0,0},
{"rpop",rpopCommand,2,"wF",0,NULL,1,1,1,0,0},
{"lpop",lpopCommand,2,"wF",0,NULL,1,1,1,0,0},
{"brpop",brpopCommand,-3,"ws",0,NULL,1,1,1,0,0},
{"brpoplpush",brpoplpushCommand,4,"wms",0,NULL,1,2,1,0,0},
{"blpop",blpopCommand,-3,"ws",0,NULL,1,-2,1,0,0},
{"llen",llenCommand,2,"rF",0,NULL,1,1,1,0,0},
{"lindex",lindexCommand,3,"r",0,NULL,1,1,1,0,0},
{"lset",lsetCommand,4,"wm",0,NULL,1,1,1,0,0},
{"lrange",lrangeCommand,4,"r",0,NULL,1,1,1,0,0},
{"ltrim",ltrimCommand,4,"w",0,NULL,1,1,1,0,0},
{"lrem",lremCommand,4,"w",0,NULL,1,1,1,0,0},
// 由于太多,所以我们暂时只列出一部分...
};
上面包括了常用的部分 redis
命令,我们选第一个最常用的 get
命令开始说明,首先上面列表是 redisCommand
的数组,而 redisCommand
我们之前已经说明过了,这里再针对上面列出的各个信息进行简单说明。
struct redisCommand {
char *name;
redisCommandProc *proc;
int arity;
char *sflags;
int flags;
redisGetKeysProc *getkeys_proc;
int firstkey;
int lastkey;
int keystep;
long long microseconds, calls;
};
从上面的结构定义,再代入
{"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},
即可得到:
get
是命令的名称,
getCommand
是具体调用的函数指针,
arity
这个命令需要 2 个参数,
rF
是他的标签,接下来的 0 是 flags 字段,他会由 redis 根据标签计算得出具体的值接下来是
firstkey
、lastkey
,keystep
三个字段,他们分别表示,从第几个参数索引开始是对应到命令的具体参数,最后一个索引又是多少,每隔多少个 argv 对应一个参数最后的 microseconds, calls 则是用来分析所用,他们保存了该命令总共调用了多少次,调用了这么多次共耗时多少。
最后,在开始执行对应的命令前,附上 redis
对 flag 的说明:
/*
* This is the meaning of the flags:
*
* w: write command (may modify the key space).
* r: read command (will never modify the key space).
* m: may increase memory usage once called. Don't allow if out of memory.
* a: admin command, like SAVE or SHUTDOWN.
* p: Pub/Sub related command.
* f: force replication of this command, regardless of server.dirty.
* s: command not allowed in scripts.
* R: random command. Command is not deterministic, that is, the same command
* with the same arguments, with the same key space, may have different
* results. For instance SPOP and RANDOMKEY are two random commands.
* S: Sort command output array if called from script, so that the output
* is deterministic.
* l: Allow command while loading the database.
* t: Allow command while a slave has stale data but is not allowed to
* server this data. Normally no command is accepted in this condition
* but just a few.
* M: Do not automatically propagate the command on MONITOR.
* k: Perform an implicit ASKING for this command, so the command will be
* accepted in cluster mode if the slot is marked as 'importing'.
* F: Fast command: O(1) or O(log(N)) command that should never delay
* its execution as long as the kernel scheduler is giving us time.
* Note that commands that may trigger a DEL as a side effect (like SET)
* are not fast commands.
*/
GET/SET
GET
{ "get", getCommand, 2, "rF", 0, NULL, 1, 1, 1, 0, 0 }
名称: get 调用: getCommand 参数个数: 2 标志: rF Read And Fast 参数从 1 开始,到 1 结束,步进为 1 后续的接口不再逐个说明,但会列出结构
get 命令从 redis
中根据 key 值获取对应的 value, key 跟 value 都必须是字符串。
void getCommand(redisClient *c) {
getGenericCommand(c);
}
int getGenericCommand(redisClient *c) {
robj *o;
if ((o == lookupKeyReadOrReply(c, c->argv[1], shared.nullbulk)) == NULL)
return REDIS_OK;
if (o->type != REDIS_STRING) {
addReply(c, shared.wrongtypeerr);
return REDIS_ERR;
}
else {
addReply(c, o);
return REDIS_OK;
}
}
从上面可以大致看出,搜索一个 key 的操作被封装在了 lookupKeyReadOrReply
,同时可以看到的是 argv[1] 确实是作为参数传递给了 lookupKeyReadOrReply
, 在搜完之后对得到的结果的类型进行验证,因为 Get 支持设置 String。接着是看看 lookupKeyReadOrReply
robj *lookupKeyReadOrReply(redisClient *c, robj *key, robj *reply) {
robj *o = lookupKeyRead(c->db, key);
if (!o) addReply(c, reply);
return o;
}
int expireIfNeeded(redisDb *db, robj *key) {
mstime_t when = getExpire(db, key);
mstime_t now;
if (when < 0) return 0;
now = server.lua_caller ? server.lua_time_start : mstime();
if (server.masterhost != NULL) return now > when;
if (now <= when) return 0;
server.stat_expiredKeys++;
propagateExpire(db, key);
notifyKeyspaceEvent(REDIS_NOTIFY_EXPIRED,
"expired", key, db->id);
return dbDelete(db, key);
}
robj *lookupKeyRead(redisDb *db, robj *key) {
robj *val;
expireIfNeeded(db, key);
val = lookupKey(db, key);
if (val == NULL)
server.stat_keyspace_misses++;
else
server.stat_keyspace_hits++;
return val;
}
具体实现的 lookupKeyRead
首先检查了当前需要检索的 key 是否已经过期,如果还没过期或者根本没有设置过期则直接返回,否则将过期的 key 传播到其他的 slaves, 最后将其从数据库中删除。
在处理 propagateExpire
时还会根据是否开启了 AOF,如果开启了,则将操作记录到 AOF 文件中。
最后是具体的查找动作
robj *lookupKey(redisDb *db, robj *key) {
dictEntry *de = dictFind(db->dict, key->prt);
if (de) {
robj *val = dictGetVal(de);
if (server.rdb_child_pid == -1 && server.aof_child_pid == -1)
val->lru = LRU_CLOCK();
return val;
}
else {
return NULL;
}
}
最终的查找动作落在了 dictFind
身上,也就是直接进行了一次 Hash 查找,具体的实现在 dict.h
我们已经详细说明过了。
在查到目标 value 后,如果当前没有在后台处理 rdb 或 aof 机制,则更新 value 对应的 lru,可用于内存满载时的调整机制。
REPLY Preview
在上面我们看到了 addReply
调用,这是 redis 用来回复信息给客户端使用的接口,在这里我们先简单的理解为,调用这个接口后,相应的数据就会回复给客户端即可,因为这里面涉及的东西较多,也跟具体的 get 逻辑无关,所以后面再另开篇章描述。
MGET
{ "mget", mgetCommand, -2, "r", 0, NULL, 1, -1, 1, 0, 0 }
跟 get 一样,但是支持同时获取多个 key 的值。其实就是重复实现 m 次 get 的逻辑,不同的是,对于这种多返回值的操作,redis 会先返回即将返回的值的个数,然后再逐个返回,简化了服务端的处理,也给了客户端较大的自由度来组织数据。
void mgetCommand(redisClient *c) {
int j;
addReplyMultiBulkLen(c, c->argc - 1);
for (j = 1; j < c->argc; j++) {
robj *o = lookupKeyRead(c->db, c->argv[j]);
if (o == NULL) {
addReply(c, shared.nullbulk);
}
else {
if (o->type != REDIS_STRING) {
addReply(c, shared.nullbulk);
}
else {
addReplyBulk(c, o);
}
}
}
}
SET
{ "set", setCommand, -3, "wm", 0, NULL, 1, 1, 1, 0, 0 }
set
命令将 key 对应的 value 保存到 redis 中。同时还支持在设置的同时,设置 NX XX EX PX 标签,分别用于指示:
EX 以秒为单位设置超时时间 PX 以毫秒为单位,设置超时时间 XX 当数据库中存在 key 时才进行 set 操作 NX 当数据库中不存在 key 时才进行 set 操作
void setCommand(redisClient *c) {
int j;
robj *expire = NULL;
int unit = UNIT_SECONDS;
int flags = REDIS_SET_NO_FLAGS;
// 获取额外的操作标签,即上面提到的 EX、PX 等
for (j = 3; j < c->argc; j++) {
char *a = c->argv[j]->ptr;
robj *next = (j == c->argc - 1) ? NULL : c->argv[j + 1];
// 处理 NX
if ((a[0] == 'n' || a[0] == 'N') &&
(a[1] == 'x' || a[1] == 'X') && a[2] == '\0') {
flags |= REDIS_SET_NX;
}
// 处理 XX
else if ((a[0] == 'x' || a[0] == 'X') &&
(a[1] == 'x' || a[1] == 'X') && a[2] == '\0') {
flags |= REDIS_SET_XX;
}
// 处理 EX 超时
else if ((a[0] == 'e' || a[0] == 'E') &&
(a[1] == 'x' || a[1] == 'X') && a[2] == '\0' && next) {
unit = UNIT_SECONDS;
expire = next;
j++;
}
else if ((a[0] == 'p' || a[0] == 'P') &&
(a[1] == 'x' || a[1] == 'X') && a[2] == '\0' && next) {
unit = UNIT_MILLSECONDS;
expire = next;
j++
}
else {
addReply(c, shared.syntaxerr);
return;
}
}
c->argv[2] = tryObjectEncoding(c->argv[2]);
setGenericCommand(c, flags, c->argv[1], c->argv[2], expire, unit, NULL, NULL);
}
这里在进入真正的 Set 命令前,尝试将 argv[2]
进行了一次编码,这里的编码是之前说明过的,尝试编码为数字,短字符串跟长字符串等。
然后我们进入 set 的逻辑
void setGenericCommand(redisClient *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) {
long long milliseconds = 0;
if (expire) {
// 从 obj 中获取对应的数值
if (getLongLongFromObjectOrReply(c, expire, &milliseconds, NULL) != REDIS_OK)
return;
if (milliseconds <= 0) {
addReplyErrorFormat(c, "invalid expire time in %s", c->cmd->name);
return;
}
// 如果是以 秒 为单位
if (unit == UNIT_SECONDS) milliseconds *= 1000;
}
// 根据标签进行检查
if ((flags & REDIS_SET_NX && lookupKeyWrite(c->db, key) != NULL) ||
(flags & REDIS_SET_XX && lookupKeyWriete(c->db, key) == NULL)) {
addReply(c, abort_reply ? abort_reply : shared.nullbulk);
return;
}
setKey(c->db, key, val);
// 从这里可以判断,dirty 是用来记录数据在上次静态化到现在之间被修改的次数
server.dirty++;
// 这里则可以窥探到 过期机制
if (expire) setExpire(c->db, key, mstime() + milliseconds);
notifyKeyspaceEvent(REDIS_NOTIFY_STRING, "set", key, c->db->id);
if (expire) notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,
"expire",
key,
c->db->id);
addReply(c, ok_reply ? ok_reply : shared.ok);
}
上面主要的逻辑是 setKey
它负责具体保存 key 到 redis 中,而这里的两个 notify 则会将状态改变的消息推送给订阅了对应的 key 状态提醒的用户。这里面主要只是构建一个推送的 key,然后通过 pubsub
接口推送,我们在介绍 pubsub 时再进行说明。
void setKey(redisDb *db, robj *key, robj *val) {
// 查找 key 是否存在,存在则覆盖,不存在则添加
// 这里的 lookupKeyWrite 跟之前的 lookupKeyRead 没什么区别
if (lookupKeyWrite(db, key) == NULL) {
dbAdd(db, key, val);
}
else {
dbOverwrite(db, key, val);
}
// 添加引用
incrRefCount(val);
// 如果已过期,则删除过期的 key
removeExpire(db, key);
singalModifiedKey(db, key);
}
void dbAdd(redisDb *db, robj *key, robj *val) {
sds copy = sdsdup(key->ptr);
int retval = dictAdd(db->dict, copy, val);
redisAsservWithInfo(NULL, key, retval == REDIS_OK);
if (val->type == REDIS_LIST) signalListAsReady(db, key);
if (server.cluster_enabled) slotToKeyAdd(key);
}
dbAdd
只是简单的调用了 dict 对应的函数,这里要注意的只有
signalListAsReady
, 因为 redis 提供了堵塞的相关接口,如 lpushx
, 他只会在 key 对应的 list 已经存在时才能 push 成功,所以这里的 signalListAsReady
则负责将当前的 key 加到 ready_keys 中,好让 redis 能通知客户端 lpushx 的结果。
接下来是覆盖操作
void dbOverwrite(redisDb *db, robj *key, robj *val) {
dictEntry *de = dictFind(db->dict, key->prt);
redisAssertWithInfo(NULL, key, de != NULL);
dictReplace(db->dict, key->prt, val);
}
也只是简单的调用了 dict 的接口,并做了一些 assert。
SETNX
跟 set 同效,但只有当对应的 key 不存在时,才会 set 成功,具体的实现跟 set 并无二致,只是在调用前设置了对应的 flag: REDIS_SET_NX
void setnxCommand(redisClient *c) {
c->argv[2] = tryObjectEncoding(c->argv[2]);
setGenericCommand(c, REDIS_SET_NX, c->argv[1], c->argv[2], NULL, 0, shared.cone, shared.czero);
}
SETEX
跟 set 同效,在设置 key 对应的 value 时,同时还会设置过期时间。
void setexCommand(redisClient *c) {
c->argv[3] = tryObjectEncoding(c->argv[3]);
setGenericCommand(c, REDIS_NO_FLAGS, c->argv[1], c->argv[3, c->argv[2], UNIT_SECONDS, NULL, NULL);
}
MSET / MSETNX
批量设置新的键值对
void msetCommand(redisClient *c) {
msetGenericCommand(c, 0);
}
void msetnxCommand(redisClient *c) {
msetGenericCommand(c, 1);
}
void msetGenericCommand(redisClient *c, int nx) {
int j, busykeys = 0;
if ((c->argc % 2) == 0) {
addReplyError(c, "wrong number of arguments for MSET");
return;
}
// 如果设置了 nx 标志,则说明所有需要设置的 key 都不可以存在于 redis 中
if (nx) {
for (j = 1; j < c->argc; j += 2) {
if (lookupKeyWrite(c->db, c->argv[j]) != NULL) {
busykeys++;
}
}
if (busykeys) {
addReply(c, shared.czero);
}
}
for (j = 1; j < c->argc; j += 2) {
c->argv[j + 1] = tryObjectEncoding(c->argv[j + 1]);
setKey(c->db, c->argv[j], c->argv[j + 1]);
notifyKeyspaceEvent(REDIS_NOTIFY_STRING, "set",
c->argv[j], d->db->id);
}
server.dirty += (c->argc - 1) / 2;
addReply(c, nx ? shared.cone : shared.ok);
}
上面的 msetCommand
转调的是 msetGenericCommand
,同时传递了 0 作为 nx 参数,而在 msetGenericCommand
中 nx 是作为一个标识符,用于指示即将插入的键值对能否已存在 redis 之中。 0 则表示不做限制。
而对于 msetnxCommand
来说则需要限制,所以传递了 1。
BASIC
下面列出的是跟类型无关的基础指令
DEL
删除 key 对应的 value,主要的实现为,接收变长的参数列表,列表中都为 redis 中的 key, 把所有的 key 都从 redis 中删掉,然后提醒订阅了这些 key 状态的客户端
void delCommand(redisClient *c) {
int deleted = 0, j;
for (j = 1; j < c->argc; j++) {
expireIfNeeded(c->db, c->argv[j]);
if (dbDelete(c->db, c->argv[j])) {
// 如果删除成功,则提醒订阅了这些 key 的客户端
signalModifiedKey(c->db, c->argv[j]);
notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,
"del", c->argv[j], c->db->id);
server.dirty++;
deleted++;
}
}
addReplyLongLong(c, deleted);
}
EXISTS
判断对应的 key 是否存在于 redis 中。
void existsCommand(redisClient *c) {
long long count = 0;
int j;
for (j = 1; j < c->argc; j++) {
expireIfNeeded(c->db, c->argv[j]);
if (dbExists(c->db, c->argv[j])) count++;
}
addReplyLongLong(c, count);
}