ActiveMQ 杂记

7月 3, 2023 |

ActiveMQ 增加消费能力

broker添加<policyEntry queue=">" consumersBeforeDispatchStarts="2" timeBeforeDispatchStarts="60000" ></policyEntry>策略, 指定最少的消费者数量和最小的等待时间。
message.setGroupID("xyz"); 消息添加groupId保序

BrokerURL的定制参数

jms.prefetchPolicy.queuePrefetch=500 指定preFetch数量。
jms.redeliveryPolicy.backOffMultiplier=2指定redilevery倍乘数,
jms.redeliveryPolicy.useExponentialBackOff=true 指定按指数处理backOff

客户端确认及多线程指定

@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(
		DefaultJmsListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
	DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
	configurer.configure(factory, connectionFactory);
	//客户端确认需要禁用session事务
	factory.setSessionTransacted(false);
	factory.setPubSubDomain(false);
	factory.setSessionAcknowledgeMode(JmsProperties.AcknowledgeMode.CLIENT.getMode());
	return factory;
}

高可用

failover:(tcp://192.168.140.148:61616,tcp://192.168.140.128:61616)?jms.prefetchPolicy.queuePrefetch=500

将如下jar添加到activeMQ的lib目录 commons-dbcp2-2.9.0.jar,commons-pool2-2.11.1.jar,mysql-connector-j-8.0.33.jar
activemq.xml添加如下配置

<persistenceAdapter>
	<jdbcPersistenceAdapter  dataSource="#mysql-ds"/>
</persistenceAdapter>
		
  <bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
     <property name="driverClassName" value="com.mysql.cj.jdbc.Driver"/>
     <property name="url" value="jdbc:mysql://192.168.140.128/activemq?relaxAutoCommit=true"/>
     <property name="username" value="root"/>
     <property name="password" value="Root@123"/>
     <property name="poolPreparedStatements" value="true"/>
</bean>

每个queue单独的DLQ

activemq.xml中添加

<policyEntry queue=">" consumersBeforeDispatchStarts="2" timeBeforeDispatchStarts="60000" >
  <deadLetterStrategy>
	   <individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true"/>
  </deadLetterStrategy>
</policyEntry>

Posted in: ActiveMQ

Comments are closed.