Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
laizuan committed Dec 16, 2023
1 parent f89d20e commit a32b741
Show file tree
Hide file tree
Showing 3 changed files with 206 additions and 39 deletions.
4 changes: 4 additions & 0 deletions docs/.vitepress/configs/menus/java.js
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ export const createJavaMenu = () => {
{
text: 'MessageQueue 监控',
link: '/java/dependencys/message-queue-monit'
},
{
text: '间隔重试',
link: '/java/dependencys/retry'
}
]
},
Expand Down
94 changes: 55 additions & 39 deletions docs/java/dependencys/mq.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,23 @@
</dependency>
```

### MessageQueue 消息队列
### 消息队列

每个系统统一使用一个`Topic`来发送,如果需要区分消息类型使用`Tags`来区分。默认的系统`Topic``系统代码-out-0`

### 属性配置
#### 属性配置

| 字段名称 | 说明 | 默认值 |
| ----------------------------------------------- | ------------------------------------------------------------- | --------------------------------------------- |
| 字段名称 | 说明 | 默认值 |
| ----------------------------------------------- | ------------------------------------------------------------ | --------------------------------------------- |
| `leaderrun.mq.auto-registry` | 是否开启扫描注解自动注册。关闭之后`@MessageQueueListener`无效 | true |
| `leaderrun.mq.auto-create-producer` | 是否自动创建默认的发送者。 | true |
| `leaderrun.mq.open-send-completion-interceptor` | 是否开启发送消息失败拦截 | true |
| `leaderrun.mq.producer.name` | 默认发送的主题名称 | `${spring.application.name}` |
| `leaderrun.mq.producer.group` | 默认发送主题的生产者组别名称 | `${spring.application.name} + "-" + "group"}` |
| `leaderrun.mq.producer.messageQueueSelector` | 发送到那个队列算法 bean 名称 | `orderlyMessageQueueSelector` |
| `leaderrun.mq.auto-create-producer` | 是否自动创建默认的发送者。 | true |
| `leaderrun.mq.open-send-completion-interceptor` | 是否开启发送消息失败拦截 | true |
| `leaderrun.mq.producer.name` | 默认发送的主题名称 | `${spring.application.name}` |
| `leaderrun.mq.producer.group` | 默认发送主题的生产者组别名称 | `${spring.application.name} + "-" + "group"}` |
| `leaderrun.mq.producer.messageQueueSelector` | 发送到哪个队列算法 `bean` 名称 | `orderlyMessageQueueSelector` |
| `leaderrun.mq.binders` | 监听多个`RocketMQ`消息配置 | - |

### 注解配置 <Badge type="tip" text="^2.2.3" />
#### 注解配置 <Badge type="tip" text="^2.2.3" />

框架版本从`2.2.3`开始,可以使用注解的方式类配置消息队列。

Expand All @@ -38,16 +39,17 @@

`@MessageQueueListener`可以用在类上或者是方法上面

| 字段名称 | 说明 | 默认值 |
| -------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------ |
| consumerGroup | 相同角色的消费者需要具有完全相同的订阅和 consumerGroup 才能正确实现负载平衡。并且需要是唯一的。如果不配置默认:`${spring.application.name} + "-" + beanName + "-group"` | |
| topic | 订阅的主题 | |
| subscription | 订阅的消息多个 TAG 可以使用`\|\|`隔开。支持 Tag 和 SQL 混搭。例如:` sql:(clientId = 'leaderrun' and (TAGS is not null and TAGS = 'recpt'))` | |
| messageModel | 消费模式 | `CLUSTERING` |
| errorHandlerBeanName | 当该消息消费时出现异常的回调方法 bean 的名称,如果不配置使用全局的拦截器 | |
| `push.orderly` | 控制消费模式,您可以选择并发或有序接收消息。<br/>如果你的消息需要控制消费顺序,请设置成 true,否则设置成 false 提高消费速度 | false |
| `push.maxReconsumeTimes` | 一个消息如果消费失败的话,最多重新消费多少次才投递到死信队列. 默认 1 次 | 1 |
| `push.delayLevelWhenNextConsume` | 消息消费重试策略。<br />-1,不重试,直接放入 DLQ<br/>0 ,由 broker 控制频率<br/>>0,客户端控制重试频率 | -1 |
| 字段名称 | 说明 | 默认值 |
| ------------------------------------ | ------------------------------------------------------------ | ------------ |
| consumerGroup | 相同角色的消费者需要具有完全相同的订阅和 consumerGroup 才能正确实现负载平衡。并且需要是唯一的。如果不配置默认:`${spring.application.name} + "-" + beanName + "-group"` | |
| topic | 订阅的主题 | |
| subscription | 订阅的消息多个 TAG 可以使用`\|\|`隔开。支持 Tag 和 SQL 混搭。例如:` sql:(clientId = 'leaderrun' and (TAGS is not null and TAGS = 'recpt'))` | |
| messageModel | 消费模式 | `CLUSTERING` |
| errorHandlerBeanName | 当该消息消费时出现异常的回调方法 bean 的名称,如果不配置使用全局的拦截器 | |
| `push.orderly` | 控制消费模式,您可以选择并发或有序接收消息。<br/>如果你的消息需要控制消费顺序,请设置成 true,否则设置成 false 提高消费速度 | true |
| `push.maxReconsumeTimes` | 一个消息如果消费失败的话,最多重新消费多少次才投递到死信队列. 默认 1 次 | 1 |
| `push.delayLevelWhenNextConsume` | 消息消费重试策略。<br />-1,不重试,直接放入 DLQ<br/>0 ,由 broker 控制频率<br/>>0,客户端控制重试频率 | -1 |
| `push.suspendCurrentQueueTimeMillis` | 下一次重试的时间。如果消费失败下一次重试的时间,如果maxReconsumeTimes设置成1不重试 | 1000 |

如果是用在类上需要继承`MessageEventListener`并实现`onMessage`接口。**注意:如果使用在类上整个类只能有一个方法,除了类构造器**

Expand All @@ -60,12 +62,13 @@
return idempotentConsumer(
message -> {
...
},
"");
});
}
```

