Redis 源码分析 (一) 基本定义

[TOC]

Redis Object

Redis 自身使用了一套对象系统来构建整个架构,比如我们

set "name" "sinsay"

会构建两个字符串对象,一个保存 key 一个保存 value,然后会再构建一个 HASH 对象,用来保存对应的键值对。 各种对象里面根据当前保存的信息选择对应的编码,例如之前实现过的,当我们要保存一个列表时,会首先创建一个 LIST 对象,这个对象明确了他可以做 LIST 对应的操作,但具体的实现则取决于这个 LIST 对象里到底保存了什么元素,如果里面保存的是少量的数字类型,那 Redis 就会使用 INTSET 编码来作为元素的容器,以达到节省内存的目的,如果保存的是大量的字符串,则会转换为 LINKEDLIST 编码,用链表来保存所有的元素。

具体的编码跟对象类型如下所示:

对象类型

对象名对象类型
字符串对象REDIS_STRING
列表对象REDIS_LIST
哈希对象REDIS_HASH
集合对象REDIS_SET
有序集合对象REDIS_ZSET

对象编码

编码名编码类型
原始字符串REDIS_ENCODING_RAW
数字REDIS_ENCODING_INT
哈希表REDIS_ENCODING_HT
压缩表REDIS_ENCODING_ZIPMAP
链表REDIS_ENCODING_LINKEDLIST
压缩列表REDIS_ENCODING_ZIPLIST
整数集合REDIS_ENCODING_INTSET
跳跃表REDIS_ENCODING_SKIPLIST
压缩字符串REDIS_ENCODING_EMBSTR

具体实现

对象跟编码的定义

#define REDIS_STRING 0
#define REDIS_LIST   1
#define REDIS_SET    2
#define REDIS_ZSET   3
#define REDIS_HASH   4

#define REDISENCODING_RAW        0
#define REDISENCODING_INT        1
#define REDISENCODING_HT         2
#define REDISENCODING_ZIPMAP     3
#define REDISENCODING_LINKEDLIST 4
#define REDISENCODING_ZIPLIST    5
#define REDISENCODING_INTSET     6
#define REDISENCODING_SKIPLIST   7
#define REDISENCODING_EMBSTR     8

typedef struct redisObject {
    unsigned type:4;
    unsigned encoding:4;
    unsigned lru:REDIS_LRU_BITS;
    int refcount;
    void *ptr;
} robj;

服务器信息

Redis 在服务器上体现为一个 redisServer 结构,其中保存了 redisDb 的指针用于实现同时多个 redis 的数据库并存。 我们先看看具体定义

struct redisServer {
    // 这里分成了很多部分,我们先简单的看看通用部分
    pid_t pid;              // 主进程 id
    char *configfile;       // 配置文件
    int hz;                 // 后面解释,服务器后台检查的频率
    redisDb *db;            // 最重要的存储 DB
    dict *commands;         // 命令表
    dict *orig_commands;    // 原始命令表
    aeEventLoop *el;        // 事件循环句柄
    unsigned lruclock:REDIS_LRU_BITS; // lru 
    int shutdown_asap;      // ???
    int activerehashing;    // rehash 标示符
    char *requirepass;      // 传递给 AUTH 命令的信息
    char *pidfile;          // PID 文件
    int arch_bits;          // 架构标示符(32位 OR 64位)
    int cronloops;          // 后台 Cron 运行次数
    char runid[REDIS_RUN_ID_SIZE+1]; // 每次执行 Redis 都会产生不同的 id
    int sentinel_mode;      // 是否启动哨兵模式
    
    // 配置信息
    dbnum;                  // 启用的 Db 个数
};

Ok, 以上即是通用部分,是我们当前最关心的部分,主要包括了一些主要的执行信息,比如进程信息,后台逻辑检查频率,以及最重要的 Db 信息。

接下来我们看看服务器的启动步骤。

