跨服务通知系统
RuoYi Office 扩展功能:支持本地事件、MQ(Redis Stream)、Feign 三种方式,将 BPM 流程状态变化通知到业务服务。
1. 概述
在微服务架构中,BPM 模块与业务模块(如 OA、HRM)通常是独立部署的服务。当流程状态发生变化(审批通过/拒绝/取消)时,需要通知业务服务同步更新业务数据。
RuoYi Office 提供了三种跨服务通知方式:
| 方式 | 适用场景 | 特点 |
|---|---|---|
| 本地事件(ApplicationEvent) | 单体架构、开发测试 | 零依赖、响应快 |
| 消息队列(Redis Stream MQ) | 微服务、异步、高吞吐 | 解耦、持久化、可重试 |
| Feign 远程调用 | 微服务、实时同步 | 强一致性、可获取返回值 |
2. 快速配置
2.1 全局配置
在 application.yaml(或与主工程合并的 profile 配置)中,将 BPM 通知配置写在与租户、字典、多数据源等相同的 Spring 业务框架根节点之下。若不确定根键名称,可在后端源码中搜索 bpm.notification.default-type,与 BpmNotificationManager 的 @Value 绑定路径对齐。
yaml
bpm:
notification:
default-type: local # 本地事件;可选值:local / mq / feign(与通知类型枚举 code 一致)
async: true
mq:
enabled: true
feign:
enabled: true2.2 按流程类型配置
可以为不同流程指定不同的通知方式:
yaml
bpm:
notification:
process-config:
oa_car_apply_bill: mq # 用车申请使用 MQ
oa_leave: feign # 请假使用 Feign
oa_expense: local # 报销使用本地事件3. 本地事件通知
适用场景:单体架构、开发测试环境
使用 Spring 的 ApplicationEvent 机制,零依赖、响应最快:
java
@Component
public class MyProcessStatusListener
extends BpmProcessInstanceStatusEventListener {
@Override
protected String getProcessDefinitionKey() {
return "my_process_key";
}
@Override
protected void onEvent(BpmProcessInstanceStatusEvent event) {
// 处理流程状态变化
log.info("流程 {} 状态变为 {}",
event.getProcessInstanceId(), event.getStatus());
}
}4. MQ 通知(Redis Stream)
适用场景:微服务架构、异步处理、高并发
4.1 消息类定义
发送方和接收方都需要定义消息类,继承 AbstractRedisStreamMessage:
java
@Data
@EqualsAndHashCode(callSuper = true)
public class BpmProcessInstanceStatusRedisMessage
extends AbstractRedisStreamMessage {
private String processInstanceId;
private String processDefinitionKey;
private Integer status;
private String businessKey;
private LocalDateTime eventTime;
private String tenantId;
private String startUserId;
@Override
public String getStreamKey() {
return "bpm.workflow.instance.status.changed";
}
}4.2 消费者实现
java
@Component
public class OaProcessNotificationConsumer
extends AbstractRedisStreamMessageListener<OaBpmProcessInstanceStatusMessage> {
@Override
public void onMessage(OaBpmProcessInstanceStatusMessage message) {
if (message.getProcessDefinitionKey().startsWith("oa_")) {
handleOaProcessNotification(message);
}
}
}4.3 关键要求
- 消息类必须继承
AbstractRedisStreamMessage - 发送方和接收方的
getStreamKey()返回值必须一致 - 消费者继承
AbstractRedisStreamMessageListener
4.4 调试命令
bash
# 查看 Stream 信息
redis-cli XINFO STREAM bpm.workflow.instance.status.changed
# 查看消费者组
redis-cli XINFO GROUPS bpm.workflow.instance.status.changed
# 查看未处理消息
redis-cli XPENDING bpm.workflow.instance.status.changed oa-server5. Feign 远程调用
适用场景:实时同步、强一致性
java
@RestController
@RequestMapping("/your-service/workflow-callback")
public class ProcessCallbackController {
@PostMapping("/status-change")
public CommonResult<Boolean> processStatusChange(
@RequestBody Map<String, Object> message) {
// 处理回调
return CommonResult.success(true);
}
}6. 消息格式
所有通知方式传递的消息格式统一:
json
{
"processInstanceId": "流程实例 ID",
"processDefinitionKey": "流程定义 Key",
"status": "流程状态(1=运行中, 2=通过, 3=不通过, 4=取消)",
"businessKey": "业务标识",
"startUserId": "发起人 ID",
"tenantId": "租户 ID",
"eventTime": "事件时间"
}7. 最佳实践
| 环境 | 推荐方式 | 原因 |
|---|---|---|
| 开发/测试 | local(本地事件) | 零依赖,调试方便 |
| 生产(实时性高) | feign | 同步返回,强一致 |
| 生产(高并发) | mq | 异步解耦,高吞吐 |
| 生产(混合场景) | 按流程配置 | 灵活选择 |
错误处理建议:
- MQ:配置重试和死信队列
- Feign:配置熔断和降级(Sentinel/Resilience4j)
- 本地事件:注意异常传播和事务边界
8. 故障排查
| 问题 | 排查方向 |
|---|---|
| MQ 消息未消费 | 检查 StreamKey、消费者组、Redis 连接 |
| Feign 调用失败 | 检查服务注册发现、接口路径、网络 |
| 本地事件未触发 | 检查监听器注册、processDefinitionKey 匹配、事务提交 |