### 生产者
### 配置文件方式

#### 生产者

- 配置

Expand Down Expand Up @@ -104,7 +107,7 @@
```java
@Autowired
private MessageQueueTemplate messageQueueTemplate;
messageQueueTemplate.sendDefaultTopic("rule", "om"); // 发送tags为rule的消息,消息内容为om
```

Expand All @@ -126,7 +129,7 @@
*/
```

### 消费者
#### 消费者

- 配置

Expand Down Expand Up @@ -191,25 +194,38 @@
@Component
public class CustomsConsumer extends BaseConsumer {
private final LogWrapper log = LogWrapper.getLogger(this.getClass());
public CustomsConsumer(RedisService redisService) {
super(redisService);
}
@Bean
public Consumer<Message<CustomsCmd>> factorySubmitOrder() {
return idempotentConsumer(
message -> {
.....
}
);
}
}
## 监听多RocketMQ消息
public CustomsConsumer(RedisService redisService) {
super(redisService);
}
:::warning
@Bean
public Consumer<Message<CustomsCmd>> factorySubmitOrder() {
return idempotentConsumer(
message -> {
.....
}
);
}
}
```
目前只支持多RocketMQ监听,其它MQ暂不支持
:::
对于需要监听多个RocketMQ的场景可以通过配置`binders`属性然后继承``AbstractMessageListenerConcurrently`或者`AbstractMessageListenerOrderly`来实现

- `AbstractMessageListenerConcurrently`并发消费。同一个队列有多个线程消费,具有比较高的消费能力
- `AbstractMessageListenerOrderly`顺序消费。同一个队列的消息只有一个线程消费。

如果消费返回`false`都会进入重试。**需要注意的是并发消费和顺序消费的机制不同,在顺序消费的时候重试会阻塞后面的所有消息,知道重试成功或者到达重试阈值**

## Bus 消息总线

```
一般用于服务集群所有节点通知,比如通知集群中所有节点清空缓存在

以用户中心字典数据更新为例:在页面更新字典数据之后通知所有节点清空缓存
Expand Down Expand Up @@ -262,7 +278,7 @@ public class DictEvent extends RemoteApplicationEvent {
this.dataValue = dataValue;
}
}
```
```

- 定义消息生产者

Expand Down
147 changes: 147 additions & 0 deletions docs/java/dependencys/retry.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
## 间隔重试组件

重试组件主要运用在在最大程度保证业务执行成功。多用于推送数据给第三方等场景

它主要原理是使用`RocketMQ`的延时消息来实现,配合`Redis`存储需要处理的数据。并且它能保证同一个业务数据能顺序执行

