activemq特性是什么

這篇文章主要介紹“activemq特性是什么”,在日常操作中,相信很多人在activemq特性是什么問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”activemq特性是什么”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!

目前創(chuàng)新互聯(lián)已為超過千家的企業(yè)提供了網(wǎng)站建設、域名、網(wǎng)頁空間、成都網(wǎng)站托管、企業(yè)網(wǎng)站設計、古塔網(wǎng)站維護等服務,公司將堅持客戶導向、應用為本的策略,正道將秉承"和諧、參與、激情"的文化,與客戶和合作伙伴齊心協(xié)力一起成長,共同發(fā)展。

activemq特點:用通配符訂閱多個destination,用組合發(fā)布多重destionation
activemq支持destination的層次結構【topic和queen】便于歸類和管理。
通配符有三個:
.  用來分隔路徑
* 用來匹配路徑中的一節(jié)
> 用來匹配任意節(jié)的路徑
opics: <sport><League>.<team>  
。例如: football.division.leeds。 如果leeds 參加兩種運動--Scccer 和 Rugby,為了方便,我們希望通過一個消息消費者而看到Leeds兩種運動的最新戰(zhàn)績,這個時候,通配符就有用武之地了
.  : used to separate elements in the destination name
*  : used to match one element    
>  : match one or all trailing elements
所以,對于上面的例子, 你可以訂閱這樣的主題: *.*.Leeds
如果你想知道division1 這個賽區(qū)的所有分數(shù), 你可以訂閱這個: soccer.division1.*
如果你想知道Rugby的分數(shù): 你可以訂閱這個: rugby.>.
然而, 通配符中是為消費者服務的,如果你發(fā)送了這樣的一個主題: rugby.>., 這個消息僅會發(fā)送到命名了rugby.>.的主題,并不是所有的主題都是以rugby開頭的。
這里有一種  方法,使消息生產(chǎn)者能將一條
消息發(fā)送到多個目的地。通過使用   composite destination。
將同一條消息發(fā)送到不同的目的地是很有用的。 比如一個用來存儲信息的應用,會發(fā)送一條消息給隊列
同時也要將這條消息廣播給監(jiān)控的所有系統(tǒng)。通常,你會通過用兩個producer 發(fā)送兩次消息來達到這個目的。composite destination就是用來解決這種情況的
例如,如果你創(chuàng)建了名子為: store.order.backoffice,store.order.warehouse 的 Queue,這樣 就會發(fā)送同時兩個Queue。
訂閱信息     解釋
PRICE.>     Any price for any product on any exchange
PRICE.STOCK.>     Any price for a stock on any exchange
PRICE.STOCK.NASDAQ.*     Any stock price on NASDAQ
PRICE.STOCK.*.IBM     Any IBM stock price on any exchange
從5.5 版本以后,可以自定義路徑分隔符:

    <plugins>
       .....
       <destinationPathSeparatorPlugin/>
    </plugins>

