﻿{"id":1014,"date":"2015-04-20T21:42:50","date_gmt":"2015-04-20T13:42:50","guid":{"rendered":"http:\/\/zerobox.org\/notes\/?p=1014"},"modified":"2016-06-02T19:36:43","modified_gmt":"2016-06-02T11:36:43","slug":"c-activemq-cms-%e5%ad%a6%e4%b9%a0%e7%ac%94%e8%ae%b0","status":"publish","type":"post","link":"http:\/\/zerobox.org\/notes\/1014.html","title":{"rendered":"C++ activemq CMS \u5b66\u4e60\u7b14\u8bb0."},"content":{"rendered":"<p>\u5f88\u65e9\u524d\u5c31\u4ed3\u4fc3\u7684\u63a5\u89e6\u8fc7activemq,\u4f46\u5f53\u65f6\u592a\u8d76\u65f6\u95f4.\u540e\u9762\u53d1\u73b0activemq \u9700\u8981\u4e86\u89e3\u7684\u4e1c\u897f\u5b9e\u5728\u662f\u592a\u591a\u4e86.<\/p>\n<p>\u5173\u4e8eactivemq \u4e00\u76f4\u60f3\u8d77\u4e00\u904d\u6587\u7ae0.\u4f46\u4e5f\u4e00\u76f4\u7f3a\u5c11\u81ea\u5df1\u7684\u89c1\u89e3.\u6216\u8bb8\u662f\u7f51\u4e0a\u8fd9\u4e9b\u6587\u7ae0\u592a\u591a\u4e86.\u4e5f\u53ef\u80fd\u662f\u81ea\u5df1\u77e5\u8bc6\u8fd8\u4e0d\u8db3\u591f.<\/p>\n<p>&nbsp;<\/p>\n<p>&nbsp;<\/p>\n<h1>0,activemq-cpp \u80fd\u89e3\u51b3\u4ec0\u4e48\u95ee\u9898.<\/h1>\n<p>\u5b9e\u9645\u5e94\u7528\u5c31\u662f\u8ba9\u5f00\u53d1\u8005\u80fd\u4ece\u591a\u7ebf\u7a0b,\u591a\u6d88\u606f\u901a\u4fe1\u4e2d\u89e3\u6551\u51fa\u6765.\u66f4\u591a\u7684\u5173\u6ce8\u5e94\u7528\u903b\u8f91.<\/p>\n<p>CMS (stands for C++ Messaging Service) is a JMS-like API for C++ for interfacing with Message Brokers such as\u00a0Apache ActiveMQ. CMS helps to make your C++ client code much neater and easier to follow. To get a better feel for CMS try the\u00a0API\u00a0Reference. ActiveMQ-CPP is a client only library, a message broker such as\u00a0Apache ActiveMQ\u00a0is still needed for your clients to communicate.<\/p>\n<p>Our implementation of CMS is called ActiveMQ-CPP, which has an architecture that allows for pluggable transports and wire formats. Currently we support the\u00a0OpenWire\u00a0and\u00a0Stomp\u00a0protocols, both over TCP and SSL, we also now support a Failover Transport for more reliable client operation. In addition to CMS, ActiveMQ-CPP also provides a robust set of classes that support platform independent constructs such as threading, I\/O, sockets, etc. You may find many of these utilities very useful, such as a Java like Thread class or the &#8220;synchronized&#8221; macro that let&#8217;s you use a Java-like synchronization on any object that implements the activemq::concurrent::Synchronizable interface. ActiveMQ-CPP is released under the\u00a0Apache\u00a02.0 License<\/p>\n<p>\u5927\u610f:<\/p>\n<p>CMS (C++ \u6d88\u606f \u670d\u52a1)\u662f\u4e00\u4e2a\u9762\u8c61apache activemq \u7684 \u6d88\u606f \u4e2d\u95f4\u5c42\u7684C++\u63a5\u53e3.<\/p>\n<p>CMS\u7684\u5b9e\u73b0 \u53eb\u505aactivemq-cpp ,\u4e0d\u8fc7\u5f53\u524d\u53ea\u652f\u6301\u00a0openwire,amqp,TCP,ssl. \u73b0\u5728\u8fd8\u652f\u6301 \u4e3b\u5907\u5207\u6362\u529f\u80fd(\u8fd9\u4e2a\u662f\u91cd\u70b9,\u5f53\u65f6\u6211\u4e0d\u61c2,\u7ed3\u679c\u5c31\u8d70\u4e86\u5f2f\u8def!_!).<\/p>\n<p>-_- ,\u610f\u601d\u662f activemq\\conf\\activemq.xml\u4e2d\u7684stomp,mqtt,ws \u662f\u6ca1\u529e\u6cd5\u7684.<\/p>\n<div class=\"cnblogs_code\">\n<div class=\"cnblogs_code_toolbar\"><span class=\"cnblogs_code_copy\"><img decoding=\"async\" src=\"http:\/\/zerobox.org\/notes\/wp-content\/uploads\/2015\/04\/9148256b89e79e5d03bf03a93cf12ac4.gif\" alt=\"\u590d\u5236\u4ee3\u7801\" \/><\/span><\/div>\n<pre>&lt;transportConnectors&gt;\r\n&lt;!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --&gt;\r\n&lt;transportConnector name=\"openwire\" uri=\"tcp:\/\/0.0.0.0:61616?maximumConnections=1000&amp;amp;wireFormat.maxFrameSize=104857600\"\/&gt;\r\n&lt;transportConnector name=\"amqp\" uri=\"amqp:\/\/0.0.0.0:5672?maximumConnections=1000&amp;amp;wireFormat.maxFrameSize=104857600\"\/&gt;\r\n&lt;transportConnector name=\"stomp\" uri=\"stomp:\/\/0.0.0.0:61613?maximumConnections=1000&amp;amp;wireFormat.maxFrameSize=104857600\"\/&gt;\r\n&lt;transportConnector name=\"mqtt\" uri=\"mqtt:\/\/0.0.0.0:1883?maximumConnections=1000&amp;amp;wireFormat.maxFrameSize=104857600\"\/&gt;\r\n&lt;transportConnector name=\"ws\" uri=\"ws:\/\/0.0.0.0:61614?maximumConnections=1000&amp;amp;wireFormat.maxFrameSize=104857600\"\/&gt;\r\n&lt;\/transportConnectors&gt;<\/pre>\n<div class=\"cnblogs_code_toolbar\"><span class=\"cnblogs_code_copy\"><img decoding=\"async\" src=\"http:\/\/zerobox.org\/notes\/wp-content\/uploads\/2015\/04\/9148256b89e79e5d03bf03a93cf12ac4.gif\" alt=\"\u590d\u5236\u4ee3\u7801\" \/><\/span><\/div>\n<\/div>\n<p>&nbsp;<\/p>\n<p>&nbsp;<\/p>\n<p>&nbsp;<\/p>\n<h1>1,acticvemq-cpp \u7684\u914d\u7f6e\u4f7f\u7528.<\/h1>\n<p>\u53c2\u8003:Active MQ \u00a0C++\u5b9e\u73b0\u901a\u8baf\u00a0http:\/\/blog.csdn.net\/lee353086\/article\/details\/6777261<\/p>\n<p>&nbsp;<\/p>\n<p>activemq-cpp\u4e0b\u8f7d\u5730\u5740:<\/p>\n<p>http:\/\/activemq.apache.org\/cms\/download.html<\/p>\n<p>\u76f8\u5173\u4f9d\u8d56\u5e93<\/p>\n<p>\u5728\u00a0http:\/\/activemq.apache.org\/cms\/building.html \u00a0\u4e2d\u662f\u6709\u4ecb\u7ecd\u7684.\u4e0d\u8fc7\u662fen\u7684.<\/p>\n<p>\u8fd8\u662f\u518d\u8bf4\u4e0b\u5427.\u672c\u4ebaen\u4e5f\u7279\u5dee.<\/p>\n<p>&#8220;With versions of ActiveMQ-CPP 2.2 and later, we have a dependency on the\u00a0Apache Portable Runtime\u00a0project. You&#8217;ll need to install APR on your system before you&#8217;ll be able to build ActiveMQ-CPP.&#8221;<\/p>\n<p>&#8220;The package contains a complete set of CppUnit tests. In order for you to build an run the tests, you will need to download and install the CppUnit library. See\u00a0http:\/\/cppunit.sourceforge.net\/cppunit-wiki&#8221;<\/p>\n<p>\u6240\u4ee5\u5c31\u5305\u542b\u4e86:apr,apr-iconv,apr-util,cppunit.<\/p>\n<p>&nbsp;<\/p>\n<p>http:\/\/mirrors.hust.edu.cn\/apache\/apr\/<\/p>\n<p>\u4e2d\u53ef\u4ee5\u4e0b\u8f7d\u00a0apr,apr-iconv,apr-util(\u7248\u672c\u53f7\u90fd\u627e\u6700\u9ad8\u7684,\u4e0d\u8981\u4e00\u9ad8\u4e00\u4f4e,\u4e0d\u7136\u7f16\u8bd1\u4f1a\u51fa\u95ee\u9898).<\/p>\n<p>apr-1.5.1-win32-src.zip,<\/p>\n<p>apr-iconv-1.2.1-win32-src-r2.zip,<\/p>\n<p>apr-util-1.5.4-win32-src.zip.<\/p>\n<p>&nbsp;<\/p>\n<p>\u89e3\u538b\u540e\u8bb0\u5f97\u91cd\u547d\u4ee4\u6587\u4ef6\u5939,\u53bb\u6389\u7248\u672c\u53f7,\u6539\u6210\u5982\u4e0b\u56fe,\u4e0d\u7136\u5de5\u7a0b\u7f16\u8bd1\u65f6\u9ed8\u8ba4\u7684 [\u9644\u52a0\u5305\u542b\u76ee\u5f55] \u662f\u627e\u4e0d\u5230\u7684.<\/p>\n<p>\u6240\u6709\u6587\u4ef6\u5939\u653e\u5728\u4e00\u4e2a\u6839\u76ee\u5f55\u4e0b.<\/p>\n<p><img decoding=\"async\" src=\"http:\/\/zerobox.org\/notes\/wp-content\/uploads\/2015\/04\/9b5c094069bcf28a8cc7fd4d60fb88a1.png\" alt=\"\" \/><\/p>\n<p><img decoding=\"async\" src=\"http:\/\/zerobox.org\/notes\/wp-content\/uploads\/2015\/04\/1c1203a87b8b3b6c2f473592661c7943.png\" alt=\"\" \/><\/p>\n<p>&nbsp;<\/p>\n<p>\u6253\u5f00 \u00a0activemq-cpp-library\\vs2008-build\\activemq-cpp.sln<\/p>\n<p>\u4f9d\u6b21\u6dfb\u52a0[\u73b0\u5728\u9879\u76ee]:libapr.vcproj,libapriconv.vcproj,libaprutil.vcproj. \u53ea\u9700\u8981lib\u9879\u5c31\u884c\u4e86.<\/p>\n<p><img decoding=\"async\" src=\"http:\/\/zerobox.org\/notes\/wp-content\/uploads\/2015\/04\/62bb8219b9a22b26ca5933c3886671de.png\" alt=\"\" \/><\/p>\n<p>&nbsp;<\/p>\n<p>\u6700\u540e\u9879\u76ee\u56fe:<\/p>\n<p><img decoding=\"async\" src=\"http:\/\/zerobox.org\/notes\/wp-content\/uploads\/2015\/04\/452dfefe3f5f458aea97601b92f8e26a.png\" alt=\"\" \/><\/p>\n<p>&nbsp;<\/p>\n<p>libapriconv.vcproj,libaprutil.vcproj \u7684[\u9879\u76ee\u4f9d\u8d56\u9879]\u90fd\u9700\u8981libapr<\/p>\n<p><img decoding=\"async\" src=\"http:\/\/zerobox.org\/notes\/wp-content\/uploads\/2015\/04\/099610b575db85b06addb62b85bfe159.png\" alt=\"\" \/><\/p>\n<p>&nbsp;<\/p>\n<p>activemq-cpp\u00a0\u7684[\u9879\u76ee\u4f9d\u8d56\u9879]\u9700\u8981libapriconv,libaprutil,libapr.<\/p>\n<p>activemq-cpp \u7684[\u9644\u52a0\u5305\u542b\u76ee\u5f55] \u9700\u8981\u5305\u542b \u8fd9\u4e09\u4e2a\u7684\u7684include\u76ee\u5f55.<\/p>\n<p><img decoding=\"async\" src=\"http:\/\/zerobox.org\/notes\/wp-content\/uploads\/2015\/04\/eb4d919eb7e4724257aa7733e2c56462.png\" alt=\"\" \/><\/p>\n<p>&nbsp;<\/p>\n<p>\u7ecf\u8fc7\u6f2b\u957f\u7684\u7f16\u8bd1\u540e,<\/p>\n<p><img decoding=\"async\" src=\"http:\/\/zerobox.org\/notes\/wp-content\/uploads\/2015\/04\/1bbc3c3f494151c387a37ebf3d989860.png\" alt=\"\" \/><\/p>\n<p>&nbsp;<\/p>\n<p>\u8fd9\u4e2a\u5927lib\u6587\u4ef6\u5c31\u51fa\u6765.<\/p>\n<p>activemq-cpp-example \u8fd9\u4e2a\u5de5\u7a0b ,\u5c31\u6709 hello world \u7684\u4ee3\u7801.<\/p>\n<p>&nbsp;<\/p>\n<h1>2,activemq-cpp-example \u9879\u76ee\u4ee3\u7801\u89e3\u6790.<\/h1>\n<p>\u901a\u8fc7\u8fd9\u4e2a\u9879\u76ee\u53ef\u4ee5\u8ba9\u6211\u4eec\u66f4\u597d\u7684\u8ba4\u8bc6 activemq-cpp\u7684\u7ed3\u6784.<\/p>\n<div class=\"cnblogs_code\">\n<div class=\"cnblogs_code_toolbar\"><span class=\"cnblogs_code_copy\"><img decoding=\"async\" src=\"http:\/\/zerobox.org\/notes\/wp-content\/uploads\/2015\/04\/9148256b89e79e5d03bf03a93cf12ac4.gif\" alt=\"\u590d\u5236\u4ee3\u7801\" \/><\/span><\/div>\n<div class=\"cnblogs_code\">\n<div class=\"cnblogs_code_toolbar\"><span class=\"cnblogs_code_copy\"><img decoding=\"async\" src=\"http:\/\/zerobox.org\/notes\/wp-content\/uploads\/2015\/04\/9148256b89e79e5d03bf03a93cf12ac4.gif\" alt=\"\u590d\u5236\u4ee3\u7801\" \/><\/span><\/div>\n<pre>\/*\r\n * Licensed to the Apache Software Foundation (ASF) under one or more\r\n * contributor license agreements.  See the NOTICE file distributed with\r\n * this work for additional information regarding copyright ownership.\r\n * The ASF licenses this file to You under the Apache License, Version 2.0\r\n * (the \"License\"); you may not use this file except in compliance with\r\n * the License.  You may obtain a copy of the License at\r\n *\r\n *     http:\/\/www.apache.org\/licenses\/LICENSE-2.0\r\n *\r\n * Unless required by applicable law or agreed to in writing, software\r\n * distributed under the License is distributed on an \"AS IS\" BASIS,\r\n * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r\n * See the License for the specific language governing permissions and\r\n * limitations under the License.\r\n *\/\r\n\r\n\/\/ START SNIPPET: demo\r\n\r\n#include &lt;activemq\/library\/ActiveMQCPP.h&gt;\r\n#include &lt;decaf\/lang\/Thread.h&gt;\r\n#include &lt;decaf\/lang\/Runnable.h&gt;\r\n#include &lt;decaf\/util\/concurrent\/CountDownLatch.h&gt;\r\n#include &lt;decaf\/lang\/Integer.h&gt;\r\n#include &lt;decaf\/lang\/Long.h&gt;\r\n#include &lt;decaf\/lang\/System.h&gt;\r\n#include &lt;activemq\/core\/ActiveMQConnectionFactory.h&gt;\r\n#include &lt;activemq\/util\/Config.h&gt;\r\n#include &lt;cms\/Connection.h&gt;\r\n#include &lt;cms\/Session.h&gt;\r\n#include &lt;cms\/TextMessage.h&gt;\r\n#include &lt;cms\/BytesMessage.h&gt;\r\n#include &lt;cms\/MapMessage.h&gt;\r\n#include &lt;cms\/ExceptionListener.h&gt;\r\n#include &lt;cms\/MessageListener.h&gt;\r\n#include &lt;stdlib.h&gt;\r\n#include &lt;stdio.h&gt;\r\n#include &lt;iostream&gt;\r\n#include &lt;memory&gt;\r\n\r\nusing namespace activemq::core;\r\nusing namespace decaf::util::concurrent;\r\nusing namespace decaf::util;\r\nusing namespace decaf::lang;\r\nusing namespace cms;\r\nusing namespace std;\r\n\r\nclass HelloWorldProducer : public Runnable {\r\nprivate:\r\n\r\n    Connection* connection;\r\n    Session* session;\r\n    Destination* destination;\r\n    MessageProducer* producer;\r\n    int numMessages;\r\n    bool useTopic;\r\n    bool sessionTransacted;\r\n    std::string brokerURI;\r\n\r\nprivate:\r\n\r\n    HelloWorldProducer(const HelloWorldProducer&amp;);\r\n    HelloWorldProducer&amp; operator=(const HelloWorldProducer&amp;);\r\n\r\npublic:\r\n\r\n    HelloWorldProducer(const std::string&amp; brokerURI, int numMessages, bool useTopic = false, bool sessionTransacted = false) :\r\n        connection(NULL),\r\n        session(NULL),\r\n        destination(NULL),\r\n        producer(NULL),\r\n        numMessages(numMessages),\r\n        useTopic(useTopic),\r\n        sessionTransacted(sessionTransacted),\r\n        brokerURI(brokerURI) {\r\n    }\r\n\r\n    virtual ~HelloWorldProducer(){\r\n        cleanup();\r\n    }\r\n\r\n    void close() {\r\n        this-&gt;cleanup();\r\n    }\r\n\r\n    virtual void run() {\r\n\r\n        try {\r\n\r\n            \/\/ Create a ConnectionFactory\r\n            auto_ptr&lt;ConnectionFactory&gt; connectionFactory(\r\n                ConnectionFactory::createCMSConnectionFactory(brokerURI));\r\n\r\n            \/\/ Create a Connection\r\n            connection = connectionFactory-&gt;createConnection();\r\n            connection-&gt;start();\r\n\r\n            \/\/ Create a Session\r\n            if (this-&gt;sessionTransacted) {\r\n                session = connection-&gt;createSession(Session::SESSION_TRANSACTED);\r\n            } else {\r\n                session = connection-&gt;createSession(Session::AUTO_ACKNOWLEDGE);\r\n            }\r\n\r\n            \/\/ Create the destination (Topic or Queue)\r\n            if (useTopic) {\r\n                destination = session-&gt;createTopic(\"TEST.FOO\");\r\n            } else {\r\n                destination = session-&gt;createQueue(\"TEST.FOO\");\r\n            }\r\n\r\n            \/\/ Create a MessageProducer from the Session to the Topic or Queue\r\n            producer = session-&gt;createProducer(destination);\r\n            producer-&gt;setDeliveryMode(DeliveryMode::NON_PERSISTENT);\r\n\r\n            \/\/ Create the Thread Id String\r\n            string threadIdStr = Long::toString(Thread::currentThread()-&gt;getId());\r\n\r\n            \/\/ Create a messages\r\n            string text = (string) \"Hello world! from thread \" + threadIdStr;\r\n\r\n            for (int ix = 0; ix &lt; numMessages; ++ix) {\r\n                std::auto_ptr&lt;TextMessage&gt; message(session-&gt;createTextMessage(text));\r\n                message-&gt;setIntProperty(\"Integer\", ix);\r\n                printf(\"Sent message #%d from thread %s\\n\", ix + 1, threadIdStr.c_str());\r\n                producer-&gt;send(message.get());\r\n            }\r\n\r\n        } catch (CMSException&amp; e) {\r\n            e.printStackTrace();\r\n        }\r\n    }\r\n\r\nprivate:\r\n\r\n    void cleanup() {\r\n\r\n        if (connection != NULL) {\r\n            try {\r\n                connection-&gt;close();\r\n            } catch (cms::CMSException&amp; ex) {\r\n                ex.printStackTrace();\r\n            }\r\n        }\r\n\r\n        \/\/ Destroy resources.\r\n        try {\r\n            delete destination;\r\n            destination = NULL;\r\n            delete producer;\r\n            producer = NULL;\r\n            delete session;\r\n            session = NULL;\r\n            delete connection;\r\n            connection = NULL;\r\n        } catch (CMSException&amp; e) {\r\n            e.printStackTrace();\r\n        }\r\n    }\r\n};\r\n\r\nclass HelloWorldConsumer : public ExceptionListener,\r\n                           public MessageListener,\r\n                           public Runnable {\r\n\r\nprivate:\r\n\r\n    CountDownLatch latch;\r\n    CountDownLatch doneLatch;\r\n    Connection* connection;\r\n    Session* session;\r\n    Destination* destination;\r\n    MessageConsumer* consumer;\r\n    long waitMillis;\r\n    bool useTopic;\r\n    bool sessionTransacted;\r\n    std::string brokerURI;\r\n\r\nprivate:\r\n\r\n    HelloWorldConsumer(const HelloWorldConsumer&amp;);\r\n    HelloWorldConsumer&amp; operator=(const HelloWorldConsumer&amp;);\r\n\r\npublic:\r\n\r\n    HelloWorldConsumer(const std::string&amp; brokerURI, int numMessages, bool useTopic = false, bool sessionTransacted = false, int waitMillis = 30000) :\r\n        latch(1),\r\n        doneLatch(numMessages),\r\n        connection(NULL),\r\n        session(NULL),\r\n        destination(NULL),\r\n        consumer(NULL),\r\n        waitMillis(waitMillis),\r\n        useTopic(useTopic),\r\n        sessionTransacted(sessionTransacted),\r\n        brokerURI(brokerURI) {\r\n    }\r\n\r\n    virtual ~HelloWorldConsumer() {\r\n        cleanup();\r\n    }\r\n\r\n    void close() {\r\n        this-&gt;cleanup();\r\n    }\r\n\r\n    void waitUntilReady() {\r\n        latch.await();\r\n    }\r\n\r\n    virtual void run() {\r\n\r\n        try {\r\n\r\n            \/\/ Create a ConnectionFactory\r\n            auto_ptr&lt;ConnectionFactory&gt; connectionFactory(\r\n                ConnectionFactory::createCMSConnectionFactory(brokerURI));\r\n\r\n            \/\/ Create a Connection\r\n            connection = connectionFactory-&gt;createConnection();\r\n            connection-&gt;start();\r\n            connection-&gt;setExceptionListener(this);\r\n\r\n            \/\/ Create a Session\r\n            if (this-&gt;sessionTransacted == true) {\r\n                session = connection-&gt;createSession(Session::SESSION_TRANSACTED);\r\n            } else {\r\n                session = connection-&gt;createSession(Session::AUTO_ACKNOWLEDGE);\r\n            }\r\n\r\n            \/\/ Create the destination (Topic or Queue)\r\n            if (useTopic) {\r\n                destination = session-&gt;createTopic(\"TEST.FOO\");\r\n            } else {\r\n                destination = session-&gt;createQueue(\"TEST.FOO\");\r\n            }\r\n\r\n            \/\/ Create a MessageConsumer from the Session to the Topic or Queue\r\n            consumer = session-&gt;createConsumer(destination);\r\n\r\n            consumer-&gt;setMessageListener(this);\r\n\r\n            std::cout.flush();\r\n            std::cerr.flush();\r\n\r\n            \/\/ Indicate we are ready for messages.\r\n            latch.countDown();\r\n\r\n            \/\/ Wait while asynchronous messages come in.\r\n            doneLatch.await(waitMillis);\r\n\r\n        } catch (CMSException&amp; e) {\r\n            \/\/ Indicate we are ready for messages.\r\n            latch.countDown();\r\n            e.printStackTrace();\r\n        }\r\n    }\r\n\r\n    \/\/ Called from the consumer since this class is a registered MessageListener.\r\n    virtual void onMessage(const Message* message) {\r\n\r\n        static int count = 0;\r\n\r\n        try {\r\n            count++;\r\n            const TextMessage* textMessage = dynamic_cast&lt;const TextMessage*&gt; (message);\r\n            string text = \"\";\r\n\r\n            if (textMessage != NULL) {\r\n                text = textMessage-&gt;getText();\r\n            } else {\r\n                text = \"NOT A TEXTMESSAGE!\";\r\n            }\r\n\r\n            printf(\"Message #%d Received: %s\\n\", count, text.c_str());\r\n\r\n        } catch (CMSException&amp; e) {\r\n            e.printStackTrace();\r\n        }\r\n\r\n        \/\/ Commit all messages.\r\n        if (this-&gt;sessionTransacted) {\r\n            session-&gt;commit();\r\n        }\r\n\r\n        \/\/ No matter what, tag the count down latch until done.\r\n        doneLatch.countDown();\r\n    }\r\n\r\n    \/\/ If something bad happens you see it here as this class is also been\r\n    \/\/ registered as an ExceptionListener with the connection.\r\n    virtual void onException(const CMSException&amp; ex AMQCPP_UNUSED) {\r\n        printf(\"CMS Exception occurred.  Shutting down client.\\n\");\r\n        ex.printStackTrace();\r\n        exit(1);\r\n    }\r\n\r\nprivate:\r\n\r\n    void cleanup() {\r\n        if (connection != NULL) {\r\n            try {\r\n                connection-&gt;close();\r\n            } catch (cms::CMSException&amp; ex) {\r\n                ex.printStackTrace();\r\n            }\r\n        }\r\n\r\n        \/\/ Destroy resources.\r\n        try {\r\n            delete destination;\r\n            destination = NULL;\r\n            delete consumer;\r\n            consumer = NULL;\r\n            delete session;\r\n            session = NULL;\r\n            delete connection;\r\n            connection = NULL;\r\n        } catch (CMSException&amp; e) {\r\n            e.printStackTrace();\r\n        }\r\n    }\r\n};\r\n\r\nint main(int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED) {\r\n\r\n    activemq::library::ActiveMQCPP::initializeLibrary();\r\n    {\r\n    std::cout &lt;&lt; \"=====================================================\\n\";\r\n    std::cout &lt;&lt; \"Starting the example:\" &lt;&lt; std::endl;\r\n    std::cout &lt;&lt; \"-----------------------------------------------------\\n\";\r\n\r\n\r\n    \/\/ Set the URI to point to the IP Address of your broker.\r\n    \/\/ add any optional params to the url to enable things like\r\n    \/\/ tightMarshalling or tcp logging etc.  See the CMS web site for\r\n    \/\/ a full list of configuration options.\r\n    \/\/\r\n    \/\/  http:\/\/activemq.apache.org\/cms\/\r\n    \/\/\r\n    \/\/ Wire Format Options:\r\n    \/\/ =========================\r\n    \/\/ Use either stomp or openwire, the default ports are different for each\r\n    \/\/\r\n    \/\/ Examples:\r\n    \/\/    tcp:\/\/127.0.0.1:61616                      default to openwire\r\n    \/\/    tcp:\/\/127.0.0.1:61616?wireFormat=openwire  same as above\r\n    \/\/    tcp:\/\/127.0.0.1:61613?wireFormat=stomp     use stomp instead\r\n    \/\/\r\n    \/\/ SSL:\r\n    \/\/ =========================\r\n    \/\/ To use SSL you need to specify the location of the trusted Root CA or the\r\n    \/\/ certificate for the broker you want to connect to.  Using the Root CA allows\r\n    \/\/ you to use failover with multiple servers all using certificates signed by\r\n    \/\/ the trusted root.  If using client authentication you also need to specify\r\n    \/\/ the location of the client Certificate.\r\n    \/\/\r\n    \/\/     System::setProperty( \"decaf.net.ssl.keyStore\", \"&lt;path&gt;\/client.pem\" );\r\n    \/\/     System::setProperty( \"decaf.net.ssl.keyStorePassword\", \"password\" );\r\n    \/\/     System::setProperty( \"decaf.net.ssl.trustStore\", \"&lt;path&gt;\/rootCA.pem\" );\r\n    \/\/\r\n    \/\/ The you just specify the ssl transport in the URI, for example:\r\n    \/\/\r\n    \/\/     ssl:\/\/localhost:61617\r\n    \/\/\r\n    std::string brokerURI =\r\n        \"failover:(tcp:\/\/localhost:61616\"\r\n\/\/        \"?wireFormat=openwire\"\r\n\/\/        \"&amp;transport.useInactivityMonitor=false\"\r\n\/\/        \"&amp;connection.alwaysSyncSend=true\"\r\n\/\/        \"&amp;connection.useAsyncSend=true\"\r\n\/\/        \"?transport.commandTracingEnabled=true\"\r\n\/\/        \"&amp;transport.tcpTracingEnabled=true\"\r\n\/\/        \"&amp;wireFormat.tightEncodingEnabled=true\"\r\n        \")\";\r\n\r\n    \/\/============================================================\r\n    \/\/ set to true to use topics instead of queues\r\n    \/\/ Note in the code above that this causes createTopic or\r\n    \/\/ createQueue to be used in both consumer an producer.\r\n    \/\/============================================================\r\n    bool useTopics = true;\r\n    bool sessionTransacted = false;\r\n    int numMessages = 2000;\r\n\r\n    long long startTime = System::currentTimeMillis();\r\n\r\n    HelloWorldProducer producer(brokerURI, numMessages, useTopics);\r\n        HelloWorldConsumer consumer(brokerURI, numMessages, useTopics, sessionTransacted);\r\n\r\n    \/\/ Start the consumer thread.\r\n    Thread consumerThread(&amp;consumer);\r\n    consumerThread.start();\r\n\r\n    \/\/ Wait for the consumer to indicate that its ready to go.\r\n    consumer.waitUntilReady();\r\n\r\n    \/\/ Start the producer thread.\r\n    Thread producerThread(&amp;producer);\r\n    producerThread.start();\r\n\r\n    \/\/ Wait for the threads to complete.\r\n    producerThread.join();\r\n    consumerThread.join();\r\n\r\n    long long endTime = System::currentTimeMillis();\r\n    double totalTime = (double)(endTime - startTime) \/ 1000.0;\r\n\r\n    consumer.close();\r\n    producer.close();\r\n\r\n    std::cout &lt;&lt; \"Time to completion = \" &lt;&lt; totalTime &lt;&lt; \" seconds.\" &lt;&lt; std::endl;\r\n    std::cout &lt;&lt; \"-----------------------------------------------------\\n\";\r\n    std::cout &lt;&lt; \"Finished with the example.\" &lt;&lt; std::endl;\r\n    std::cout &lt;&lt; \"=====================================================\\n\";\r\n\r\n    }\r\n    activemq::library::ActiveMQCPP::shutdownLibrary();\r\n}\r\n\r\n\/\/ END SNIPPET: demo<\/pre>\n<div class=\"cnblogs_code_toolbar\"><span class=\"cnblogs_code_copy\"><img decoding=\"async\" src=\"http:\/\/zerobox.org\/notes\/wp-content\/uploads\/2015\/04\/9148256b89e79e5d03bf03a93cf12ac4.gif\" alt=\"\u590d\u5236\u4ee3\u7801\" \/><\/span><\/div>\n<\/div>\n<p>&nbsp;<\/p>\n<div class=\"cnblogs_code_toolbar\"><span class=\"cnblogs_code_copy\"><img decoding=\"async\" src=\"http:\/\/zerobox.org\/notes\/wp-content\/uploads\/2015\/04\/9148256b89e79e5d03bf03a93cf12ac4.gif\" alt=\"\u590d\u5236\u4ee3\u7801\" \/><\/span><\/div>\n<\/div>\n<p>\u4ecemain()\u5f00\u59cb\u5427.<\/p>\n<p><img decoding=\"async\" src=\"http:\/\/zerobox.org\/notes\/wp-content\/uploads\/2015\/04\/dd9fe99128997a1e3c9067efb0f1a0e2.png\" alt=\"\" \/><\/p>\n<p>&nbsp;<\/p>\n<p><img decoding=\"async\" src=\"http:\/\/zerobox.org\/notes\/wp-content\/uploads\/2015\/04\/82e17b1d78bf76b985fee92796a5c1f1.png\" alt=\"\" \/><\/p>\n<p>&nbsp;<\/p>\n<p>&nbsp;<\/p>\n<p>\u8fd9\u6837\u4fbf\u7b80\u5feb\u901f\u7684\u5b9e\u73b0\u4e86\u5e94\u7528\u903b\u8f91.<\/p>\n<p>&nbsp;<\/p>\n<p>&nbsp;<\/p>\n<h1>3,activemq\u7684\u51e0\u79cd\u901a\u4fe1\u6a21\u5f0f.<\/h1>\n<p>\u53ef\u4ee5\u53c2\u8003:<\/p>\n<p>http:\/\/shmilyaw-hotmail-com.iteye.com\/blog\/1897635<\/p>\n<p>&nbsp;<\/p>\n<p>\u76ee\u524d\u672c\u4eba\u9700\u8981\u7684\u662factivemq-cpp\u7684\u00a0request-response \u6a21\u5f0f.<\/p>\n<p>&nbsp;<\/p>\n<p>&nbsp;<\/p>\n<p>&nbsp;<\/p>\n<h1>4,activemq-cpp\u7684\u00a0request-response \u6a21\u5f0f\u7684\u5e94\u7528.<\/h1>\n<p>\u670d\u52a1\u5668\u4e0e\u5ba2\u6237\u7aef\u901a\u4fe1 \u6570\u636e\u7684\u4ea4\u4e92 \u548c \u786e\u8ba4.<\/p>\n<p><img decoding=\"async\" src=\"http:\/\/zerobox.org\/notes\/wp-content\/uploads\/2015\/04\/a230a2f9a35e4ddce68dcb9749b54401.png\" alt=\"\" \/><\/p>\n<p>&nbsp;<\/p>\n<p>\u4ee5\u4e0b\u662f\u672c\u4eba\u4fee\u6539\u540e\u7684\u7b80\u5355\u4ee3\u7801,bug\u53ef\u80fd\u5b58\u5728,\u8bf7\u6307\u51fa.<\/p>\n<p>&nbsp;<\/p>\n<p>\u590d\u5236\u4e24\u4efd,\u4e00\u4efd\u5b9a\u4e49\u00a0USE_COMSUMER \u4e00\u4efd\u5b9a\u4e49\u00a0USE_PRODUCER \u5c31\u53ef\u4ee5\u751f\u6210.<\/p>\n<div class=\"cnblogs_code\">\n<div class=\"cnblogs_code_toolbar\"><span class=\"cnblogs_code_copy\"><img decoding=\"async\" src=\"http:\/\/zerobox.org\/notes\/wp-content\/uploads\/2015\/04\/9148256b89e79e5d03bf03a93cf12ac4.gif\" alt=\"\u590d\u5236\u4ee3\u7801\" \/><\/span><\/div>\n<pre>\/*\r\n * Licensed to the Apache Software Foundation (ASF) under one or more\r\n * contributor license agreements.  See the NOTICE file distributed with\r\n * this work for additional information regarding copyright ownership.\r\n * The ASF licenses this file to You under the Apache License, Version 2.0\r\n * (the \"License\"); you may not use this file except in compliance with\r\n * the License.  You may obtain a copy of the License at\r\n *\r\n *     http:\/\/www.apache.org\/licenses\/LICENSE-2.0\r\n *\r\n * Unless required by applicable law or agreed to in writing, software\r\n * distributed under the License is distributed on an \"AS IS\" BASIS,\r\n * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r\n * See the License for the specific language governing permissions and\r\n * limitations under the License.\r\n *\/\r\n\r\n\/\/ START SNIPPET: demo\r\n\r\n#include &lt;activemq\/library\/ActiveMQCPP.h&gt;\r\n#include &lt;decaf\/lang\/Thread.h&gt;\r\n#include &lt;decaf\/lang\/Runnable.h&gt;\r\n#include &lt;decaf\/util\/concurrent\/CountDownLatch.h&gt;\r\n#include &lt;decaf\/lang\/Integer.h&gt;\r\n#include &lt;decaf\/lang\/Long.h&gt;\r\n#include &lt;decaf\/lang\/System.h&gt;\r\n#include &lt;activemq\/core\/ActiveMQConnectionFactory.h&gt;\r\n#include &lt;activemq\/util\/Config.h&gt;\r\n#include &lt;cms\/Connection.h&gt;\r\n#include &lt;cms\/Session.h&gt;\r\n#include &lt;cms\/TextMessage.h&gt;\r\n#include &lt;cms\/BytesMessage.h&gt;\r\n#include &lt;cms\/MapMessage.h&gt;\r\n#include &lt;cms\/ExceptionListener.h&gt;\r\n#include &lt;cms\/MessageListener.h&gt;\r\n#include &lt;stdlib.h&gt;\r\n#include &lt;stdio.h&gt;\r\n#include &lt;iostream&gt;\r\n#include &lt;memory&gt;\r\n\r\n#include &lt;decaf\/util\/Random.h&gt;\r\n\r\nusing namespace activemq::core;\r\nusing namespace decaf::util::concurrent;\r\nusing namespace decaf::util;\r\nusing namespace decaf::lang;\r\nusing namespace cms;\r\nusing namespace std;\r\n\r\n#define  QUEUE_NAME    \"eventQueue\"\r\n#define NAME_BYTE_LEN        16\r\n\r\nclass HelloWorldProducer : public ExceptionListener,\r\n                            public MessageListener,\r\n                            public Runnable {\r\nprivate:\r\n    CountDownLatch latch;\r\n    CountDownLatch doneLatch;\r\n    Connection* connection;\r\n    Session* session;\r\n    Destination* destination;\r\n    MessageProducer* producer;\r\n    int numMessages;\r\n    bool useTopic;\r\n    bool sessionTransacted;\r\n    std::string brokerURI;\r\n    bool bReciveMessage;\r\n    long waitMillis;\r\n\r\nprivate:\r\n\r\n    HelloWorldProducer(const HelloWorldProducer&amp;);\r\n    HelloWorldProducer&amp; operator=(const HelloWorldProducer&amp;);\r\n\r\npublic:\r\n\r\n    HelloWorldProducer(const std::string&amp; brokerURI, int numMessages, bool useTopic = false, bool sessionTransacted = false,\r\n        long waitMillis=3000) :\r\n        latch(1),\r\n        doneLatch(numMessages),  \r\n        connection(NULL),\r\n        session(NULL),\r\n        destination(NULL),\r\n        producer(NULL),\r\n        numMessages(numMessages),\r\n        useTopic(useTopic),\r\n        sessionTransacted(sessionTransacted),\r\n        brokerURI(brokerURI) ,\r\n        bReciveMessage(false),\r\n        waitMillis(waitMillis)\r\n        { }\r\n\r\n    virtual ~HelloWorldProducer(){\r\n        cleanup();\r\n    }\r\n\r\n    void close() {\r\n        this-&gt;cleanup();\r\n    }\r\n    \r\n    void waitUntilReady() {\r\n        latch.await();\r\n    }\r\n\r\n    virtual void run() {\r\n\r\n        try {\r\n\r\n            \/\/ Create a ConnectionFactory\r\n            auto_ptr&lt;ConnectionFactory&gt; connectionFactory(\r\n                ConnectionFactory::createCMSConnectionFactory(brokerURI));\r\n\r\n            \/\/ Create a Connection\r\n            connection = connectionFactory-&gt;createConnection();\r\n            connection-&gt;start();\r\n\r\n            \/\/ Create a Session\r\n            if (this-&gt;sessionTransacted) {\r\n                session = connection-&gt;createSession(Session::SESSION_TRANSACTED);\r\n            } else {\r\n                session = connection-&gt;createSession(Session::AUTO_ACKNOWLEDGE);\r\n            }\r\n            \r\n            session = connection-&gt;createSession();\r\n            \/\/ Create the destination (Topic or Queue)\r\n            if (useTopic) {\r\n                destination = session-&gt;createTopic(QUEUE_NAME);\r\n            } else {\r\n                destination = session-&gt;createQueue(QUEUE_NAME);\r\n            }\r\n\r\n            \/\/ Create a MessageProducer from the Session to the Topic or Queue\r\n            producer = session-&gt;createProducer(destination);\r\n            producer-&gt;setDeliveryMode(DeliveryMode::NON_PERSISTENT);\r\n\r\n            \/\/ Create the Thread Id String\r\n            string threadIdStr = Long::toString(Thread::currentThread()-&gt;getId());\r\n\r\n            \/\/ Create a messages\r\n            string text = (string) \"Hello world! from thread \" + threadIdStr;\r\n\r\n            for (int ix = 0; ix &lt; numMessages; ++ix) {\r\n                std::auto_ptr&lt;TextMessage&gt; message(session-&gt;createTextMessage(text));\r\n                \r\n                \/\/\u5173\u952e\u6d88\u606f...\r\n                std::auto_ptr&lt;Destination&gt; tempDest(session-&gt;createTemporaryQueue());\r\n\r\n                \/\/cms::Destination tempDest=session-&gt;createTemporaryTopic() ; \r\n                MessageConsumer * responseConsumer = session-&gt;createConsumer(tempDest.get());  \r\n                responseConsumer-&gt;setMessageListener(this);\/\/\u76d1\u542c...\r\n\r\n\r\n                message-&gt;setCMSReplyTo(tempDest.get());\r\n                Random random;\r\n                char buffer[NAME_BYTE_LEN]={0};\r\n                random.nextBytes((unsigned char *)buffer,NAME_BYTE_LEN);\r\n                string correlationId=\"\";\r\n                for(int i=0;i&lt;NAME_BYTE_LEN;++i)\r\n                {\r\n                    char ch[NAME_BYTE_LEN*2]={0};\r\n                    sprintf(ch,\"%02X\",(unsigned char)buffer[i]);\r\n                    string str(ch);\r\n\r\n                    correlationId+=str;\r\n                }\r\n\r\n                message-&gt;setCMSCorrelationID(correlationId);\r\n\r\n                message-&gt;setIntProperty(\"Integer\", ix);\r\n                printf(\"Producer Sent message #%d from thread %s\\n\", ix + 1, threadIdStr.c_str());\r\n                producer-&gt;send(message.get());\r\n\r\n                \/\/ Indicate we are ready for messages.\r\n                latch.countDown();\r\n\r\n                \/\/ Wait while asynchronous messages come in.\r\n                doneLatch.await(waitMillis);\r\n\r\n            } \r\n        }    \r\n        catch (CMSException&amp; e) {\r\n            printf(\"Producer run() CMSException \\n\" );\r\n                \/\/ Indicate we are ready for messages.\r\n                latch.countDown();\r\n                e.printStackTrace();\r\n            }\r\n\r\n    \r\n        }\r\n        \r\n\r\n    \/\/ Called from the Producer since this class is a registered MessageListener.\r\n    virtual void onMessage(const Message* message) {\r\n\r\n        static int count = 0;\r\n\r\n        try {\r\n            count++;\r\n            const TextMessage* textMessage = dynamic_cast&lt;const TextMessage*&gt; (message);\r\n            \/\/ActiveMQMessageTransformation\r\n            \/\/std::auto_ptr&lt;TextMessage&gt; responsemessage(session-&gt;createTextMessage());\r\n            \/\/responsemessage-&gt;setCMSCorrelationID(textMessage-&gt;getCMSCorrelationID());\r\n            \/\/responsemessage-&gt;getCMSReplyTo()\r\n\r\n            string text = \"\";\r\n\r\n            if (textMessage != NULL) {\r\n                text = textMessage-&gt;getText();\r\n            } else {\r\n                text = \"NOT A TEXTMESSAGE!\";\r\n            }\r\n\r\n            printf(\"Producer Message #%d Received: %s\\n\", count, text.c_str());\r\n\r\n\r\n            \/\/producer.send\r\n\r\n        } catch (CMSException&amp; e) {\r\n            printf(\"Producer onMessage() CMSException \\n\" );\r\n            e.printStackTrace();\r\n        }\r\n\r\n        \/\/ Commit all messages.\r\n        if (this-&gt;sessionTransacted) {\r\n            session-&gt;commit();\r\n        }\r\n\r\n        \/\/ No matter what, tag the count down latch until done.\r\n        doneLatch.countDown();\r\n    }\r\n\r\n    \/\/ If something bad happens you see it here as this class is also been\r\n    \/\/ registered as an ExceptionListener with the connection.\r\n    virtual void onException(const CMSException&amp; ex AMQCPP_UNUSED) {\r\n        printf(\"Producer onException() CMS Exception occurred.  Shutting down client. \\n\" );\r\n        ex.printStackTrace();\r\n        exit(1);\r\n    }\r\n\r\n\r\nprivate:\r\n\r\n    void cleanup() {\r\n\r\n        if (connection != NULL) {\r\n            try {\r\n                connection-&gt;close();\r\n            } catch (cms::CMSException&amp; ex) {\r\n                ex.printStackTrace();\r\n            }\r\n        }\r\n\r\n        \/\/ Destroy resources.\r\n        try {\r\n            delete destination;\r\n            destination = NULL;\r\n            delete producer;\r\n            producer = NULL;\r\n            delete session;\r\n            session = NULL;\r\n            delete connection;\r\n            connection = NULL;\r\n        } catch (CMSException&amp; e) {\r\n            e.printStackTrace();\r\n        }\r\n    }\r\n};\r\n\r\nclass HelloWorldConsumer : public ExceptionListener,\r\n                           public MessageListener,\r\n                           public Runnable {\r\n\r\nprivate:\r\n\r\n    CountDownLatch latch;\r\n    CountDownLatch doneLatch;\r\n    Connection* connection;\r\n    Session* session;\r\n    Destination* destination;\r\n    MessageConsumer* consumer;\r\n    MessageProducer *producer;\r\n    long waitMillis;\r\n    bool useTopic;\r\n    bool sessionTransacted;\r\n    std::string brokerURI;\r\n\r\nprivate:\r\n\r\n    HelloWorldConsumer(const HelloWorldConsumer&amp;);\r\n    HelloWorldConsumer&amp; operator=(const HelloWorldConsumer&amp;);\r\n\r\npublic:\r\n\r\n    HelloWorldConsumer(const std::string&amp; brokerURI, int numMessages, bool useTopic = false, bool sessionTransacted = false, int waitMillis = 30000) :\r\n        latch(1),\r\n        doneLatch(numMessages),\r\n        connection(NULL),\r\n        session(NULL),\r\n        destination(NULL),\r\n        consumer(NULL),\r\n        producer(NULL),\r\n        waitMillis(waitMillis),\r\n        useTopic(useTopic),\r\n        sessionTransacted(sessionTransacted),\r\n        brokerURI(brokerURI) {\r\n    }\r\n\r\n    virtual ~HelloWorldConsumer() {\r\n        cleanup();\r\n    }\r\n\r\n    void close() {\r\n        this-&gt;cleanup();\r\n    }\r\n\r\n    void waitUntilReady() {\r\n        latch.await();\r\n    }\r\n\r\n    virtual void run() {\r\n\r\n        try {\r\n\r\n            \/\/ Create a ConnectionFactory\r\n            auto_ptr&lt;ConnectionFactory&gt; connectionFactory(\r\n                ConnectionFactory::createCMSConnectionFactory(brokerURI));\r\n\r\n            \/\/ Create a Connection\r\n            connection = connectionFactory-&gt;createConnection();\r\n            connection-&gt;start();\r\n            connection-&gt;setExceptionListener(this);\r\n\r\n            \/\/ Create a Session\r\n            if (this-&gt;sessionTransacted == true) {\r\n                session = connection-&gt;createSession(Session::SESSION_TRANSACTED);\r\n            } else {\r\n                session = connection-&gt;createSession(Session::AUTO_ACKNOWLEDGE);\r\n            }\r\n\r\n            \/\/ Create the destination (Topic or Queue)\r\n            if (useTopic) {\r\n                destination = session-&gt;createTopic(QUEUE_NAME);\r\n            } else {\r\n                destination = session-&gt;createQueue(QUEUE_NAME);\r\n            }\r\n\r\n            producer = session-&gt;createProducer();\r\n            producer-&gt;setDeliveryMode(DeliveryMode::NON_PERSISTENT);\r\n\r\n            \/\/ Create a MessageConsumer from the Session to the Topic or Queue\r\n            consumer = session-&gt;createConsumer(destination);\r\n\r\n            consumer-&gt;setMessageListener(this);\r\n\r\n            std::cout.flush();\r\n            std::cerr.flush();\r\n\r\n            \/\/ Indicate we are ready for messages.\r\n            latch.countDown();\r\n\r\n            \/\/ Wait while asynchronous messages come in.\r\n            doneLatch.await();\r\n\r\n        } catch (CMSException&amp; e) {\r\n            printf(\"Consumer onException() CMS Exception occurred.  Shutting down client. \\n\" );\r\n            \/\/ Indicate we are ready for messages.\r\n            latch.countDown();\r\n            e.printStackTrace();\r\n        }\r\n    }\r\n\r\n    \/\/ Called from the consumer since this class is a registered MessageListener.\r\n    virtual void onMessage(const Message* message) {\r\n\r\n        static int count = 0;\r\n\r\n        try {\r\n            count++;\r\n            \r\n            \r\n            \/\/ Create the Thread Id String\r\n            string threadIdStr = Long::toString(Thread::currentThread()-&gt;getId());\r\n\r\n            static bool bPrintf=true;\r\n            if(bPrintf)\r\n            {\r\n                bPrintf=false;\r\n                printf(\"consumer Message threadid: %s\\n\",  threadIdStr.c_str());\r\n            }\r\n\r\n            string strReply=\"consumer return  xxx,ThreadID=\"+threadIdStr;\r\n            const TextMessage* textMessage = dynamic_cast&lt;const TextMessage*&gt; (message);\r\n            \r\n            if(NULL==textMessage)\r\n            {\r\n                printf(\"NULL==textMessage\", message-&gt;getCMSType().c_str());\r\n\r\n\r\n                \/\/const cms::MapMessage* mapMsg = dynamic_cast&lt;const cms::MapMessage*&gt;(message);\r\n                \/\/if(mapMsg)\r\n                \/\/{\r\n                \/\/    \r\n                \/\/    std::vector&lt;std::string&gt; elements = mapMsg-&gt;getMapNames();\r\n                \/\/    std::vector&lt;std::string&gt;::iterator iter = elements.begin();\r\n                \/\/    for(; iter != elements.end() ; ++iter) \r\n                \/\/    {\r\n                \/\/        std::string key = *iter;\r\n                \/\/        cms::Message::ValueType elementType = mapMsg-&gt;getValueType(key);\r\n                \/\/        string strxxx;\r\n                \/\/        int cc=0;\r\n                \/\/        switch(elementType) {\r\n                \/\/    case cms::Message::BOOLEAN_TYPE:\r\n                \/\/        \/\/msg-&gt;setBoolean(key, mapMsg-&gt;getBoolean(key));\r\n                \/\/        break;\r\n                \/\/    case cms::Message::BYTE_TYPE:\r\n                \/\/        \/\/msg-&gt;setByte(key, mapMsg-&gt;getByte(key));\r\n                \/\/        break;\r\n                \/\/    case cms::Message::BYTE_ARRAY_TYPE:\r\n                \/\/        \/\/msg-&gt;setBytes(key, mapMsg-&gt;getBytes(key));\r\n                \/\/        break;\r\n                \/\/    case cms::Message::CHAR_TYPE:\r\n                \/\/        \/\/msg-&gt;setChar(key, mapMsg-&gt;getChar(key));\r\n                \/\/        break;\r\n                \/\/    case cms::Message::SHORT_TYPE:\r\n                \/\/        \/\/msg-&gt;setShort(key, mapMsg-&gt;getShort(key));\r\n                \/\/        break;\r\n                \/\/    case cms::Message::INTEGER_TYPE:\r\n                \/\/        \/\/msg-&gt;setInt(key, mapMsg-&gt;getInt(key));\r\n                \/\/        break;\r\n                \/\/    case cms::Message::LONG_TYPE:\r\n                \/\/        \/\/msg-&gt;setLong(key, mapMsg-&gt;getLong(key));\r\n                \/\/        break;\r\n                \/\/    case cms::Message::FLOAT_TYPE:\r\n                \/\/        \/\/msg-&gt;setFloat(key, mapMsg-&gt;getFloat(key));\r\n                \/\/        break;\r\n                \/\/    case cms::Message::DOUBLE_TYPE:\r\n                \/\/        \/\/msg-&gt;setDouble(key, mapMsg-&gt;getDouble(key));\r\n                \/\/        break;\r\n                \/\/    case cms::Message::STRING_TYPE:\r\n                \/\/        \/\/msg-&gt;setString(key, mapMsg-&gt;getString(key));\r\n                \/\/        strxxx=mapMsg-&gt;getString(key);\r\n                \/\/        cc=1;\r\n                \/\/        break;\r\n                \/\/    default:\r\n                \/\/        break;\r\n                \/\/        }\r\n                \/\/    }\r\n\r\n                \/\/}\r\n\r\n                return;\r\n            }\r\n\r\n            std::auto_ptr&lt;TextMessage&gt; responsemessage(session-&gt;createTextMessage(strReply));\r\n            responsemessage-&gt;setCMSCorrelationID(textMessage-&gt;getCMSCorrelationID());\r\n            \r\n            \r\n            string text = \"\";\r\n\r\n            if (textMessage != NULL) {\r\n                text = textMessage-&gt;getText();\r\n            } else {\r\n                text = \"NOT A TEXTMESSAGE!\";\r\n            }\r\n            \r\n            int nProPerty=textMessage-&gt;getIntProperty(\"Integer\");\r\n            printf(\"consumer Message #%d Received: %s,nProPerty[%d]\\n\", count, text.c_str(),nProPerty);\r\n            \r\n            \r\n            const cms::Destination* destSend=textMessage-&gt;getCMSReplyTo();\r\n            if(destSend)\r\n            {\r\n                this-&gt;producer-&gt;send(destSend,responsemessage.get());\r\n\r\n                printf(\"consumer Message #%d send: %s\\n\", count, strReply.c_str());\r\n            }\r\n            \r\n\r\n        } catch (CMSException&amp; e) {\r\n            printf(\"Consumer onMessage() CMS Exception occurred.  Shutting down client. \\n\" );\r\n            e.printStackTrace();\r\n        }\r\n\r\n        \/\/ Commit all messages.\r\n        if (this-&gt;sessionTransacted) {\r\n            session-&gt;commit();\r\n        }\r\n\r\n        \/\/ No matter what, tag the count down latch until done.\r\n        \/\/doneLatch.countDown();\r\n    }\r\n\r\n    \/\/ If something bad happens you see it here as this class is also been\r\n    \/\/ registered as an ExceptionListener with the connection.\r\n    virtual void onException(const CMSException&amp; ex AMQCPP_UNUSED) {\r\n        printf(\"Consumer onException() CMS Exception occurred.  Shutting down client. \\n\" );\r\n        \/\/printf(\"CMS Exception occurred.  Shutting down client.\\n\");\r\n        ex.printStackTrace();\r\n        exit(1);\r\n    }\r\n\r\nprivate:\r\n\r\n    void cleanup() {\r\n        if (connection != NULL) {\r\n            try {\r\n                connection-&gt;close();\r\n            } catch (cms::CMSException&amp; ex) {\r\n                ex.printStackTrace();\r\n            }\r\n        }\r\n\r\n        \/\/ Destroy resources.\r\n        try {\r\n            delete destination;\r\n            destination = NULL;\r\n            delete consumer;\r\n            consumer = NULL;\r\n            delete session;\r\n            session = NULL;\r\n            delete connection;\r\n            connection = NULL;\r\n        } catch (CMSException&amp; e) {\r\n            e.printStackTrace();\r\n        }\r\n    }\r\n};\r\n\r\nint main(int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED) {\r\n\r\n    \/\/if(argc&lt;2)\r\n    \/\/{\r\n    \/\/    printf(\"argc&lt;2\\r\\n\");\r\n    \/\/    return 0;\r\n    \/\/}\r\n\r\n    activemq::library::ActiveMQCPP::initializeLibrary();\r\n    {\r\n    std::cout &lt;&lt; \"=====================================================\\n\";\r\n    std::cout &lt;&lt; \"Starting the example:\" &lt;&lt; std::endl;\r\n    std::cout &lt;&lt; \"-----------------------------------------------------\\n\";\r\n\r\n\r\n    \/\/ Set the URI to point to the IP Address of your broker.\r\n    \/\/ add any optional params to the url to enable things like\r\n    \/\/ tightMarshalling or tcp logging etc.  See the CMS web site for\r\n    \/\/ a full list of configuration options.\r\n    \/\/\r\n    \/\/  http:\/\/activemq.apache.org\/cms\/\r\n    \/\/\r\n    \/\/ Wire Format Options:\r\n    \/\/ =========================\r\n    \/\/ Use either stomp or openwire, the default ports are different for each\r\n    \/\/\r\n    \/\/ Examples:\r\n    \/\/    tcp:\/\/127.0.0.1:61616                      default to openwire\r\n    \/\/    tcp:\/\/127.0.0.1:61616?wireFormat=openwire  same as above\r\n    \/\/    tcp:\/\/127.0.0.1:61613?wireFormat=stomp     use stomp instead\r\n    \/\/\r\n    \/\/ SSL:\r\n    \/\/ =========================\r\n    \/\/ To use SSL you need to specify the location of the trusted Root CA or the\r\n    \/\/ certificate for the broker you want to connect to.  Using the Root CA allows\r\n    \/\/ you to use failover with multiple servers all using certificates signed by\r\n    \/\/ the trusted root.  If using client authentication you also need to specify\r\n    \/\/ the location of the client Certificate.\r\n    \/\/\r\n    \/\/     System::setProperty( \"decaf.net.ssl.keyStore\", \"&lt;path&gt;\/client.pem\" );\r\n    \/\/     System::setProperty( \"decaf.net.ssl.keyStorePassword\", \"password\" );\r\n    \/\/     System::setProperty( \"decaf.net.ssl.trustStore\", \"&lt;path&gt;\/rootCA.pem\" );\r\n    \/\/\r\n    \/\/ The you just specify the ssl transport in the URI, for example:\r\n    \/\/\r\n    \/\/     ssl:\/\/localhost:61617\r\n    \/\/\r\n    std::string brokerURI =\r\n        \"failover:(tcp:\/\/192.168.10.143:61616\"\r\n\/\/        \"?wireFormat=openwire\"\r\n\/\/        \"&amp;transport.useInactivityMonitor=false\"\r\n\/\/        \"&amp;connection.alwaysSyncSend=true\"\r\n\/\/        \"&amp;connection.useAsyncSend=true\"\r\n\/\/        \"?transport.commandTracingEnabled=true\"\r\n\/\/        \"&amp;transport.tcpTracingEnabled=true\"\r\n\/\/        \"&amp;wireFormat.tightEncodingEnabled=true\"\r\n        \")\";\r\n\r\n    \/\/============================================================\r\n    \/\/ set to true to use topics instead of queues\r\n    \/\/ Note in the code above that this causes createTopic or\r\n    \/\/ createQueue to be used in both consumer an producer.\r\n    \/\/============================================================\r\n    bool useTopics = false;\r\n    bool sessionTransacted = true;\r\n    int numMessages = 1;\r\n    bool useConsumer=true;\r\n    bool useProducer=true;\r\n\r\n    \/\/int nSet=atoi(argv[1]);\r\n    \/\/if(1==nSet)\r\n    \/\/{\r\n        \/\/#define USE_COMSUMER\r\n\r\n        \r\n    \/\/}\r\n    \/\/else\r\n    \/\/{\r\n        \/\/#define USE_PRODUCER\r\n\r\n        \/\/\r\n    \/\/}\r\n\r\n\r\n\r\n    long long startTime = System::currentTimeMillis();\r\n    \r\n#ifdef USE_PRODUCER\r\n    printf(\"\u5f53\u524d USE_PRODUCER \\r\\n\");\r\n\r\n    int numProducerMessages = 30;\r\n    int nThreadNumber=10;\r\n    vector&lt;HelloWorldProducer *&gt; vHelloWorldProducer;\r\n    for(int i=0;i&lt;nThreadNumber;++i)\r\n    {\r\n        HelloWorldProducer * producerTemp=new HelloWorldProducer(brokerURI, numProducerMessages, useTopics);\r\n        vHelloWorldProducer.push_back(producerTemp);\r\n    }\r\n\r\n#endif\r\n    \r\n#ifdef USE_COMSUMER\r\n    printf(\"\u5f53\u524d USE_COMSUMER \\r\\n\");\r\n    HelloWorldConsumer consumer(brokerURI, numMessages, useTopics, sessionTransacted);\r\n    \/\/ Start the consumer thread.\r\n    Thread consumerThread(&amp;consumer);\r\n    consumerThread.start();\r\n\r\n    \/\/ Wait for the consumer to indicate that its ready to go.\r\n    consumer.waitUntilReady();\r\n\r\n#endif\r\n\r\n    \r\n\r\n\r\n#ifdef USE_PRODUCER\r\n    \/\/ Start the producer thread.\r\n\r\n    vector&lt;Thread *&gt; vThread;\r\n    for(int i=0;i&lt;nThreadNumber;++i)\r\n    {\r\n        HelloWorldProducer &amp; ProducerTemp=*vHelloWorldProducer[i];\r\n        Thread * threadTemp=new Thread(&amp;ProducerTemp);\r\n        vThread.push_back(threadTemp);\r\n        threadTemp-&gt;start();\r\n        ProducerTemp.waitUntilReady();\r\n\r\n    }\r\n\r\n    for(int i=0;i&lt;vThread.size();++i)\r\n    {\r\n        Thread * threadTemp=vThread[i];\r\n        \/\/threadTemp-&gt;join();\r\n    }\r\n    while(1)\r\n    {\r\n        Thread::sleep(10);\r\n    }\r\n\r\n    \/\/Thread producerThread1(&amp;producer1);\r\n    \/\/producerThread1.start();\r\n    \/\/producer1.waitUntilReady();\r\n\r\n    \/\/Thread producerThread2(&amp;producer2);\r\n    \/\/producerThread2.start();\r\n    \/\/producer2.waitUntilReady();\r\n\r\n    \/\/Thread producerThread3(&amp;producer3);\r\n    \/\/producerThread3.start();\r\n    \/\/producer3.waitUntilReady();\r\n#endif\r\n\r\n\r\n\r\n    \r\n#ifdef USE_PRODUCER\r\n    \/\/ Wait for the threads to complete.\r\n    \/\/producerThread1.join();\r\n    \/\/producerThread2.join();\r\n    \/\/producerThread3.join();\r\n#endif\r\n\r\n#ifdef USE_COMSUMER\r\n    consumerThread.join();\r\n#endif\r\n\r\n    long long endTime = System::currentTimeMillis();\r\n    double totalTime = (double)(endTime - startTime) \/ 1000.0;\r\n\r\n#ifdef USE_PRODUCER\r\n    \/\/producer1.close();\r\n    \/\/producer2.close();\r\n    \/\/producer3.close();\r\n\r\n    for(int i=0;i&lt;vHelloWorldProducer.size();++i)\r\n    {\r\n        HelloWorldProducer * ProducerTemp=vHelloWorldProducer[i];\r\n        ProducerTemp-&gt;close();\r\n\r\n        if(ProducerTemp)\r\n        {\r\n            delete ProducerTemp;\r\n            ProducerTemp=NULL;\r\n        }\r\n    }\r\n\r\n#endif\r\n#ifdef USE_COMSUMER\r\n    consumer.close();\r\n#endif\r\n\r\n    \r\n   \r\n\r\n    std::cout &lt;&lt; \"Time to completion = \" &lt;&lt; totalTime &lt;&lt; \" seconds.\" &lt;&lt; std::endl;\r\n    std::cout &lt;&lt; \"-----------------------------------------------------\\n\";\r\n    std::cout &lt;&lt; \"Finished with the example.\" &lt;&lt; std::endl;\r\n    std::cout &lt;&lt; \"=====================================================\\n\";\r\n\r\n    }\r\n    activemq::library::ActiveMQCPP::shutdownLibrary();\r\n\r\n\r\n    return 0;\r\n}\r\n\r\n\/\/ END SNIPPET: demo<\/pre>\n<div class=\"cnblogs_code_toolbar\"><span class=\"cnblogs_code_copy\"><img decoding=\"async\" src=\"http:\/\/zerobox.org\/notes\/wp-content\/uploads\/2015\/04\/9148256b89e79e5d03bf03a93cf12ac4.gif\" alt=\"\u590d\u5236\u4ee3\u7801\" \/><\/span><\/div>\n<\/div>\n<p>&nbsp;<\/p>\n<p>\u7a0b\u5e8f\u8fd0\u884c\u7ed3\u679c:<\/p>\n<p><img decoding=\"async\" src=\"http:\/\/zerobox.org\/notes\/wp-content\/uploads\/2015\/04\/c0b6a94b478110f332981c7c725271ea.png\" alt=\"\" \/><\/p>\n<p>&nbsp;<\/p>\n<p><strong>\u5173\u4e8eactivemq-cpp \u7684Message \u6d88\u606f\u8f6c\u6362.<\/strong><\/p>\n<p>activemq-cpp \u4e2d\u7684\u8f6c\u6362ActiveMQMessageTransformation.transformMessage \u4e2d\u662f\u6709\u76f8\u5e94\u7684\u5b9e\u73b0.<\/p>\n<div class=\"cnblogs_code\">\n<div class=\"cnblogs_code_toolbar\"><span class=\"cnblogs_code_copy\"><img decoding=\"async\" src=\"http:\/\/zerobox.org\/notes\/wp-content\/uploads\/2015\/04\/9148256b89e79e5d03bf03a93cf12ac4.gif\" alt=\"\u590d\u5236\u4ee3\u7801\" \/><\/span><\/div>\n<pre>\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\r\nbool ActiveMQMessageTransformation::transformMessage(cms::Message* message, ActiveMQConnection* connection, Message** amqMessage) {\r\n\r\n    if (message == NULL) {\r\n        throw NullPointerException(__FILE__, __LINE__, \"Provided source cms::Message pointer was NULL\");\r\n    }\r\n\r\n    if (amqMessage == NULL) {\r\n        throw NullPointerException(__FILE__, __LINE__, \"Provided target commands::Message pointer was NULL\");\r\n    }\r\n\r\n    *amqMessage = dynamic_cast&lt;Message*&gt;(message);\r\n\r\n    if (*amqMessage != NULL) {\r\n        return false;\r\n    } else {\r\n\r\n        if (dynamic_cast&lt;cms::BytesMessage*&gt;(message) != NULL) {\r\n            cms::BytesMessage* bytesMsg = dynamic_cast&lt;cms::BytesMessage*&gt;(message);\r\n            bytesMsg-&gt;reset();\r\n            ActiveMQBytesMessage* msg = new ActiveMQBytesMessage();\r\n            msg-&gt;setConnection(connection);\r\n            try {\r\n                for (;;) {\r\n                    \/\/ Reads a byte from the message stream until the stream is empty\r\n                    msg-&gt;writeByte(bytesMsg-&gt;readByte());\r\n                }\r\n            } catch (cms::MessageEOFException&amp; e) {\r\n                \/\/ if an end of message stream as expected\r\n            } catch (cms::CMSException&amp; e) {\r\n            }\r\n\r\n            *amqMessage = msg;\r\n        } else if (dynamic_cast&lt;cms::MapMessage*&gt;(message) != NULL) {\r\n            cms::MapMessage* mapMsg = dynamic_cast&lt;cms::MapMessage*&gt;(message);\r\n            ActiveMQMapMessage* msg = new ActiveMQMapMessage();\r\n            msg-&gt;setConnection(connection);\r\n\r\n            std::vector&lt;std::string&gt; elements = mapMsg-&gt;getMapNames();\r\n            std::vector&lt;std::string&gt;::iterator iter = elements.begin();\r\n            for(; iter != elements.end() ; ++iter) {\r\n                std::string key = *iter;\r\n                cms::Message::ValueType elementType = mapMsg-&gt;getValueType(key);\r\n\r\n                switch(elementType) {\r\n                    case cms::Message::BOOLEAN_TYPE:\r\n                        msg-&gt;setBoolean(key, mapMsg-&gt;getBoolean(key));\r\n                        break;\r\n                    case cms::Message::BYTE_TYPE:\r\n                        msg-&gt;setByte(key, mapMsg-&gt;getByte(key));\r\n                        break;\r\n                    case cms::Message::BYTE_ARRAY_TYPE:\r\n                        msg-&gt;setBytes(key, mapMsg-&gt;getBytes(key));\r\n                        break;\r\n                    case cms::Message::CHAR_TYPE:\r\n                        msg-&gt;setChar(key, mapMsg-&gt;getChar(key));\r\n                        break;\r\n                    case cms::Message::SHORT_TYPE:\r\n                        msg-&gt;setShort(key, mapMsg-&gt;getShort(key));\r\n                        break;\r\n                    case cms::Message::INTEGER_TYPE:\r\n                        msg-&gt;setInt(key, mapMsg-&gt;getInt(key));\r\n                        break;\r\n                    case cms::Message::LONG_TYPE:\r\n                        msg-&gt;setLong(key, mapMsg-&gt;getLong(key));\r\n                        break;\r\n                    case cms::Message::FLOAT_TYPE:\r\n                        msg-&gt;setFloat(key, mapMsg-&gt;getFloat(key));\r\n                        break;\r\n                    case cms::Message::DOUBLE_TYPE:\r\n                        msg-&gt;setDouble(key, mapMsg-&gt;getDouble(key));\r\n                        break;\r\n                    case cms::Message::STRING_TYPE:\r\n                        msg-&gt;setString(key, mapMsg-&gt;getString(key));\r\n                        break;\r\n                    default:\r\n                        break;\r\n                }\r\n            }\r\n\r\n            *amqMessage = msg;\r\n        } else if (dynamic_cast&lt;cms::ObjectMessage*&gt;(message) != NULL) {\r\n            cms::ObjectMessage* objMsg = dynamic_cast&lt;cms::ObjectMessage*&gt;(message);\r\n            ActiveMQObjectMessage* msg = new ActiveMQObjectMessage();\r\n            msg-&gt;setConnection(connection);\r\n            msg-&gt;setObjectBytes(objMsg-&gt;getObjectBytes());\r\n            *amqMessage = msg;\r\n        } else if (dynamic_cast&lt;cms::StreamMessage*&gt;(message) != NULL) {\r\n            cms::StreamMessage* streamMessage = dynamic_cast&lt;cms::StreamMessage*&gt;(message);\r\n            streamMessage-&gt;reset();\r\n            ActiveMQStreamMessage* msg = new ActiveMQStreamMessage();\r\n            msg-&gt;setConnection(connection);\r\n\r\n            try {\r\n                while(true) {\r\n                    cms::Message::ValueType elementType = streamMessage-&gt;getNextValueType();\r\n                    int result = -1;\r\n                    std::vector&lt;unsigned char&gt; buffer(255);\r\n\r\n                    switch(elementType) {\r\n                        case cms::Message::BOOLEAN_TYPE:\r\n                            msg-&gt;writeBoolean(streamMessage-&gt;readBoolean());\r\n                            break;\r\n                        case cms::Message::BYTE_TYPE:\r\n                            msg-&gt;writeBoolean(streamMessage-&gt;readBoolean());\r\n                            break;\r\n                        case cms::Message::BYTE_ARRAY_TYPE:\r\n                            while ((result = streamMessage-&gt;readBytes(buffer)) != -1) {\r\n                                msg-&gt;writeBytes(&amp;buffer[0], 0, result);\r\n                                buffer.clear();\r\n                            }\r\n                            break;\r\n                        case cms::Message::CHAR_TYPE:\r\n                            msg-&gt;writeChar(streamMessage-&gt;readChar());\r\n                            break;\r\n                        case cms::Message::SHORT_TYPE:\r\n                            msg-&gt;writeShort(streamMessage-&gt;readShort());\r\n                            break;\r\n                        case cms::Message::INTEGER_TYPE:\r\n                            msg-&gt;writeInt(streamMessage-&gt;readInt());\r\n                            break;\r\n                        case cms::Message::LONG_TYPE:\r\n                            msg-&gt;writeLong(streamMessage-&gt;readLong());\r\n                            break;\r\n                        case cms::Message::FLOAT_TYPE:\r\n                            msg-&gt;writeFloat(streamMessage-&gt;readFloat());\r\n                            break;\r\n                        case cms::Message::DOUBLE_TYPE:\r\n                            msg-&gt;writeDouble(streamMessage-&gt;readDouble());\r\n                            break;\r\n                        case cms::Message::STRING_TYPE:\r\n                            msg-&gt;writeString(streamMessage-&gt;readString());\r\n                            break;\r\n                        default:\r\n                            break;\r\n                    }\r\n                }\r\n            } catch (cms::MessageEOFException&amp; e) {\r\n                \/\/ if an end of message stream as expected\r\n            } catch (cms::CMSException&amp; e) {\r\n            }\r\n\r\n            *amqMessage = msg;\r\n        } else if (dynamic_cast&lt;cms::TextMessage*&gt;(message) != NULL) {\r\n            cms::TextMessage* textMsg = dynamic_cast&lt;cms::TextMessage*&gt;(message);\r\n            ActiveMQTextMessage* msg = new ActiveMQTextMessage();\r\n            msg-&gt;setConnection(connection);\r\n            msg-&gt;setText(textMsg-&gt;getText());\r\n            *amqMessage = msg;\r\n        } else {\r\n            *amqMessage = new ActiveMQMessage();\r\n            (*amqMessage)-&gt;setConnection(connection);\r\n        }\r\n\r\n        ActiveMQMessageTransformation::copyProperties(message, dynamic_cast&lt;cms::Message*&gt;(*amqMessage));\r\n    }\r\n\r\n    return true;\r\n}<\/pre>\n<div class=\"cnblogs_code_toolbar\"><span class=\"cnblogs_code_copy\"><img decoding=\"async\" src=\"http:\/\/zerobox.org\/notes\/wp-content\/uploads\/2015\/04\/9148256b89e79e5d03bf03a93cf12ac4.gif\" alt=\"\u590d\u5236\u4ee3\u7801\" \/><\/span><\/div>\n<\/div>\n<p>&nbsp;<\/p>\n<p>&nbsp;<\/p>\n<p>&nbsp;<\/p>\n<h1>5,activemq \u7684activemq broker cluster (activemq \u96c6\u7fa4).<\/h1>\n<p>&nbsp;<\/p>\n<p>\u53ef\u4ee5\u53c2\u8003:<\/p>\n<p>http:\/\/bh-keven.iteye.com\/blog\/1617788<\/p>\n<p>http:\/\/blog.csdn.net\/jason5186\/article\/details\/18702523<\/p>\n<p>&nbsp;<\/p>\n<h1>6,activemq.xml \u4e2d\u7684\u914d\u7f6e\u548cactivemq Connection URIS \u914d\u7f6e<\/h1>\n<p>&nbsp;<\/p>\n<div id=\"site-breadcrumbs\">Index\u00a0&gt;\u00a0Apache.NMS.ActiveMQ\u00a0&gt;\u00a0ActiveMQ URI Configuration<\/div>\n<p>http:\/\/activemq.apache.org\/nms\/activemq-uri-configuration.html<\/p>\n<p>http:\/\/activemq.apache.org\/tcp-transport-reference.html<\/p>\n<p>&nbsp;<\/p>\n<p>\u662f\u6709\u76f8\u5e94\u4ecb\u7ecd,\u4f46\u9700\u8981\u82b1\u4e00\u4e9b\u65f6\u95f4\u53bb\u8bfb.<\/p>\n<p>&nbsp;<\/p>\n<p>&nbsp;<\/p>\n<p>&nbsp;<\/p>\n<p>\/\/7,wireFormat=openwire \u7684\u51e0\u79cd\u65b9\u5f0f.\u7684\u4f18\u7f3a\u70b9.<\/p>\n<p>\/\/openwire,amqp,stomp,mqtt,ws<\/p>\n","protected":false},"excerpt":{"rendered":"<p>\u5f88\u65e9\u524d\u5c31\u4ed3\u4fc3\u7684\u63a5\u89e6\u8fc7activemq,\u4f46\u5f53\u65f6\u592a\u8d76\u65f6\u95f4.\u540e\u9762\u53d1\u73b0activemq  &hellip;<\/p>\n<p class=\"read-more\"><a href=\"http:\/\/zerobox.org\/notes\/1014.html\">\u7ee7\u7eed\u9605\u8bfb &raquo;<\/a><\/p>\n","protected":false},"author":1,"featured_media":1015,"comment_status":"closed","ping_status":"closed","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[],"tags":[63],"class_list":["post-1014","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","tag-c"],"views":444,"_links":{"self":[{"href":"http:\/\/zerobox.org\/notes\/wp-json\/wp\/v2\/posts\/1014","targetHints":{"allow":["GET"]}}],"collection":[{"href":"http:\/\/zerobox.org\/notes\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"http:\/\/zerobox.org\/notes\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"http:\/\/zerobox.org\/notes\/wp-json\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"http:\/\/zerobox.org\/notes\/wp-json\/wp\/v2\/comments?post=1014"}],"version-history":[{"count":0,"href":"http:\/\/zerobox.org\/notes\/wp-json\/wp\/v2\/posts\/1014\/revisions"}],"wp:featuredmedia":[{"embeddable":true,"href":"http:\/\/zerobox.org\/notes\/wp-json\/wp\/v2\/media\/1015"}],"wp:attachment":[{"href":"http:\/\/zerobox.org\/notes\/wp-json\/wp\/v2\/media?parent=1014"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"http:\/\/zerobox.org\/notes\/wp-json\/wp\/v2\/categories?post=1014"},{"taxonomy":"post_tag","embeddable":true,"href":"http:\/\/zerobox.org\/notes\/wp-json\/wp\/v2\/tags?post=1014"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}