大悟建设局网站,怎么以公司名义注册邮箱,四川建设银行官网招聘网站,外贸 wordpress目录 概述使用准备工作引入依赖创建Topic代码应用启动消息接收再扩展一个 结束 概述
github 文档地址 rocket mq example
RocketMQ 版本为 5.1.4
使用
准备工作
阅读此文需要事先准备 RocketMQ #xff0c;如有疑问#xff0c;请移步 RocketMQ 服务搭建
引入依赖
此处… 目录 概述使用准备工作引入依赖创建Topic代码应用启动消息接收再扩展一个 结束 概述
github 文档地址 rocket mq example
RocketMQ 版本为 5.1.4
使用
准备工作
阅读此文需要事先准备 RocketMQ 如有疑问请移步 RocketMQ 服务搭建
引入依赖
此处多依赖了一些依赖。
dependencygroupIdcom.alibaba.cloud/groupIdartifactIdspring-cloud-starter-alibaba-nacos-discovery/artifactId
/dependency
dependencygroupIdcom.alibaba.cloud/groupIdartifactIdspring-cloud-starter-alibaba-nacos-config/artifactId
/dependency
dependencygroupIdcom.alibaba.cloud/groupIdartifactIdspring-cloud-starter-stream-rocketmq/artifactId
/dependencydependencygroupIdorg.springframework.cloud/groupIdartifactIdspring-cloud-starter-bootstrap/artifactId
/dependency
dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId
/dependency创建Topic
sh bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t test-topic代码
配置 Input 和 Output 的 Binding 信息并配合 EnableBinding 注解使其生效
EnableBinding({ Source.class, Sink.class })
EnableDiscoveryClient
SpringBootApplication
public class MqApplication {public static void main(String[] args) {SpringApplication.run(MqApplication.class);}
}配置 Binding 信息
# 配置rocketmq的nameserver地址
spring.cloud.stream.rocketmq.binder.name-server10.xx.xx.143:9876
# 定义name为output的binding
spring.cloud.stream.bindings.output.destinationtest-topic
spring.cloud.stream.bindings.output.content-typeapplication/json
# 定义name为input的binding
spring.cloud.stream.bindings.input.destinationtest-topic
spring.cloud.stream.bindings.input.content-typeapplication/json
spring.cloud.stream.bindings.input.grouptest-group发送消息代码
public class RocketMQProducer {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer new DefaultMQProducer(producer_group);producer.setNamesrvAddr(10.xx.xx.143:9876);producer.start();Message msg new Message(test-topic, tagStr, message from rocketmq producer.getBytes());producer.send(msg);}
}应用启动
1.增加配置在应用的 /src/main/resources/application.properties 中添加基本配置信息
spring:application:name: mq
server:port: 97002.启动应用支持 IDE 直接启动和编译打包后启动。
IDE 直接启动找到主类 RocketMQApplication执行 main 方法启动应用。打包编译后启动首先执行 mvn clean package 将工程编译打包然后执行 java -jar rocketmq-example.jar 启动应用。
消息接收
使用 name 为 output 对应的 binding 发送消息到 test-topic 这个 topic。
Service
public class ReceiveService {StreamListener(input1)public void receiveInput1(String receiveMsg) {System.out.println(input1 receive: receiveMsg);}
}再扩展一个 一个服务中要使用多个 Topic 这种情况如何解决下面给出解决方案。 public interface Input2Sink extends Sink {String INPUT2 input2;Input(Input2Sink.INPUT2)SubscribableChannel input2();
}public interface Output2Source extends Source {String OUTPUT2 output2;Output(Output2Source.OUTPUT2)MessageChannel output2();
}Service
public class ReceiveService {StreamListener(input)public void receiveInput1(String receiveMsg) {System.out.println(input receive: receiveMsg);}StreamListener(input2)public void receiveInput2(String receiveMsg) {System.out.println(input2 receive: receiveMsg);}
}EnableBinding(value {Source.class, Sink.class, Output2Source.class, Input2Sink.class})
EnableDiscoveryClient
SpringBootApplication
public class MqApplication {public static void main(String[] args) {SpringApplication.run(MqApplication.class);}
}结果如下
结束
spring cloud alibaba RocketMQ 最佳实践 至此就结束了如有疑问欢迎评论区留言。