大鹏网络网站建设,室内效果图用什么软件做最好,企业网站免费认证,学校网站的建设目标文章目录 代码1#xff1a;消费者代码2#xff1a;生成者 之前在Linux下的qt程序中使用activeMQ的时候也是用了很多时间去研究#xff0c;本来想的是好好记录一下#xff0c;但是当时顾着写代码。很多细节也不想再去走一遍了。大概写一下怎么使用就行了。注意#xff1a;一… 文章目录 代码1消费者代码2生成者 之前在Linux下的qt程序中使用activeMQ的时候也是用了很多时间去研究本来想的是好好记录一下但是当时顾着写代码。很多细节也不想再去走一遍了。大概写一下怎么使用就行了。注意一定要先开启服务器。 代码1消费者
代码这个代码其实是官网的源码里面提供的例子 我来对这个代码做个解释也是作为自己的笔记。
#include decaf/lang/Thread.h
#include decaf/lang/Runnable.h
#include decaf/util/concurrent/CountDownLatch.h
#include activemq/core/ActiveMQConnectionFactory.h
#include activemq/core/ActiveMQConnection.h
#include activemq/transport/DefaultTransportListener.h
#include activemq/library/ActiveMQCPP.h
#include decaf/lang/Integer.h
#include activemq/util/Config.h
#include decaf/util/Date.h
#include cms/Connection.h
#include cms/Session.h
#include cms/TextMessage.h
#include cms/BytesMessage.h
#include cms/MapMessage.h
#include cms/ExceptionListener.h
#include cms/MessageListener.h
#include stdlib.h
#include stdio.h
#include iostreamusing namespace activemq;
using namespace activemq::core;
using namespace activemq::transport;
using namespace decaf::lang;
using namespace decaf::util;
using namespace decaf::util::concurrent;
using namespace cms;
using namespace std;
class SimpleAsyncConsumer : public ExceptionListener,public MessageListener,public DefaultTransportListener {
private:Connection* connection;Session* session;Destination* destination;MessageConsumer* consumer;bool useTopic;std::string brokerURI;std::string destURI;bool clientAck;private:SimpleAsyncConsumer(const SimpleAsyncConsumer);SimpleAsyncConsumer operator(const SimpleAsyncConsumer);public:SimpleAsyncConsumer(const std::string brokerURI,const std::string destURI,bool useTopic false,bool clientAck false) :connection(NULL),session(NULL),destination(NULL),consumer(NULL),useTopic(useTopic),brokerURI(brokerURI),destURI(destURI),clientAck(clientAck) {}virtual ~SimpleAsyncConsumer() {this-cleanup();}void close() {this-cleanup();}void runConsumer() {try {// Create a ConnectionFactoryActiveMQConnectionFactory* connectionFactory new ActiveMQConnectionFactory(brokerURI);// Create a Connectionconnection connectionFactory-createConnection();delete connectionFactory;ActiveMQConnection* amqConnection dynamic_castActiveMQConnection*(connection);if (amqConnection ! NULL) {amqConnection-addTransportListener(this);}//如果不是使用的failover连接的话。如果服务器没有开启执行这一步的时候会抛出错误//如果是使用failover连接执行到这一步的时候就不会往下继续执行了就一直卡在这里直到和服务器建立连接//failover连接是一个什么模式我也不是很清楚我只知道在使用这个模式连接的时候它是会自动重连的。即使服务器中途断了服务器再 次开启的时候它是能自动连接上的connection-start();//监听异常。有异常抛出的时候会执行相关的函数connection-setExceptionListener(this);// Create a Sessionif (clientAck) {session connection-createSession(Session::CLIENT_ACKNOWLEDGE);} else {session connection-createSession(Session::AUTO_ACKNOWLEDGE);}// Create the destination (Topic or Queue)//使用queue模式还是topic模式//queue模式是一对一连接一个生成者一个消费者。//topic模式是多对多连接可以有多个生成者和多个消费者//如果你需要既发送有接收的消息的话这个情况大概率是应该使用topic模式的//在使用topic模式的时候你自己通过生成者发出去的消息你自己的消费者也是会收到这个消息的//这个时候你可以设置消息过滤或者使用消息属性识别。if (useTopic) {destination session-createTopic(destURI);} else {destination session-createQueue(destURI);}// Create a MessageConsumer from the Session to the Topic or Queueconsumer session-createConsumer(destination);consumer-setMessageListener(this);} catch (CMSException e) {e.printStackTrace();}}// Called from the consumer since this class is a registered MessageListener.virtual void onMessage(const Message* message) {static int count 0;try {count;const TextMessage* textMessage dynamic_castconst TextMessage*(message);string text ;if (textMessage ! NULL) {text textMessage-getText();} else {text NOT A TEXTMESSAGE!;}if (clientAck) {message-acknowledge();}printf(Message #%d Received: %s\n, count, text.c_str());} catch (CMSException e) {e.printStackTrace();}}// If something bad happens you see it here as this class is also been// registered as an ExceptionListener with the connection.virtual void onException(const CMSException ex AMQCPP_UNUSED) {printf(CMS Exception occurred. Shutting down client.\n);exit(1);}virtual void onException(const decaf::lang::Exception ex) {printf(Transport Exception occurred: %s \n, ex.getMessage().c_str());}virtual void transportInterrupted() {std::cout The Connections Transport has been Interrupted. std::endl;}virtual void transportResumed() {std::cout The Connections Transport has been Restored. std::endl;}private:void cleanup(){//*************************************************// Always close destination, consumers and producers before// you destroy their sessions and connection.//*************************************************// Destroy resources.try{if( destination ! NULL ) delete destination;}catch (CMSException e) {}destination NULL;try{if( consumer ! NULL ) delete consumer;}catch (CMSException e) {}consumer NULL;// Close open resources.try{if( session ! NULL ) session-close();if( connection ! NULL ) connection-close();}catch (CMSException e) {}// Now Destroy themtry{if( session ! NULL ) delete session;}catch (CMSException e) {}session NULL;try{if( connection ! NULL ) delete connection;}catch (CMSException e) {}connection NULL;}
};
int main(int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED) {activemq::library::ActiveMQCPP::initializeLibrary();std::cout \n;std::cout Starting the example: std::endl;std::cout -----------------------------------------------------\n;// Set the URI to point to the IPAddress of your broker.// add any optional params to the url to enable things like// tightMarshalling or tcp logging etc. See the CMS web site for// a full list of configuration options.//// http://activemq.apache.org/cms///// Wire Format Options:// // Use either stomp or openwire, the default ports are different for each//// Examples:// tcp://127.0.0.1:61616 default to openwire// tcp://127.0.0.1:61616?wireFormatopenwire same as above// tcp://127.0.0.1:61613?wireFormatstomp use stomp instead//std::string brokerURI failover:(tcp://127.0.0.1:61616
// ?wireFormatopenwire
// connection.useAsyncSendtrue
// transport.commandTracingEnabledtrue
// transport.tcpTracingEnabledtrue
// wireFormat.tightEncodingEnabledtrue);//// This is the Destination Name and URI options. Use this to// customize where the consumer listens, to have the consumer// use a topic or queue set the useTopics flag.//std::string destURI TEST.FOO; //?consumer.prefetchSize1;//// set to true to use topics instead of queues// Note in the code above that this causes createTopic or// createQueue to be used in the consumer.//bool useTopics false;//// set to true if you want the consumer to use client ack mode// instead of the default auto ack mode.//bool clientAck false;// Create the consumerSimpleAsyncConsumer consumer( brokerURI, destURI, useTopics, clientAck );// Start it up and it will listen forever.consumer.runConsumer();// Wait to exit.std::cout Press q to quit std::endl;while( std::cin.get() ! q) {}// All CMS resources should be closed before the library is shutdown.consumer.close();std::cout -----------------------------------------------------\n;std::cout Finished with the example. std::endl;std::cout \n;activemq::library::ActiveMQCPP::shutdownLibrary();
}代码2生成者
#include decaf/lang/Thread.h
#include decaf/lang/Runnable.h
#include decaf/util/concurrent/CountDownLatch.h
#include decaf/lang/Long.h
#include decaf/util/Date.h
#include activemq/core/ActiveMQConnectionFactory.h
#include activemq/util/Config.h
#include activemq/library/ActiveMQCPP.h
#include cms/Connection.h
#include cms/Session.h
#include cms/TextMessage.h
#include cms/BytesMessage.h
#include cms/MapMessage.h
#include cms/ExceptionListener.h
#include cms/MessageListener.h
#include stdlib.h
#include stdio.h
#include iostream
#include memoryusing namespace activemq;
using namespace activemq::core;
using namespace decaf;
using namespace decaf::lang;
using namespace decaf::util;
using namespace decaf::util::concurrent;
using namespace cms;
using namespace std;
class SimpleProducer : public Runnable {
private:Connection* connection;Session* session;Destination* destination;MessageProducer* producer;bool useTopic;bool clientAck;unsigned int numMessages;std::string brokerURI;std::string destURI;private:SimpleProducer( const SimpleProducer );SimpleProducer operator ( const SimpleProducer );public:SimpleProducer( const std::string brokerURI, unsigned int numMessages,const std::string destURI, bool useTopic false, bool clientAck false ) :connection(NULL),session(NULL),destination(NULL),producer(NULL),useTopic(useTopic),clientAck(clientAck),numMessages(numMessages),brokerURI(brokerURI),destURI(destURI) {}virtual ~SimpleProducer(){cleanup();}void close() {this-cleanup();}virtual void run() {try {// Create a ConnectionFactoryauto_ptrActiveMQConnectionFactory connectionFactory(new ActiveMQConnectionFactory( brokerURI ) );// Create a Connectiontry{connection connectionFactory-createConnection();connection-start();} catch( CMSException e ) {e.printStackTrace();throw e;}// Create a Sessionif( clientAck ) {session connection-createSession( Session::CLIENT_ACKNOWLEDGE );} else {session connection-createSession( Session::AUTO_ACKNOWLEDGE );}// Create the destination (Topic or Queue)if( useTopic ) {destination session-createTopic( destURI );} else {destination session-createQueue( destURI );}// Create a MessageProducer from the Session to the Topic or Queueproducer session-createProducer( destination );producer-setDeliveryMode( DeliveryMode::NON_PERSISTENT );// Create the Thread Id Stringstring threadIdStr Long::toString( Thread::currentThread()-getId() );// Create a messagesstring text (string)Hello world! from thread threadIdStr;for( unsigned int ix0; ixnumMessages; ix ){TextMessage* message session-createTextMessage( text );//这里有一个setIntProperty应该也会有一个对应的getIntProperty我没有去看有没有。你可以在消费者获取消息的时候调用这个函数函数属性是否一致如果一致说明是自己发送的消息就可以不去处理它。//可以确认的是TextMessage是有一个setPropertyString和getPropertyString函数的这个函数可以用了区别是否是自己的消息。message-setIntProperty( Integer, ix );// Tell the producer to send the messageprintf( Sent message #%d from thread %s\n, ix1, threadIdStr.c_str() );producer-send( message );delete message;}}catch ( CMSException e ) {e.printStackTrace();}}private:void cleanup(){// Destroy resources.try{if( destination ! NULL ) delete destination;}catch ( CMSException e ) { e.printStackTrace(); }destination NULL;try{if( producer ! NULL ) delete producer;}catch ( CMSException e ) { e.printStackTrace(); }producer NULL;// Close open resources.try{if( session ! NULL ) session-close();if( connection ! NULL ) connection-close();}catch ( CMSException e ) { e.printStackTrace(); }try{if( session ! NULL ) delete session;}catch ( CMSException e ) { e.printStackTrace(); }session NULL;try{if( connection ! NULL ) delete connection;}catch ( CMSException e ) { e.printStackTrace(); }connection NULL;}
};
int main(int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED) {//这个函数只调用一次activemq::library::ActiveMQCPP::initializeLibrary();std::cout \n;std::cout Starting the example: std::endl;std::cout -----------------------------------------------------\n;// Set the URI to point to the IPAddress of your broker.// add any optional params to the url to enable things like// tightMarshalling or tcp logging etc. See the CMS web site for// a full list of configuration options.//// http://activemq.apache.org/cms///// Wire Format Options:// // Use either stomp or openwire, the default ports are different for each//// Examples:// tcp://127.0.0.1:61616 default to openwire// tcp://127.0.0.1:61616?wireFormatopenwire same as above// tcp://127.0.0.1:61613?wireFormatstomp use stomp instead//std::string brokerURI failover://(tcp://127.0.0.1:61616
// ?wireFormatopenwire
// connection.useAsyncSendtrue
// transport.commandTracingEnabledtrue
// transport.tcpTracingEnabledtrue
// wireFormat.tightEncodingEnabledtrue);//// Total number of messages for this producer to send.//unsigned int numMessages 2000;//// This is the Destination Name and URI options. Use this to// customize where the Producer produces, to have the producer// use a topic or queue set the useTopics flag.//std::string destURI TEST.FOO;//// set to true to use topics instead of queues// Note in the code above that this causes createTopic or// createQueue to be used in the producer.//bool useTopics false;// Create the producer and run it.SimpleProducer producer( brokerURI, numMessages, destURI, useTopics );// Publish the given number of Messagesproducer.run();// Before exiting we ensure that all CMS resources are closed.producer.close();std::cout -----------------------------------------------------\n;std::cout Finished with the example. std::endl;std::cout \n;activemq::library::ActiveMQCPP::shutdownLibrary();
}
文章转载自: http://www.morning.lwyqd.cn.gov.cn.lwyqd.cn http://www.morning.ykmkz.cn.gov.cn.ykmkz.cn http://www.morning.yfzld.cn.gov.cn.yfzld.cn http://www.morning.knlyl.cn.gov.cn.knlyl.cn http://www.morning.fkyqt.cn.gov.cn.fkyqt.cn http://www.morning.wcrcy.cn.gov.cn.wcrcy.cn http://www.morning.txmkx.cn.gov.cn.txmkx.cn http://www.morning.qhvah.cn.gov.cn.qhvah.cn http://www.morning.wjxtq.cn.gov.cn.wjxtq.cn http://www.morning.yfzld.cn.gov.cn.yfzld.cn http://www.morning.jcxgr.cn.gov.cn.jcxgr.cn http://www.morning.rqnzh.cn.gov.cn.rqnzh.cn http://www.morning.pylpd.cn.gov.cn.pylpd.cn http://www.morning.ttaes.cn.gov.cn.ttaes.cn http://www.morning.jjmrx.cn.gov.cn.jjmrx.cn http://www.morning.ppghc.cn.gov.cn.ppghc.cn http://www.morning.plchy.cn.gov.cn.plchy.cn http://www.morning.jfnlj.cn.gov.cn.jfnlj.cn http://www.morning.yrnrr.cn.gov.cn.yrnrr.cn http://www.morning.fmtfj.cn.gov.cn.fmtfj.cn http://www.morning.jydhl.cn.gov.cn.jydhl.cn http://www.morning.pnntx.cn.gov.cn.pnntx.cn http://www.morning.dxhnm.cn.gov.cn.dxhnm.cn http://www.morning.mhcys.cn.gov.cn.mhcys.cn http://www.morning.mkkcr.cn.gov.cn.mkkcr.cn http://www.morning.xdxpq.cn.gov.cn.xdxpq.cn http://www.morning.rwzc.cn.gov.cn.rwzc.cn http://www.morning.dhbyj.cn.gov.cn.dhbyj.cn http://www.morning.zqcdl.cn.gov.cn.zqcdl.cn http://www.morning.rccbt.cn.gov.cn.rccbt.cn http://www.morning.tkrdg.cn.gov.cn.tkrdg.cn http://www.morning.mxhcf.cn.gov.cn.mxhcf.cn http://www.morning.datadragon-auh.cn.gov.cn.datadragon-auh.cn http://www.morning.xgbq.cn.gov.cn.xgbq.cn http://www.morning.lcbt.cn.gov.cn.lcbt.cn http://www.morning.hdwjb.cn.gov.cn.hdwjb.cn http://www.morning.nkqrq.cn.gov.cn.nkqrq.cn http://www.morning.flxqm.cn.gov.cn.flxqm.cn http://www.morning.jfjqs.cn.gov.cn.jfjqs.cn http://www.morning.qmsbr.cn.gov.cn.qmsbr.cn http://www.morning.kwqqs.cn.gov.cn.kwqqs.cn http://www.morning.gjlxn.cn.gov.cn.gjlxn.cn http://www.morning.ctfh.cn.gov.cn.ctfh.cn http://www.morning.gthwz.cn.gov.cn.gthwz.cn http://www.morning.ktskc.cn.gov.cn.ktskc.cn http://www.morning.ktcrr.cn.gov.cn.ktcrr.cn http://www.morning.djgrg.cn.gov.cn.djgrg.cn http://www.morning.zdqsc.cn.gov.cn.zdqsc.cn http://www.morning.nrfqd.cn.gov.cn.nrfqd.cn http://www.morning.hmktd.cn.gov.cn.hmktd.cn http://www.morning.xrksf.cn.gov.cn.xrksf.cn http://www.morning.zdwjg.cn.gov.cn.zdwjg.cn http://www.morning.sqqdy.cn.gov.cn.sqqdy.cn http://www.morning.mhlsx.cn.gov.cn.mhlsx.cn http://www.morning.jlnlr.cn.gov.cn.jlnlr.cn http://www.morning.lflnb.cn.gov.cn.lflnb.cn http://www.morning.pwgzh.cn.gov.cn.pwgzh.cn http://www.morning.kstgt.cn.gov.cn.kstgt.cn http://www.morning.qbrs.cn.gov.cn.qbrs.cn http://www.morning.rwbx.cn.gov.cn.rwbx.cn http://www.morning.kldtf.cn.gov.cn.kldtf.cn http://www.morning.tyjnr.cn.gov.cn.tyjnr.cn http://www.morning.jrgxx.cn.gov.cn.jrgxx.cn http://www.morning.dfhkh.cn.gov.cn.dfhkh.cn http://www.morning.skbhl.cn.gov.cn.skbhl.cn http://www.morning.zfrs.cn.gov.cn.zfrs.cn http://www.morning.jqjnl.cn.gov.cn.jqjnl.cn http://www.morning.qstjr.cn.gov.cn.qstjr.cn http://www.morning.ljwyc.cn.gov.cn.ljwyc.cn http://www.morning.ljsxg.cn.gov.cn.ljsxg.cn http://www.morning.gfpyy.cn.gov.cn.gfpyy.cn http://www.morning.dbrnl.cn.gov.cn.dbrnl.cn http://www.morning.gfmpk.cn.gov.cn.gfmpk.cn http://www.morning.qmbtn.cn.gov.cn.qmbtn.cn http://www.morning.rzscb.cn.gov.cn.rzscb.cn http://www.morning.cnkrd.cn.gov.cn.cnkrd.cn http://www.morning.phtqr.cn.gov.cn.phtqr.cn http://www.morning.pbtdr.cn.gov.cn.pbtdr.cn http://www.morning.rxdsq.cn.gov.cn.rxdsq.cn http://www.morning.bfgbz.cn.gov.cn.bfgbz.cn