initialReconnectDelay 10 maxReconnectDelay 30000 How long to wait before the first reconnect attempt The maximum amount of time we ever wait between reconnect attempts Should an exponential backoff be used btween reconnect attempts The exponent used in the exponential backoff attempts If not 0, then this is the maximum number of reconnect attempts before an error is sent back to the client useExponentialBackOff true backOffMultiplier 2 maxReconnectAttempts 0 例如:
discovery:(multicast://default)?initialReconnectDelay=100 为了使用Discovery来发现broker,需要为broker启用discovery agent。 以下是XML配置文件中的一个例子: Xml代码
...
在使用Failover Transport或Discovery transport等能够自动重连的transport的时候,需要注意的是:设想有两个broker,它们都启用AMQ Message Store作为持久化存储,有一个producer和一个consumer连接到某个queue。当因其中一个broker失效时而切换到另一个 broker的时候,如果失效的broker的queue中还有未被consumer消费的消息,那么这个queue里的消息仍然滞留在失效broker 的中,直到失效的broker被修复并重新切换回这个被修复的broker后,之前被保留的消息才会被consumer消费掉。如果被处理的消息有时序限制,那么应用程序就需要处理这个问题。另外也可以通过ActiveMQ集群来解决这个问题。
在transport重连的时候,可以在connection上注册TransportListener来获得回调,例如:
Java代码
(ActiveMQConnection)connection).addTransportListener(new TransportListener() {
public void onCommand(Object cmd) { }
public void onException(IOException exp) {
第13页
}
public void transportInterupted() {
// The transport has suffered an interruption from which it hopes to recover. }
public void transportResumed() {
// The transport has resumed after an interruption. } });
2.5 ActiveMQ Broker的持久方式
当ActiveMQ的队列比较多时,可能内存将会不够用,或突然关机,可能导致内存数据丢失,所以我们需要通过其他方式来保证数据的完整性。目前Broker常用的持久方式有:AMQMessage Store,Kaha Persistence,JDBC Persistence
2.5.1 AMQMessage Store
AMQ Message Store是ActiveMQ5.0 缺省的持久化存储。Message commands被保存到transactional journal(由rolling data logs组成)。Messages被保存到data logs中,同时被reference store进行索引以提高存取速度。Date logs由一些单独的data log文件组成,缺省的文件大小是32M,如果某个消息的大小超过了data log文件的大小,那么可以修改配置以增加data log文件的大小。如果某个data log文件中所有的消息都被成功消费了,那么这个data log文件将会被标记,以便在下一轮的清理中被删除或者归档。以下是其配置的一个例子: Xml代码
第14页
Property name directory Default value Comments the path to the directory to activemq-data use to store the message store data and log files true false 32mb use NIO to write messages to the data logs sync every write to disk a hint to set the maximum size of the message data logs use a persistent index for the message logs. If this is false, an in-memory structure is maintained the maximum number of messages to keep in a transaction before automatically committing time (ms) before checking for a discarding/moving message data logs that are no longer used default number of bins used by the index. The bigger the bin size - the better the relative performance of the index the size of the index key - the key is the message id the size of the index page - the bigger the page - the better the write performance of the index the path to the directory to use to store discarded data logs if true data logs are moved to the archive directory instead of being deleted useNIO syncOnWrite maxFileLength persistentIndex true maxCheckpointMessageAddSize 4kb cleanupInterval 30000 indexBinSize 1024 indexKeySize 96 indexPageSize 16kb directoryArchive archive archiveDataLogs false 第15页
2.5.2 Kaha Message Store
Kaha Persistence 是一个专门针对消息持久化的解决方案。它对典型的消息使用模式进行了优化。在Kaha中,数据被追加到data logs中。当不再需要log文件中的数据的时候,log文件会被丢弃。以下是其配置的一个例子: Xml代码
2.5.3 JDBC Message Store
目前支持的数据库有Apache Derby, Axion, DB2, HSQL, Informix, MaxDB, MySQL, Oracle, Postgresql, SQLServer, Sybase。
如果你使用的数据库不被支持,那么可以调整StatementProvider 来保证使用正确的SQL方言(flavour of SQL)。通常绝大多数数据库支持以下adaptor:
org.activemq.store.jdbc.adapter.BlobJDBCAdapter ? org.activemq.store.jdbc.adapter.BytesJDBCAdapter ? org.activemq.store.jdbc.adapter.DefaultJDBCAdapter ? org.activemq.store.jdbc.adapter.ImageJDBCAdapter
?
也可以在配置文件中直接指定JDBC adaptor,例如: Xml代码
1. 第16页 Xml代码 2.6 ActimveMQ的其他特性 2.6.1 异步发送消息 ActiveMQ支持生产者以同步或异步模式发送消息。使用不同的模式对send方法的反应时间有巨大的影响,反映时间是衡量ActiveMQ吞吐量的重要因素,使用异步发送可以提高系统的性能。 在默认大多数情况 下,AcitveMQ是以异步模式发送消息。例外的情况:在没有使用事务的情况下,生产者以PERSISTENT传送模式发送消息。在这种情况 下,send方法都是同步的,并且一直阻塞直到ActiveMQ发回确认消息:消息已经存储在持久性数据存储中。这种确认机制保证消息不会丢失,但会造成 生产者阻塞从而影响反应时间。 高性能的程序一般都能容忍在故障情况下丢失少量数据。如果编写这样的程序,可以通过使用异步发送来提高吞吐量(甚至在使用PERSISTENT传送模式的情况下)。 使用Connection URI配置异步发送: cf = new ActiveMQConnectionFactory(\在ConnectionFactory层面配置异步发送: ((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true); 在Connection层面配置异步发送,此层面的设置将覆盖ConnectionFactory层面的设置: 第17页 百度搜索“77cn”或“免费范文网”即可找到本站免费阅读全部范文。收藏本站方便下次阅读,免费范文网,提供经典小说综合文库ActiveMQ使用手册(4)在线全文阅读。
相关推荐: