Redis 内容篇

[TOC]

在经过了之前对 Redis 的几个基础类型的分析后,我们开始查看其具体实现,首先嘛,任何程序跟应用总是需要启动的,所以我们从其 redis.h/redis.c 开始。

#include "ae.h"        /* Event driven programming library */
#include "sds.h"       /* Dynamic safe strings */
#include "dict.h"      /* Hash tables */
#include "adlist.h"    /* Linked lists*/
#include "zmalloc.h"   /* total memory usage aware version of malloc/free */
#include "anet.h"      /* Networkiing the easy way */
#include "ziplist.h"   /* Compact list date structure */
#include "intset.h"    /* Compact integer set structure */
#include "version.h"   /* Version macro */
#include "util.h"      /* Misc function useful in many place*/
#include "latency.h"   /* Latency monitor API */
#include "sparkline.h" /* ASII graphs API */

除去我们已经分析过的基础类型,其他的都是 Redis 相关逻辑实现,而注释也写的非常清楚

  • ae.h 是事件驱动 lib, 所有的 redis 操作应当都由此 lib 进行管理、调用
  • anet.h 网络库的封装
  • zmalloc.h 内存管理,方便 redis 对内存的使用量进行监控
  • version.h 版本信息
  • latency.h 延时管理器
  • 其他
    • util.h 使用函数
    • sparkline.h 程序启动的图形输出

由上面的基本信息可知,核心流程基本是由 anet 来管理网络连接,在接收到客户端的请求后,将请求内容转发给 ae,ae 则根据请求的类型,调用相关的处理函数。

启动服务器

接下来看看 Redis 是如何启动的, 以下是经过简单整理的启动函数

int main (int argc, char **argv) {
    struct timeval tv;
    
    setlocale(LC_COLLATE, "");
    zmalloc_enable_thread_safeness();
    zmalloc_set_oom_handler(redisOutOfMemoryHandler);
    srand(time(NULL)^getpid());
    gettimeofday(&tv, NULL);
    dictSetHashFunctionSeed(tv.tv_sec ^ tv.tv_usec ^ getpid());
    server.sentinel_mode = checkForSentinelMode(argc, argv);
    initSErverConfig();
    
    if (server.sentinel_mode) {
        initSentinelConfig();
        initSentinel();
    }
    
    // 命令行处理...
    // 略过
    if (server.daemonize) daemonize();
    // 初始化服务
    initServer();
    if (server.daemonize) createPidFile();    
}

首先看看服务是怎么初始化的

