Redis 内容篇

[TOC]

解析请求

在前文我们看到了当客户端发送命令过来时,是由 ae 调用 readQueryFromClient 进行处理,接着我们来看看这个函数是如何处理客户端发送过来的数据的。

void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
    int nread, readlen;
    size_t qblen;
    REDIS_NOTUSED(el);
    REDIS_NOTUSED(mask);

    server.current_client = c;
    readlen = REDIS_IOBUF_LEN;

    // 处理大数据包
    if (c->reqtype == REDIS_REQ_MULTIBULD && c->multibulklen &&
         c->bulklen != -1 && c->bulklen >= REDIS_MBULK_BIG_ARG) {
        int remaining = (unsigned)(c->bulklen + 2) - sdslen(c->querybuf);
        // 如果剩余的数据量少于默认的缓存数,则使用较小的缓存大小
        if (remaining < readlen) readlen = remaining;
    }

    qblen = sdslen(c->querybuf);
    if (c->querybuf_peak < qblen) c->querybuf_peek = qblen;
    // 扩展缓存空间,以存放即将读取的数据
    c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
    // c->querybuf + qblen 是跳过已经使用的缓存内容
    nread = read(fd, c->querybuf + qblen, readlen);

    // 错误处理,如果因为信号中断,则设置 nread 为 0,并等待下次重读
    // 如果远程客户端断开,则服务端也断开连接
    if (nread == -1) {
        if (errno == EAGAIN) {
            nread = 0;
        }
        else {
            readLog(REDIS_VERBOSE, "Reading from client: %s", strerrno(errno));
            freeClient(c);
            return;
        }
    }
    else if(nread == 0) {
        readisLog(REDIS_VERBOSE, "Client closed connection");
        freeClient(c);
        return;
    }

    if (nread) {
        // 调整 querybuf 已使用的数据量
        sdsIncrLen(c->querybuf, nread);
        // 更新状态信息
        c->lastinteraction = server.unixtime;
        if (c->flags & REDIS_MASTER) c->reploff += nread;
        server.stat_net_input_bytes += nread;
    }
    else {
        server.current_client = NULL;
        return;
    }

    // 如果当前客户端请求的缓存大于系统设置的上限,则中断此连接, 并输出当前连接信息
    // 比如当前客户端是攻击者?
    if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
        sds ci = catClientInfoString(sdsempty(), c), bytes = sdsempty();
        bytes = sdscatrepr(bytes, c->querybuf, 64);
        redisLog(REDIS_WARNING, "Closing client that reached max query buffer length");
        sdsfree(ci);
        sdsfree(bytes);
        freeClient(c);
        return;
    }

    // 开始处理请求
    processInputBuffer(c);
    server.current_client = NULL;
}

上面使用了最正统的 socket 处理方式,读取数据,做出错误处理,验证客户端的安全性(缓存长度),然后把接收到的数据丢给 processInputBuffer 进行处理。 接下来我们看看它是怎么做的。

void processInputBuffer(redisClient *c) {
    // 处理直到 buf 中没有数据
    while (sdslen(c->querybuf)) {

        // 如果是从服务器,并且由于某些原因处于暂停状态,则不处理客户端的命令
        if (!(c->flags & REDIS_SLAVE) && clientsArePaused()) return;

        // 如果当前客户端正在某个处理步骤的中间,则不处理命令
        if (c->flags & REDIS_BLOCKED) return;

        // 如果当前客户端在回复后就要关闭,则不处理命令
        if (c->flags & REDIS_CLOSE_AFTER_REPLY) return;

        // 确认当前客户端的请求模式, 单行或多行模式
        if (!c->reqtype) {
            if (c->querybuf[0] == '*') {
                c->reqtype = REDIS_REQ_MULTIBULK;
            }
            else {
                c->reqtype = REDIS_REQ_INLINE;
            }
        }

        // 然后再根据模式处理接收到的数据
        if (c->reqtype == REDIS_REQ_INLINE) {
            if (processInlineBuffer(c) != REDIS_OK) break;
        }
        else if(c->reqtype == REDIS_REQ_MULTIBULK) {
            if (processMultibulkBuffer(c) != REDIS_OK) break;
        }
        else {
            redisPanic("Unknown request type");
        }

        // 如果是 Multibulk 模式,则可能会出现 argc <= 0 的情况
        if (c->argc == 0) {
            resetClient(c);
        }
        else {
            // 开始处理命令
            if (processCommand(c) == REDIS_OK)
                resetClient(c);
        }
    }
}

