初始化(可选)

零配置启动(默认行为)

// 无需任何初始化——直接发布/订阅即可。
// 首次操作时自动以默认配置启动。
Vostok.Event.publish(new OrderCreatedEvent(42L));

显式初始化

// 使用内置默认配置
Vostok.Event.init();

// 自定义配置
Vostok.Event.init(new VKEventConfig()
    .asyncCoreThreads(4)
    .asyncMaxThreads(16)
    .asyncQueueCapacity(4096)
    .asyncKeepAliveMs(60000)
    .listenerErrorStrategy(VKEventListenerErrorStrategy.CONTINUE)
    .rejectionPolicy(VKEventRejectionPolicy.CALLER_RUNS)
    .shutdownWaitMs(3000)
);
零配置启动
Event 模块无需显式初始化。任何 on()publish() 等调用都会在首次使用时自动以默认配置启动线程池。 如需自定义线程数或异常策略,在第一次订阅/发布前调用 init(VKEventConfig) 即可。 init() 是幂等的:若已初始化则不重复执行。如需替换配置,请使用 reinit()

定义事件类

// 任意 POJO、Record 或类均可作为事件,无需继承任何基类
public record OrderCreatedEvent(Long orderId, String userId, BigDecimal amount) {}
public record UserLoginEvent(String userId, String ip, Instant time) {}

// 支持多态匹配:注册父类/接口的监听器,可以接收所有子类事件
public class BaseEvent {}
public class OrderEvent extends BaseEvent {}
public class PaymentEvent extends BaseEvent {}

订阅事件

同步监听器(默认 NORMAL 优先级)

VKEventSubscription sub = Vostok.Event.on(OrderCreatedEvent.class, event -> {
    Vostok.Log.info("order {} created", event.orderId());
});

异步监听器

Vostok.Event.on(OrderCreatedEvent.class, VKListenerMode.ASYNC, event -> {
    // 在异步线程池中执行,不阻塞发布者线程
    sendEmail(event.userId(), "Your order is confirmed");
});

指定优先级(仅指定优先级,默认同步)

// 优先级:HIGHEST > HIGH > NORMAL > LOW > LOWEST
// 同一 publish 中,高优先级监听器先执行;同优先级按注册顺序执行
Vostok.Event.on(UserLoginEvent.class, VKEventPriority.HIGH, event -> {
    auditLog(event.userId(), event.ip());
});

指定模式 + 优先级

Vostok.Event.on(UserLoginEvent.class,
    VKListenerMode.SYNC,
    VKEventPriority.HIGH,
    event -> auditLog(event.userId(), event.ip()));

带 Predicate 过滤器(全参数版本)

// 过滤器为 null 表示不过滤;过滤器不满足时跳过(不消耗 once token)
Vostok.Event.on(OrderCreatedEvent.class,
    VKListenerMode.ASYNC,
    VKEventPriority.NORMAL,
    event -> event.amount().compareTo(BigDecimal.valueOf(1000)) > 0,  // 只处理大额订单
    event -> notifyRiskSystem(event));

一次性订阅(触发一次后自动注销)

// 同步一次性监听器(默认 NORMAL 优先级)
VKEventSubscription sub = Vostok.Event.once(UserLoginEvent.class, event -> {
    System.out.println("first login: " + event.userId());
});

// 异步一次性监听器
// 注意:CAS 保证并发 publish 场景下仍只触发一次
Vostok.Event.once(UserLoginEvent.class, VKListenerMode.ASYNC, event -> {
    // 仅异步执行一次
});

多态监听(父类/接口监听器接收子类事件)

// 注册 BaseEvent 的监听器,OrderEvent 和 PaymentEvent 发布时均会触发
Vostok.Event.on(BaseEvent.class, event -> {
    Vostok.Log.info("received base event: {}", event.getClass().getSimpleName());
});

注解式注册(@VKEventHandler + scan())