void initServer(void) {
    int j;
    
    gignal(SIGHUP, SIG_IGN);
    gignal(SIGPIPE, SIG_IGN);
    setupSingalHandlers();
    
    if (server.syslog_enabled) {
        openlog(server.syslog_ident, LOG_PID | LOG_NDELAY | LOG_NOWAIT, server.syslog_facility);
    }
    
    // 当前进程 ID
    server.pid = getpid();
    // 当前处理的客户端
    server.current_client = NULL;
    // 客户端列表
    server.clients = listCreate();
    // 待关闭的客户端
    server.clients_to_close = listCreate();
    // 从服务器
    server.slaves = listCreate();
    // 监控器
    server.monitors = listCreate();
    server.slaveseldb = -1;
    // 非堵塞客户端
    server.unblocked_clients = listCreate();
    server.ready_keys = listCreate();
    server.clients_waiting_acks = listCreate();
    server.get_ack_from_slaves = 0;
    server.clients_paused = 0;
    
    
    // 创建共享变量,如命令的表示,静态字符串等
    createShareObjects();
    // 调整文件描述符限制, 实现是如果支持设置最大文件描述符,则从最大的值开始尝试,直到设置成功
    adjustOpenFilesLimit();
    
    // redis 的核心:事件驱动
    server.el = aeCreateEventLoop(server.maxclients + REDIS_EVENTLOOP_FDSET_INCR);
    
    // 保存具体数据的数据库
    server.db = zmalloc(sizeof(redisDb) * server.dbnum);
    
    // 开始监听指定的端口
    if (server.port != 0 && 
        listenToPort(server.port, server.ipfd, &server.ipfd_count) == REDIS_ERR)
        exit(1);
        
    if (server.unixsocket != NULL) {
        // ... unix 域的 socket
    }
    
    // 如果没监听任何端口,则直接退出
    if (server.ipfd_count == 0 && server.sofd < 0) {
        redisLog(REDIS_WARNING,  "Configured to not listen anywhere, exiting.");
        exit(1);
    }
    
    // 初始化数据库
    // 这里初始化各个 dict 的关键就是定义的各种 dictType, 里面描述了对应的 key value 的复制、删除等实现。说完这部分再看看大致的实现
    for (j = 0; j < server.dbnum; j++) {
        server.db[j].dict = dictCreate(&dbDictType, NULL);
        server.db[j].expires = dictCreate(&keyptrDictType, NULL);
        server.db[j].blocking_keys = dictCreate(&keyListDictType, NULL);
        server.db[j].ready_keys = dictCreate(&setDictType, NULL);
        server.db[j].watched_keys = dictCreate(&keylistDictType, NULL);
        server.db[j].eviction_pool = evictionPoolAlloc();
        server.db[j].id = j;
        server.db[j].avg_ttl = 0;
    }
    
    // 用于 pubsub 的 channels,
    // 然后设置了对应 list 的处理函数
    server.pubsub_channels = dictCreate(&keylistDictType, NULL);
    server.pubsub_patterns = listCreate();
    listSetFreeMethod(server.pubsub_patterns, freePubsubPattern);
    listSetMatchMethod(server.pubsub_patterns, listMatchPubsubPattern);
    
    // 然后是其他的配置信息
    // 已经处理过 Cron 的次数
    server.cronloops = 0;
    // rdb 跟 aof 是 Redis 的两种静态化机制
    server.rdb_child_pid = -1;
    server.aof_child_pid = -1;
    server.rdb_child_type = REDIS_RDB_CHILD_TYPE_NONE;
    aofRewriteBufferReset();
    server.aof_buf = sdsempty();
    server.lastsave = time(NULL);
    server.lastbgsave_try = 0;
    server.rdb_save_time_last = -1;
    server.rdb_save_time_start = -1;
    server.dirty = 0;
    resetServerStata();
    
    // redis 的状态信息,包括启动时间等
    server.stat_starttime = time(NULL);
    server.stat_peak_memory = 0;
    server.resident_set_size = 0;
    server.lastbgsave_status = REDIS_OK;
    server.aof_last_write_status = REDIS_OK;
    server.aof_last_write_errno = 0;
    server.repl_good_slaves_count = 0;
    updateCachedTime();
    
    // 接下来启动定时器,定时触发 serverCron 操作
    if (aeCreateTimeEvent(server.el, 1 server.cron, NULL, NLL) == AE_ERR) {
        redisPanic("Can't craete the serverCron time event.");
        exit(0);
    }
    
    // 然后绑定对应 fd 的网络事件,在 fd 可读时触发 acceptTcpHandler 函数
    for (j = 0; server.ipfd_count; j++) {
        if (aeCreateFileEvent(server.el, server.ipif[j], AE_READABLE, acceptTcpHandler, NULL) == AE_ERR) {
            redisPainc("Unrecoverable error create server.ipfd file event");
        }
    }
    
    // 这里绑定 unix 域的 fd 可读时触发 acceptUnixHandler 函数
    if (server.sofd > 0 && aeCreateFileEvent(server.el, 
        server.sofd, AE_READABLE, acceptUnixHandler, NULL) == AE_ERR)
        redisPainc("Unrecoverable error create server.sofd file event.");
     
    // 如果启动了 aof 静态化机制,则打开 aof 对应的文件
    if (server.aof_sate == REDIS_AOF_ON) {
        server.aof_fd = open(server.aof_filename,
            O_WRONLY | O_APPEND | O_CREAT, 0644);
        if (server.aof_fd == -1) {
            redisLog(REDIS_WARNING, "Can't open the append-only file: %s", strerrno(errno));
            exit(1);
        }
    }
    
    // 在 32 位的机器上,限制最多使用的内存量为 3GB
    if (server.arch_bits == 32 && server.maxmemory == 0) {
        redisLog(REDIS_WARNING, "Warning: 32bit instance detected but no memory limit set, Stting 3 GB maxmemory limit with 'noeviction' policy now.");
        server.maxmemory = 3072LL * (1024 * 1024);
        server.maxmemory_policy = REDIS_MAXMEMORY_NO_EVICTION;
    }
    
    // 如果启动了主从机制,则初始化主从信息
    if (server.cluster_enabled) clusterInit();
    // 初始化复制机制
    replicationScriptCacheInit();
    // 初始化 Lua 脚本环境
    scriptingInit();
    // 初始化延迟日志
    slowLogInit();
    // 初始化 latency 列表 
    latencyMonitorInit();
    // 初始化 Backgrond IO,主要用于落地数据
    bioInit();
}

