Skip to content

Latest commit

 

History

History
131 lines (101 loc) · 3.51 KB

README-zh.md

File metadata and controls

131 lines (101 loc) · 3.51 KB

spring-cloud-stream-redis

Maven central License CodeQL

阅读其他语言版本: English

介绍

基于Spring Cloud Stream 规范实现 Redis 消息 发送、接收, 正式版本 与 Spring Cloud Stream 保持一致

文档

https://guoshiqiufeng.github.io/spring-cloud-stream-redis/

开发框架

  • Spring Cloud Stream 4
  • Spring Boot 3

功能

  • PUBLISH SUBSCRIBE 消息
  • QUEUE 消息(BLPOP BRPOP LPUSH RPUSH)

注1: 两个功能模式不能混合使用,即 使用 PUBLISH SUBSCRIBE 模式 发送消息 时,不能使用 QUEUE 模式接收消息,反之亦然

注2: PUBLISH SUBSCRIBE 模式消息接收不到会丢失,QUEUE 模式不会

使用

引入统一版本依赖,不用再使用时指定版本号

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>io.github.guoshiqiufeng.cloud</groupId>
            <artifactId>spring-cloud-stream-dependencies</artifactId>
            <version>0.5.0</version>
            <type>import</type>
        </dependency>
    </dependencies>
</dependencyManagement>

引入starter依赖

<dependency>
    <groupId>io.github.guoshiqiufeng.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-redis</artifactId>
</dependency>

yml 配置

spring:
  cloud:
    stream:
      default-binder: redis
      binders:
        redis:
          type: redis
      redis:
        binder:
          configuration:
            host: 127.0.0.1
            port: 6379
            password: 123456
            database: 7
          support-type: queue_channel
      #        bindings:
      #          send-in-0:
      #            consumer:
      #              destination-is-pattern: true
      bindings:
        out-0:
          destination: test-topic
          content-type: text/plain
          group: push-producer-group
        send-in-0:
          destination: test-topic
          content-type: text/plain
          group: test-send-group

消息发送

@Autowired
private StreamBridge streamBridge;

@GetMapping("/send")
public String send() {
    MessageVO messageVO = new MessageVO();
    messageVO.setKey(UUID.randomUUID().toString());
    messageVO.setMsg("hello ");
    messageVO.setIds(Set.of("1", "2"));
    messageVO.setCreateTime(LocalDateTime.now());
    streamBridge.send("out-0", JSON.toJSONString(messageVO, JSONWriter.Feature.WriteClassName));
    return "success";
}

消息接收

@Slf4j
@Component("send")
public class MessageHandler implements Consumer<Message<String>> {

    /**
     * Performs this operation on the given argument.
     *
     * @param messageVOMessage the input argument
     */
    @Override
    public void accept(Message<String> messageVOMessage) {
        log.info("send Receive New Messages: {}", messageVOMessage.getPayload());
    }
}

更多使用参考查看文档