在 Bean 类的方法上添加 @VKEventHandler 注解,然后调用 scan(bean) 批量注册。被注解的方法必须恰好有 1 个参数,参数类型即为监听的事件类型。扫描时会遍历本类及所有父类(不含 Object)。

public class OrderEventHandler {

    // 同步监听,HIGH 优先级
    @VKEventHandler(priority = VKEventPriority.HIGH)
    public void onOrderCreated(OrderCreatedEvent e) {
        Vostok.Log.info("order {} created, amount={}", e.orderId(), e.amount());
    }

    // 异步监听,默认 NORMAL 优先级
    @VKEventHandler(mode = VKListenerMode.ASYNC)
    public void onOrderCreatedAsync(OrderCreatedEvent e) {
        sendNotification(e.userId());
    }

    // 一次性监听,触发后自动注销
    @VKEventHandler(mode = VKListenerMode.ASYNC, once = true)
    public void onFirstPayment(PaymentEvent e) {
        sendWelcomeBonus(e.userId());
    }

    // 也支持 private 方法
    @VKEventHandler
    private void onUserLogin(UserLoginEvent e) {
        auditLog(e.userId(), e.ip());
    }
}

// 批量注册,返回所有订阅句柄(可统一取消)
List<VKEventSubscription> subs = Vostok.Event.scan(new OrderEventHandler());

@VKEventHandler 属性说明:

属性类型默认值说明
modeVKListenerModeSYNC执行模式:SYNC(在 publish 调用线程执行)或 ASYNC(提交到线程池)
priorityVKEventPriorityNORMAL监听器优先级,高优先级在同一次 publish 中先执行
oncebooleanfalse是否一次性监听,触发一次后自动注销;CAS 保证并发安全

发布事件

同步发布

// 同步发布:在当前线程依次执行所有同步监听器;异步监听器提交到线程池后立即返回(不等待)
// 返回本次发布的执行统计
VKEventPublishResult result = Vostok.Event.publish(new OrderCreatedEvent(42L, "user-1", amount));

System.out.printf("matched=%d, syncOk=%d, syncFail=%d, asyncSubmit=%d, costNs=%d%n",
    result.getMatchedListeners(),
    result.getSyncExecuted(),
    result.getSyncFailed(),
    result.getAsyncSubmitted(),
    result.getCostNs());

异步发布(等待所有异步监听器完成)

// publishAsync 等待所有 ASYNC 监听器 future 全部完成后,结果 CompletableFuture 才 complete
// 可通过 .join() / .get() 确认所有异步监听器执行完毕,asyncFailed 反映执行失败数
CompletableFuture<VKEventPublishResult> future =
    Vostok.Event.publishAsync(new OrderCreatedEvent(42L, "user-1", amount));

// 非阻塞回调
future.thenAccept(result -> {
    Vostok.Log.info("all async listeners done, failed={}", result.getAsyncFailed());
});

// 或阻塞等待(测试场景)
VKEventPublishResult result = future.join();
publish vs publishAsync 的区别
  • publish():在当前线程同步执行所有 SYNC 监听器;ASYNC 监听器提交到线程池后立即返回,不等待异步结果,asyncFailed 固定为 0。
  • publishAsync():同样立即提交异步任务,但返回一个 CompletableFuture,该 Future 在所有异步监听器完成后才 complete,asyncFailed 反映实际失败数。

VKEventPublishResult 发布结果

方法返回值说明
getMatchedListeners()int本次发布匹配到的监听器总数(含被过滤器拦截的)
getSyncExecuted()int同步监听器成功执行数
getSyncFailed()int同步监听器执行异常数
getAsyncSubmitted()int成功提交到线程池的异步任务数
getAsyncRejected()int被线程池拒绝的异步任务数(队列满)
getAsyncFailed()int异步监听器执行失败数(publishAsync() 专用;publish() 固定为 0)
getCostNs()long发布耗时(纳秒,System.nanoTime 精度)

死信处理器

当发布的事件没有任何匹配的监听器时,死信处理器被调用。全局唯一,后注册覆盖前注册,用于监控未被消费的事件。