在上面的初始化过程中,我们把当前应该关注的跟可以忽略的都写出来了,其实是为了留个印象,以后遇到的时候可以知道是用来干啥的。 除去 ClusterAOFRDB 等,剩余的基本都是 Redis 基础服务的核心,如初始化时通过 aeCreateTimeEvent 注册的的 serverCron 事件,以及注册到监听端口的处理函数 acceptTcpHandler, 暂时可以把它理解为一切的开始:因为 Redis 的工作方式就是等待一条命令,处理命令然后返回结果。

建立连接

在这里我们先不管 ae 的实现,直接查看 acceptTcpHandler,这里是作为 Redis 接收客户端链接的入口:

void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
    char cip[REDIS_IP_STR_LEN];
    REDIS_NOTUSED(el);
    REDIS_NOTUSED(mask);
    REDIS_NOTUSED(privdata);
    
    while (max--) {
        // 以非堵塞的方式接受 tcp 连接,fd 是监听的端口 fd, cip 跟 cport 用来保存客户端对应的 fd 跟端口
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
        if (cfd == ANET_ERR) {
            if (errno != EWOULDBLOCK)
                redisLog(REDIS_WARNING,
                    "Accepting client connection: %s", server.neterr);
            return;
        }
        
        redisLog(REDIS_VERBOSE, "Accepted %s:%d, cip, cport);
        // 接收连接成功,将其加入 server 的客户端列表
        acceptCommonHandler(cfd, 0);
    }
}

上面的函数会在有新的客户端连接上来是触发,而具体的逻辑则负责把新的连接通过 acceptCommonHandler 创建一个 redisClient 实例加入 server 进行管理,接下来看看其实现及 redisClient 的定义


typedef struct redisClient {
    uint64_t id;  // 客户端的唯一标示符
    int fd;
    redisDb *db;
    int dictid;
    robj *name;   // 客户端的名称
    sds querybuf;
    size_t querybuf_peak;
    int argc;
    robj **argv;
    struct redisCommand *cmd, *lastcmd;
    int reqtype;
    int multibulklen;
    long bulklen;
    list *reply;
    unsigned long reply_bytes;
    int sentlen;
    
    time_t ctime;
    time_t lastinteraction;
    time_t obuf_soft_limit_reached_time;
    int flags;    // REDIS_SLAVE | REDIS_MONITOR | REDIS_MULTI
    int authenticated;
    
    int replstate;  // repl 的状态,只有在当前进程是 slave 时才有用
    int repl_put_online_on_ack;
    int repldbfd;
    off_t repldboff;
    off_t repldbsize;
    sds replpreamble;
    long long reploff;
    long long repl_ack_off;
    long long repl_ack_time;
    long long psync_initial_offset;
    
    char replrunid[REDIS_RUN_ID_SIZE + 1];
    int slave_listening_port;
    int slave_capa;
    multiState mstate;
    int btype;
    blockingState bpop;
    long long woff;
    list *watched_keys;
    dict *pubsub_channels;
    list *pubsub_patterns;
    sds peerid;
    
    int bufpos;
    char buf[REDIS_REPLY_CHUNK_BYTES];
} redisClient;

