redis源码解读(十一):数据持久化

虽然 redis 是一个内存数据库,但也提供了数据持久化的解决方案。redis 的作者antirez大神对 redis 的持久化做了一个系统性的论述。在了解实现细节之前,建议先看看作者的论述。

原文:Redis persistence demystified
译文:解密Redis持久化

持久化方案

redis 提供了两种持久化方案,分别是RDB和AOF。RDB是全量数据持久化,通过遍历所有数据库中的所有键值对,全量落地为2进制文件。AOF全称为append only file,是 redis 命令的增量记录。其他一些细节就不扯了,antirez大神的文章里都有,去找吧!

RDB

触发方式

RDB有两种触发方式,首先是通过client命令手动触发,有SAVE和BGSAVE两种方式;还有一种是被动触发,通过配置一定条件,自动触发BGSAVE命令。

自动保存

先看自动保存的配置方式:在config文件中添加save配置,例如save 900 10,服务器在900s内对数据库进行了至少10次修改。redis支持多RDB配置,任意一个条件满足都会触发BGSAVE。

redisServer持有一个saveparam的数组保存自动触发配置:

struct saveparam {
    time_t seconds;
    int changes;
};

struct redisServer {
    // ...
    /* RDB persistence */
    long long dirty;                /* Changes to DB from the last save */  // db变更次数
    long long dirty_before_bgsave;  /* Used to restore dirty on failed BGSAVE */
    pid_t rdb_child_pid;            /* PID of RDB saving child */
    struct saveparam *saveparams;   /* Save points array for RDB */ // rdb save的配置
    int saveparamslen;              /* Number of saving points */
    char *rdb_filename;             /* Name of RDB file */
    int rdb_compression;            /* Use compression in RDB? */
    int rdb_checksum;               /* Use RDB checksum? */
    time_t lastsave;                /* Unix time of last successful save */ // 上一次执行save的时间点
    time_t lastbgsave_try;          /* Unix time of last attempted bgsave */
    time_t rdb_save_time_last;      /* Time used by last RDB save run. */
    time_t rdb_save_time_start;     /* Current RDB save start time. */
    int rdb_bgsave_scheduled;       /* BGSAVE when possible if true. */
    int rdb_child_type;             /* Type of save by active child. */
    int lastbgsave_status;          /* C_OK or C_ERR */
    int stop_writes_on_bgsave_err;  /* Don't allow writes if can't BGSAVE */
    int rdb_pipe_write_result_to_parent; /* RDB pipes used to return the state */
    int rdb_pipe_read_result_from_child; /* of each slave in diskless SYNC. */
    // ...
}

之前的文章中有提到 redis 的事件循环中有定期执行的时间事件,如果没有正在执行的bgsave或aof rewrite,就会对saveparams中所有的配置进行检测,是否需要进行BGSAVE。

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { // redis的定时任务 系统默认为每秒跑10次
    // ...
    /* Check if a background saving or AOF rewrite in progress terminated. */
    if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 ||
        ldbPendingChildren())
    {   // 如果有bgsave或者aof子进程
        // ...
        }
    } else {
         for (j = 0; j < server.saveparamslen; j++) {
            struct saveparam *sp = server.saveparams+j;
            // 校验是否满足rdb的saveparam的触发条件
            if (server.dirty >= sp->changes &&
                server.unixtime-server.lastsave > sp->seconds &&
                (server.unixtime-server.lastbgsave_try >
                 CONFIG_BGSAVE_RETRY_DELAY ||
                 server.lastbgsave_status == C_OK))
            {   
                serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...",
                    sp->changes, (int)sp->seconds);
                rdbSaveBackground(server.rdb_filename);  // 调用BGSAVE
                break;
            }
         }
         // ...
    }
    // ...
}

rdbSaveBackground就是BGSAVE命令调用的函数,该函数会fork一个子进程执行SAVE操作,使服务主进程不被阻塞。并且由于linux copy-on-write的特性,正常情况下不会出现内存使用翻倍的情况。

int rdbSaveBackground(char *filename) { // 子进程保存 非阻塞
    pid_t childpid;
    long long start;

    if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR;

    server.dirty_before_bgsave = server.dirty;
    server.lastbgsave_try = time(NULL);

    start = ustime();
    if ((childpid = fork()) == 0) {
        int retval;

        /* Child */
        closeListeningSockets(0);   // 关闭子进程的监听
        redisSetProcTitle("redis-rdb-bgsave");
        retval = rdbSave(filename); // 调用rdbsave保存rdb文件
        if (retval == C_OK) {
            size_t private_dirty = zmalloc_get_private_dirty();

            if (private_dirty) {
                serverLog(LL_NOTICE,
                    "RDB: %zu MB of memory used by copy-on-write",
                    private_dirty/(1024*1024));
            }
        }
        exitFromChild((retval == C_OK) ? 0 : 1);    // 调用_exit退出
    } else { // 主进程进行BGSAVE状态记录
        /* Parent */
        server.stat_fork_time = ustime()-start;
        server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */   // fork速度
        latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000);
        if (childpid == -1) {   // fork失败
            server.lastbgsave_status = C_ERR;
            serverLog(LL_WARNING,"Can't save in background: fork: %s",
                strerror(errno));
            return C_ERR;
        }
        serverLog(LL_NOTICE,"Background saving started by pid %d",childpid);
        server.rdb_save_time_start = time(NULL);
        server.rdb_child_pid = childpid;
        server.rdb_child_type = RDB_CHILD_TYPE_DISK;
        updateDictResizePolicy();
        return C_OK;
    }
    return C_OK; /* unreached */
}

