气吉美食网
您的当前位置:首页redis源代码分析19–主从复制

redis源代码分析19–主从复制

来源:气吉美食网


先说下Redis主从复制的特点。 官方文档ReplicationHowto中提到以下特点: 1. 一个master支持多个slave 2. slave可以接受其他slave的连接,作为其他slave的master,从而形成一个master-slave的多级结构 3. 复制在master端是非阻塞的,也就是master在向client

先说下Redis主从复制的特点。

官方文档ReplicationHowto中提到以下特点:
1. 一个master支持多个slave
2. slave可以接受其他slave的连接,作为其他slave的master,从而形成一个master-slave的多级结构
3. 复制在master端是非阻塞的,也就是master在向client复制时可处理其他client的命令,而slave在第一次同步时是阻塞的
4. 复制被利用来提供可扩展性,比如可以将slave端用作数据冗余,也可以将耗时的命令(比如sort)发往某些slave从而避免master的阻塞,另外也可以用slave做持久化,这只需要将master的配置文件中的save指令注释掉。

client可以在一开始时作为slave连接master,也可以在运行后发布sync命令,从而跟master建立主从关系。

接下来我们分别从slave和master的视角概述下redis的主从复制的运行机制。

如果redis作为slave运行,则全局变量server.replstate的状态有REDIS_REPL_NONE(不处于复制状态)、 REDIS_REPL_CONNECT(需要跟master建立连接)、REDIS_REPL_CONNECTED(已跟master建立连接)三种。在读入slaveof配置或者发布slaveof命令后,server.replstate取值为REDIS_REPL_CONNECT,然后在syncWithMaster跟master执行第一次同步后,取值变为REDIS_REPL_CONNECTED。

如果redis作为master运行,则对应某个客户端连接的变量slave.replstate的状态有REDIS_REPL_WAIT_BGSAVE_START(等待bgsave运行)、REDIS_REPL_WAIT_BGSAVE_END(bgsave已dump db,该bulk传输了)、REDIS_REPL_SEND_BULK(正在bulk传输)、REDIS_REPL_ONLINE(已完成开始的bulk传输,以后只需发送更新了)。对于slave客户端(发布sync命令),一开始slave.replstate都处于REDIS_REPL_WAIT_BGSAVE_START状态(后面详解syncCommand函数),然后在后台dump db后(backgroundSaveDoneHandler函数),处于REDIS_REPL_WAIT_BGSAVE_END 状态,然后updateSlavesWaitingBgsave会将状态置为REDIS_REPL_SEND_BULK,并设置write事件的函数 sendBulkToSlave,在sendBulkToSlave运行后,状态就变为REDIS_REPL_ONLINE了,此后master会一直调用replicationFeedSlaves给处于REDIS_REPL_ONLINE状态的slave发送新命令。

我们先看处于master端的redis会执行的代码。

slave端都是通过发布sync命令来跟master同步的,sync命令的处理函数syncCommand如下所示。

该函数中的注释足够明了。如果slave的client设置了REDIS_SLAVE标志,说明master已用syncCommand处理了该 slave。如果master还有对这个client的reply没有发送,则返回出错信息。此后若server.bgsavechildpid != -1且有slave处于REDIS_REPL_WAIT_BGSAVE_END状态,则说明dump db的后台进程刚结束,此时新的slave可直接用保存的rdb进行bulk传输(注意复制reply参数,因为master是非阻塞的,此时可能执行了一些命令,call函数会调用replicationFeedSlaves函数将命令参数保存到slave的reply参数中)。如果没有slave处于REDIS_REPL_WAIT_BGSAVE_END状态,但server.bgsavechildpid != -1,则说明bgsave后台进程没有运行完,需要等待其结束(bgsave后台进程结束后会处理等待的slave)。如果server.bgsavechildpid 等于 -1,则需要启动一个后台进程来dump db了。最后将当前client加到master的slaves链表中。

static void syncCommand(redisClient *c) {
 /* ignore SYNC if aleady slave or in monitor mode */
 if (c->flags & REDIS_SLAVE) return;
 /* SYNC can't be issued when the server has pending data to send to
 * the client about already issued commands. We need a fresh reply
 * buffer registering the differences between the BGSAVE and the current
 * dataset, so that we can copy to other slaves if needed. */
 if (listLength(c->reply) != 0) {
 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
 return;
 }
 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
 /* Here we need to check if there is a background saving operation
 * in progress, or if it is required to start one */
 if (server.bgsavechildpid != -1) {
 /* Ok a background save is in progress. Let's check if it is a good
 * one for replication, i.e. if there is another slave that is
 * registering differences since the server forked to save */
 redisClient *slave;
 listNode *ln;
 listIter li;
 listRewind(server.slaves,&li);
 while((ln = listNext(&li))) {
 slave = ln->value;
 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
 }
 if (ln) {
 /* Perfect, the server is already registering differences for
 * another slave. Set the right state, and copy the buffer. */
 listRelease(c->reply);
 c->reply = listDup(slave->reply);
 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
 } else {
 /* No way, we need to wait for the next BGSAVE in order to
 * register differences */
 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
 }
 } else {
 /* Ok we don't have a BGSAVE in progress, let's start one */
 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
 return;
 }
 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
 }
 c->repldbfd = -1;
 c->flags |= REDIS_SLAVE;
 c->slaveseldb = 0;
 listAddNodeTail(server.slaves,c);
 return;
}

