上一篇
RocketMQ 5.x的生产者,应该怎么配?
- 行业动态
- 2024-05-09
- 4780
在RocketMQ 5.x中,生产者的配置主要包括以下几个方面:
1、创建生产者实例
2、设置生产者属性
3、发送消息
4、关闭生产者
下面详细介绍每个方面的配置方法。
创建生产者实例
在RocketMQ中,生产者实例是通过DefaultMQProducer类创建的,首先需要引入RocketMQ的依赖,然后创建一个DefaultMQProducer实例。
import org.apache.rocketmq.client.producer.DefaultMQProducer; public class RocketMQProducer { public static void main(String[] args) throws Exception { // 创建一个名为"producerGroup"的生产者组 DefaultMQProducer producer = new DefaultMQProducer("producerGroup"); // 设置NameServer地址 producer.setNamesrvAddr("127.0.0.1:9876"); // 启动生产者实例 producer.start(); } }
设置生产者属性
在创建完生产者实例后,可以通过setProperty方法设置生产者的属性,以下是一些常用的属性设置:
属性名 | 默认值 | 描述 |
sendMessageTimeout | 3000 | 发送消息的超时时间,单位为毫秒 |
retryTimesWhenSendFailed | 2 | 发送失败时的重试次数 |
maxReconsumeTimes | 18 | 消息最大重试次数 |
topicPublishInfoBatchMaxSize | 1000 | 批量发送主题消息的最大数量 |
compressMsgBodyOverHowmuch | 1024 * 4 | 压缩消息体的大小阈值,超过该值则进行压缩 |
batchSize | 16 | 批量发送消息的大小阈值,超过该值则进行批量发送 |
sendMessageThreadPoolNums | 128 | 发送消息的线程池数量 |
pullMessageThreadPoolNums | 128 | 拉取消息的线程池数量 |
checkFrequency | 60000 | 检查Broker是否可用的频率,单位为毫秒 |
storePathRootDir | null | 消息存储路径的根目录 |
storePathCommitLog | null | commitLog存储路径 |
mappedFileSizeCommitLog | 1GB | commitLog文件的大小阈值,超过该值则新建一个文件 |
flushIntervalCommitLog | 500ms | commitLog刷新到磁盘的时间间隔,单位为毫秒 |
cleanResourceInterval | 1000 * 60 * 60 | 清理资源的时间间隔,单位为毫秒 |
deleteWhen | 04 | commitLog文件删除的时间点,取值为小时(024)和分钟(059)的组合,例如04表示凌晨4点删除文件 |
fileReservedTime | 72 | commitLog文件保留的时间,单位为小时,超过该时间则删除文件 |
maxTransferBytesOnMessageInMemory | 1 | 消息内存中传输的最大字节数,超过该值则将消息写入磁盘队列中等待传输 |
maxTransferCountOnMessageInMemory | 1 | 消息内存中传输的最大次数,超过该值则将消息写入磁盘队列中等待传输 |
enableMsgTrace | false | 是否开启消息轨迹追踪功能,开启后会记录消息的发送和接收情况,用于排查问题 |
msgTraceTopic | null | 消息轨迹追踪的主题名称,如果为null则使用生产者组名作为主题名称 |
msgTraceConsumerGroup | null | 消息轨迹追踪的消费者组名称,如果为null则使用生产者组名作为消费者组名称 |
flushConsumerQueueLaterally | false | 是否启用侧边队列刷盘机制,当消费者消费速度过快时,可以将部分消息暂时存储在侧边队列中,待消费者慢下来后再进行消费,提高系统吞吐量 |
maxReconsumeTimeDifference | 1800000000L | 两次重试之间的最大时间差,超过该时间差则重新投递未被消费的消息,单位为毫秒,默认值为1表示禁用该功能 |
maxDelayTime | 1L | 如果设置了延迟消息,则该参数表示延迟的最大时间,单位为毫秒,默认值为1表示禁用该功能 |
maxOffsetDelta | 1L | 如果设置了延迟消息或者定时消息,则该参数表示允许的最大offset变化量,单位为毫秒,默认值为1表示禁用该功能 |
traceDispatcherListenerStackSize | 1L | traceDispatcherListener的堆栈大小,默认值为1表示使用JVM默认值,建议设置为32K或更大以减少OOM异常的发生概率,注意:此参数仅在开启traceDispatcherEnable时生效。 |
| traceDispatcherEnable
本站发布或转载的文章及图片均来自网络,其原创性以及文中表达的观点和判断不代表本站,有问题联系侵删!
本文链接:http://www.xixizhuji.com/fuzhu/179891.html