能力概览

首版边界
仅支持 topic + byte[] 广播,不做对象自动序列化;不引入 Redis 注册中心、不做 UDP Multicast、不做消息持久化恢复。

初始化

import yueyang.vostok.Vostok;
import yueyang.vostok.cluster.VKClusterConfig;

Vostok.Cluster.init(new VKClusterConfig()
    .clusterName("prod-order")
    .clusterSecret("replace-with-strong-shared-secret")
    .nodeId("order-node-01")
    .bindHost("0.0.0.0")
    .bindPort(18888)
    .advertiseHost("10.0.0.12")
    .advertisePort(18888)
    .seedNodes("10.0.0.10:18888", "10.0.0.11:18888")
    .label("zone", "sh-a")
    .label("service", "order")
);

boolean ready = Vostok.Cluster.awaitReady(5000);
System.out.println("cluster ready = " + ready);
System.out.println(Vostok.Cluster.self().getNodeId());
配置要求
nodeIdclusterSecret 必须显式配置。nodeId 需要在同一集群内唯一;不建议依赖随机值,否则节点重启后会造成身份漂移。

自动发现与节点状态

Cluster 运行时会缓存全部已知节点快照。节点状态按如下状态机推进:

JOINING -> ALIVE -> SUSPECT -> DEAD
                     \------> LEFT
List<VKClusterNode> snapshot = Vostok.Cluster.nodes();
for (VKClusterNode node : snapshot) {
    System.out.println(node.getNodeId() + " -> " + node.getStatus());
}

VKClusterNode peer = Vostok.Cluster.node("order-node-02");
if (peer != null && peer.getStatus() == VKClusterNodeStatus.ALIVE) {
    // 可以安全参与广播目标集合
}

广播与订阅

广播载荷固定为 topic + byte[]。订阅按 topic 精确匹配,不支持通配符。

import java.nio.charset.StandardCharsets;

VKClusterSubscription sub = Vostok.Cluster.on("order.created", message -> {
    String body = new String(message.getPayload(), StandardCharsets.UTF_8);
    System.out.println(message.getFromNodeId() + ": " + body);
});

VKClusterBroadcastResult result = Vostok.Cluster
    .broadcastReliable("order.created", "{\"id\":1001}".getBytes(StandardCharsets.UTF_8))
    .get();

System.out.println(result.getMode());
System.out.println(result.getTargetedNodes());
System.out.println(result.getAckedNodes());
System.out.println(result.getFailedNodes());

sub.cancel();
// 或者:Vostok.Cluster.off(sub);
// 或者:Vostok.Cluster.offAll("order.created");

两种广播模式

方法语义适用场景
broadcast()等价于 broadcastReliable()默认推荐,跨节点状态通知、轻量事件分发
broadcastReliable()至少一次;目标节点集合在发送瞬间冻结,使用 ACK + 重试 + 去重要求尽量送达的广播
broadcastBestEffort()只发一次,不做 ACK,不等待远端处理结果追求低延迟、可接受远端丢弃的场景
BEST_EFFORT 的返回值语义
broadcastBestEffort()CompletableFuture本地成功入发送队列后就完成。failedNodes 只统计本地无法投递到连接层的目标,不代表远端业务监听器一定执行成功。若远端监听队列已满,远端会快速丢弃并把丢弃记入 queueDrops 统计,但发送端不会收到 ACK/NACK。

可靠广播 ACK 语义

可靠广播中的 ACK 表示“远端节点已经完成协议解析,并成功放入本地监听分发队列”。它不等价于业务回调执行成功,因此业务监听器内部仍应自行做好幂等与异常保护。

API 速查

方法说明
init(config)启动当前节点 Cluster runtime
awaitReady(timeoutMs)等待至少一次握手与成员同步完成
self()获取本节点快照
nodes()获取按 nodeId 排序的全量节点快照
node(nodeId)按节点 ID 查询快照
on(topic, listener)订阅 topic
off(subscription)取消单个订阅
offAll(topic)清空某个 topic 的所有订阅
broadcast(...)默认可靠广播
broadcastReliable(...)可靠广播
broadcastBestEffort(...)尽力广播
stats()返回运行时统计快照
close()发送 LEAVE 并关闭所有连接

VKClusterConfig 关键配置

配置项默认值说明
clusterNamedefault集群名称,不同名称节点互相拒绝
nodeId节点唯一标识,必须显式配置
bindHost/bindPort127.0.0.1:18888本地监听地址
advertiseHost/advertisePort默认回退到 bind对外广播给其他节点的地址
seedNodes种子节点列表,格式为 host:port
clusterSecret握手 HMAC 密钥,必须统一
heartbeatIntervalMs2000心跳间隔
suspectTimeoutMs6000进入 SUSPECT 的超时
deadTimeoutMs15000进入 DEAD 的超时
syncIntervalMs10000成员表反熵同步周期
maxNodeCount100首版设计上限
maxMessageBytes1048576单条广播最大字节数
outboundQueueCapacity4096每连接发送队列大小
listenerQueueCapacity8192业务监听分发队列大小
reliableAckTimeoutMs1500可靠广播 ACK 等待时间
reliableMaxRetries3可靠广播最大重试次数
dedupeRetentionMs60000消息去重窗口
includeSelfOnBroadcasttrue广播时是否投递给本机监听器

运行时统计

Vostok.Cluster.stats() 返回当前节点的统计快照,可直接接入监控:

VKClusterStats stats = Vostok.Cluster.stats();
System.out.println(stats.getTotalNodes());
System.out.println(stats.getAliveNodes());
System.out.println(stats.getOpenConnections());
System.out.println(stats.getSentMessages());
System.out.println(stats.getReceivedMessages());
System.out.println(stats.getReliableRetries());
System.out.println(stats.getQueueDrops());
System.out.println(stats.getAuthFailures());
System.out.println(stats.getProtocolErrors());

生产建议

错误码

错误码枚举场景
CL-400INVALID_ARGUMENT空 topic、空 listener、非法参数
CL-402CONFIG_ERRORnodeId / clusterSecret / 端口配置错误
CL-403STATE_ERROR模块未启动就调用运行时方法
CL-404NOT_FOUND节点或资源不存在
CL-451AUTH_ERRORclusterName / HMAC 校验失败
CL-500IO_ERROR监听端口绑定失败、连接 IO 异常
CL-560PROTOCOL_ERROR非法帧、非法长度、版本不兼容
CL-566LIMIT_EXCEEDED超过最大节点数、消息体超限
CL-568BROADCAST_TIMEOUT预留给广播超时场景的错误码

关闭

Vostok.Cluster.close();

关闭时会尽量向已连接节点发送 LEAVE 帧,并等待短暂时间让对端将本节点状态推进到 LEFT