当前位置:首页 > 行业动态 > 正文

activemq服务器如何使用

ActiveMQ服务器使用步骤:下载安装、启动服务,可通过管理控制台或JMX进行配置管理,客户端连接需指定连接URL等参数。

ActiveMQ 是一个强大的开源消息中间件,它支持多种语言编写客户端,对 Spring 的支持也非常友好,以下是 ActiveMQ 服务器的详细使用步骤:

1、环境准备

下载与安装

Windows系统:从 [ActiveMQ官网](http://activemq.apache.org/components/classic/download/) 下载 Windows 版本的压缩包,解压到指定目录,如C:activemq

Linux系统:使用wget 命令下载,例如wget https://mirrors.tuna.tsinghua.edu.cn/apache/activemq/5.15.3/apache-activemq-5.15.3-bin.tar.gz,然后使用tar -zxf 命令解压到指定目录,如/opt

配置环境变量(以 Linux 为例)

打开/etc/profile 文件,添加以下内容:

        export ACTIVEMQ_HOME=/opt/apache-activemq-5.15.3
        export PATH=$PATH:$ACTIVEMQ_HOME/bin

保存文件后,使环境变量生效:source /etc/profile

2、启动与停止

启动

进入bin 目录,执行./activemq start 命令启动 ActiveMQ 服务器,也可以使用./activemq console 命令作为前台进程启动,方便查看日志输出,按Ctrl+C 可停止。

停止

执行./activemq stop 命令停止服务器。

检查状态

可以通过ps -ef | grep activemq 命令查看 ActiveMQ 进程是否在运行,或者通过浏览器访问管理界面(默认端口 8161)来检查服务器状态。

3、浏览器访问管理界面

查看服务端口:在conf/jetty.xml 文件中可以查看管理控制台的服务端口,默认为 8161。

查看用户密码:在conf/users.properties 文件中,默认的用户名和密码均为admin

访问管理控制台:在浏览器中输入http://localhost:8161/admin(如果是本地部署)或http://<服务器 IP>:8161/admin(如果是远程访问),然后使用默认的用户名和密码登录,登录后可以进行队列、主题等的管理操作。

4、防火墙配置(如果需要远程访问)

找到管理控制台的服务端口(默认 8161)和消息传输的端口(默认 61616)。

编辑防火墙配置文件(如/etc/sysconfig/iptables),添加以下规则:

     -A INPUT -m state --state NEW -m tcp -p tcp --dport 8161 -j ACCEPT
     -A INPUT -m state --state NEW -m tcp -p tcp --dport 61616 -j ACCEPT

重启防火墙使配置生效:service iptables restart

5、创建 Java 项目并使用 ActiveMQ

创建项目并导入依赖

创建一个 Maven 项目,在pom.xml 文件中添加 ActiveMQ 的依赖:

        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.15.3</version>
        </dependency>

创建消息生产者

创建一个类,如MyProducer,实现发送消息的方法:

        import javax.jms.Connection;
        import javax.jms.ConnectionFactory;
        import javax.jms.Destination;
        import javax.jms.JMSException;
        import javax.jms.Message;
        import javax.jms.MessageProducer;
        import javax.jms.Session;
        import org.apache.activemq.ActiveMQConnectionFactory;
        public class MyProducer {
            private ConnectionFactory connectionFactory = null;
            private Connection connection = null;
            private Session session = null;
            private Destination destination = null;
            private MessageProducer producer = null;
            private Message message = null;
            public void sendToMQ(String msg) throws JMSException {
                connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
                connection = connectionFactory.createConnection();
                connection.start();
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                destination = session.createQueue("test-queue");
                producer = session.createProducer(destination);
                message = session.createTextMessage(msg);
                producer.send(message);
                producer.close();
                session.close();
                connection.close();
            }
        }

创建消息消费者

创建一个类,如MyConsumer,实现接收消息的方法:

        import javax.jms.Connection;
        import javax.jms.ConnectionFactory;
        import javax.jms.Destination;
        import javax.jms.JMSException;
        import javax.jms.Message;
        import javax.jms.MessageConsumer;
        import javax.jms.Session;
        import javax.jms.TextMessage;
        import org.apache.activemq.ActiveMQConnectionFactory;
        public class MyConsumer {
            private ConnectionFactory connectionFactory = null;
            private Connection connection = null;
            private Session session = null;
            private Destination destination = null;
            private MessageConsumer consumer = null;
            public void receiveFromMQ() throws JMSException {
                connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
                connection = connectionFactory.createConnection();
                connection.start();
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                destination = session.createQueue("test-queue");
                consumer = session.createConsumer(destination);
                consumer.setMessageListener(new javax.jms.MessageListener() {
                    @Override
                    public void onMessage(Message message) {
                        if (message instanceof TextMessage) {
                            try {
                                TextMessage textMessage = (TextMessage) message;
                                System.out.println("Received: " + textMessage.getText());
                            } catch (JMSException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                });
                // 保持程序运行,等待接收消息
                System.in.read();
                consumer.close();
                session.close();
                connection.close();
            }
        }

运行程序

先运行消息生产者,发送消息到队列,然后运行消息消费者,接收并处理消息,可以在消费者中看到生产者发送的消息被正确接收和打印出来。

相关问答FAQs

1、问:ActiveMQ 支持哪些消息协议?

:ActiveMQ 支持多种消息协议,包括但不限于 OpenWire、Stomp、REST、WS Notification、XMPP 和 AMQP 等,这使得它可以与各种不同类型的客户端应用程序进行通信,满足不同场景下的消息传递需求,OpenWire 是其默认的高效二进制协议,适用于高性能的消息传输;Stomp 协议则常用于 Web 应用程序中实现实时通信功能。

2、问:如何在 ActiveMQ 中实现消息的持久化?

:在 ActiveMQ 中实现消息持久化有多种方式,一种常见的方法是将消息存储到磁盘上,这样即使服务器崩溃,消息也不会丢失,可以通过配置 ActiveMQ 的持久化存储机制来实现,例如使用 KahaDB 或 LevelDB 作为持久化存储引擎,还可以通过设置消息的持久化属性,确保消息在发送时被标记为持久化,这样即使出现网络故障等情况,消息也会在合适的时机重新发送,保证消息的可靠传输,对于消费者的消费进度等信息也可以进行持久化,以便在消费者恢复时能够继续从上次消费的位置开始处理消息。

0