static void acceptCommonHandler(int fd, int flags) {
    redisClient *c;
    // 创建 client 示例
    if ((c = createClient(fd)) == NULL) {
        redisLog(REDIS_WARNING,
            "Error registering fd event for the new client : %s (fd=%d)",
            strerror(errno), fd);
        close(fd);
        return;
    }

    // 限制最大链接数    
    if (listLength(server.clients) > server.maxclients) {
        char *err = "-ERR max number of clients reached\r\n";
        
        if (write(c->fd, err, strlen(err)) == -1) {
        }
        server.stat_rejected_conn++;
        freeClient(c);
        return;
    }
    server.stat_numconnections++;
    c->flags |= flags;
}

redisClient *createClient(int fd) {
    redisClient *c = zmalloc(sizeof(redisClient));
    
    if (fd != -1) {
        // 将客户端设置为非堵塞的
        anetNonBlock(NULL, fd);
        // 不延迟发送客户端的回应
        anetEnableTcpNoDelay(NULL, fd);
        // 设置 keepalive 以检查客户端断开的状况
        if (server.tcpkeepalive)
            anetKeepAlive(NULL, fd, server.tcpkeepalive);
            
        // 注册事件,当客户端有请求过来时,调用 readQueryFromClient
        if (aeCreateFileEvent(server.el, fd, AE_READABLE,
                readQueryFromClient, c) == AE_ERR) {
            close(fd);
            zfree(c);
            return NULL;       
        }
        
        // 接下来是对 client 的一些常规设置, 以下会忽略一些跟现在无关的选项
        selectDb(c, 0); // 设置使用第一个 db
        c->id = server.next_client_id++;
        c->fd = fd;
        c->name = NULL;
        // ...
        c->querybuf = sdsempty(); // 请求的缓冲
        c->argc = 0;
        c->argv = NULL; // 上面两个是保存具体操作的参数
        c->cmd = c->lastcmd = NULL;
        c->reply = listCreate(); // 给客户端的 response
        c->reply_bytes = 0;
        listSetFreeMethod(c->reply, decrRefCountVoid);
        listSetDupMethod(c->reply, dupClientReplyValue);
        // ...
        // 最后添加 client 到 server 的 clients 进行管理
        if (fd != -1) listAddNodeTail(server.clients, c);
        initClientMultiState(c);
        
    }
}

创建客户端的操作基本完成,在这个时候客户端已经与服务端建立连接,随时准备客户端发送请求。

Redis 的类型定义

如何处理请求留到下一篇章进行分析,接下来看一下上面遇到的各种 redis 的内置类型定义。

#define REDIS_LRU_BITS 24

typedef struct redisObject {
    unsigned type:4;
    unsigned encoding:4;
    unsigned lru:REDIS_LRU_BITS;
    int refcount;
    void *ptr;
} robj;

typedef struct redisDb {
    dict *dict;         /* The keyspace for thsi DB */
    dict *expires;      /* Timeout of keys with a timeout set */
    dict *blocking_keys;
    dict *ready_keys;
    dict *watched_keys;
    struct evictionPoolEntry *eviction_pool;
    int id;
    long long avg_ttl;
} redisDb;

typedef struct redisServer {
    // 这里的东西太多,就不一一列出了,等要用到的时候再进行分析吧
} redisServer;