此時FOO.BAR.* 可以表示為 FOO/BAR/*
也可以通過pathSeparator 屬性定義其他符號位路徑分隔符。
   public void subscribeToLeeds() throws JMSException {
        String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic allLeeds = session.createTopic("*.*.Leeds");
        MessageConsumer consumer = session.createConsumer(allLeeds);
        Message result = consumer.receive();
    }
    11.1.2發(fā)送一個message到多重destinations
    發(fā)送相同的message到不同的destination上:案列發(fā)送一個[queen,opic]組合模式,默認的組合destination用,分隔
    列如store.order.backoffice,store.order.warehouse
     String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue ordersDestination = session.createQueue("store.orders, topic://store.orders");
        MessageProducer producer = session.createProducer(ordersDestination);
        Message order = session.createObjectMessage();
        producer.send(order);
11.2通知消息
單的說就是實現(xiàn)了ActiveMQ的broker上各種操作的記錄跟蹤和通知。

使用這個功能,你可以實時的知道broker上

    創(chuàng)建或銷毀了連接,
    添加或刪除了生存者或消費者,
    添加或刪除了主題或隊列,
    有消息發(fā)送和接收,
    什么時候有慢消費者,
    什么時候有快生產(chǎn)者
    什么時候什么消息被丟棄
    什么時候broker被添加到集群(主從或是網(wǎng)絡連接)

這個機制是ActiveMQ對JMS協(xié)議的重要補充,也是基于JMS實現(xiàn)的ActiveMQ的可管理性的一部分。多個ActiveMQ的相互協(xié)調(diào)和互操作的基礎設置。
 String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic connectionAdvisory = org.apache.activemq.advisory.AdvisorySupport.CONNECTION_ADVISORY_TOPIC;
        MessageConsumer consumer = session.createConsumer(connectionAdvisory);
        ActiveMQMessage message = (ActiveMQMessage) consumer.receive();
        DataStructure data = (DataStructure) message.getDataStructure();
        if (data.getDataStructureType() == ConnectionInfo.DATA_STRUCTURE_TYPE) {
            ConnectionInfo connectionInfo = (ConnectionInfo) data;
            System.out.println("Connection started: " + connectionInfo);
        } else if (data.getDataStructureType() == RemoveInfo.DATA_STRUCTURE_TYPE) {
            RemoveInfo removeInfo = (RemoveInfo) data;
            System.out.println("Connection stopped: " + removeInfo.getObjectId());
        } else {
            System.err.println("Unknown message " + data);
        }
        大多數(shù)advisor消息都是完整的對于destiation,但是呢advisorysupport類有一些方法來決定監(jiān)聽哪個advisorytopic,你也能使用通配符-
             String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
        Connection connection = connectionFactory.createConnection();
        connection.start();
        // Lets first create a Consumer to listen too
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // Lets first create a Consumer to listen too
        Queue queue = session.createQueue("test.Queue");
        MessageConsumer testConsumer = session.createConsumer(queue);
        // so lets listen for the Consumer starting/stoping
        Topic advisoryTopic = org.apache.activemq.advisory.AdvisorySupport.getConsumerAdvisoryTopic(queue);
        MessageConsumer consumer = session.createConsumer(advisoryTopic);
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message m) {
                ActiveMQMessage message = (ActiveMQMessage) m;
                try {
                    System.out.println("Consumer Count = " + m.getStringProperty("consumerCount"));
                    DataStructure data = (DataStructure) message.getDataStructure();
                    if (data.getDataStructureType() == ConsumerInfo.DATA_STRUCTURE_TYPE) {
                        ConsumerInfo consumerInfo = (ConsumerInfo) data;
                        System.out.println("Consumer started: " + consumerInfo);
                    } else if (data.getDataStructureType() == RemoveInfo.DATA_STRUCTURE_TYPE) {
                        RemoveInfo removeInfo = (RemoveInfo) data;
                        System.out.println("Consumer stopped: " + removeInfo.getObjectId());
                    } else {
                        System.err.println("Unknown message " + data);
                    }
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        testConsumer.close();
         <destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry topic=">" advisoryForSlowConsumers="true">
                </policyEntry>
              </policyEntries>
            </policyMap>
        </destinationPolicy>

ActiveMQ中,topic只有在持久訂閱(durablesubscription)下是持久化的。存在持久訂閱時,每個持久訂閱者,都相當于一個持久化的queue的客戶端,它會收取所有消息。這種情況下存在兩個問題:

1.        同一應用內(nèi)consumer端負載均衡的問題:同一個應用上的一個持久訂閱不能使用多個consumer來共同承擔消息處理功能。因為每個都會獲取所有消息。queue模式可以解決這個問題,broker
端又不能將消息發(fā)送到多個應用端。所以,既要發(fā)布訂閱,又要讓消費者分組,這個功能jms規(guī)范本身是沒有的。
2.        同一應用內(nèi)consumer端failover的問題:由于只能使用單個的持久訂閱者,如果這個訂閱者出錯,則應用就無法處理消息了,系統(tǒng)的健壯性不高,為了解決這兩個問題,ActiveMQ中實現(xiàn)了虛擬
Topic的功能。使用起來非常簡單。對于消息發(fā)布者來說,就是一個正常的Topic,名稱以VirtualTopic.開頭。例如VirtualTopic.TEST。對于消息接收端來說,是個隊列,不同應用里使用不同的前綴作為
隊列的名稱,即可表明自己的身份即可實現(xiàn)消費端應用分組。例如Consumer.A.VirtualTopic.TEST,說明它是名稱為A的消費端,同理Consumer.B.VirtualTopic.TEST說明是一個名稱為B的客戶端。
可以在同一個應用里使用多個consumer消費此queue,則可以實現(xiàn)上面兩個功能。又因為不同應用使用的queue名稱不同(前綴不同),所以不同的應用中都可以接收到全部的消息。每個客戶端相當于一個持久訂
閱者,而且這個客戶端可以使用多個消費者共同來承擔消費任務。
String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
        
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
        Connection consumerConnection = connectionFactory.createConnection();
        consumerConnection.start();
        
        Session consumerSessionA = consumerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
        Queue consumerAQueue = consumerSessionA.createQueue("Consumer.A.VirtualTopic.orders");
        MessageConsumer consumerA = consumerSessionA.createConsumer(consumerAQueue);
        
        Session consumerSessionB = consumerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
        Queue consumerBQueue = consumerSessionB.createQueue("Consumer.B.VirtualTopic.orders");
        MessageConsumer consumerB = consumerSessionB.createConsumer(consumerAQueue);
        
        //setup the sender
        Connection senderConnection = connectionFactory.createConnection();
        senderConnection.start();
        Session senerSession = senderConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic ordersDestination = senerSession.createTopic("VirtualTopic.orders");
        MessageProducer producer = senerSession.createProducer(ordersDestination);
同樣queue名稱的消費者會平分所有消息。
從queue接收到的消息,message.getJMSDestination().toString()為topic://VirtualTopic.TEST,即原始的destination。消息的persistent屬性為true,即每個相當于一個持久訂閱。
Virtual Topic這個功能特性在broker上有個總開關,useVirtualTopics屬性,默認為true,設置為false即可關閉此功能。
當此功能開啟,并且使用了持久化的存儲時,broker啟動的時候會從持久化存儲里拿到所有的destinations的名稱,如果名稱模式與Virtual Topics匹配,則把它們添加到系統(tǒng)的Virtual Topics列表中去。
當然,沒有顯式定義的Virtual Topics,也可以直接使用的,系統(tǒng)會自動創(chuàng)建對應的實際topic。
當有consumer訪問此VirtualTopics時,系統(tǒng)會自動創(chuàng)建持久化的queue,并在每次Topic收到消息時,分發(fā)到具體的queue。



可追溯”消費者,只對Topic有效,如果consumer是可追溯的,那么它可以獲取實例創(chuàng)建之前的消息。通常而言,訂閱者不可能獲取實例創(chuàng)建之前的消息,因為broker根本不知道它的存在。對于broker而言,如果
一個Topic通道創(chuàng)建,且有發(fā)布者發(fā)布消息(Publisher),那么broker將會在內(nèi)存中(非持久化)或者磁盤中(持久化)保存已經(jīng)發(fā)布的消息,直到所有的訂閱者都消費者,才會清除原始消息內(nèi)容。那么retroactive
類型的訂閱者,就可以獲取這些原本不屬于自己但broker上還保存的舊消息,就像我們訂閱一種Feed,可以立即獲取舊的內(nèi)容列表一樣。如果此訂閱者不是durable(耐久的),它可以獲取最近發(fā)布的一些消息;如果是durable,它可以獲取存儲器中尚未刪除的所有的舊消息。[下文會詳細介紹Topic的數(shù)據(jù)轉發(fā)模型]
//在destinationUrl中設置,默認為false
feedTopic?consumer.retroactive=true
在broker端,可以配置當前Topic默認為“可追溯的”,不過Topic并不會在此種情況下額外的保存消息,只不過表示訂閱者默認都是可追溯的而已。
<!-- 只對topic有效,默認為false -->
<policyEntry topic="feedTopic" alwaysRetroactive="true" />
    String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic("soccer.division1.leeds?consumer.retroactive=true");
        MessageConsumer consumer = session.createConsumer(topic);
        Message result = consumer.receive();

 redeliveryPolicy
    consumer使用的重發(fā)策略,當消息在client端處理失敗(比如onMessage方法拋出異常,事務回滾等),將會觸發(fā)消息重發(fā)。對于Broker端,需要重發(fā)的消息將會被立即發(fā)送(如果broker端使用異步發(fā)送,
且發(fā)送隊列中還有其他消息,那么重發(fā)的消息可能不會被立即到達Consumer)。我們通過此Policy配置最大重發(fā)次數(shù)、重發(fā)頻率等,如果你的Consumer客戶端處于不良網(wǎng)絡環(huán)境中,可以適當調(diào)整相關參數(shù)。參數(shù)列表,
請參見(RedeliveryPolicy)
//在brokerUrl中設置
tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=6
 . redeliveryPolicy
RedelieveryPolicy policy=connection.getRedelieveryPolicy();
policy.setInitialRedelieveryDelay(500);
policy.setBackOffMultiplier(2)
policy.setUseExponentialBackOff(true)
policy.setMaximumRedelieveries(2)

DLQ-死信隊列(Dead Letter Queue)用來保存處理失敗或者過期的消息。
出現(xiàn)以下情況時,消息會被redelivered
A transacted session is used and rollback() is called.
A transacted session is closed before commit is called.
A session is using CLIENT_ACKNOWLEDGE and Session.recover() is called.

當一個消息被redelivered超過maximumRedeliveries(缺省為6次,具體設置請參考后面的鏈接)次數(shù)時,會給broker發(fā)送一個"Poison ack",這個消息被認為是a poison pill,這時broker會將這
消息發(fā)送到DLQ,以便后續(xù)處理。缺省的死信隊列是ActiveMQ.DLQ,如果沒有特別指定,死信都會被發(fā)送到這個隊列。缺省持久消息過期,會被送到DLQ,非持久消息不會送到DLQ可以通過配置文件(activemq.xml)
來調(diào)整死信發(fā)送策略
<destinationPolicy>

    <policyMap>

      <policyEntries>

        <!— 設置所有隊列,使用 '>' ,否則用隊列名稱 -->

        <policyEntry queue=">">

          <deadLetterStrategy>

            <!--

                    queuePrefix:設置死信隊列前綴

                    useQueueForQueueMessages: 設置使用隊列保存死信,還可以設置useQueueForTopicMessages,使用Topic來保存死信

            -->

            <individualDeadLetterStrategy   queuePrefix="DLQ." useQueueForQueueMessages="true"  processExpired="false" processNonPersistent="false"/>

          </deadLetterStrategy>

        </policyEntry>

      </policyEntries>

    </policyMap>

  </destinationPolicy>

  ...

</broker>

        在一個電子系統(tǒng)中可能接受來自不同供應商的各種訂單信息,不同類型的訂單走的流程不盡相同,為了快速處理各種不同的訂單完成不同的業(yè)務。特定義不同的路由 信息。根據(jù)路由信息的不同,將消息進行不同的處理。如果采用ActiveMQ那么最好采用apache-camel整合,使不同的消息根據(jù)不同的流程自動 處理到不同的隊列中去。

<beans>

<broker brokerName="testBroker">

<transportConnectors>

<transportConnector uri="tcp://localhos:61616">

</transportConnectors>

<import resource="camel.xml">

</beans>

到此,關于“activemq特性是什么”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續(xù)學習更多相關知識,請繼續(xù)關注創(chuàng)新互聯(lián)網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>

新聞標題:activemq特性是什么
當前URL:http://bm7419.com/article8/pcsjop.html

成都網(wǎng)站建設公司_創(chuàng)新互聯(lián),為您提供外貿(mào)網(wǎng)站建設、響應式網(wǎng)站、商城網(wǎng)站品牌網(wǎng)站制作、虛擬主機、搜索引擎優(yōu)化

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉載內(nèi)容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉載,或轉載時需注明來源: 創(chuàng)新互聯(lián)

h5響應式網(wǎng)站建設