此后slave无论处于REDIS_REPL_WAIT_BGSAVE_START还是REDIS_REPL_WAIT_BGSAVE_END,都只能等 dump db的后台进程运行结束后才会被处理。该进程结束后会执行backgroundSaveDoneHandler函数,而该函数调用 updateSlavesWaitingBgsave来处理slaves。

updateSlavesWaitingBgsave和syncCommand一样,涉及到slave的几个状态变换。对于等待dump db的slave,master都会将其放入server.slaves 链表中。此时,若slave->replstate == REDIS_REPL_WAIT_BGSAVE_START,说明当前dump db不是该slave需要的,redis需要重新启动后台进程来dump db。若slave->replstate == REDIS_REPL_WAIT_BGSAVE_END,则说明当前dump db正是该slave所需要的,此时设置slave的write事件的处理函数sendBulkToSlave。

static void updateSlavesWaitingBgsave(int bgsaveerr) {
 listNode *ln;
 int startbgsave = 0;
 listIter li;
 listRewind(server.slaves,&li);
 while((ln = listNext(&li))) {
 redisClient *slave = ln->value;
 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
 startbgsave = 1;
 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
 struct redis_stat buf;
 if (bgsaveerr != REDIS_OK) {
 freeClient(slave);
 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
 continue;
 }
 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
 redis_fstat(slave->repldbfd,&buf) == -1) {
 freeClient(slave);
 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
 continue;
 }
 slave->repldboff = 0;
 slave->repldbsize = buf.st_size;
 slave->replstate = REDIS_REPL_SEND_BULK;
 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
 freeClient(slave);
 continue;
 }
 }
 }
 if (startbgsave) {
 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
 listIter li;
 listRewind(server.slaves,&li);
 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
 while((ln = listNext(&li))) {
 redisClient *slave = ln->value;
 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
 freeClient(slave);
 }
 }
 }
}

sendBulkToSlave 的逻辑不复杂。它根据slave->repldbfd指向的db,先从dump后的rdb文件中读入db数据,然后发送。发送完后会删除write 事件,设置slave->replstate状态为REDIS_REPL_ONLINE,此后master就会在收到命令后调用call函数,然后使用replicationFeedSlaves同步更新该slave了。replicationFeedSlaves也是遍历slave链表,对处于REDIS_REPL_ONLINE状态的slave,发送当前命令及其参数。

 static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
 redisClient *slave = privdata;
 REDIS_NOTUSED(el);
 REDIS_NOTUSED(mask);
 char buf[REDIS_IOBUF_LEN];
 ssize_t nwritten, buflen;
 if (slave->repldboff == 0) {
 /* Write the bulk write count before to transfer the DB. In theory here
 * we don't know how much room there is in the output buffer of the
 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
 * operations) will never be smaller than the few bytes we need. */
 sds bulkcount;
 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
 slave->repldbsize);
 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
 {
 sdsfree(bulkcount);
 freeClient(slave);
 return;
 }
 sdsfree(bulkcount);
 }
 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
 if (buflen <= 0) {
 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
 (buflen == 0) ? "premature EOF" : strerror(errno));
 freeClient(slave);
 return;
 }
 if ((nwritten = write(fd,buf,buflen)) == -1) {
 redisLog(REDIS_VERBOSE,"Write error sending DB to slave: %s",
 strerror(errno));
 freeClient(slave);
 return;
 }
 slave->repldboff += nwritten;
 if (slave->repldboff == slave->repldbsize) {
 close(slave->repldbfd);
 slave->repldbfd = -1;
 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
 slave->replstate = REDIS_REPL_ONLINE;
 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
 sendReplyToClient, slave) == AE_ERR) {
 freeClient(slave);
 return;
 }
 addReplySds(slave,sdsempty());
 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
 }
}

接下来我们看看redis作为slave是如何运行的。

redis 作为slave(当然也可以使用普通的client作为slave端,这样则跟具体client的实现有关了)时,需要在配置文件中指明master的位置,在loadServerConfig读取配置参数时,会将server.replstate设置为REDIS_REPL_CONNECT状态。处于此状态的redis需要运行到serverCron后才能使用syncWithMaster来和master进行初始同步。查看syncWithMaster的代码可知,其实也向master发布sync命令来建立主从关系的,另外,该函数接收、发送数据时使用的是syncRead、syncWrite函数,而这些函数是阻塞的,因此,redis作为slave运行时,建立最初的主从关系时也是阻塞的。

 /* Check if we should connect to a MASTER */
 if (server.replstate == REDIS_REPL_CONNECT && !(loops % 10)) {
 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
 if (syncWithMaster() == REDIS_OK) {
 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
 if (server.appendonly) rewriteAppendOnlyFileBackground();
 }
 }

另外跟主从复制有关的一个命令就是slaveof命令。此命令是redis主从状态的转换函数,通过前面的分析可知,这只需要更改几个状态即可。

static void slaveofCommand(redisClient *c) {
 if (!strcasecmp(c->argv[1]->ptr,"no") &&
 !strcasecmp(c->argv[2]->ptr,"one")) {
 if (server.masterhost) {
 sdsfree(server.masterhost);
 server.masterhost = NULL;
 if (server.master) freeClient(server.master);
 server.replstate = REDIS_REPL_NONE;
 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
 }
 } else {
 sdsfree(server.masterhost);
 server.masterhost = sdsdup(c->argv[1]->ptr);
 server.masterport = atoi(c->argv[2]->ptr);
 if (server.master) freeClient(server.master);
 server.replstate = REDIS_REPL_CONNECT;
 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
 server.masterhost, server.masterport);
 }
 addReply(c,shared.ok);
}
显示全文