Redis 内容篇
[TOC]
在经过了之前对 Redis 的几个基础类型的分析后,我们开始查看其具体实现,首先嘛,任何程序跟应用总是需要启动的,所以我们从其 redis.h/redis.c
开始。
#include "ae.h" /* Event driven programming library */
#include "sds.h" /* Dynamic safe strings */
#include "dict.h" /* Hash tables */
#include "adlist.h" /* Linked lists*/
#include "zmalloc.h" /* total memory usage aware version of malloc/free */
#include "anet.h" /* Networkiing the easy way */
#include "ziplist.h" /* Compact list date structure */
#include "intset.h" /* Compact integer set structure */
#include "version.h" /* Version macro */
#include "util.h" /* Misc function useful in many place*/
#include "latency.h" /* Latency monitor API */
#include "sparkline.h" /* ASII graphs API */
除去我们已经分析过的基础类型,其他的都是 Redis 相关逻辑实现,而注释也写的非常清楚
ae.h
是事件驱动 lib, 所有的 redis 操作应当都由此 lib 进行管理、调用anet.h
网络库的封装zmalloc.h
内存管理,方便 redis 对内存的使用量进行监控version.h
版本信息latency.h
延时管理器- 其他
util.h
使用函数sparkline.h
程序启动的图形输出
由上面的基本信息可知,核心流程基本是由 anet 来管理网络连接,在接收到客户端的请求后,将请求内容转发给 ae,ae 则根据请求的类型,调用相关的处理函数。
启动服务器
接下来看看 Redis 是如何启动的, 以下是经过简单整理的启动函数
int main (int argc, char **argv) {
struct timeval tv;
setlocale(LC_COLLATE, "");
zmalloc_enable_thread_safeness();
zmalloc_set_oom_handler(redisOutOfMemoryHandler);
srand(time(NULL)^getpid());
gettimeofday(&tv, NULL);
dictSetHashFunctionSeed(tv.tv_sec ^ tv.tv_usec ^ getpid());
server.sentinel_mode = checkForSentinelMode(argc, argv);
initSErverConfig();
if (server.sentinel_mode) {
initSentinelConfig();
initSentinel();
}
// 命令行处理...
// 略过
if (server.daemonize) daemonize();
// 初始化服务
initServer();
if (server.daemonize) createPidFile();
}
首先看看服务是怎么初始化的
void initServer(void) {
int j;
gignal(SIGHUP, SIG_IGN);
gignal(SIGPIPE, SIG_IGN);
setupSingalHandlers();
if (server.syslog_enabled) {
openlog(server.syslog_ident, LOG_PID | LOG_NDELAY | LOG_NOWAIT, server.syslog_facility);
}
// 当前进程 ID
server.pid = getpid();
// 当前处理的客户端
server.current_client = NULL;
// 客户端列表
server.clients = listCreate();
// 待关闭的客户端
server.clients_to_close = listCreate();
// 从服务器
server.slaves = listCreate();
// 监控器
server.monitors = listCreate();
server.slaveseldb = -1;
// 非堵塞客户端
server.unblocked_clients = listCreate();
server.ready_keys = listCreate();
server.clients_waiting_acks = listCreate();
server.get_ack_from_slaves = 0;
server.clients_paused = 0;
// 创建共享变量,如命令的表示,静态字符串等
createShareObjects();
// 调整文件描述符限制, 实现是如果支持设置最大文件描述符,则从最大的值开始尝试,直到设置成功
adjustOpenFilesLimit();
// redis 的核心:事件驱动
server.el = aeCreateEventLoop(server.maxclients + REDIS_EVENTLOOP_FDSET_INCR);
// 保存具体数据的数据库
server.db = zmalloc(sizeof(redisDb) * server.dbnum);
// 开始监听指定的端口
if (server.port != 0 &&
listenToPort(server.port, server.ipfd, &server.ipfd_count) == REDIS_ERR)
exit(1);
if (server.unixsocket != NULL) {
// ... unix 域的 socket
}
// 如果没监听任何端口,则直接退出
if (server.ipfd_count == 0 && server.sofd < 0) {
redisLog(REDIS_WARNING, "Configured to not listen anywhere, exiting.");
exit(1);
}
// 初始化数据库
// 这里初始化各个 dict 的关键就是定义的各种 dictType, 里面描述了对应的 key value 的复制、删除等实现。说完这部分再看看大致的实现
for (j = 0; j < server.dbnum; j++) {
server.db[j].dict = dictCreate(&dbDictType, NULL);
server.db[j].expires = dictCreate(&keyptrDictType, NULL);
server.db[j].blocking_keys = dictCreate(&keyListDictType, NULL);
server.db[j].ready_keys = dictCreate(&setDictType, NULL);
server.db[j].watched_keys = dictCreate(&keylistDictType, NULL);
server.db[j].eviction_pool = evictionPoolAlloc();
server.db[j].id = j;
server.db[j].avg_ttl = 0;
}
// 用于 pubsub 的 channels,
// 然后设置了对应 list 的处理函数
server.pubsub_channels = dictCreate(&keylistDictType, NULL);
server.pubsub_patterns = listCreate();
listSetFreeMethod(server.pubsub_patterns, freePubsubPattern);
listSetMatchMethod(server.pubsub_patterns, listMatchPubsubPattern);
// 然后是其他的配置信息
// 已经处理过 Cron 的次数
server.cronloops = 0;
// rdb 跟 aof 是 Redis 的两种静态化机制
server.rdb_child_pid = -1;
server.aof_child_pid = -1;
server.rdb_child_type = REDIS_RDB_CHILD_TYPE_NONE;
aofRewriteBufferReset();
server.aof_buf = sdsempty();
server.lastsave = time(NULL);
server.lastbgsave_try = 0;
server.rdb_save_time_last = -1;
server.rdb_save_time_start = -1;
server.dirty = 0;
resetServerStata();
// redis 的状态信息,包括启动时间等
server.stat_starttime = time(NULL);
server.stat_peak_memory = 0;
server.resident_set_size = 0;
server.lastbgsave_status = REDIS_OK;
server.aof_last_write_status = REDIS_OK;
server.aof_last_write_errno = 0;
server.repl_good_slaves_count = 0;
updateCachedTime();
// 接下来启动定时器,定时触发 serverCron 操作
if (aeCreateTimeEvent(server.el, 1 server.cron, NULL, NLL) == AE_ERR) {
redisPanic("Can't craete the serverCron time event.");
exit(0);
}
// 然后绑定对应 fd 的网络事件,在 fd 可读时触发 acceptTcpHandler 函数
for (j = 0; server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el, server.ipif[j], AE_READABLE, acceptTcpHandler, NULL) == AE_ERR) {
redisPainc("Unrecoverable error create server.ipfd file event");
}
}
// 这里绑定 unix 域的 fd 可读时触发 acceptUnixHandler 函数
if (server.sofd > 0 && aeCreateFileEvent(server.el,
server.sofd, AE_READABLE, acceptUnixHandler, NULL) == AE_ERR)
redisPainc("Unrecoverable error create server.sofd file event.");
// 如果启动了 aof 静态化机制,则打开 aof 对应的文件
if (server.aof_sate == REDIS_AOF_ON) {
server.aof_fd = open(server.aof_filename,
O_WRONLY | O_APPEND | O_CREAT, 0644);
if (server.aof_fd == -1) {
redisLog(REDIS_WARNING, "Can't open the append-only file: %s", strerrno(errno));
exit(1);
}
}
// 在 32 位的机器上,限制最多使用的内存量为 3GB
if (server.arch_bits == 32 && server.maxmemory == 0) {
redisLog(REDIS_WARNING, "Warning: 32bit instance detected but no memory limit set, Stting 3 GB maxmemory limit with 'noeviction' policy now.");
server.maxmemory = 3072LL * (1024 * 1024);
server.maxmemory_policy = REDIS_MAXMEMORY_NO_EVICTION;
}
// 如果启动了主从机制,则初始化主从信息
if (server.cluster_enabled) clusterInit();
// 初始化复制机制
replicationScriptCacheInit();
// 初始化 Lua 脚本环境
scriptingInit();
// 初始化延迟日志
slowLogInit();
// 初始化 latency 列表
latencyMonitorInit();
// 初始化 Backgrond IO,主要用于落地数据
bioInit();
}
在上面的初始化过程中,我们把当前应该关注的跟可以忽略的都写出来了,其实是为了留个印象,以后遇到的时候可以知道是用来干啥的。
除去 Cluster
、AOF
、RDB
等,剩余的基本都是 Redis 基础服务的核心,如初始化时通过 aeCreateTimeEvent
注册的的 serverCron
事件,以及注册到监听端口的处理函数 acceptTcpHandler
, 暂时可以把它理解为一切的开始:因为 Redis 的工作方式就是等待一条命令,处理命令然后返回结果。
建立连接
在这里我们先不管 ae 的实现,直接查看 acceptTcpHandler
,这里是作为 Redis 接收客户端链接的入口:
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
char cip[REDIS_IP_STR_LEN];
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);
REDIS_NOTUSED(privdata);
while (max--) {
// 以非堵塞的方式接受 tcp 连接,fd 是监听的端口 fd, cip 跟 cport 用来保存客户端对应的 fd 跟端口
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
if (cfd == ANET_ERR) {
if (errno != EWOULDBLOCK)
redisLog(REDIS_WARNING,
"Accepting client connection: %s", server.neterr);
return;
}
redisLog(REDIS_VERBOSE, "Accepted %s:%d, cip, cport);
// 接收连接成功,将其加入 server 的客户端列表
acceptCommonHandler(cfd, 0);
}
}
上面的函数会在有新的客户端连接上来是触发,而具体的逻辑则负责把新的连接通过 acceptCommonHandler
创建一个 redisClient
实例加入 server 进行管理,接下来看看其实现及 redisClient
的定义
typedef struct redisClient {
uint64_t id; // 客户端的唯一标示符
int fd;
redisDb *db;
int dictid;
robj *name; // 客户端的名称
sds querybuf;
size_t querybuf_peak;
int argc;
robj **argv;
struct redisCommand *cmd, *lastcmd;
int reqtype;
int multibulklen;
long bulklen;
list *reply;
unsigned long reply_bytes;
int sentlen;
time_t ctime;
time_t lastinteraction;
time_t obuf_soft_limit_reached_time;
int flags; // REDIS_SLAVE | REDIS_MONITOR | REDIS_MULTI
int authenticated;
int replstate; // repl 的状态,只有在当前进程是 slave 时才有用
int repl_put_online_on_ack;
int repldbfd;
off_t repldboff;
off_t repldbsize;
sds replpreamble;
long long reploff;
long long repl_ack_off;
long long repl_ack_time;
long long psync_initial_offset;
char replrunid[REDIS_RUN_ID_SIZE + 1];
int slave_listening_port;
int slave_capa;
multiState mstate;
int btype;
blockingState bpop;
long long woff;
list *watched_keys;
dict *pubsub_channels;
list *pubsub_patterns;
sds peerid;
int bufpos;
char buf[REDIS_REPLY_CHUNK_BYTES];
} redisClient;
static void acceptCommonHandler(int fd, int flags) {
redisClient *c;
// 创建 client 示例
if ((c = createClient(fd)) == NULL) {
redisLog(REDIS_WARNING,
"Error registering fd event for the new client : %s (fd=%d)",
strerror(errno), fd);
close(fd);
return;
}
// 限制最大链接数
if (listLength(server.clients) > server.maxclients) {
char *err = "-ERR max number of clients reached\r\n";
if (write(c->fd, err, strlen(err)) == -1) {
}
server.stat_rejected_conn++;
freeClient(c);
return;
}
server.stat_numconnections++;
c->flags |= flags;
}
redisClient *createClient(int fd) {
redisClient *c = zmalloc(sizeof(redisClient));
if (fd != -1) {
// 将客户端设置为非堵塞的
anetNonBlock(NULL, fd);
// 不延迟发送客户端的回应
anetEnableTcpNoDelay(NULL, fd);
// 设置 keepalive 以检查客户端断开的状况
if (server.tcpkeepalive)
anetKeepAlive(NULL, fd, server.tcpkeepalive);
// 注册事件,当客户端有请求过来时,调用 readQueryFromClient
if (aeCreateFileEvent(server.el, fd, AE_READABLE,
readQueryFromClient, c) == AE_ERR) {
close(fd);
zfree(c);
return NULL;
}
// 接下来是对 client 的一些常规设置, 以下会忽略一些跟现在无关的选项
selectDb(c, 0); // 设置使用第一个 db
c->id = server.next_client_id++;
c->fd = fd;
c->name = NULL;
// ...
c->querybuf = sdsempty(); // 请求的缓冲
c->argc = 0;
c->argv = NULL; // 上面两个是保存具体操作的参数
c->cmd = c->lastcmd = NULL;
c->reply = listCreate(); // 给客户端的 response
c->reply_bytes = 0;
listSetFreeMethod(c->reply, decrRefCountVoid);
listSetDupMethod(c->reply, dupClientReplyValue);
// ...
// 最后添加 client 到 server 的 clients 进行管理
if (fd != -1) listAddNodeTail(server.clients, c);
initClientMultiState(c);
}
}
创建客户端的操作基本完成,在这个时候客户端已经与服务端建立连接,随时准备客户端发送请求。
Redis 的类型定义
如何处理请求留到下一篇章进行分析,接下来看一下上面遇到的各种 redis 的内置类型定义。
#define REDIS_LRU_BITS 24
typedef struct redisObject {
unsigned type:4;
unsigned encoding:4;
unsigned lru:REDIS_LRU_BITS;
int refcount;
void *ptr;
} robj;
typedef struct redisDb {
dict *dict; /* The keyspace for thsi DB */
dict *expires; /* Timeout of keys with a timeout set */
dict *blocking_keys;
dict *ready_keys;
dict *watched_keys;
struct evictionPoolEntry *eviction_pool;
int id;
long long avg_ttl;
} redisDb;
typedef struct redisServer {
// 这里的东西太多,就不一一列出了,等要用到的时候再进行分析吧
} redisServer;