在进入处理具体命令的逻辑之前,我们先看看,是如何组织命令信息的,Inline 跟 MultiBulk 两种方式分别实现在 processInlineBufferprocessMultibulkBuffer 中,我们分别来看看。

解析单个命令请求

int processInlineBuffer(redisClient *c) {
    char *newline;
    int argc, j;
    sds *argv, aux;
    size_t querylen;

    // 获取当前命令行
    newline = strchr(c->querybuf, '\n');

    // 错误处理
    if (newline == NULL) {
        if (sdslen(c->querybuf) > REDIS_INLINE_MAX_SIZE) {
            addReplyError(c, "Protocol error: too big inline request");
            setProtocoError(c, 0);
        }
    }

    // 处理 \r\n 的情况,如果 \n 不是在 querybuf 的第一个字节且 \n 的前一个字节为 \r, 则 newline 回退一个字节
    if (newline && newline != c->querybuf && *(newline - 1) == '\r')
        newline--;

        // 得到命令的长度
        querylen = newline - (c->querybuf);
        // 得到确切的命令, 按空格或 "" 进行分割,比如
        // cmd "hello world" im sinsay
        // 可得到 [ cmd, "hello world", im, sinsay ]
        aux = sdsnewlen(c->querybuf, querylen);
        argv = sdssplitargs(aux, &argc);
        sdsfree(aux);
        if (argv == NULL) {
            addReplyError(c, "Protocol error: unblanced quotes in request");
            setProtocolError(c, 0);
            return REDIS_ERR;
        }

        // 从 slave 发过来的空的 newline 表示要刷新 ack 信息
        if (querylen == 0 && c->flags & REDIS_SLAVE)
            c->repl_ack_time = server.unixtime;

        // 调整 querybuf, 使其跳过已获取到的 newline, 类似 substr 的操作
        sdsrange(c->querybuf, querylen + 2, -1);

        // 分配存放参数的内存块
        if (argc) {
            if (c->argc) zfree(c->argv);
            c->argv = zmalloc(sizeof(robj*) * argc);
        }

        // 这里创建的 argv 每个都是属于 robj 类型
        for (c->argc = 0, j = 0; j < argc; j++) {
            if (sdslen(argv[j])) {
                c->argv[c->argc] = createObject(REDIS_STRING, argv[j]);
                c->argc++;
            }
            else {
                sdsfree(argv[j]);
            }
        }

        zfree(argv);
        return REDIS_OK;
}

上面唯一要重视的就是,argv 数组中的参数都是通过 createObject 创建的 robj 类型。robj 的结构在上篇的末尾已经介绍过,但我们还不了解各个字段的含义,现在我们看看到底是如何创建这个结构的。

robj *createObject(int type, void *ptr) {
    robj *o = zmalloc(sizeof(*o));
    o->type = type;
    o->encoding = REDIS_ENCODING_RAW;
    o->ptr = ptr;
    o->refcount = 1;
    o->lru = LRU_CLOCK();
    return o;
}

从上面的调用方式基本可以得到:

type 表示 obj 的类型,从定义我们可得到大致上有以下几种类型

#define REDIS_STRING 0
#define REDIS_LIST   1
#define REDIS_SET    2
#define REDIST_ZSET  3
#define REDIS_HASH   4

这不就是 redis 支持的数据结构么,OK。

encoding 这里看到的是 REDIS_ENCODING_RAW, 确定不了什么,再看看相关定义:

