`

MQ消息接收接口

阅读更多
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
	xmlns:jee="http://www.springframework.org/schema/jee" xmlns:tx="http://www.springframework.org/schema/tx"
	xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd   
       http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd   
       http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd   
       http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
       http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-2.0.xsd"
	default-lazy-init="false">
	
	<!-- MQ消息监听接口服务 -->
	<bean id="jmsReceiverListenerServer" class="com.iteye.jms.receive.JmsReceiverListenerServer"></bean>
	
	<!-- MQ队列连接工厂配置 -->
	<bean id="connectionFactory" class="com.ibm.mq.jms.MQQueueConnectionFactory"> 
		<property name="queueManager" value="队列管理器名称" />
		<property name="hostName" value="IP" /> 
		<property name="port" value="端口" /> 
		<property name="channel" value="通道" />
		<property name="transportType" value="传输方式" />
	</bean> 
	
	<!-- MQ队列连接工厂配置适配器 -->
	<bean id="connectionFactoryAdapter"
		class="org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter">
		<property name="targetConnectionFactory" ref="connectionFactory" />
	</bean>
	
	<!-- 订单接收队列 -->
	<bean id="orderReceiveQueue" class="com.ibm.mq.jms.MQQueue"> 
		<property name="baseQueueName" value="OrderReceive_Queue" /> 
	</bean> 
	
	<!-- 商品接收队列 -->
	<bean id="productReceiveQueue" class="com.ibm.mq.jms.MQQueue"> 
		<property name="baseQueueName" value="ProductReceive_Queue" /> 
	</bean> 
	
	<!-- 订单消息监听器 -->	
	<bean  class="com.iteye.jms.receive.OrderQueueMessageListener"></bean>
	
	<!-- 商品消息监听器 -->	
	<bean  class="com.iteye.jms.receive.ProductQueueMessageListener"></bean>
</beans>




package com.iteye.jms.receive;

import static java.lang.annotation.ElementType.TYPE;
import static java.lang.annotation.RetentionPolicy.RUNTIME;

import java.lang.annotation.Retention;
import java.lang.annotation.Target;

@Target(TYPE)
@Retention(RUNTIME)
public @interface JmsReceiveMsgAnno {

    /**
     * 监听的链接工厂
     */
    String connectionFactory() default "";

    /**
     * 监听队列
     */
    String receiveQueue() default "";

    /**
     * 监听端消费者数量
     */
    int consumers() default 1;

    /**
     * 消息选择器
     */
    String messageSelector() default "";

    /**
     * 是否开启事务
     */
    boolean sessionTransacted() default false;
}






package com.iteye.jms.receive;

import java.util.HashSet;
import java.util.Set;

import javax.jms.ConnectionFactory;
import javax.jms.MessageListener;
import javax.jms.Queue;

import org.apache.commons.lang.StringUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.jms.listener.DefaultMessageListenerContainer;

/**
 * 
 * 〈一句话功能简述〉<br>
 * MQ消息监听接口服务
 * 
 * @see [相关类/方法](可选)
 * @since [产品/模块版本] (可选)
 */
@SuppressWarnings("rawtypes")
public class JmsReceiverListenerServer implements BeanPostProcessor, ApplicationContextAware, ApplicationListener,
        DisposableBean {

    private ApplicationContext applicationContext;

    private boolean started = false;

    /**
     * 服务端监听器容器列表(Set集合不允许重复)
     */
    private Set<DefaultMessageListenerContainer> messageListenerContainers = new HashSet<DefaultMessageListenerContainer>();

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
        // 扫描所有使用了@JmsReceiveMsgAnno的bean,将其注册为服务端监听器容器列表
        registerMQServerBean(bean);
        return bean;
    }

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        return bean;
    }

    /**
     * 在所有的Spring加载工作完成之后会执行onApplicationEvent方法,启动消息监听器进行消息监听
     */
    @Override
    public void onApplicationEvent(ApplicationEvent event) {
        if (isCurrentApplicationContextRefresh(event)) {
            if (started) {
                return;
            }

            started = true;
            // 遍历所有消息监听器列表,启动消息监听
            for (DefaultMessageListenerContainer container : messageListenerContainers) {
                container.afterPropertiesSet();
                container.start();
            }
        }
    }

    /**
     * 
     * 是否是当前上下文,防止重复加载和过早加载 <br>
     * 〈功能详细描述〉
     * 
     * @param event
     * @return
     * @see [相关类/方法](可选)
     * @since [产品/模块版本](可选)
     */
    private boolean isCurrentApplicationContextRefresh(ApplicationEvent event) {
        return event instanceof ContextRefreshedEvent
                && ((ContextRefreshedEvent) event).getApplicationContext() == applicationContext;
    }

    /**
     * 容器销毁时停止消息监听器监听
     */
    @Override
    public void destroy() throws Exception {
        if (started) {
            for (DefaultMessageListenerContainer container : messageListenerContainers) {
                container.shutdown();
            }
        }
    }

    /**
     * 
     * 扫描所有使用了@JmsReceiveMsgAnno的bean,将其注册为服务端监听器容器列表 <br>
     * 〈功能详细描述〉
     * 
     * @param bean
     * @see [相关类/方法](可选)
     * @since [产品/模块版本](可选)
     */
    private void registerMQServerBean(Object bean) {
        Class<?> clazz = bean.getClass();
        // 该类是否使用了JmsReceiveMsgAnno注解
        if (clazz.isAnnotationPresent(JmsReceiveMsgAnno.class) == false) {
            return;
        } else {
            // 该类必须实现MessageListener接口
            if (!MessageListener.class.isAssignableFrom(clazz)) {
                throw new RuntimeException("消息监听器必须实现MessageListener接口");
            }
        }

        // 消息监听器
        MessageListener listener = (MessageListener) bean;
        // 注解信息类对象
        JmsReceiveMsgAnno anno = clazz.getAnnotation(JmsReceiveMsgAnno.class);

        // 默认的消息监听容器
        DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
        // 队列连接工厂
        container.setConnectionFactory(applicationContext.getBean(anno.connectionFactory(), ConnectionFactory.class));
        // 监听队列
        container.setDestination(applicationContext.getBean(anno.receiveQueue(), Queue.class));
        // 服务端消费者数量
        container.setConcurrentConsumers(anno.consumers());
        // 消息监听器
        container.setMessageListener(listener);
        // 是否开启事务
        container.setSessionTransacted(anno.sessionTransacted());
        if (StringUtils.isNotEmpty(anno.messageSelector())) {
            // 消息选择器
            container.setMessageSelector(anno.messageSelector());
        }

        messageListenerContainers.add(container);
    }

}







package com.iteye.jms.receive;

import javax.jms.BytesMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

@JmsReceiveMsgAnno(receiveQueue = "orderReceiveQueue", connectionFactory = "connectionFactoryAdapter")
public class OrderQueueMessageListener implements MessageListener {

    @Override
    public void onMessage(Message message) {
        // 报文消息
        String msgstr = null;

        try {
            if (message instanceof TextMessage) {
                TextMessage tm = (TextMessage) message;
                msgstr = tm.getText();
            } else if (message instanceof BytesMessage) {
                BytesMessage bm = (BytesMessage) message;
                byte[] bys = null;
                bys = new byte[(int) bm.getBodyLength()];
                bm.readBytes(bys);
                msgstr = new String(bys);
            } else {
                // 日志
            }
        } catch (Exception e) {
            // 日志
        }

        System.out.println(msgstr);
    }

}







package com.iteye.jms.receive;

import javax.jms.BytesMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

@JmsReceiveMsgAnno(receiveQueue = "productReceiveQueue", connectionFactory = "connectionFactoryAdapter")
public class ProductQueueMessageListener implements MessageListener {

    @Override
    public void onMessage(Message message) {
        // 报文消息
        String msgstr = null;

        try {
            if (message instanceof TextMessage) {
                TextMessage tm = (TextMessage) message;
                msgstr = tm.getText();
            } else if (message instanceof BytesMessage) {
                BytesMessage bm = (BytesMessage) message;
                byte[] bys = null;
                bys = new byte[(int) bm.getBodyLength()];
                bm.readBytes(bys);
                msgstr = new String(bys);
            } else {
                // 日志
            }
        } catch (Exception e) {
            // 日志
        }

        System.out.println(msgstr);
    }

}





当向OrderReceive_Queue队列发送消息时会触发订单导入消息监听接口,执行onMessage方法。

当向ProductReceive_Queue队列发送消息时会触发商品导入消息监听接口,执行onMessage方法。
分享到:
评论

相关推荐

    用C#连接IBM MQ

    1、传输队列。...2、消息通道(注意填写链接名称是对方IP,传输队列都填上) 3、远程队列(注意填写远程队列和远程队列管理器名称) 然后往远程队列里面填写数据,就会发送到远程的队列了! (转贴)

    Websphere MQ入门教程

    2.2.1 WebSphere MQ和消息排队 31 2.2.2 队列管理器的进程 32 2.3客户机和服务器 33 客户机-服务器环境中的 WebSphere MQ 应用程序 33 2.4触发机制 33 2.4.1触发的概念 33 2.4.2触发类型 34 2.4.3触发的工作原理 35...

    SpringBoot+ActiveMq+MQTT实现消息的发送和接收

    SpringBoot+ActiveMq+MQTT实现消息的发送和接收 后台消费者、生产者、消息发送接口、发送消息业务类等相关配置

    Websphere MQ入门教程.doc

    2.2.1 WebSphere MQ和消息排队 31 2.2.2 队列管理器的进程 32 2.3客户机和服务器 33 客户机-服务器环境中的 WebSphere MQ 应用程序 33 2.4触发机制 33 2.4.1触发的概念 33 2.4.2触发类型 34 2.4.3触发的工作原理 35...

    Python中线程的MQ消息队列实现以及消息队列的优点解析

    如果发送消息时接收者不可用,消息队列会保留消息,直到可以成功地传递它。相信对任何架构或应用来说,消息队列都是一个至关重要的组件,下面是十个理由: Python的消息队列示例: 1.threading+Queue实现线程队列 ...

    海康iSC平台测试第三方对接接收事件工具

    实时事件对接是通过回调第三方的 http 接口推送事件的,在与第三方对接中 为了方便验证 esc 是否正常回调第三方接口推送事件,我这边开发了一个建议程序,用来监 听一个 http 接口,用来测试事件推送功能。

    vue 调用 RESTful风格接口操作

    主要介绍了vue 调用 RESTful风格接口操作,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧

    CC2530 MQ-2 ADC读模拟量

    数据传输:如果需要将读取到的数据传输到PC机或其他设备,可以利用UART(通用异步接收/发送器)等通信接口进行数据传输。 实验参考:可以参考相关的实验教程和视频,这些资源通常会提供具体的代码示例和实验步骤,...

    mq-mqi-nodejs:从Node.js调用IBM MQ-一个JavaScript MQI包装器

    这将使Node.js开发人员可以更轻松地通过MQ发送和接收消息,并与组织中其他启用MQ的应用程序进行交互。 该程序包基于完整的MQI。 它使用与C或COBOL界面基本相同的动词和结构,但在此环境下使用更合适的样式。 它使...

    51单片机(STC15W4K56S4)智能小车驱动板原理图和PCB图.rar

    DHT11温湿度传感器接口、串口、MQ2传感器接口、PM2.5(gp2y1014au)监测模块接口,循迹避障电路接口、超声波模块接口、舵机接口、L293D电机驱动电路、电源电路、按键、LED、对外供电接口、红外遥控信号接收器、1602...

    STM32F103ZET6步进电机智能小车驱动板原理图和PCB图.rar

    DHT11温湿度传感器接口、串口、MQ2传感器接口、PM2.5(gp2y1014au)监测模块接口,循迹避障电路接口、超声波模块接口、舵机接口、ULN2003步进电机驱动电路、电源电路、按键、LED、MPU6050姿态传感器接口、对外供电接口...

    STM32智能小车驱动板原理图和PCB图.rar

    MQ2接口。 3、包括智能小车驱动板原理图和PCB图。 4、如果还需要配套的STM32智能小车核心板原理图和PCB图,可以搜索下“STM32智能小车核心板原理图和PCB图”。 6、依据“STM32智能小车驱动板原理图和PCB图”做成过...

    51单片机步进电机智能小车驱动板原理图和PCB图.rar

    DHT11温湿度传感器接口、串口、MQ2传感器接口、PM2.5(gp2y1014au)监测模块接口,循迹避障电路接口、超声波模块接口、舵机接口、ULN2003步进电机驱动电路、电源电路、按键、LED、MPU6050姿态传感器接口、对外供电接口...

    mmi-python:模型消息接口。 在进程之间发送带有元数据的数组

    在这里,我们描述了一个序列化协议,该协议可用作替代消息传递协议(例如ØMQ和WebSockets)之上的一层。 我们的主要重点是发送和接收简单定长类型的n维数组,例如整数和浮点值,以及元数据和其他属性。 我们基于...

    赞:高并发点赞的详情解决方案

    同时,MQ消息中间件接收到消息后,会按照“自己的方式”及时消费,还可以用MQ消息中间件来限制流量并进行同步处理等。 整个流程,如下图所示 详细设计 1.Redis数据结构设计 使用set来存储被点赞的类型id,键为被点赞...

    WinMM网络通讯中间件及其编程接口

    如果链接没有应用处理,则WinMM在一定时间后自动清除接收到的信息,如果有应用程序处理接收的数据,则WinMM通知应用,有信息通过网络接收到。应用可以采用多种方式获得和处理这些信息。 WinMM管理收发队列,自动...

    queue-interop:促进消息队列对象的互操作性

    队列互操作性关于queue-interop尝试识别和标准化PHP程序创建,发送,接收和读取MQ消息以实现互操作性的通用方法。 通过讨论和试验,我们尝试创建一个由通用接口和建议组成的标准。 如果提供队列实现PHP项目开始采用...

    基于 Canal 的 MySql RabbitMQ Redis/memcached/mongodb

    4.对MQ数据的消费(接收+数据解析,考虑消费速度,MQ队列的阻塞) 5.数据写入/修改到nosql (redis的主从/hash分片) 6.保证对应关系的简单性:一个mysql表对应一个 redis实例(redis单线程,多实例保证分流不阻塞...

Global site tag (gtag.js) - Google Analytics