Vostok.Event.onDeadLetter(event -> {
    Vostok.Log.warn("unhandled event: {}", event.getClass().getSimpleName());
    // 可投递到告警队列、监控系统等
    metrics.increment("event.dead_letter");
});
死信处理器注意事项
死信处理器在 publish() 的调用线程上执行(同步),应保持低延迟,避免阻塞 IO。如需网络操作,请投递到独立队列异步处理。

取消订阅

// 方式一:通过订阅句柄的 cancel() 方法(等价于 Vostok.Event.off(sub))
VKEventSubscription sub = Vostok.Event.on(OrderCreatedEvent.class, event -> { /* … */ });
sub.cancel();  // 线程安全,原子操作

// 方式二:通过门面方法
Vostok.Event.off(sub);

// 移除某事件类型的全部监听器
Vostok.Event.offAll(OrderCreatedEvent.class);

// 批量取消(scan() 返回的订阅列表)
List<VKEventSubscription> subs = Vostok.Event.scan(new MyHandler());
subs.forEach(VKEventSubscription::cancel);

生命周期

// 检查是否已初始化
boolean running = Vostok.Event.started();

// 获取当前配置快照(返回副本,不可修改运行时)
VKEventConfig cfg = Vostok.Event.config();

// 重新初始化:替换配置,重建线程池,清空所有已注册监听器
Vostok.Event.reinit(new VKEventConfig().asyncCoreThreads(8));

// 关闭事件总线:等待线程池任务完成(最长 shutdownWaitMs),清空所有监听器
Vostok.Event.close();
reinit() 说明
reinit() 会清空所有已注册的监听器并重建线程池,等待旧线程池任务在 shutdownWaitMs 内完成后关闭。 调用后需重新注册所有监听器。与 init() 不同,reinit() 在已初始化时仍会执行。

完整使用示例

Web 请求事件追踪

// 定义事件
public record RequestCompletedEvent(String path, int status, long elapsedMs) {}

// 注册监听器
Vostok.Event.on(RequestCompletedEvent.class,
    VKListenerMode.ASYNC,
    VKEventPriority.LOW,
    event -> metrics.record(event.path(), event.status(), event.elapsedMs()));

// 慢请求告警(过滤器:只处理超过 1 秒的请求)
Vostok.Event.on(RequestCompletedEvent.class,
    VKListenerMode.ASYNC,
    VKEventPriority.NORMAL,
    event -> event.elapsedMs() > 1000,
    event -> alertQueue.offer("slow: " + event.path()));

// 在 handler 中发布
long start = System.currentTimeMillis();
handle(ctx);
Vostok.Event.publish(new RequestCompletedEvent(
    ctx.path(), ctx.status(), System.currentTimeMillis() - start));

注解式注册(与 scan() 结合)

public class AuditHandler {

    @VKEventHandler(priority = VKEventPriority.HIGHEST)
    public void onUserLogin(UserLoginEvent e) {
        Vostok.Log.info("[AUDIT] login: user={} ip={}", e.userId(), e.ip());
    }

    @VKEventHandler(mode = VKListenerMode.ASYNC, priority = VKEventPriority.LOW)
    public void onOrderCreated(OrderCreatedEvent e) {
        writeAuditLog(e.orderId(), e.userId(), e.amount());
    }
}

// 应用启动时注册
List<VKEventSubscription> auditSubs = Vostok.Event.scan(new AuditHandler());

// 关闭时批量注销
auditSubs.forEach(VKEventSubscription::cancel);

配置参数

参数类型默认值说明
enabledbooleantrue是否启用事件总线;false 时 publish/publishAsync 立即返回空结果
asyncCoreThreadsintCPU/2(最小 1)异步线程池核心线程数;设置时自动保证不超过 asyncMaxThreads
asyncMaxThreadsintCPU*2(最小 1)异步线程池最大线程数;设置时自动保证不小于 asyncCoreThreads
asyncQueueCapacityint4096异步任务队列容量(ArrayBlockingQueue);队列满时触发 rejectionPolicy
asyncKeepAliveMslong60000空闲线程存活时间(ms);超时后多余线程被回收
rejectionPolicyVKEventRejectionPolicyCALLER_RUNS线程池队列满时的拒绝策略,见枚举说明
listenerErrorStrategyVKEventListenerErrorStrategyCONTINUE监听器抛出异常时的处理策略,见枚举说明
shutdownWaitMslong3000关闭(close/reinit)时等待线程池任务完成的最长时间(ms);超时后强制 shutdownNow