/* Objects encoding. Some kind of objects like String and Hashes can be
 * internally represented in multiple ways. The `encoding` field of the object
 * is set to one of this fields for this object */
#define REDIS_ENCODING_RAW        0   // Raw representation
#define REDIS_ENCODING_INT        1   // Encoded as integer
#define REDIS_ENCODING_HT         2   // Encoded as hash table
#define REDIS_ENCODING_ZIPMAP     3   // Encoded as zipmap
#define REDIS_ENCODING_LINKEDLIST 4   // Encoded as regular linked list
#define REDIS_ENCODING_ZIPLIST    5   // Encoded as ziplist
#define REDIS_ENCODING_INTSET     6   // Encoded as intset
#define REDIS_ENCODING_SKIPLIST   7   // Encoded as skiplist
#define REDIS_ENCODING_EMBSTR     8   // Embedded sds string encoding

再次可以确定,encoding 表示的是当前 obj 的具体实现,也就是这个 obj 可以是 REDIS_LIST 的类型,但他的具体实现可以使用 ZIPLIST 或 LINKEDLIST 之类的(我猜的

而 ptr 就是具体实现的指针,具体能怎么使用这个 ptr, 取决于 type 跟 encoding.

refcountlru 就基本可以从字面意思看出,一个是用来共享变量的,一个是用来 Least Recently Used 算法,可能是用来排除长时间不使用的 key 之类的。

到了这一步, 已经解析好了所有由客户端发出来的命令,并打包到 client 的 argc 跟 argv 字段 中,时刻准备着处理 :)

解析多个命令请求

另外一种情况是 reqtype 是 MULTIBULK 的时候,需要处理多个命令,我们看看实现做了什么。

int processMultibulkBuffer(redisClient *c) {
    chr *newline = NULL;
    int pos = 0, ok;
    long long ll;

    // 获取当前请求是由多少个命令组成的
    if (c->multibulklen == 0) {
        redisAssertWithInfo(c, NULL, c->argc == 0);

        newline = strchr(c->querybuf, '\r');
        if (newline == NULL) {
            // 异常处理
            return REDIS_ERR;
        }

        // querybuf 应该还包含 \n,所以 -2, 这是确认长度是正确的
        if (newline - (c->querybuf) > ((signed)sdslen(c->querybuf) - 2)) {
            return REDIS_ERR;
        }

        redisAssertWithInfo(c, NULL, c->querybuf[0] == '*');
        ok = string2ll(c->querybuf + 1, newline - (c->querybuf + 1), &ll);

        ...
    }
}

Ok, 在继续下去之前,我们从最后一句得到一个信息,也就是命令的格式:

    *NUMBER\r\n????

