RocketMQ broker停写功能源码分析
这里是weihubeats,觉得文章不错可以关注公众号小奏技术,文章首发。拒绝营销号,拒绝标题党
背景
在我们要平滑升级broker的时候,无损升级的最佳实践应该是
所以我们本次就是来分析如何完成broker
的停写
源码入口
其实通过查看源码,我们发现有两种方式可以停写broker
两种方式没有区别,都是调用DefaultMQAdminExt
的wipeWritePermOfBroker
方法
源码分析
client
我们这里直接进去到最底层的实现类代码
public int wipeWritePermOfBroker(final String namesrvAddr, String brokerName,
final long timeoutMillis) throws RemotingCommandException,
RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQClientException {
WipeWritePermOfBrokerRequestHeader requestHeader = new WipeWritePermOfBrokerRequestHeader();
requestHeader.setBrokerName(brokerName);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.WIPE_WRITE_PERM_OF_BROKER, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
WipeWritePermOfBrokerResponseHeader responseHeader =
(WipeWritePermOfBrokerResponseHeader) response.decodeCommandCustomHeader(WipeWritePermOfBrokerResponseHeader.class);
return responseHeader.getWipeTopicCount();
}
default:
break;
}
throw new MQClientException(response.getCode(), response.getRemark());
}
客户端的代码我们没什么好看的,我们还是通过请求状态码RequestCode.WIPE_WRITE_PERM_OF_BROKER
查看实际的Nameserver
的处理逻辑
NameServer
DefaultRequestProcessor
的方法wipeWritePermOfBroker
可以看到实际的代码逻辑在这一行
int wipeTopicCnt = this.namesrvController.getRouteInfoManager().wipeWritePermOfBrokerByLock(requestHeader.getBrokerName());
我们进入到这个方法看看
可以看到实际的逻辑还在更下层,我们再进去看看
private int operateWritePermOfBroker(final String brokerName, final int requestCode) {
int topicCnt = 0;
for (Entry entry : this.topicQueueTable.entrySet()) {
Map qdMap = entry.getValue();
final QueueData qd = qdMap.get(brokerName);
if (qd == null) {
continue;
}
int perm = qd.getPerm();
switch (requestCode) {
case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
perm &= ~PermName.PERM_WRITE;
break;
case RequestCode.ADD_WRITE_PERM_OF_BROKER:
perm = PermName.PERM_READ | PermName.PERM_WRITE;
break;
}
qd.setPerm(perm);
topicCnt++;
}
return topicCnt;
}
这里的topicQueueTable
我们可以看看他实际的数据结构
可以看到这里他的perm
是7
如果我们去broker
的 config的topics.json
查看相关的配置信息
可以看到刚好可以对得上
这里我们解释一下perm
的权限问题
PERM_PRIORITY = 0x1