通信
初始化完成之后,sentinel会主动和master、slave进行通信,获取他们的信息。
获取主服务器信息
首先,sentinel会和master建立两个连接,分别是命令连接和订阅连接(分别保存在sentinelRedisInstance的cc和pc字段中)。
代码如下 | 复制代码 |
void sentinelHandleRedisInstance(sentinelRedisInstance *ri) { sentinelReconnectInstance(ri); ... } #define SENTINEL_HELLO_CHANNEL "__sentinel__:hello" |
命令连接用于后续获取master的信息(包括slave的信息),订阅(sentinel:hello频道)用于获取master的掉线状态等消息的推送。
连接完成之后,sentinel会定时发送消息:
代码如下 | 复制代码 |
void sentinelSendPeriodicCommands(sentinelRedisInstance *ri) { mstime_t now = mstime(); mstime_t info_period, ping_period; int retval; /* Return ASAP if we have already a PING or INFO already pending, or * in the case the instance is not properly connected. */ if (ri->flags & SRI_DISCONNECTED) return; /* For INFO, PING, PUBLISH that are not critical commands to send we * also have a limit of SENTINEL_MAX_PENDING_COMMANDS. We don't * want to use a lot of memory just because a link is not working * properly (note that anyway there is a redundant protection about this, * that is, the link will be disconnected and reconnected if a long * timeout condition is detected. */ if (ri->pending_commands >= SENTINEL_MAX_PENDING_COMMANDS) return; /* If this is a slave of a master in O_DOWN condition we start sending * it INFO every second, instead of the usual SENTINEL_INFO_PERIOD * period. In this state we want to closely monitor slaves in case they * are turned into masters by another Sentinel, or by the sysadmin. */ // INFO命令默认发送间隔为 10s #define SENTINEL_INFO_PERIOD 10000 // 如果为已经宕机的master的slave,改1s if ((ri->flags & SRI_SLAVE) && (ri->master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS))) { info_period = 1000; } else { info_period = SENTINEL_INFO_PERIOD; } /* We ping instances every time the last received pong is older than * the configured 'down-after-milliseconds' time, but every second * anyway if 'down-after-milliseconds' is greater than 1 second. */ // ping命令默认间隔1s #define SENTINEL_PING_PERIOD 1000 ping_period = ri->down_after_period; if (ping_period > SENTINEL_PING_PERIOD) ping_period = SENTINEL_PING_PERIOD; if ((ri->flags & SRI_SENTINEL) == 0 && (ri->info_refresh == 0 || (now - ri->info_refresh) > info_period)) { /* Send INFO to masters and slaves, not sentinels. */ retval = redisAsyncCommand(ri->cc, sentinelInfoReplyCallback, NULL, "INFO"); if (retval == REDIS_OK) ri->pending_commands++; } else if ((now - ri->last_pong_time) > ping_period) { /* Send PING to all the three kinds of instances. */ sentinelSendPing(ri); } else if ((now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD) { /* PUBLISH hello messages to all the three kinds of instances. */ sentinelSendHello(ri); } } |
处理INFO消息的返回:
代码如下 | 复制代码 |
void sentinelInfoReplyCallback(redisAsyncContext *c, void *reply, void *privdata) { sentinelRedisInstance *ri = c->data; redisReply *r; REDIS_NOTUSED(privdata); if (ri) ri->pending_commands--; if (!reply || !ri) return; r = reply; if (r->type == REDIS_REPLY_STRING) { sentinelRefreshInstanceInfo(ri,r->str); } } |
这里针对向master发送的INFO,sentinel会:
1. 获取master run_id记录,检查master的role,更新自己维护的master列表
2. 通过复制字段,获取master对应的slave列表,更新自己维护的slave列表
获取slave信息
同样,sentinel也会以同样的频率向slave发送INFO命令,并且提取以下参数:
* run_id
* role
* master的host和port
* 主从服务器的连接状态(master_link_status)
* slave优先级(slave_priority)
* slave复制偏移量(slave_repl_offset)
并更新自己维护的sentinelRedisInstance结构。
发送和接受订阅信息
sentinel每秒通过命令连接向所有master和slave发送信息。
代码如下 | 复制代码 |
int sentinelSendHello(sentinelRedisInstance *ri) { char ip[REDIS_IP_STR_LEN]; char payload[REDIS_IP_STR_LEN+1024]; int retval; char *announce_ip; int announce_port; sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ? ri : ri->master; sentinelAddr *master_addr = sentinelGetCurrentMasterAddress(master); if (ri->flags & SRI_DISCONNECTED) return REDIS_ERR; /* Use the specified announce address if specified, otherwise try to * obtain our own IP address. */ if (sentinel.announce_ip) { announce_ip = sentinel.announce_ip; } else { if (anetSockName(ri->cc->c.fd,ip,sizeof(ip),NULL) == -1) return REDIS_ERR; announce_ip = ip; } announce_port = sentinel.announce_port ? sentinel.announce_port : server.port; /* Format and send the Hello message. */ snprintf(payload,sizeof(payload), "%s,%d,%s,%llu," /* Info about this sentinel. */ "%s,%s,%d,%llu", /* Info about current master. */ announce_ip, announce_port, server.runid, (unsigned long long) sentinel.current_epoch, /* --- */ master->name,master_addr->ip,master_addr->port, (unsigned long long) master->config_epoch); retval = redisAsyncCommand(ri->cc, sentinelPublishReplyCallback, NULL, "PUBLISH %s %s", SENTINEL_HELLO_CHANNEL,payload); if (retval != REDIS_OK) return REDIS_ERR; ri->pending_commands++; return REDIS_OK; } |
发送内容包括:
* sentinel ip(announce_ip)
* sentinel端口(announce_port)
* sentinel运行id(server.runid)
* sentinel配置纪元(sentinel.current_epoch)
* master名称(master->name)
* master IP(master_addr->ip)
* master端口(master_addr->port)
* master纪元(master->config_epoch)
同时,所有连接到这个master上的sentinel都会收到这个消息,然后做出回应:
* 更新sentinels字典
* 创建连接其他sentinel的命令连接