枚举说明

VKEventPriority

优先级数值越小,执行越靠前。同优先级的监听器按注册顺序(ID 升序)执行。

枚举值数值说明
HIGHEST1最高优先级,最先执行
HIGH2高优先级
NORMAL3默认优先级
LOW4低优先级
LOWEST5最低优先级,最后执行

VKListenerMode

枚举值行为
SYNC在 publish() 的调用线程上同步执行,阻塞直到该监听器完成
ASYNC提交到内部线程池异步执行,不阻塞发布者线程

VKEventRejectionPolicy

枚举值行为
CALLER_RUNS队列满时在调用者线程(发布者线程)同步执行该任务(默认)
ABORT队列满时抛出 RejectedExecutionException,发布方可捕获
DISCARD队列满时静默丢弃该异步任务,asyncRejected 计数加 1

VKEventListenerErrorStrategy

枚举值行为
CONTINUE某监听器抛出异常后,记录错误日志并继续执行后续监听器(默认)
FAIL_FAST同步监听器异常时立即重新抛出(RuntimeException),终止后续执行;异步监听器异常时将 future 标记为失败(publishAsync 的调用方可检测到 asyncFailed > 0)

API 速查

生命周期

方法返回值说明
init()void以默认配置显式初始化(已初始化时幂等)
init(VKEventConfig)void以自定义配置显式初始化(已初始化时幂等)
reinit(VKEventConfig)void重新初始化,替换配置并清空所有监听器(已初始化时强制执行)
started()boolean返回是否已初始化
config()VKEventConfig返回当前配置的副本快照
close()void关闭事件总线,等待线程池任务完成,清空所有监听器

订阅(on / once)

方法返回值说明
on(Class, listener)VKEventSubscription注册同步监听器,默认 NORMAL 优先级
on(Class, mode, listener)VKEventSubscription指定执行模式,默认 NORMAL 优先级
on(Class, priority, listener)VKEventSubscription同步监听器,指定优先级
on(Class, mode, priority, listener)VKEventSubscription指定执行模式和优先级
on(Class, mode, priority, filter, listener)VKEventSubscription全参数版本,含 Predicate 过滤器(null 表示不过滤)
once(Class, listener)VKEventSubscription一次性同步监听器,触发一次后自动注销
once(Class, mode, listener)VKEventSubscription一次性监听器,指定执行模式
scan(bean)List<VKEventSubscription>扫描 bean 中 @VKEventHandler 注解方法并批量注册

取消订阅

方法说明
subscription.cancel()取消该订阅(线程安全,通过 ConcurrentHashMap.compute 原子移除)
off(VKEventSubscription)取消指定订阅,等价于 subscription.cancel()
offAll(Class)移除指定事件类型的全部监听器

发布

方法返回值说明
publish(event)VKEventPublishResult同步发布;SYNC 监听器在当前线程执行;ASYNC 监听器提交后立即返回(不等待);asyncFailed 固定为 0
publishAsync(event)CompletableFuture<VKEventPublishResult>发布并等待所有 ASYNC 监听器完成;asyncFailed 反映实际失败数

死信与 VKEventSubscription

方法说明
onDeadLetter(VKEventDeadLetterHandler)注册死信处理器(全局唯一,后注册覆盖);事件无匹配监听器时调用
subscription.getId()返回订阅的唯一 ID(long,单调递增)
subscription.getEventType()返回订阅时指定的事件类型
subscription.getMode()返回监听器执行模式(SYNC/ASYNC)
subscription.getPriority()返回监听器优先级