🛰️ Vostok.Cluster
Cluster 集群模块
提供 TCP Gossip + Seed 节点自动发现、节点信息缓存、集群广播、可靠 ACK 重试、去重窗口、共享密钥 HMAC 鉴权与运行时统计。首版按 <=100 节点规模设计,优先保证链路稳定性和生产可运维性。
能力概览
- 节点发现:通过
seedNodes入网,收到成员表后继续连接新发现节点,最终收敛为全连接。 - 节点缓存:本地缓存节点快照,可查询
self()、nodes()、node(nodeId)。 - 广播模式:默认
broadcast()为可靠广播;也支持broadcastBestEffort()。 - 传输安全:握手时校验
clusterName + clusterSecret(HMAC),不匹配立即拒绝。 - 高性能边界:二进制帧协议、有界队列、独立监听线程池,避免业务监听器阻塞 IO 线程。
首版边界
仅支持 topic + byte[] 广播,不做对象自动序列化;不引入 Redis 注册中心、不做 UDP Multicast、不做消息持久化恢复。
初始化
import yueyang.vostok.Vostok;
import yueyang.vostok.cluster.VKClusterConfig;
Vostok.Cluster.init(new VKClusterConfig()
.clusterName("prod-order")
.clusterSecret("replace-with-strong-shared-secret")
.nodeId("order-node-01")
.bindHost("0.0.0.0")
.bindPort(18888)
.advertiseHost("10.0.0.12")
.advertisePort(18888)
.seedNodes("10.0.0.10:18888", "10.0.0.11:18888")
.label("zone", "sh-a")
.label("service", "order")
);
boolean ready = Vostok.Cluster.awaitReady(5000);
System.out.println("cluster ready = " + ready);
System.out.println(Vostok.Cluster.self().getNodeId());
配置要求
nodeId 与 clusterSecret 必须显式配置。nodeId 需要在同一集群内唯一;不建议依赖随机值,否则节点重启后会造成身份漂移。
自动发现与节点状态
Cluster 运行时会缓存全部已知节点快照。节点状态按如下状态机推进:
JOINING -> ALIVE -> SUSPECT -> DEAD
\------> LEFT
- JOINING:握手刚完成,成员表还在收敛。
- ALIVE:心跳与成员同步正常。
- SUSPECT:超过
suspectTimeoutMs未见心跳,先进入怀疑态,避免短暂网络抖动直接判死。 - DEAD:超过
deadTimeoutMs仍未恢复。 - LEFT:节点主动关闭并广播离场帧。
List<VKClusterNode> snapshot = Vostok.Cluster.nodes();
for (VKClusterNode node : snapshot) {
System.out.println(node.getNodeId() + " -> " + node.getStatus());
}
VKClusterNode peer = Vostok.Cluster.node("order-node-02");
if (peer != null && peer.getStatus() == VKClusterNodeStatus.ALIVE) {
// 可以安全参与广播目标集合
}
广播与订阅
广播载荷固定为 topic + byte[]。订阅按 topic 精确匹配,不支持通配符。
import java.nio.charset.StandardCharsets;
VKClusterSubscription sub = Vostok.Cluster.on("order.created", message -> {
String body = new String(message.getPayload(), StandardCharsets.UTF_8);
System.out.println(message.getFromNodeId() + ": " + body);
});
VKClusterBroadcastResult result = Vostok.Cluster
.broadcastReliable("order.created", "{\"id\":1001}".getBytes(StandardCharsets.UTF_8))
.get();
System.out.println(result.getMode());
System.out.println(result.getTargetedNodes());
System.out.println(result.getAckedNodes());
System.out.println(result.getFailedNodes());
sub.cancel();
// 或者:Vostok.Cluster.off(sub);
// 或者:Vostok.Cluster.offAll("order.created");
两种广播模式
| 方法 | 语义 | 适用场景 |
|---|---|---|
broadcast() | 等价于 broadcastReliable() | 默认推荐,跨节点状态通知、轻量事件分发 |
broadcastReliable() | 至少一次;目标节点集合在发送瞬间冻结,使用 ACK + 重试 + 去重 | 要求尽量送达的广播 |
broadcastBestEffort() | 只发一次,不做 ACK,不等待远端处理结果 | 追求低延迟、可接受远端丢弃的场景 |
BEST_EFFORT 的返回值语义
broadcastBestEffort() 的 CompletableFuture 在本地成功入发送队列后就完成。failedNodes 只统计本地无法投递到连接层的目标,不代表远端业务监听器一定执行成功。若远端监听队列已满,远端会快速丢弃并把丢弃记入 queueDrops 统计,但发送端不会收到 ACK/NACK。
可靠广播 ACK 语义
可靠广播中的 ACK 表示“远端节点已经完成协议解析,并成功放入本地监听分发队列”。它不等价于业务回调执行成功,因此业务监听器内部仍应自行做好幂等与异常保护。
API 速查
| 方法 | 说明 |
|---|---|
init(config) | 启动当前节点 Cluster runtime |
awaitReady(timeoutMs) | 等待至少一次握手与成员同步完成 |
self() | 获取本节点快照 |
nodes() | 获取按 nodeId 排序的全量节点快照 |
node(nodeId) | 按节点 ID 查询快照 |
on(topic, listener) | 订阅 topic |
off(subscription) | 取消单个订阅 |
offAll(topic) | 清空某个 topic 的所有订阅 |
broadcast(...) | 默认可靠广播 |
broadcastReliable(...) | 可靠广播 |
broadcastBestEffort(...) | 尽力广播 |
stats() | 返回运行时统计快照 |
close() | 发送 LEAVE 并关闭所有连接 |
VKClusterConfig 关键配置
| 配置项 | 默认值 | 说明 |
|---|---|---|
clusterName | default | 集群名称,不同名称节点互相拒绝 |
nodeId | 无 | 节点唯一标识,必须显式配置 |
bindHost/bindPort | 127.0.0.1:18888 | 本地监听地址 |
advertiseHost/advertisePort | 默认回退到 bind | 对外广播给其他节点的地址 |
seedNodes | 空 | 种子节点列表,格式为 host:port |
clusterSecret | 无 | 握手 HMAC 密钥,必须统一 |
heartbeatIntervalMs | 2000 | 心跳间隔 |
suspectTimeoutMs | 6000 | 进入 SUSPECT 的超时 |
deadTimeoutMs | 15000 | 进入 DEAD 的超时 |
syncIntervalMs | 10000 | 成员表反熵同步周期 |
maxNodeCount | 100 | 首版设计上限 |
maxMessageBytes | 1048576 | 单条广播最大字节数 |
outboundQueueCapacity | 4096 | 每连接发送队列大小 |
listenerQueueCapacity | 8192 | 业务监听分发队列大小 |
reliableAckTimeoutMs | 1500 | 可靠广播 ACK 等待时间 |
reliableMaxRetries | 3 | 可靠广播最大重试次数 |
dedupeRetentionMs | 60000 | 消息去重窗口 |
includeSelfOnBroadcast | true | 广播时是否投递给本机监听器 |
运行时统计
Vostok.Cluster.stats() 返回当前节点的统计快照,可直接接入监控:
VKClusterStats stats = Vostok.Cluster.stats();
System.out.println(stats.getTotalNodes());
System.out.println(stats.getAliveNodes());
System.out.println(stats.getOpenConnections());
System.out.println(stats.getSentMessages());
System.out.println(stats.getReceivedMessages());
System.out.println(stats.getReliableRetries());
System.out.println(stats.getQueueDrops());
System.out.println(stats.getAuthFailures());
System.out.println(stats.getProtocolErrors());
- queueDrops:发送队列或监听分发队列满导致的快速丢弃次数。
- authFailures:握手 HMAC 或 clusterName 校验失败次数。
- protocolErrors:非法帧、协议版本不兼容、非法长度等协议异常次数。
生产建议
- 同一集群内节点数建议控制在 100 以内,首版按此规模优化。
clusterSecret建议使用高熵随机串,并与应用配置中心统一下发。advertiseHost必须配置为其他节点可达地址,不要直接复用容器内环回地址。- 业务监听器请尽量短平快;耗时逻辑建议在回调中再转交自己的业务线程池。
- 可靠广播是至少一次语义,业务层需要接受重复投递并保持幂等。
- 若只做弱一致通知,优先使用
broadcastBestEffort(),避免无意义重试。
错误码
| 错误码 | 枚举 | 场景 |
|---|---|---|
| CL-400 | INVALID_ARGUMENT | 空 topic、空 listener、非法参数 |
| CL-402 | CONFIG_ERROR | nodeId / clusterSecret / 端口配置错误 |
| CL-403 | STATE_ERROR | 模块未启动就调用运行时方法 |
| CL-404 | NOT_FOUND | 节点或资源不存在 |
| CL-451 | AUTH_ERROR | clusterName / HMAC 校验失败 |
| CL-500 | IO_ERROR | 监听端口绑定失败、连接 IO 异常 |
| CL-560 | PROTOCOL_ERROR | 非法帧、非法长度、版本不兼容 |
| CL-566 | LIMIT_EXCEEDED | 超过最大节点数、消息体超限 |
| CL-568 | BROADCAST_TIMEOUT | 预留给广播超时场景的错误码 |
关闭
Vostok.Cluster.close();
关闭时会尽量向已连接节点发送 LEAVE 帧,并等待短暂时间让对端将本节点状态推进到 LEFT。