int main (int argc, char **argv) {
    struct timeval tv;

#ifdef INIT_SETPROCTITLE_REPLACEMENT
    spt_init(argc, argv);
#endif
    // 设置区域信息
    setlocale(LC_COLLATE, "");
    
    // 设置内存分配信息
    zmalloc_enable_thread_safeness();
    zmalloc_set_oom_handler(redisOutOfMemoryHandler);
    
    // 初始化随机数种子
    srand(time(NULL)^getpid());
    gettimeofday(&tv, NULL);
    
    // 初始化字典种子
    dictSetHashFunctionSeed(tv.tv_sec ^ tv.tv_usec ^ getpid());
    
    // 初始化服务器配置, 包括监听端口,DB 数等
    server.sentinel = checkForSentinelMode(argc, argv);
    initServerConfig();
    
    // 如果设置了 sentinel 模式,则设置配置信息
    if (server.sentinel_mode) {
        initSentinelConfig();
        initSentinel();
    }
    
    // 处理命令行参数信息,包括读取设置配置文件信息,获取版本信息等
    if (argc >= 2) {
        // ...
    }
    
    // 是否以后台进程运行
    if (server.daemonize) daemonize();
    
    // 初始化服务
    initServer();
    
    // 如果是以后台进程运行,则创建 Pid File
    if (server.daemonize) createPidFile();
    
    // 设置一些启动信息
    redisSetProcTitle(argv[0]);
    redisAsciiArc();
    checkTcpBacklogSettings();
    
    // 正常启动
    if (!server.sentinel_mode) {
        redislLog(REDIS_WARNING, "...");
        
        // 读取静态化的数据
        loadDataFromDisk();
        
        // 如果开启了分布式模式,这里我们先只考虑单机部分
        if (server.cluster_enabled) {
            if(verifyClusterConfigWithData() == REDIS_ERR) {
                redisLog("...");
                exit(1);
            }
        }
        // 如果绑定的 ip 数已大于零,说明已经可以监听客户端的连接了
        if (server.ipfd_count > 0) {
            redisLog("...");
        }
        // unixsocket 模式
        if (server.sofd > 0) {
        }
    }
    else {
        // 哨兵模式启动
        sentinelIsRunning()
    }
    
    // 到这里已经初始化完成,开始进入监听模式,等候客户端发过来的命令
    aeSetBeforeSleepProc(server.el, beforeSleep);
    aeMain(server.el);
    aeDeleteEventLoop(server.el);
    return 0;
}

启动过程完毕,从 aeMain 开始,服务器已经初始化完毕,并且开始监听对应的端口,ae 的具体细节我们就不分析了,这是网络库的一部分,具体的实现就是: 在客户端连上来时,创建一个 redisClient,并且绑定了一个处理事件:当这个 redisClient 可读时调用,这个事件是 readQueryFromClient。 大致的代码为:

static void acceptCommonHandler(int fd, int flags) {
    redisClient *c;
    // 当连接进来时,调用 acceptCommonHandler,然后创建一个
    // redisClient 实例,并在 createClient 中绑定了读取事件
    if ((c = createClient(fd)) == NULL) {
        // process err
    }
    // ...
}

redisClient *createClient(int fd) {
    redisClient *c = zmalloc(sizeof(redisClient));
    if (fd != -1) {
        // ...
        // 将 fd 绑定到当前 client, 然后绑定 readQueryFromCleint,
        // 当 fd 可读时触发
        if (aeCreateFileEvent(server.el, fd, AE_READABLE,
            readQueryFromClient, c) == AE_ERR) {
            // process error   
        }
    }
}

void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
    redisClient *c = (redisClient *) privdata;
    
    // process multi bulk request
    if (c->reqtype == REDIS_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1 && c->bulklen >= REDIS_MBULK_BIG_ARG) {
        int remaining = (unsigned)(c->bulklen + 2) - (sdslen(c->querybuf);
        if (remaining < readlen) readlen = remaining;
    }
    
    // 从客户端读取内容,并保存到 redisClient.querybuf 中
    qblen = sdslen(c->querybuf);
    if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
    c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
    nread = read(fd, c->querybuf + qblen, readlen);
    
    // 然后是一系列的错误处理
    // ...
    
    // 读取完毕后,处理 buff
    processInputBuffer(c);
    server.current_client = NULL;
}

ProcessInputBuffer 中,将 buffer 中的命令解析出来,创建为 redisObject,并放到 redisClient 的 argc 跟 argv 中