以 * 开头,以 \r\n 结尾的中间段,保存的是一个数字,用来表示之后的长度,具体是什么长度呢,继续分析。

        // ...
        // 长度不能超过 1024 * 1024
        if (!ok || ll > 1024 * 1024) {
            // 异常处理
            return REDIS_ERR;
        }

        // pos 指向下一个命令的开始位置
        pos = (newline - c->querybuf) + 2;

        // 如果解析出来的命令数为0,则截断缓存后直接返回
        if (ll <= 0) {
            sdsrange(c->querybuf, pos, -1);
            return REDIS_OK;
        }

        c->multibulklen = ll;

        // 分配内存用于存放命令参数
        if (c->argv) zfree(c->argv);
        c->argv = zmalloc(sizeof(robj *) * c->multibulklen);
    }

    redisAssertWithInfo(c, NULL, c->multibulklen > 0);
    // 更具刚刚得到的命令数,逐个获取命令信息
    while (c->multibulklen) {
        // 获取当前命令的长度
        if (c->bulklen == -1) {
            newline = strchr(c->querybuf + pos, '\r');
            if (newline == NULL) {
                // 错误处理
                if (sdslen(c->querybuf) > REDIS_INLINE_MAX_SIZE) {
                    return REDIS_ERR;
                }
                break;
            }

            // 如果 newline 位于 querybuf 的开头,则说明没东西可以处理了
            if (newline - (c->querybuf) > ((signed)sdslen(c->queryuf) - 2)) {
                break;
            }

            // pos 现在当前命令的开始
            // 确认当前命令以 $ 开头
            if (c->querybuf[pos] != '$') {
                // 错误处理
                return REDIS_ERR;
            }

            ok = string2ll(c->querybuf + pos + 1, newline - (c->querybuf + pos + 1, &ll);
            if (!ok || ll < 0 || ll > 512 * 1024 * 1024) {
                return REDIS_ERR;
            }

            // 获取到长度信息,更新 pos,指向下一个命令的位置
            pos += newline - (c->querybuf + pos) + 2;
            if (ll >= REDIS_MBULK_BIG_ARG) {
                size_t qblen;

                // 如果现有的空间不够存放接下来的命令信息,则扩容 sds
                sdsrange(c->querybuf, pos, -1);
                pos = 0;
                qblen = sdslen(c->querybuf);
                if (qblen < (size_t)ll + 2)
                    c->querybuf = sdsMakeRoomFor(c->querybuf, ll + 2 - qblen);
            }

            c->bulklen = ll;
        }
    }

再次暂停,从上面的分析我们基本得到了 BULK 模式下的消息结构

    *命令个数\r\n命令长度\r\n命令....

ok,可以继续了。

    // 如果 buffer 的长度不足以构成一个包,则跳出处理流程
    if (sdslen(c->querybuf) - pos < (unsigned)(c->bulklen + 2)) {
        break;
    }
    else {
        // 如果已经处理到了最后
        if (pos == 0 && c->bulklen >= REDIS_MBULK_BIG_ARG &&
            (signed) sdslen(c->querybuf) == c->bulklen + 2) {
            c->argc[c->argc++] = createObject(REDIS_STRING, c->querybuf);
            sdsIncrLen(c->querybuf, -2);
            c->querybuf = sdsempty();
            c->querybuf = sdsMakeRoomFor(c->querybuf,c->bulklen + 2);
            pos = 0;
        }
        else {
            c->argv[c->argc++] =
                createStringObject(c->querybuf + pos, c->bulklen);
            pos += c->bulklen + 2;
        }
        c->bulklen = -1;
        c->multibulklen--;
    }

    if (pos) sdsrange(c->querybuf, pos, -1);
    if (c->multibulklen == 0) return REDIS_OK;

    return REDIS_ERR;

跟单行的请求一样,只是这里会同时处理多个请求,接下来我们可以真正的进入处理请求的流程了。

处理请求

上面的处理已经把所有的请求信息,都放进了当前 client 的 argc 跟 argv 中,接下来只需要遍历 argv 即可获取对应的参数跟数据进行处理了。

int processCommand(redisClient *c) {
    // 如果客户端发过来的是 quit 命令,则直接回复退出成功,并把客户端标示为 REDIS_CLOSE_AFTER_REPLY
    if (!strcasecmp(c->argv[0]->ptr, "quit")) {
        addReply(c, shared.ok);
        c->flags |= REDIS_CLOSE_AFTER_REPLY;
    }
}

这时候我们可以确定刚刚处理命令时遇到的对 REDIS_CLOSE_AFTER_REPLY 的处理了,也就是说,如果客户已明确标示要断开连接了,那接下来所有的操作我们都可以直接忽略。 OK 继续。

    // 根据用户发送的命令,获取具体的 cmd
    c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
    // 然后是一系列的错误检查,如果没有找到对应的命令,则说明命令格式错误
    // 然后如果参数的数量不一致的话,也视为出错
    if (!c->cmd) {
        flagTransction(c);
        addReplyErrorFormat(c, "unknown command '%s'",
            (char *)c->argv[0]->ptr);
        return REDIS_OK;
    }
    else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
                (c->argc < -c->cmd->arity)) {
        flagTransaction(c);
        addReplyErrorFormat(c, "wrong number of arguments for '%s' command",
            c->cmd->name);
        return REDIS_OK;
    }

