安卓软件开发公司,宁波seo网络推广报价,wordpress启用主题404,培训网站建设RocketMQ消息发送基本示例(推送消费者)-CSDN博客
RocketMQ消费者主动拉取消息示例-CSDN博客
RocketMQ顺序消息-CSDN博客
RocketMQ广播消息-CSDN博客 延时消息:
延时消息实现的效果就是产者调用 producer.send 方法后#xff0c;消息会立即发送到 Broker#xff0c;并被存…RocketMQ消息发送基本示例(推送消费者)-CSDN博客
RocketMQ消费者主动拉取消息示例-CSDN博客
RocketMQ顺序消息-CSDN博客
RocketMQ广播消息-CSDN博客 延时消息:
延时消息实现的效果就是产者调用 producer.send 方法后消息会立即发送到 Broker并被存储在指定的队列中。RocketMQ 使用内部的延迟队列机制来实现延时消息。消息在到达 Broker 后会根据设定的延迟级别放入相应的延迟队列。每个延迟级别对应一个特定的延迟时间如 1 分钟、5 分钟等。消息在延迟队列中等待直到延迟时间过去。到达指定时间后RocketMQ 会将消息移动到实际的消费队列中这时消息才会对消费者可见。 预定日常定时发送:messageDelayLevel1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 18个级别 可以修改broker.conf文件 messageDelayLevel3s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h message.setDelayTimeLevel(3) 就是选定第3个等级
5.0版本以上支持 指定时间定时发送
message.setDelayTimeMs(10L) 指定时间定时发送.默认支持最大延迟时间3天.
在broker.conf种可以修改 timerMaxDelaySec2592000 默认最大3天 72 小时 package com.example.rocketmqdemo.scheldule;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;import java.nio.charset.StandardCharsets;
import java.time.LocalDate;
import java.time.LocalTime;/*** 预定日程生产者* author hrui* date 2024/8/1 11:48*/
public class schelduleProducer {public static void main(String[] args) {//创建一个DefaultMQProducer实例指定生产者组名为group1DefaultMQProducer producer new DefaultMQProducer(schelduleProducer);//生产者组和消费者组是不同概念 不需要相同//设置NameServer地址RocketMQ客户端通过NameServer获取Broker的路由信息producer.setNamesrvAddr(xxx.xxx.xxx:9876);try {//启动生产者实例producer.start();//发送10条消息for (int i 0; i 2; i) {//创建消息实例指定主题为Topic1标签为Tag1消息内容为Hello World加上编号Message message new Message(scheldule, Tag1, (schelduleProducer i).getBytes(StandardCharsets.UTF_8));//在发送之前设置定时发送等级//messageDelayLevel1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h//从1开始18个等级//message.setDelayTimeLevel(5);//交给Broker 1分钟后延迟发送给消费者//自定义时间发送 最大72小时message.setDelayTimeMs(30000L);//30秒producer.send(message);System.out.println(消息定时发送成功,已交给Broker: LocalTime.now());}} catch (Exception e) {//捕获并打印异常信息e.printStackTrace();} finally {//关闭生产者实例释放资源producer.shutdown();}}
}package com.example.rocketmqdemo.scheldule;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;import java.time.LocalDate;
import java.time.LocalTime;
import java.util.List;/*** author hrui* date 2024/8/1 11:55*/
public class ScheduleConsumer {public static void main(String[] args) {//创建一个DefaultMQPushConsumer实例指定消费者组名为group1//采用长轮询机制模拟推送效果但本质上是主动拉取。适合低延迟、高实时性的场景。DefaultMQPushConsumer consumer new DefaultMQPushConsumer(group1);//设置NameServer地址RocketMQ客户端通过NameServer获取Broker的路由信息consumer.setNamesrvAddr(xxx.xxx.xxx:9876);try {//订阅主题Topic1过滤标签为*表示接收所有消息consumer.subscribe(scheldule, *);//设置消息监听器处理接收到的消息//可以传入两种类型的监听器//1. MessageListenerOrderly顺序消费保证消息按顺序处理//2. MessageListenerConcurrently并发消费消息并发处理不保证顺序consumer.setMessageListener(new MessageListenerConcurrently() {//consumeMessage方法用于处理接收到的消息列表Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {System.out.println(Thread.currentThread().getName());for (int i0;ilist.size();i){System.out.println(i_消息消费成功_new String(list.get(i).getBody()));broker是将两条消息分别发送的System.out.println(LocalTime.now());}//返回消费状态CONSUME_SUCCESS表示消息消费成功return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消费者实例开始接收消息consumer.start();} catch (Exception e) {//捕获并打印异常信息e.printStackTrace();}}
}