一、下载与安装
直接去官网(
http://activemq.apache.org/)下载最新版本即可,由于这是免安装的,只需要解压就行了。安装完之后进入bin目录,双击 activemq.bat文件(linux下在bin目录下执行 activemq start)
二、访问控制台
在浏览器输入:http://ip:8161/admin/,出现如下界面表示启动成功,默认的用户名密码都是admin
三、修改端口号
61616为对外服务端口号
8161为控制器端口号
当端口号冲突时,可以修改这两个端口号。cd conf ,修改activemq.xml 修改里面的61616端口。修改jetty.xml,修改里面的8161端口。
queue队列模式:
和rabbitmq简单队列模式一样,若是有多个消费者消费同一个队列中的消息的话,默认也是轮询机制的消费
示例代码:
public class productor {
public static final string borker_url = "tcp://127.0.0.1:61616";
public static final string queue_name = "queue1";
public static void main(string[] args) throws jmsexception {
//创建工厂
activemqconnectionfactory factory = new activemqconnectionfactory(borker_url);
//创建tcp连接
connection connection = factory.createconnection();
//建立连接
connection.start();
/**
* 创建会话,1.是否开启事务,2.签收模式
*/
session session = connection.createsession(false, session.auto_acknowledge);
//创建队列(消息的目的地)
queue queue = session.createqueue(queue_name);
//创建生产者
messageproducer producer = session.createproducer(queue);
//消息非持久化
producer.setdeliverymode(deliverymode.non_persistent);
//消息持久化 默认是持久化的
// producer.setdeliverymode(deliverymode.persistent);
//创建消息
textmessage message = session.createtextmessage("你好吗");
//发送消息
producer.send(message);
producer.close();
session.close();
connection.close();
system.out.println("发送成功!");
}
}
public class consumer {
public static final string borker_url = "tcp://127.0.0.1:61616";
public static final string queue_name = "queue1";
public static void main(string[] args) throws jmsexception {
//创建工厂
activemqconnectionfactory factory = new activemqconnectionfactory(borker_url);
//创建tcp连接
connection connection = factory.createconnection();
//建立连接
connection.start();
/**
* 创建会话,1.是否开启事务,2.签收模式
*/
session session = connection.createsession(false, session.auto_acknowledge);
//创建/声明队列(消息的目的地)
queue queue = session.createqueue(queue_name);
//创建消费者
messageconsumer consumer = session.createconsumer(queue);
/*while (true) {
//receive会阻塞线程
textmessage message = (textmessage)consumer.receive();
system.out.println("接收到消息:" + message.gettext());
}*/
//监听的方式消费
consumer.setmessagelistener(message -> {
textmessage textmessage = (textmessage)message;
try {
system.out.println("1号接收到消息:" + textmessage.gettext());
} catch (jmsexception e) {
e.printstacktrace();
}
});
}
}
topic队列模式:
称为发布订阅模式,生产者把消息发送给订阅给某个topic主题的消费者,是分发的模式,这种模式默认需要先启动消费者,不然就算生产者发布了某个topic主题的消息,消费者也消费不了;除非消费者提前订阅,并且做了消息持久化的处理,这样后启动消费者才能消费提前推送的消息。
代码:
public class productor {
public static final string borker_url = "tcp://127.0.0.1:61616";
public static final string topic_name = "topic1";
public static void main(string[] args) throws jmsexception {
//创建工厂
activemqconnectionfactory factory = new activemqconnectionfactory(borker_url);
//异步投递
factory.setuseasyncsend(true);
//创建tcp连接
connection connection = factory.createconnection();
/**
* 创建会话,1.是否开启事务,2.签收模式
*/
session session = connection.createsession(false, session.auto_acknowledge);
//创建/声明topic(消息的目的地)
topic topic = session.createtopic(topic_name);
//创建生产者
activemqmessageproducer producer = (activemqmessageproducer)session.createproducer(topic);
//持久化
producer.setdeliverymode(deliverymode.persistent);
//建立连接
connection.start();
//创建消息
textmessage message = session.createtextmessage("你好吗");
//发送消息,异步发送回调函数
producer.send(message, new asynccallback() {
@override
public void onsuccess() {
system.out.println("success");
}
@override
public void onexception(jmsexception e) {
system.out.println("fail");
}
});
producer.close();
session.close();
connection.close();
system.out.println("发送成功!");
}
}
public class consumer1 {
public static final string borker_url = "tcp://127.0.0.1:61616";
public static final string topic_name = "topic1";
public static void main(string[] args) throws jmsexception {
//创建工厂
activemqconnectionfactory factory = new activemqconnectionfactory(borker_url);
//创建tcp连接
connection connection = factory.createconnection();
//制定clientid
connection.setclientid("my");
/**
* 创建会话,1.是否开启事务,2.签收模式
*/
session session = connection.createsession(false, session.auto_acknowledge);
//创建/声明topic(消息的目的地)
topic topic = session.createtopic(topic_name);
//订阅主题
topicsubscriber subscriber = session.createdurablesubscriber(topic, "remark");
//建立连接
connection.start();
while (true) {
//receive会阻塞线程
//接收订阅的消息
textmessage message = (textmessage) subscriber.receive();
system.out.println("接收到消息:" + message.gettext());
}
/*//创建消费者
messageconsumer consumer = session.createconsumer(topic);
//建立连接
connection.start();
*//*while (true) {
//receive会阻塞线程
textmessage message = (textmessage)consumer.receive();
system.out.println("接收到消息:" + message.gettext());
}*//*
//监听的方式消费
consumer.setmessagelistener(message -> {
textmessage textmessage = (textmessage)message;
try {
system.out.println("1号接收到消息:" + textmessage.gettext());
} catch (jmsexception e) {
e.printstacktrace();
}
});*/
}
}
如何保证消息的可靠性
回答这个问题主要从持久化,事务,签收这几个方面入手
消息持久化的核心代码:
//queue模式的消息持久化 默认是持久化的
producer.setdeliverymode(deliverymode.persistent);
/**
* topic模式的持久化
*/
topic topic = session.createtopic(topic_name);
activemqmessageproducer producer = (activemqmessageproducer)session.createproducer(topic);
producer.setdeliverymode(deliverymode.persistent);
connection.start();
事务的核心代码(偏生产者):
//参数设置成true
connection.createsession(false, session.auto_acknowledge);
//事务提交
session.commit();
签收的核心代码(偏消费者):
//参数设置成手动提交
connection.createsession(false, session.client_acknowledge);
//消息签收
message.acknowledge();
注意:若是既开启事务,又开启手动签收,以事务为准,只要事务被提交了也默认消息被签收了
性能提升:
1.利用nio的协议比tcp的性能高,
- 配置方式:在conf目录下activemq.xml照着下面配置
<broker>
...
<transportconnectors>
<transportconnector name="nio" uri="nio://0.0.0.0:61616"/>
</<transportconnectors>
...
</broker>
- 第二步是代码访问方式由tcp改为nio
//创建工厂
activemqconnectionfactory factory = new activemqconnectionfactory("nio://127.0.0.1:61616");
2.jdbc+journaling提高只有jdbc持久化的性能,它在做持久化入数据库之前,会先将数据保存到journaling文件中,之后才慢慢同步到数据库中,等于中间加了一层缓冲层。
- 把数据库mysql的驱动包放到lib目录下
- 配置方式:在conf目录下activemq.xml照着下面配置,其中有个createtablesonstartup属性,默认值是true,表示每次启动后去数据库自动建表
<persistenceadapter>
<kahadb directory="${activemq.data}/kahadb"/>
</persistenceadapter>
//上面是默认配置找到改成下面的配置
<persistenceadapter>
<journalpersistenceadapterfactory journallogfiles="5" datadirectory="${basedir}/activemq-data" datasource="#mysql-ds"/>
</persistenceadapter>
//下面的代码写在<beans>节点中
<bean id="mysql-ds" class="org.apache.commons.dbcp.basicdatasource" destroy-method="close">
<property name="driverclassname" value="com.mysql.jdbc.driver"/>
<property name="url" value="jdbc:mysql://localhost/activemq?relaxautocommit=true"/>
<property name="username" value="activemq"/>
<property name="password" value="activemq"/>
<property name="poolpreparedstatements" value="true"/>
</bean>
声明:如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。