初始化(可选)
零配置启动(默认行为)
// 无需任何初始化——直接发布/订阅即可。
// 首次操作时自动以默认配置启动。
Vostok.Event.publish(new OrderCreatedEvent(42L));
显式初始化
// 使用内置默认配置
Vostok.Event.init();
// 自定义配置
Vostok.Event.init(new VKEventConfig()
.asyncCoreThreads(4)
.asyncMaxThreads(16)
.asyncQueueCapacity(4096)
.asyncKeepAliveMs(60000)
.listenerErrorStrategy(VKEventListenerErrorStrategy.CONTINUE)
.rejectionPolicy(VKEventRejectionPolicy.CALLER_RUNS)
.shutdownWaitMs(3000)
);
零配置启动
Event 模块无需显式初始化。任何
on()、
publish() 等调用都会在首次使用时自动以默认配置启动线程池。
如需自定义线程数或异常策略,在第一次订阅/发布前调用
init(VKEventConfig) 即可。
init() 是幂等的:若已初始化则不重复执行。如需替换配置,请使用
reinit()。
定义事件类
// 任意 POJO、Record 或类均可作为事件,无需继承任何基类
public record OrderCreatedEvent(Long orderId, String userId, BigDecimal amount) {}
public record UserLoginEvent(String userId, String ip, Instant time) {}
// 支持多态匹配:注册父类/接口的监听器,可以接收所有子类事件
public class BaseEvent {}
public class OrderEvent extends BaseEvent {}
public class PaymentEvent extends BaseEvent {}
订阅事件
同步监听器(默认 NORMAL 优先级)
VKEventSubscription sub = Vostok.Event.on(OrderCreatedEvent.class, event -> {
Vostok.Log.info("order {} created", event.orderId());
});
异步监听器
Vostok.Event.on(OrderCreatedEvent.class, VKListenerMode.ASYNC, event -> {
// 在异步线程池中执行,不阻塞发布者线程
sendEmail(event.userId(), "Your order is confirmed");
});
指定优先级(仅指定优先级,默认同步)
// 优先级:HIGHEST > HIGH > NORMAL > LOW > LOWEST
// 同一 publish 中,高优先级监听器先执行;同优先级按注册顺序执行
Vostok.Event.on(UserLoginEvent.class, VKEventPriority.HIGH, event -> {
auditLog(event.userId(), event.ip());
});
指定模式 + 优先级
Vostok.Event.on(UserLoginEvent.class,
VKListenerMode.SYNC,
VKEventPriority.HIGH,
event -> auditLog(event.userId(), event.ip()));
带 Predicate 过滤器(全参数版本)
// 过滤器为 null 表示不过滤;过滤器不满足时跳过(不消耗 once token)
Vostok.Event.on(OrderCreatedEvent.class,
VKListenerMode.ASYNC,
VKEventPriority.NORMAL,
event -> event.amount().compareTo(BigDecimal.valueOf(1000)) > 0, // 只处理大额订单
event -> notifyRiskSystem(event));
一次性订阅(触发一次后自动注销)
// 同步一次性监听器(默认 NORMAL 优先级)
VKEventSubscription sub = Vostok.Event.once(UserLoginEvent.class, event -> {
System.out.println("first login: " + event.userId());
});
// 异步一次性监听器
// 注意:CAS 保证并发 publish 场景下仍只触发一次
Vostok.Event.once(UserLoginEvent.class, VKListenerMode.ASYNC, event -> {
// 仅异步执行一次
});
多态监听(父类/接口监听器接收子类事件)
// 注册 BaseEvent 的监听器,OrderEvent 和 PaymentEvent 发布时均会触发
Vostok.Event.on(BaseEvent.class, event -> {
Vostok.Log.info("received base event: {}", event.getClass().getSimpleName());
});
注解式注册(@VKEventHandler + scan())
在 Bean 类的方法上添加 @VKEventHandler 注解,然后调用 scan(bean) 批量注册。被注解的方法必须恰好有 1 个参数,参数类型即为监听的事件类型。扫描时会遍历本类及所有父类(不含 Object)。
public class OrderEventHandler {
// 同步监听,HIGH 优先级
@VKEventHandler(priority = VKEventPriority.HIGH)
public void onOrderCreated(OrderCreatedEvent e) {
Vostok.Log.info("order {} created, amount={}", e.orderId(), e.amount());
}
// 异步监听,默认 NORMAL 优先级
@VKEventHandler(mode = VKListenerMode.ASYNC)
public void onOrderCreatedAsync(OrderCreatedEvent e) {
sendNotification(e.userId());
}
// 一次性监听,触发后自动注销
@VKEventHandler(mode = VKListenerMode.ASYNC, once = true)
public void onFirstPayment(PaymentEvent e) {
sendWelcomeBonus(e.userId());
}
// 也支持 private 方法
@VKEventHandler
private void onUserLogin(UserLoginEvent e) {
auditLog(e.userId(), e.ip());
}
}
// 批量注册,返回所有订阅句柄(可统一取消)
List<VKEventSubscription> subs = Vostok.Event.scan(new OrderEventHandler());
@VKEventHandler 属性说明:
| 属性 | 类型 | 默认值 | 说明 |
mode | VKListenerMode | SYNC | 执行模式:SYNC(在 publish 调用线程执行)或 ASYNC(提交到线程池) |
priority | VKEventPriority | NORMAL | 监听器优先级,高优先级在同一次 publish 中先执行 |
once | boolean | false | 是否一次性监听,触发一次后自动注销;CAS 保证并发安全 |
发布事件
同步发布
// 同步发布:在当前线程依次执行所有同步监听器;异步监听器提交到线程池后立即返回(不等待)
// 返回本次发布的执行统计
VKEventPublishResult result = Vostok.Event.publish(new OrderCreatedEvent(42L, "user-1", amount));
System.out.printf("matched=%d, syncOk=%d, syncFail=%d, asyncSubmit=%d, costNs=%d%n",
result.getMatchedListeners(),
result.getSyncExecuted(),
result.getSyncFailed(),
result.getAsyncSubmitted(),
result.getCostNs());
异步发布(等待所有异步监听器完成)
// publishAsync 等待所有 ASYNC 监听器 future 全部完成后,结果 CompletableFuture 才 complete
// 可通过 .join() / .get() 确认所有异步监听器执行完毕,asyncFailed 反映执行失败数
CompletableFuture<VKEventPublishResult> future =
Vostok.Event.publishAsync(new OrderCreatedEvent(42L, "user-1", amount));
// 非阻塞回调
future.thenAccept(result -> {
Vostok.Log.info("all async listeners done, failed={}", result.getAsyncFailed());
});
// 或阻塞等待(测试场景)
VKEventPublishResult result = future.join();
publish vs publishAsync 的区别
publish():在当前线程同步执行所有 SYNC 监听器;ASYNC 监听器提交到线程池后立即返回,不等待异步结果,asyncFailed 固定为 0。
publishAsync():同样立即提交异步任务,但返回一个 CompletableFuture,该 Future 在所有异步监听器完成后才 complete,asyncFailed 反映实际失败数。
VKEventPublishResult 发布结果
| 方法 | 返回值 | 说明 |
getMatchedListeners() | int | 本次发布匹配到的监听器总数(含被过滤器拦截的) |
getSyncExecuted() | int | 同步监听器成功执行数 |
getSyncFailed() | int | 同步监听器执行异常数 |
getAsyncSubmitted() | int | 成功提交到线程池的异步任务数 |
getAsyncRejected() | int | 被线程池拒绝的异步任务数(队列满) |
getAsyncFailed() | int | 异步监听器执行失败数(publishAsync() 专用;publish() 固定为 0) |
getCostNs() | long | 发布耗时(纳秒,System.nanoTime 精度) |
死信处理器
当发布的事件没有任何匹配的监听器时,死信处理器被调用。全局唯一,后注册覆盖前注册,用于监控未被消费的事件。
Vostok.Event.onDeadLetter(event -> {
Vostok.Log.warn("unhandled event: {}", event.getClass().getSimpleName());
// 可投递到告警队列、监控系统等
metrics.increment("event.dead_letter");
});
死信处理器注意事项
死信处理器在
publish() 的调用线程上执行(同步),应保持低延迟,避免阻塞 IO。如需网络操作,请投递到独立队列异步处理。
取消订阅
// 方式一:通过订阅句柄的 cancel() 方法(等价于 Vostok.Event.off(sub))
VKEventSubscription sub = Vostok.Event.on(OrderCreatedEvent.class, event -> { /* … */ });
sub.cancel(); // 线程安全,原子操作
// 方式二:通过门面方法
Vostok.Event.off(sub);
// 移除某事件类型的全部监听器
Vostok.Event.offAll(OrderCreatedEvent.class);
// 批量取消(scan() 返回的订阅列表)
List<VKEventSubscription> subs = Vostok.Event.scan(new MyHandler());
subs.forEach(VKEventSubscription::cancel);
生命周期
// 检查是否已初始化
boolean running = Vostok.Event.started();
// 获取当前配置快照(返回副本,不可修改运行时)
VKEventConfig cfg = Vostok.Event.config();
// 重新初始化:替换配置,重建线程池,清空所有已注册监听器
Vostok.Event.reinit(new VKEventConfig().asyncCoreThreads(8));
// 关闭事件总线:等待线程池任务完成(最长 shutdownWaitMs),清空所有监听器
Vostok.Event.close();
reinit() 说明
reinit() 会清空所有已注册的监听器并重建线程池,等待旧线程池任务在
shutdownWaitMs 内完成后关闭。
调用后需重新注册所有监听器。与
init() 不同,
reinit() 在已初始化时仍会执行。
完整使用示例
Web 请求事件追踪
// 定义事件
public record RequestCompletedEvent(String path, int status, long elapsedMs) {}
// 注册监听器
Vostok.Event.on(RequestCompletedEvent.class,
VKListenerMode.ASYNC,
VKEventPriority.LOW,
event -> metrics.record(event.path(), event.status(), event.elapsedMs()));
// 慢请求告警(过滤器:只处理超过 1 秒的请求)
Vostok.Event.on(RequestCompletedEvent.class,
VKListenerMode.ASYNC,
VKEventPriority.NORMAL,
event -> event.elapsedMs() > 1000,
event -> alertQueue.offer("slow: " + event.path()));
// 在 handler 中发布
long start = System.currentTimeMillis();
handle(ctx);
Vostok.Event.publish(new RequestCompletedEvent(
ctx.path(), ctx.status(), System.currentTimeMillis() - start));
注解式注册(与 scan() 结合)
public class AuditHandler {
@VKEventHandler(priority = VKEventPriority.HIGHEST)
public void onUserLogin(UserLoginEvent e) {
Vostok.Log.info("[AUDIT] login: user={} ip={}", e.userId(), e.ip());
}
@VKEventHandler(mode = VKListenerMode.ASYNC, priority = VKEventPriority.LOW)
public void onOrderCreated(OrderCreatedEvent e) {
writeAuditLog(e.orderId(), e.userId(), e.amount());
}
}
// 应用启动时注册
List<VKEventSubscription> auditSubs = Vostok.Event.scan(new AuditHandler());
// 关闭时批量注销
auditSubs.forEach(VKEventSubscription::cancel);
配置参数
| 参数 | 类型 | 默认值 | 说明 |
| enabled | boolean | true | 是否启用事件总线;false 时 publish/publishAsync 立即返回空结果 |
| asyncCoreThreads | int | CPU/2(最小 1) | 异步线程池核心线程数;设置时自动保证不超过 asyncMaxThreads |
| asyncMaxThreads | int | CPU*2(最小 1) | 异步线程池最大线程数;设置时自动保证不小于 asyncCoreThreads |
| asyncQueueCapacity | int | 4096 | 异步任务队列容量(ArrayBlockingQueue);队列满时触发 rejectionPolicy |
| asyncKeepAliveMs | long | 60000 | 空闲线程存活时间(ms);超时后多余线程被回收 |
| rejectionPolicy | VKEventRejectionPolicy | CALLER_RUNS | 线程池队列满时的拒绝策略,见枚举说明 |
| listenerErrorStrategy | VKEventListenerErrorStrategy | CONTINUE | 监听器抛出异常时的处理策略,见枚举说明 |
| shutdownWaitMs | long | 3000 | 关闭(close/reinit)时等待线程池任务完成的最长时间(ms);超时后强制 shutdownNow |
枚举说明
VKEventPriority
优先级数值越小,执行越靠前。同优先级的监听器按注册顺序(ID 升序)执行。
| 枚举值 | 数值 | 说明 |
HIGHEST | 1 | 最高优先级,最先执行 |
HIGH | 2 | 高优先级 |
NORMAL | 3 | 默认优先级 |
LOW | 4 | 低优先级 |
LOWEST | 5 | 最低优先级,最后执行 |
VKListenerMode
| 枚举值 | 行为 |
SYNC | 在 publish() 的调用线程上同步执行,阻塞直到该监听器完成 |
ASYNC | 提交到内部线程池异步执行,不阻塞发布者线程 |
VKEventRejectionPolicy
| 枚举值 | 行为 |
CALLER_RUNS | 队列满时在调用者线程(发布者线程)同步执行该任务(默认) |
ABORT | 队列满时抛出 RejectedExecutionException,发布方可捕获 |
DISCARD | 队列满时静默丢弃该异步任务,asyncRejected 计数加 1 |
VKEventListenerErrorStrategy
| 枚举值 | 行为 |
CONTINUE | 某监听器抛出异常后,记录错误日志并继续执行后续监听器(默认) |
FAIL_FAST | 同步监听器异常时立即重新抛出(RuntimeException),终止后续执行;异步监听器异常时将 future 标记为失败(publishAsync 的调用方可检测到 asyncFailed > 0) |
API 速查
生命周期
| 方法 | 返回值 | 说明 |
init() | void | 以默认配置显式初始化(已初始化时幂等) |
init(VKEventConfig) | void | 以自定义配置显式初始化(已初始化时幂等) |
reinit(VKEventConfig) | void | 重新初始化,替换配置并清空所有监听器(已初始化时强制执行) |
started() | boolean | 返回是否已初始化 |
config() | VKEventConfig | 返回当前配置的副本快照 |
close() | void | 关闭事件总线,等待线程池任务完成,清空所有监听器 |
订阅(on / once)
| 方法 | 返回值 | 说明 |
on(Class, listener) | VKEventSubscription | 注册同步监听器,默认 NORMAL 优先级 |
on(Class, mode, listener) | VKEventSubscription | 指定执行模式,默认 NORMAL 优先级 |
on(Class, priority, listener) | VKEventSubscription | 同步监听器,指定优先级 |
on(Class, mode, priority, listener) | VKEventSubscription | 指定执行模式和优先级 |
on(Class, mode, priority, filter, listener) | VKEventSubscription | 全参数版本,含 Predicate 过滤器(null 表示不过滤) |
once(Class, listener) | VKEventSubscription | 一次性同步监听器,触发一次后自动注销 |
once(Class, mode, listener) | VKEventSubscription | 一次性监听器,指定执行模式 |
scan(bean) | List<VKEventSubscription> | 扫描 bean 中 @VKEventHandler 注解方法并批量注册 |
取消订阅
| 方法 | 说明 |
subscription.cancel() | 取消该订阅(线程安全,通过 ConcurrentHashMap.compute 原子移除) |
off(VKEventSubscription) | 取消指定订阅,等价于 subscription.cancel() |
offAll(Class) | 移除指定事件类型的全部监听器 |
发布
| 方法 | 返回值 | 说明 |
publish(event) | VKEventPublishResult | 同步发布;SYNC 监听器在当前线程执行;ASYNC 监听器提交后立即返回(不等待);asyncFailed 固定为 0 |
publishAsync(event) | CompletableFuture<VKEventPublishResult> | 发布并等待所有 ASYNC 监听器完成;asyncFailed 反映实际失败数 |
死信与 VKEventSubscription
| 方法 | 说明 |
onDeadLetter(VKEventDeadLetterHandler) | 注册死信处理器(全局唯一,后注册覆盖);事件无匹配监听器时调用 |
subscription.getId() | 返回订阅的唯一 ID(long,单调递增) |
subscription.getEventType() | 返回订阅时指定的事件类型 |
subscription.getMode() | 返回监听器执行模式(SYNC/ASYNC) |
subscription.getPriority() | 返回监听器优先级 |