broker添加<policyEntry queue=">" consumersBeforeDispatchStarts="2" timeBeforeDispatchStarts="60000" ></policyEntry>
策略, 指定最少的消费者数量和最小的等待时间。
message.setGroupID("xyz");
消息添加groupId保序
jms.prefetchPolicy.queuePrefetch=500
指定preFetch数量。
jms.redeliveryPolicy.backOffMultiplier=2
指定redilevery倍乘数,
jms.redeliveryPolicy.useExponentialBackOff=true
指定按指数处理backOff
1 2 3 4 5 6 7 8 9 |
spring: jms: listener: # 客户端确认 acknowledge-mode: client #默认listener 只有一个线程,这样可以多线程处理 concurrency: 2 max-concurrency: 2 |
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(
DefaultJmsListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
configurer.configure(factory, connectionFactory);
//客户端确认需要禁用session事务
factory.setSessionTransacted(false);
factory.setPubSubDomain(false);
factory.setSessionAcknowledgeMode(JmsProperties.AcknowledgeMode.CLIENT.getMode());
return factory;
}
failover:(tcp://192.168.140.148:61616,tcp://192.168.140.128:61616)?jms.prefetchPolicy.queuePrefetch=500
将如下jar添加到activeMQ的lib目录 commons-dbcp2-2.9.0.jar
,commons-pool2-2.11.1.jar
,mysql-connector-j-8.0.33.jar
activemq.xml添加如下配置
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#mysql-ds"/>
</persistenceAdapter>
<bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.cj.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://192.168.140.128/activemq?relaxAutoCommit=true"/>
<property name="username" value="root"/>
<property name="password" value="Root@123"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
activemq.xml中添加
<policyEntry queue=">" consumersBeforeDispatchStarts="2" timeBeforeDispatchStarts="60000" >
<deadLetterStrategy>
<individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true"/>
</deadLetterStrategy>
</policyEntry>
Posted in: ActiveMQ
Comments are closed.