有个小细节,在退出子进程的时候,redis 采用的是_exit而不是exit,因为父进程可能对文件对象进行操作,exit会对清除IO缓存,可能会父进程造成影响。

save

殊途同归,不论是自动触发还是SAVE和BGSAVE,最终都会走到rdbSave函数:

int rdbSave(char *filename) {   // 将db中的数据保存到rdb文件中
    char tmpfile[256];
    char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */
    FILE *fp;
    rio rdb;
    int error = 0;

    snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());    // 创建一个temp文件
    fp = fopen(tmpfile,"w");
    if (!fp) {
        char *cwdp = getcwd(cwd,MAXPATHLEN);
        serverLog(LL_WARNING,
            "Failed opening the RDB file %s (in server root dir %s) "
            "for saving: %s",
            filename,
            cwdp ? cwdp : "unknown",
            strerror(errno));
        return C_ERR;
    }

    rioInitWithFile(&rdb,fp);   // 将rio初始化为文件类型
    if (rdbSaveRio(&rdb,&error) == C_ERR) { // 保存rdb文件
        errno = error;
        goto werr;
    }

    /* Make sure data will not remain on the OS's output buffers */
    if (fflush(fp) == EOF) goto werr;
    if (fsync(fileno(fp)) == -1) goto werr;
    if (fclose(fp) == EOF) goto werr;

    if (rename(tmpfile,filename) == -1) {   // 原子操作 重命名
        char *cwdp = getcwd(cwd,MAXPATHLEN);
        serverLog(LL_WARNING,
            "Error moving temp DB file %s on the final "
            "destination %s (in server root dir %s): %s",
            tmpfile,
            filename,
            cwdp ? cwdp : "unknown",
            strerror(errno));
        unlink(tmpfile);
        return C_ERR;
    }

    serverLog(LL_NOTICE,"DB saved on disk");
    server.dirty = 0;
    server.lastsave = time(NULL);
    server.lastbgsave_status = C_OK;
    return C_OK;

werr:
    serverLog(LL_WARNING,"Write error saving DB on disk: %s", strerror(errno));
    fclose(fp);
    unlink(tmpfile);
    return C_ERR;
}

上述代码大部分为流程分支处理,要点有二:

  1. 通过先创建临时文件,写入后再原子性的rename,确保rdb文件都是完整可用的
  2. 出现了一个叫做rio的数据类型,并且被初始化为file类型

rio

rioredis 的io封装,所有socket、file、buffer的io都封装在rio中,rdb就是将rio初始化为file类型,进行文件的读写操作。除了rioredis 还有对后台io操作封装的bio

struct _rio {   // rio结构体
    size_t (*read)(struct _rio *, void *buf, size_t len); // 读方法
    size_t (*write)(struct _rio *, const void *buf, size_t len);
    off_t (*tell)(struct _rio *);
    int (*flush)(struct _rio *);
    /* The update_cksum method if not NULL is used to compute the checksum of
     * all the data that was read or written so far. The method should be
     * designed so that can be called with the current checksum, and the buf
     * and len fields pointing to the new block of data to add to the checksum
     * computation. */
    void (*update_cksum)(struct _rio *, const void *buf, size_t len);

    /* The current checksum */
    uint64_t cksum;

    /* number of bytes read or written */
    size_t processed_bytes; // 读写的累积bytes

    /* maximum single read or write chunk size */
    size_t max_processing_chunk;    // 一次io的最大长度

    /* Backend-specific vars. */
    union {
        /* In-memory buffer target. */
        struct {
            sds ptr;
            off_t pos;
        } buffer;
        /* Stdio file pointer target. */
        struct {
            FILE *fp;
            off_t buffered; /* Bytes written since last fsync. */
            off_t autosync; /* fsync after 'autosync' bytes written. */
        } file;
        /* Multiple FDs target (used to write to N sockets). */
        struct {
            int *fds;       /* File descriptors. */
            int *state;     /* Error state of each fd. 0 (if ok) or errno. */
            int numfds;
            off_t pos;
            sds buf;
        } fdset;
    } io;
};

typedef struct _rio rio;

file类型的rio为例:
首先实例化一个rio对象会调用rioInitWithFile进行初始化:

void rioInitWithFile(rio *r, FILE *fp) {    // 初始化rioFileIO
    *r = rioFileIO;
    r->io.file.fp = fp;
    r->io.file.buffered = 0;
    r->io.file.autosync = 0;
}

rioFileIO是一个定义了文件IO的结构体:

static const rio rioFileIO = {
    rioFileRead,
    rioFileWrite,
    rioFileTell,
    rioFileFlush,
    NULL,           /* update_checksum */
    0,              /* current checksum */
    0,              /* bytes read or written */
    0,              /* read/write chunk size */
    { { NULL, 0 } } /* union for io-specific vars */
};

AOF