Redis 内容篇
[TOC]
解析请求
在前文我们看到了当客户端发送命令过来时,是由 ae
调用 readQueryFromClient
进行处理,接着我们来看看这个函数是如何处理客户端发送过来的数据的。
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
int nread, readlen;
size_t qblen;
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);
server.current_client = c;
readlen = REDIS_IOBUF_LEN;
// 处理大数据包
if (c->reqtype == REDIS_REQ_MULTIBULD && c->multibulklen &&
c->bulklen != -1 && c->bulklen >= REDIS_MBULK_BIG_ARG) {
int remaining = (unsigned)(c->bulklen + 2) - sdslen(c->querybuf);
// 如果剩余的数据量少于默认的缓存数,则使用较小的缓存大小
if (remaining < readlen) readlen = remaining;
}
qblen = sdslen(c->querybuf);
if (c->querybuf_peak < qblen) c->querybuf_peek = qblen;
// 扩展缓存空间,以存放即将读取的数据
c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
// c->querybuf + qblen 是跳过已经使用的缓存内容
nread = read(fd, c->querybuf + qblen, readlen);
// 错误处理,如果因为信号中断,则设置 nread 为 0,并等待下次重读
// 如果远程客户端断开,则服务端也断开连接
if (nread == -1) {
if (errno == EAGAIN) {
nread = 0;
}
else {
readLog(REDIS_VERBOSE, "Reading from client: %s", strerrno(errno));
freeClient(c);
return;
}
}
else if(nread == 0) {
readisLog(REDIS_VERBOSE, "Client closed connection");
freeClient(c);
return;
}
if (nread) {
// 调整 querybuf 已使用的数据量
sdsIncrLen(c->querybuf, nread);
// 更新状态信息
c->lastinteraction = server.unixtime;
if (c->flags & REDIS_MASTER) c->reploff += nread;
server.stat_net_input_bytes += nread;
}
else {
server.current_client = NULL;
return;
}
// 如果当前客户端请求的缓存大于系统设置的上限,则中断此连接, 并输出当前连接信息
// 比如当前客户端是攻击者?
if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
sds ci = catClientInfoString(sdsempty(), c), bytes = sdsempty();
bytes = sdscatrepr(bytes, c->querybuf, 64);
redisLog(REDIS_WARNING, "Closing client that reached max query buffer length");
sdsfree(ci);
sdsfree(bytes);
freeClient(c);
return;
}
// 开始处理请求
processInputBuffer(c);
server.current_client = NULL;
}
上面使用了最正统的 socket 处理方式,读取数据,做出错误处理,验证客户端的安全性(缓存长度),然后把接收到的数据丢给 processInputBuffer
进行处理。
接下来我们看看它是怎么做的。
void processInputBuffer(redisClient *c) {
// 处理直到 buf 中没有数据
while (sdslen(c->querybuf)) {
// 如果是从服务器,并且由于某些原因处于暂停状态,则不处理客户端的命令
if (!(c->flags & REDIS_SLAVE) && clientsArePaused()) return;
// 如果当前客户端正在某个处理步骤的中间,则不处理命令
if (c->flags & REDIS_BLOCKED) return;
// 如果当前客户端在回复后就要关闭,则不处理命令
if (c->flags & REDIS_CLOSE_AFTER_REPLY) return;
// 确认当前客户端的请求模式, 单行或多行模式
if (!c->reqtype) {
if (c->querybuf[0] == '*') {
c->reqtype = REDIS_REQ_MULTIBULK;
}
else {
c->reqtype = REDIS_REQ_INLINE;
}
}
// 然后再根据模式处理接收到的数据
if (c->reqtype == REDIS_REQ_INLINE) {
if (processInlineBuffer(c) != REDIS_OK) break;
}
else if(c->reqtype == REDIS_REQ_MULTIBULK) {
if (processMultibulkBuffer(c) != REDIS_OK) break;
}
else {
redisPanic("Unknown request type");
}
// 如果是 Multibulk 模式,则可能会出现 argc <= 0 的情况
if (c->argc == 0) {
resetClient(c);
}
else {
// 开始处理命令
if (processCommand(c) == REDIS_OK)
resetClient(c);
}
}
}
在进入处理具体命令的逻辑之前,我们先看看,是如何组织命令信息的,Inline 跟 MultiBulk 两种方式分别实现在 processInlineBuffer
跟 processMultibulkBuffer
中,我们分别来看看。
解析单个命令请求
int processInlineBuffer(redisClient *c) {
char *newline;
int argc, j;
sds *argv, aux;
size_t querylen;
// 获取当前命令行
newline = strchr(c->querybuf, '\n');
// 错误处理
if (newline == NULL) {
if (sdslen(c->querybuf) > REDIS_INLINE_MAX_SIZE) {
addReplyError(c, "Protocol error: too big inline request");
setProtocoError(c, 0);
}
}
// 处理 \r\n 的情况,如果 \n 不是在 querybuf 的第一个字节且 \n 的前一个字节为 \r, 则 newline 回退一个字节
if (newline && newline != c->querybuf && *(newline - 1) == '\r')
newline--;
// 得到命令的长度
querylen = newline - (c->querybuf);
// 得到确切的命令, 按空格或 "" 进行分割,比如
// cmd "hello world" im sinsay
// 可得到 [ cmd, "hello world", im, sinsay ]
aux = sdsnewlen(c->querybuf, querylen);
argv = sdssplitargs(aux, &argc);
sdsfree(aux);
if (argv == NULL) {
addReplyError(c, "Protocol error: unblanced quotes in request");
setProtocolError(c, 0);
return REDIS_ERR;
}
// 从 slave 发过来的空的 newline 表示要刷新 ack 信息
if (querylen == 0 && c->flags & REDIS_SLAVE)
c->repl_ack_time = server.unixtime;
// 调整 querybuf, 使其跳过已获取到的 newline, 类似 substr 的操作
sdsrange(c->querybuf, querylen + 2, -1);
// 分配存放参数的内存块
if (argc) {
if (c->argc) zfree(c->argv);
c->argv = zmalloc(sizeof(robj*) * argc);
}
// 这里创建的 argv 每个都是属于 robj 类型
for (c->argc = 0, j = 0; j < argc; j++) {
if (sdslen(argv[j])) {
c->argv[c->argc] = createObject(REDIS_STRING, argv[j]);
c->argc++;
}
else {
sdsfree(argv[j]);
}
}
zfree(argv);
return REDIS_OK;
}
上面唯一要重视的就是,argv
数组中的参数都是通过 createObject
创建的 robj
类型。robj
的结构在上篇的末尾已经介绍过,但我们还不了解各个字段的含义,现在我们看看到底是如何创建这个结构的。
robj *createObject(int type, void *ptr) {
robj *o = zmalloc(sizeof(*o));
o->type = type;
o->encoding = REDIS_ENCODING_RAW;
o->ptr = ptr;
o->refcount = 1;
o->lru = LRU_CLOCK();
return o;
}
从上面的调用方式基本可以得到:
type 表示 obj 的类型,从定义我们可得到大致上有以下几种类型
#define REDIS_STRING 0
#define REDIS_LIST 1
#define REDIS_SET 2
#define REDIST_ZSET 3
#define REDIS_HASH 4
这不就是 redis 支持的数据结构么,OK。
encoding 这里看到的是 REDIS_ENCODING_RAW, 确定不了什么,再看看相关定义:
/* Objects encoding. Some kind of objects like String and Hashes can be
* internally represented in multiple ways. The `encoding` field of the object
* is set to one of this fields for this object */
#define REDIS_ENCODING_RAW 0 // Raw representation
#define REDIS_ENCODING_INT 1 // Encoded as integer
#define REDIS_ENCODING_HT 2 // Encoded as hash table
#define REDIS_ENCODING_ZIPMAP 3 // Encoded as zipmap
#define REDIS_ENCODING_LINKEDLIST 4 // Encoded as regular linked list
#define REDIS_ENCODING_ZIPLIST 5 // Encoded as ziplist
#define REDIS_ENCODING_INTSET 6 // Encoded as intset
#define REDIS_ENCODING_SKIPLIST 7 // Encoded as skiplist
#define REDIS_ENCODING_EMBSTR 8 // Embedded sds string encoding
再次可以确定,encoding 表示的是当前 obj 的具体实现,也就是这个 obj 可以是 REDIS_LIST 的类型,但他的具体实现可以使用 ZIPLIST 或 LINKEDLIST 之类的(我猜的)
而 ptr 就是具体实现的指针,具体能怎么使用这个 ptr, 取决于 type 跟 encoding.
refcount
跟 lru
就基本可以从字面意思看出,一个是用来共享变量的,一个是用来 Least Recently Used 算法,可能是用来排除长时间不使用的 key 之类的。
到了这一步, 已经解析好了所有由客户端发出来的命令,并打包到 client 的 argc 跟 argv 字段 中,时刻准备着处理 :)
解析多个命令请求
另外一种情况是 reqtype 是 MULTIBULK 的时候,需要处理多个命令,我们看看实现做了什么。
int processMultibulkBuffer(redisClient *c) {
chr *newline = NULL;
int pos = 0, ok;
long long ll;
// 获取当前请求是由多少个命令组成的
if (c->multibulklen == 0) {
redisAssertWithInfo(c, NULL, c->argc == 0);
newline = strchr(c->querybuf, '\r');
if (newline == NULL) {
// 异常处理
return REDIS_ERR;
}
// querybuf 应该还包含 \n,所以 -2, 这是确认长度是正确的
if (newline - (c->querybuf) > ((signed)sdslen(c->querybuf) - 2)) {
return REDIS_ERR;
}
redisAssertWithInfo(c, NULL, c->querybuf[0] == '*');
ok = string2ll(c->querybuf + 1, newline - (c->querybuf + 1), &ll);
...
}
}
Ok, 在继续下去之前,我们从最后一句得到一个信息,也就是命令的格式:
*NUMBER\r\n????
以 * 开头,以 \r\n 结尾的中间段,保存的是一个数字,用来表示之后的长度,具体是什么长度呢,继续分析。
// ...
// 长度不能超过 1024 * 1024
if (!ok || ll > 1024 * 1024) {
// 异常处理
return REDIS_ERR;
}
// pos 指向下一个命令的开始位置
pos = (newline - c->querybuf) + 2;
// 如果解析出来的命令数为0,则截断缓存后直接返回
if (ll <= 0) {
sdsrange(c->querybuf, pos, -1);
return REDIS_OK;
}
c->multibulklen = ll;
// 分配内存用于存放命令参数
if (c->argv) zfree(c->argv);
c->argv = zmalloc(sizeof(robj *) * c->multibulklen);
}
redisAssertWithInfo(c, NULL, c->multibulklen > 0);
// 更具刚刚得到的命令数,逐个获取命令信息
while (c->multibulklen) {
// 获取当前命令的长度
if (c->bulklen == -1) {
newline = strchr(c->querybuf + pos, '\r');
if (newline == NULL) {
// 错误处理
if (sdslen(c->querybuf) > REDIS_INLINE_MAX_SIZE) {
return REDIS_ERR;
}
break;
}
// 如果 newline 位于 querybuf 的开头,则说明没东西可以处理了
if (newline - (c->querybuf) > ((signed)sdslen(c->queryuf) - 2)) {
break;
}
// pos 现在当前命令的开始
// 确认当前命令以 $ 开头
if (c->querybuf[pos] != '$') {
// 错误处理
return REDIS_ERR;
}
ok = string2ll(c->querybuf + pos + 1, newline - (c->querybuf + pos + 1, &ll);
if (!ok || ll < 0 || ll > 512 * 1024 * 1024) {
return REDIS_ERR;
}
// 获取到长度信息,更新 pos,指向下一个命令的位置
pos += newline - (c->querybuf + pos) + 2;
if (ll >= REDIS_MBULK_BIG_ARG) {
size_t qblen;
// 如果现有的空间不够存放接下来的命令信息,则扩容 sds
sdsrange(c->querybuf, pos, -1);
pos = 0;
qblen = sdslen(c->querybuf);
if (qblen < (size_t)ll + 2)
c->querybuf = sdsMakeRoomFor(c->querybuf, ll + 2 - qblen);
}
c->bulklen = ll;
}
}
再次暂停,从上面的分析我们基本得到了 BULK 模式下的消息结构
*命令个数\r\n命令长度\r\n命令....
ok,可以继续了。
// 如果 buffer 的长度不足以构成一个包,则跳出处理流程
if (sdslen(c->querybuf) - pos < (unsigned)(c->bulklen + 2)) {
break;
}
else {
// 如果已经处理到了最后
if (pos == 0 && c->bulklen >= REDIS_MBULK_BIG_ARG &&
(signed) sdslen(c->querybuf) == c->bulklen + 2) {
c->argc[c->argc++] = createObject(REDIS_STRING, c->querybuf);
sdsIncrLen(c->querybuf, -2);
c->querybuf = sdsempty();
c->querybuf = sdsMakeRoomFor(c->querybuf,c->bulklen + 2);
pos = 0;
}
else {
c->argv[c->argc++] =
createStringObject(c->querybuf + pos, c->bulklen);
pos += c->bulklen + 2;
}
c->bulklen = -1;
c->multibulklen--;
}
if (pos) sdsrange(c->querybuf, pos, -1);
if (c->multibulklen == 0) return REDIS_OK;
return REDIS_ERR;
跟单行的请求一样,只是这里会同时处理多个请求,接下来我们可以真正的进入处理请求的流程了。
处理请求
上面的处理已经把所有的请求信息,都放进了当前 client 的 argc 跟 argv 中,接下来只需要遍历 argv 即可获取对应的参数跟数据进行处理了。
int processCommand(redisClient *c) {
// 如果客户端发过来的是 quit 命令,则直接回复退出成功,并把客户端标示为 REDIS_CLOSE_AFTER_REPLY
if (!strcasecmp(c->argv[0]->ptr, "quit")) {
addReply(c, shared.ok);
c->flags |= REDIS_CLOSE_AFTER_REPLY;
}
}
这时候我们可以确定刚刚处理命令时遇到的对 REDIS_CLOSE_AFTER_REPLY 的处理了,也就是说,如果客户已明确标示要断开连接了,那接下来所有的操作我们都可以直接忽略。 OK 继续。
// 根据用户发送的命令,获取具体的 cmd
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
// 然后是一系列的错误检查,如果没有找到对应的命令,则说明命令格式错误
// 然后如果参数的数量不一致的话,也视为出错
if (!c->cmd) {
flagTransction(c);
addReplyErrorFormat(c, "unknown command '%s'",
(char *)c->argv[0]->ptr);
return REDIS_OK;
}
else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
(c->argc < -c->cmd->arity)) {
flagTransaction(c);
addReplyErrorFormat(c, "wrong number of arguments for '%s' command",
c->cmd->name);
return REDIS_OK;
}
接下来就是具体处理命令了,在开始处理之前我们先看看 redisCommand 的具体定义
struct redisCommand {
// 命令名
char *name;
// 命令的函数指针
redisCommandProc *proc;
// 命令的参数数
int arity;
// 命令的标志,字符串表示形式,每个字符一个标示
char *sflags;
// 命令的标志,位模式,由 sflags 决定
int flags;
// 用于 Cluster 的转发,暂时不管
redisGetKeysProc *getkeys_proc;
// 以下几个不明,待分析
int firstkey;
int lastkey;
int keystep;
long long microseconds, calls;
}
typedef void redisCommandProc(redisClient *c);
然后继续处理命令
// 检查服务是否需要验证,如果需要验证,则检查是否已通过验证,因为还没通过验证的客户端,只接受验证命令 authCommand
if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand) {
flagTranscation(c);
addReply(c, shared.noautherr);
return REDIS_OK;
}
// 如果处于 Cluster 模式,则只处理以下两种情况
// 1. 命令是从 Master 发过来的
// 2. 命令没有参数
if (server.cluster_enabled &&
!(c->flags & REDIS_MASTER) &&
!(c->flags & REDIS_LUA_CLIENT &&
server.lua_caller->flags & REDIS_MASTER) &&
!(c->cmd->getkeys_proc == NULL && c->cmd->firstkey = 0)) {
int hashslot;
// 如果当前的 Cluster 模式是正常的,执行 Redirect 操作,
// 否则处理错误
if (server.cluster->state != REDIS_CLUSTER_OK) {
flagTranscation(c);
clusterRedirectClient(c, NULL, REDIS_CLUSTER_REDIR_DOWN_STATE);
return REDIS_OK;
}
else {
int error_code;
clusterNode *n = getNodeByQuery(c, c->cmd, c->argc, &hashslot, &error_code);
if (n == NULL || n != server.cluster->myself) {
flagTranscation(c);
clusterRedirectClient(c, n, hashslot, error_node);
return REDIS_OK;
}
}
}
// 如果设置了内存限制,则检查内存使用状况
if (server.maxmemory) {
// 这里开始执行内存回收策略,遍历所有的 DB,根据 DB 配置的策略选择对应的 key 进行回收
int retval = freeMemoryIfNeeded();
if ((c->cmd->flags & REDIS_CMD_DENYOOM) && retval == REDIS_ERR) {
flagTranscation(c);
addReply(c, shared.oomerr);
return REDIS_OK;
}
}
// 当 bgsave 或者 aof 机制出错时,拒绝执行 "Write" 操作,也就是拒绝执行任何会导致数据写入磁盘的操作(包括 PING)
if (((server.stop_writes_on_bgsave_err &&
server.saveparamslen > 0 &&
server.lastbgsave_status == REDIS_ERR) ||
server.aof_last_write_status == REDIS_ERR) &&
server.masterhost == NULL &&
(c->cmd->flags & REDIS_CMS_WRIETE ||
c->cmd->proc == pingCommand)) {
flagTranscation(c);
if (server.aof_last_write_status == REIDS_OK)
addReply(c, shared.bgsaveerr);
else
addReplySds(c,
sdscatprintf(sdsempty(),
"-MISCONF Errors writeing to the AOF fiel: %s\r\n",
strerror(server.aof_last_write_errno)));
return REDIS_OK;
}
// 如果设置了 min_slaves_to_write 最小从机数并且现在可用的从机少于这个数字,则拒绝 Write 操作
if (server.masterhost == NULL &&
server.repl_min_slaves_to_write &&
server.repl_min_slaves_max_log &&
c->cmd->flags & REDIS_CMD_WRITE &&
server.repl_good_slaves_count < server.repl_min_slaves_to_write) {
flagTranscation(c);
addReply(c, shared.noreplicaserr);
return REDIS_OK;
}
// 如果是从机,并且是只读的从机,则拒绝执行 Write 操作
if (server.masterhost && server.repl_slave_ro &&
!(c->flags & REDIS_MASTER) &&
c->cmd->flags & REDIS_CMD_WRITE) {
addReply(c, shared.roslaveerr);
return REDIS_OK;
}
// 如果当前客户端是 PUBSUB 模式,则只接受跟 pubsub 相关的操作
if (c->flags & REDIS_PUBSUB &&
c->cmd->proc != pingCommand &&
c->cmd->proc != subscribeCommand &&
c->cmd->proc != unsubscribeCommand &&
c->cmd->proc != psubscribeCommand &&
c->cmd->proc != punsubscribeCommand) {
addReplyError(c, "only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context");
return REDIS_OK;
}
// 如果从机的状态未跟主机同步,则只接受 INFO 跟 SLAVEOF 操作
if (server.masterhost && server.repl_state != REDIS_REPL_CONNECTED &&
server.repl_server_state_data == 0 &&
!(c->cmd->flags & REDIS_CMD_STALE)) {
flagTranscation(c);
addReply(c, shared.masterdownerr);
return REDIS_OK;
}
// 如果 Redis 服务仍在初始化阶段,则回复对应的信息给客户端
if (server.loading && !(c->cmd->flags & REDIS_CMD_LOADING)) {
addReply(c, shared.loadingerr);
return REDIS_OK;
}
// 如果执行的 lua 脚本太慢了,则限制只能执行有限的操作
if (server.lua_timeout &&
c->cmd->proc != authCommand &&
c->cmd->proc != replconfCommand &&
!(c->cmd->proc == shutdownCommand &&
c->argc == 2 &&
tolower(((char *)c->argv[1]->prt)[0] == 'n') &&
!(c->cmd->proc == scriptCommand &&
c->argc == 2 &&
tolower(((char *)c->argv[1]->ptr)[0] == 'k')) {
flagTranscation(c);
addReply(c, shared.slowscripterr);
return REDIS_OK;
}
// 正式执行命令。。。
// 不容易啊。。。。。
// 如果当前处于多命令 MULTI 状态,则如果不是要求执行或取消已经放入队列的命令的话,将新命令加入执行队列
if (c->flags & REDIS_MULTI &&
c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
c->cmp->proc != multiCommand && c->cmd->proc != watchCommand) {
queueMultiCommand(c);
addReply(c, shared.queued);
}
else {
call(c, REDIS_CALL_FULL);
c->woff = server.master_repl_offset;
if (listLength(server.ready_keys))
handleClientBlockedOnLists();
}
return REDIS_OK;
}
到了这里,终于把接收请求跟对请求跟命令进行分析部分写完了,最后我们再来看看,执行 Multi
模式的命令是怎么加入队列的。
void queueMultiCommand(redisClient *c) {
multiCmd *mc;
int j;
c->mstat.commands = zrealloc(c->mstate.commands,
sizeof(multiCmd) * (c->mstate.count + 1));
mc = c->mstate.commands + c->mstate.count;
mc->cmd = c->cmd;
mc->argc = c->argc;
mc->argv = zmalloc(sizeof(robj *) * c->argc;
memcpy(mc->argv, c->argv, sizeof(robj *) * c->argc);
for (j = 0; j < c->argc; j++)
increRefCount(mc->argv[j]);
c->mstate.count++;
}
这里可以看到,client 会维护一个 mstate 来保存当前命令的列表,在有新的命令进来时,将其加入命令列表,并增加对应对象的引用信息。引用信息是 robj 用来共享变量所使用的。
最后剩下的,就是命令具体的执行方式了。