默认情况下如果第一次尝试处理业务失败,则会最多重试6次,剩下六次重试的频率为:30s、1m、5m、10m、30m、1h

### Maven 依赖

```xml
<dependency>
<groupId>com.leaderrun</groupId>
<artifactId>leaderrun-retry-starter</artifactId>
</dependency>
```

### 属性

| **字段** | **说明** | **默认值** | **可选值** |
| -------- | -------- | ---------- | ---------- |
| - | - | - | - |

### 使用

#### 新增一条任务

新增的任务会马上执行,如果执行返回`false`会进入重试阶段

:::warning 注意

如果自定义消息头的时候Key需要是`CMQ_`开头,否则重试的时候将拿不到自定义头的数据。推荐通过`intervalRetryTemplate.getHeaderKey('xxxx')`来获取符合条件的`Key`

:::

```java
private final IntervalRetryTemplate intervalRetryTemplate;


Map<String, Object> headers = Maps.newHashMapWithExpectedSize(1);
headers.put("CMQ_CUSTOMS_CODE", customerCode);

intervalRetryTemplate.addPushTask(GlobalConstant.MQ_PUSH_BOOKING_TAG, asnNo, "我是一个数据", headers);
```

#### 消费

:::warning 警告

不管使用哪种方式实现需要注意几点:

- 执行业务逻辑的时候自行处理并记录好异常,建议只返回`true``false`。如果直接抛出异常可能导致同一订单不会进行顺序重试

- 如果重试达到阈值就不会在执行,框架会打印如下日志。并且数据只会保留7天,可以通过`dataId``Redis`中找到

```txt
log.error(
"&&&& 重试超过 {} 次没有成功。不再进行重试。MessageId: {}, DataId: {}, data: {}",
retryCount,
msgId,
dataId,
data);
```

:::

第一种通过`@MessageQueueListener`注解来实现 **(推荐使用这种方式来处理)**

```java
@Bean
@MessageQueueListener(topic = "edi", subscription = GlobalConstant.MQ_PUSH_BOOKING_TAG,
push = @MessageQueueListener.Push(orderly = false)// 这里必须设置成false)
public Consumer<Message<String>> onNotifyFlexport() {
return message -> {
intervalRetryTemplate.onStreamMessagePush(data -> {
try {
// 执行你的业务逻辑
// 执行成功返回true则不会进行重试
// 如果返回false框架会自动判断是否需要进入重试。如果没有达到重试阈值会进入重试
} catch (Throwable e) {
// 建议自行处理异常,尽可能的返回false而不是异常。否则可能导致同一订单不会进行顺序重试
log.error("MessageId: %s, 推送 %s Booking 回执异常:%s".formatted("msgId", "customerCode", e.getMessage()), e);
return false;
}
}).apply(message);
};
```


第二种通过继承`AbstractMessageListenerConcurrently`来实现

:::warning 注意

`AbstractMessageListenerConcurrently`或者`Message`的泛型类型只能是`String`,他代表的是创建任务时候传入值的关联ID

:::

```java
@Component
public class PushBookingReceiptTaskConsumer extends AbstractMessageListenerConcurrently<String> {
private final Map<String, PublishReceiptService> publishReceiptServiceMap;
private final IntervalRetryTemplate intervalRetryTemplate;


@Override
public boolean onMessage(MessageExt msgs, ConsumeConcurrentlyContext context, String dataId) {
String customerCode = msgs.getProperty("CMQ_CUSTOMS_CODE");
PublishReceiptService publishReceiptService = publishReceiptServiceMap.get(customerCode);
if (publishReceiptService == null) {
return true;
}
String msgId = msgs.getMsgId();

return intervalRetryTemplate
.onNativeMessagePush(data -> { // 这里的data就是创建任务时的传入的值
try {
if (publishReceiptService.publishBookingReceipt(data)) {
return true;
}
} catch (Throwable e) {
log.error(
"MessageId: %s, 推送 %s Booking 回执异常:%s".formatted(msgId, customerCode, e.getMessage()),
e);
}
return true;
})
.apply(msgs, id);
}
}
```


### 依赖

```xml
<dependency>
<groupId>com.leaderrun</groupId>
<artifactId>leaderrun-mq-starter</artifactId>
<scope>compile</scope>
</dependency>

<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<scope>compile</scope>
</dependency>
```

0 comments on commit a32b741

Please sign in to comment.