Redis 源码分析 (一) 基本定义
[TOC]
Redis Object
Redis 自身使用了一套对象系统来构建整个架构,比如我们
set "name" "sinsay"
会构建两个字符串对象,一个保存 key 一个保存 value,然后会再构建一个 HASH 对象,用来保存对应的键值对。 各种对象里面根据当前保存的信息选择对应的编码,例如之前实现过的,当我们要保存一个列表时,会首先创建一个 LIST 对象,这个对象明确了他可以做 LIST 对应的操作,但具体的实现则取决于这个 LIST 对象里到底保存了什么元素,如果里面保存的是少量的数字类型,那 Redis 就会使用 INTSET 编码来作为元素的容器,以达到节省内存的目的,如果保存的是大量的字符串,则会转换为 LINKEDLIST 编码,用链表来保存所有的元素。
具体的编码跟对象类型如下所示:
对象类型
对象名 | 对象类型 |
---|---|
字符串对象 | REDIS_STRING |
列表对象 | REDIS_LIST |
哈希对象 | REDIS_HASH |
集合对象 | REDIS_SET |
有序集合对象 | REDIS_ZSET |
对象编码
编码名 | 编码类型 |
---|---|
原始字符串 | REDIS_ENCODING_RAW |
数字 | REDIS_ENCODING_INT |
哈希表 | REDIS_ENCODING_HT |
压缩表 | REDIS_ENCODING_ZIPMAP |
链表 | REDIS_ENCODING_LINKEDLIST |
压缩列表 | REDIS_ENCODING_ZIPLIST |
整数集合 | REDIS_ENCODING_INTSET |
跳跃表 | REDIS_ENCODING_SKIPLIST |
压缩字符串 | REDIS_ENCODING_EMBSTR |
具体实现
对象跟编码的定义
#define REDIS_STRING 0
#define REDIS_LIST 1
#define REDIS_SET 2
#define REDIS_ZSET 3
#define REDIS_HASH 4
#define REDISENCODING_RAW 0
#define REDISENCODING_INT 1
#define REDISENCODING_HT 2
#define REDISENCODING_ZIPMAP 3
#define REDISENCODING_LINKEDLIST 4
#define REDISENCODING_ZIPLIST 5
#define REDISENCODING_INTSET 6
#define REDISENCODING_SKIPLIST 7
#define REDISENCODING_EMBSTR 8
typedef struct redisObject {
unsigned type:4;
unsigned encoding:4;
unsigned lru:REDIS_LRU_BITS;
int refcount;
void *ptr;
} robj;
服务器信息
Redis 在服务器上体现为一个 redisServer
结构,其中保存了 redisDb
的指针用于实现同时多个 redis
的数据库并存。
我们先看看具体定义
struct redisServer {
// 这里分成了很多部分,我们先简单的看看通用部分
pid_t pid; // 主进程 id
char *configfile; // 配置文件
int hz; // 后面解释,服务器后台检查的频率
redisDb *db; // 最重要的存储 DB
dict *commands; // 命令表
dict *orig_commands; // 原始命令表
aeEventLoop *el; // 事件循环句柄
unsigned lruclock:REDIS_LRU_BITS; // lru
int shutdown_asap; // ???
int activerehashing; // rehash 标示符
char *requirepass; // 传递给 AUTH 命令的信息
char *pidfile; // PID 文件
int arch_bits; // 架构标示符(32位 OR 64位)
int cronloops; // 后台 Cron 运行次数
char runid[REDIS_RUN_ID_SIZE+1]; // 每次执行 Redis 都会产生不同的 id
int sentinel_mode; // 是否启动哨兵模式
// 配置信息
dbnum; // 启用的 Db 个数
};
Ok, 以上即是通用部分,是我们当前最关心的部分,主要包括了一些主要的执行信息,比如进程信息,后台逻辑检查频率,以及最重要的 Db 信息。
接下来我们看看服务器的启动步骤。
int main (int argc, char **argv) {
struct timeval tv;
#ifdef INIT_SETPROCTITLE_REPLACEMENT
spt_init(argc, argv);
#endif
// 设置区域信息
setlocale(LC_COLLATE, "");
// 设置内存分配信息
zmalloc_enable_thread_safeness();
zmalloc_set_oom_handler(redisOutOfMemoryHandler);
// 初始化随机数种子
srand(time(NULL)^getpid());
gettimeofday(&tv, NULL);
// 初始化字典种子
dictSetHashFunctionSeed(tv.tv_sec ^ tv.tv_usec ^ getpid());
// 初始化服务器配置, 包括监听端口,DB 数等
server.sentinel = checkForSentinelMode(argc, argv);
initServerConfig();
// 如果设置了 sentinel 模式,则设置配置信息
if (server.sentinel_mode) {
initSentinelConfig();
initSentinel();
}
// 处理命令行参数信息,包括读取设置配置文件信息,获取版本信息等
if (argc >= 2) {
// ...
}
// 是否以后台进程运行
if (server.daemonize) daemonize();
// 初始化服务
initServer();
// 如果是以后台进程运行,则创建 Pid File
if (server.daemonize) createPidFile();
// 设置一些启动信息
redisSetProcTitle(argv[0]);
redisAsciiArc();
checkTcpBacklogSettings();
// 正常启动
if (!server.sentinel_mode) {
redislLog(REDIS_WARNING, "...");
// 读取静态化的数据
loadDataFromDisk();
// 如果开启了分布式模式,这里我们先只考虑单机部分
if (server.cluster_enabled) {
if(verifyClusterConfigWithData() == REDIS_ERR) {
redisLog("...");
exit(1);
}
}
// 如果绑定的 ip 数已大于零,说明已经可以监听客户端的连接了
if (server.ipfd_count > 0) {
redisLog("...");
}
// unixsocket 模式
if (server.sofd > 0) {
}
}
else {
// 哨兵模式启动
sentinelIsRunning()
}
// 到这里已经初始化完成,开始进入监听模式,等候客户端发过来的命令
aeSetBeforeSleepProc(server.el, beforeSleep);
aeMain(server.el);
aeDeleteEventLoop(server.el);
return 0;
}
启动过程完毕,从 aeMain 开始,服务器已经初始化完毕,并且开始监听对应的端口,ae 的具体细节我们就不分析了,这是网络库的一部分,具体的实现就是:
在客户端连上来时,创建一个 redisClient
,并且绑定了一个处理事件:当这个 redisClient
可读时调用,这个事件是 readQueryFromClient
。
大致的代码为:
static void acceptCommonHandler(int fd, int flags) {
redisClient *c;
// 当连接进来时,调用 acceptCommonHandler,然后创建一个
// redisClient 实例,并在 createClient 中绑定了读取事件
if ((c = createClient(fd)) == NULL) {
// process err
}
// ...
}
redisClient *createClient(int fd) {
redisClient *c = zmalloc(sizeof(redisClient));
if (fd != -1) {
// ...
// 将 fd 绑定到当前 client, 然后绑定 readQueryFromCleint,
// 当 fd 可读时触发
if (aeCreateFileEvent(server.el, fd, AE_READABLE,
readQueryFromClient, c) == AE_ERR) {
// process error
}
}
}
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
redisClient *c = (redisClient *) privdata;
// process multi bulk request
if (c->reqtype == REDIS_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1 && c->bulklen >= REDIS_MBULK_BIG_ARG) {
int remaining = (unsigned)(c->bulklen + 2) - (sdslen(c->querybuf);
if (remaining < readlen) readlen = remaining;
}
// 从客户端读取内容,并保存到 redisClient.querybuf 中
qblen = sdslen(c->querybuf);
if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
nread = read(fd, c->querybuf + qblen, readlen);
// 然后是一系列的错误处理
// ...
// 读取完毕后,处理 buff
processInputBuffer(c);
server.current_client = NULL;
}
在 ProcessInputBuffer
中,将 buffer 中的命令解析出来,创建为 redisObject
,并放到 redisClient
的 argc 跟 argv 中