接下来就是具体处理命令了,在开始处理之前我们先看看 redisCommand 的具体定义

struct redisCommand {
    // 命令名
    char *name;
    // 命令的函数指针
    redisCommandProc *proc;
    // 命令的参数数
    int arity;
    // 命令的标志,字符串表示形式,每个字符一个标示
    char *sflags;
    // 命令的标志,位模式,由 sflags 决定
    int flags;
    // 用于 Cluster 的转发,暂时不管
    redisGetKeysProc *getkeys_proc;
    // 以下几个不明,待分析
    int firstkey;
    int lastkey;
    int keystep;
    long long microseconds, calls;
}

typedef void redisCommandProc(redisClient *c);

然后继续处理命令

    // 检查服务是否需要验证,如果需要验证,则检查是否已通过验证,因为还没通过验证的客户端,只接受验证命令 authCommand
    if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand) {
        flagTranscation(c);
        addReply(c, shared.noautherr);
        return REDIS_OK;
    }

     // 如果处于 Cluster 模式,则只处理以下两种情况
     //     1. 命令是从 Master 发过来的
     //     2. 命令没有参数
     if (server.cluster_enabled &&
         !(c->flags & REDIS_MASTER) &&
         !(c->flags & REDIS_LUA_CLIENT &&
           server.lua_caller->flags & REDIS_MASTER) &&
         !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey = 0)) {
        int hashslot;

        // 如果当前的 Cluster 模式是正常的,执行 Redirect 操作,
        // 否则处理错误
        if (server.cluster->state != REDIS_CLUSTER_OK) {
            flagTranscation(c);
            clusterRedirectClient(c, NULL, REDIS_CLUSTER_REDIR_DOWN_STATE);
            return REDIS_OK;
        }
        else {
            int error_code;
            clusterNode *n = getNodeByQuery(c, c->cmd, c->argc,  &hashslot, &error_code);
            if (n == NULL || n != server.cluster->myself) {
                flagTranscation(c);
                clusterRedirectClient(c, n, hashslot, error_node);
                return REDIS_OK;
            }
        }
     }

     // 如果设置了内存限制,则检查内存使用状况
     if (server.maxmemory) {
        // 这里开始执行内存回收策略,遍历所有的 DB,根据 DB 配置的策略选择对应的 key 进行回收
        int retval = freeMemoryIfNeeded();
        if ((c->cmd->flags & REDIS_CMD_DENYOOM) && retval == REDIS_ERR) {
            flagTranscation(c);
            addReply(c, shared.oomerr);
            return REDIS_OK;
        }
     }

     // 当 bgsave 或者 aof 机制出错时,拒绝执行 "Write" 操作,也就是拒绝执行任何会导致数据写入磁盘的操作(包括 PING)
     if (((server.stop_writes_on_bgsave_err &&
           server.saveparamslen > 0 &&
           server.lastbgsave_status == REDIS_ERR) ||
           server.aof_last_write_status == REDIS_ERR) &&
         server.masterhost == NULL &&
         (c->cmd->flags & REDIS_CMS_WRIETE ||
          c->cmd->proc == pingCommand)) {

          flagTranscation(c);
          if (server.aof_last_write_status == REIDS_OK)
            addReply(c, shared.bgsaveerr);
          else
            addReplySds(c,
                sdscatprintf(sdsempty(),
                "-MISCONF Errors writeing to the AOF fiel: %s\r\n",
                strerror(server.aof_last_write_errno)));
          return REDIS_OK;
      }

      // 如果设置了 min_slaves_to_write 最小从机数并且现在可用的从机少于这个数字,则拒绝 Write 操作
      if (server.masterhost == NULL &&
          server.repl_min_slaves_to_write &&
          server.repl_min_slaves_max_log &&
          c->cmd->flags & REDIS_CMD_WRITE &&
          server.repl_good_slaves_count < server.repl_min_slaves_to_write) {
          flagTranscation(c);
          addReply(c, shared.noreplicaserr);
          return REDIS_OK;
      }

      // 如果是从机,并且是只读的从机,则拒绝执行 Write 操作
      if (server.masterhost && server.repl_slave_ro &&
          !(c->flags & REDIS_MASTER) &&
          c->cmd->flags & REDIS_CMD_WRITE) {
          addReply(c, shared.roslaveerr);
          return REDIS_OK;
      }

      // 如果当前客户端是 PUBSUB 模式,则只接受跟 pubsub 相关的操作
      if (c->flags & REDIS_PUBSUB &&
          c->cmd->proc != pingCommand &&
          c->cmd->proc != subscribeCommand &&
          c->cmd->proc != unsubscribeCommand &&
          c->cmd->proc != psubscribeCommand &&
          c->cmd->proc != punsubscribeCommand) {
          addReplyError(c, "only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context");
          return REDIS_OK;
      }

      // 如果从机的状态未跟主机同步,则只接受 INFO 跟 SLAVEOF 操作
      if (server.masterhost && server.repl_state != REDIS_REPL_CONNECTED &&
          server.repl_server_state_data == 0 &&
          !(c->cmd->flags & REDIS_CMD_STALE)) {
          flagTranscation(c);
          addReply(c, shared.masterdownerr);
          return REDIS_OK;
      }

      // 如果 Redis 服务仍在初始化阶段,则回复对应的信息给客户端
      if (server.loading && !(c->cmd->flags & REDIS_CMD_LOADING)) {
        addReply(c, shared.loadingerr);
        return REDIS_OK;
      }

      // 如果执行的 lua 脚本太慢了,则限制只能执行有限的操作
      if (server.lua_timeout &&
          c->cmd->proc != authCommand &&
          c->cmd->proc != replconfCommand &&
          !(c->cmd->proc == shutdownCommand &&
            c->argc == 2 &&
            tolower(((char *)c->argv[1]->prt)[0] == 'n') &&
          !(c->cmd->proc == scriptCommand &&
            c->argc == 2 &&
            tolower(((char *)c->argv[1]->ptr)[0] == 'k')) {

            flagTranscation(c);
            addReply(c, shared.slowscripterr);
            return REDIS_OK;
        }

        // 正式执行命令。。。
        // 不容易啊。。。。。
        // 如果当前处于多命令 MULTI 状态,则如果不是要求执行或取消已经放入队列的命令的话,将新命令加入执行队列
        if (c->flags & REDIS_MULTI &&
            c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
            c->cmp->proc != multiCommand && c->cmd->proc != watchCommand) {
            queueMultiCommand(c);
            addReply(c, shared.queued);
        }
        else {
            call(c, REDIS_CALL_FULL);
            c->woff = server.master_repl_offset;
            if (listLength(server.ready_keys))
                handleClientBlockedOnLists();
        }

        return REDIS_OK;
}

到了这里,终于把接收请求跟对请求跟命令进行分析部分写完了,最后我们再来看看,执行 Multi 模式的命令是怎么加入队列的。

void queueMultiCommand(redisClient *c) {
    multiCmd *mc;
    int j;
    c->mstat.commands = zrealloc(c->mstate.commands,
        sizeof(multiCmd) * (c->mstate.count + 1));
    mc = c->mstate.commands + c->mstate.count;
    mc->cmd = c->cmd;
    mc->argc = c->argc;
    mc->argv = zmalloc(sizeof(robj *) * c->argc;
    memcpy(mc->argv, c->argv, sizeof(robj *) * c->argc);
    for (j = 0; j < c->argc; j++)
        increRefCount(mc->argv[j]);
    c->mstate.count++;
}

这里可以看到,client 会维护一个 mstate 来保存当前命令的列表,在有新的命令进来时,将其加入命令列表,并增加对应对象的引用信息。引用信息是 robj 用来共享变量所使用的。

最后剩下的,就是命令具体的执行方式了。