`

使用Jmstemplate向队列中发送数据

阅读更多
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:cxf="http://cxf.apache.org/core"
	xmlns:p="http://cxf.apache.org/policy" xmlns:ss="http://www.springframework.org/schema/security"
	xmlns:jaxws="http://cxf.apache.org/jaxws" xmlns:jee="http://www.springframework.org/schema/jee"
	xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context"
	xmlns:tx="http://www.springframework.org/schema/tx"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
   	http://cxf.apache.org/core http://cxf.apache.org/schemas/core.xsd
   	http://cxf.apache.org/jaxws http://cxf.apache.org/schemas/jaxws.xsd
   	http://cxf.apache.org/policy http://cxf.apache.org/schemas/policy.xsd
   	http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee.xsd
   	http://cxf.apache.org/bindings/soap http://cxf.apache.org/schemas/configuration/soap.xsd
   	http://www.springframework.org/schema/context  http://www.springframework.org/schema/context/spring-context.xsd
   	http://www.springframework.org/schema/security http://www.springframework.org/schema/security/spring-security.xsd
   	http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd
   	http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd"
	default-lazy-init="false">
	<context:annotation-config />

	
	<!-- MQ队列连接工厂配置 	-->
	<bean id="connectionFactory" class="com.ibm.mq.jms.MQQueueConnectionFactory"> 
		<property name="queueManager" value="TB_QM" />
		<property name="hostName" value="192.168.10.30" /> 
		<property name="port" value="1430" /> 
		<property name="channel" value="CHANNEL_TB" />
		<property name="transportType" value="1" />
	</bean> 
	
	<!-- 队列连接工厂适配器 -->
	<bean id="connectionFactoryAdapter"
		class="org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter">
		<property name="targetConnectionFactory" ref="connectionFactory" />
	</bean>
	
	<!-- JmsTemplate模板 -->
	<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> 
		<property name="connectionFactory" ref="connectionFactoryAdapter" />
	</bean>
	
	<!-- 请求队列(发送消息的队列) -->
	<bean id="requestQueue" class="com.ibm.mq.jms.MQQueue"> 
		<property name="baseQueueName" value="MBF_INPUTQ_MQINOUT" />
	</bean>
	 
	 <!-- JmsTemplateInvoke -->
	 <bean id="jmsTemplateInvoke" class="com.iteye.jms.send.JmsTemplateInvoke"> 
		<property name="jmsTemplate" ref="jmsTemplate" />
		<property name="requestQueue" ref="requestQueue" />
	</bean>
</beans>


import java.io.ByteArrayOutputStream;
import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.Map;

import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.Session;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

@SuppressWarnings({ "rawtypes", "unchecked" })
public class JmsTemplateInvoke {
    private static final Logger logger = LoggerFactory.getLogger(JmsTemplateInvoke.class);

    /**
     * JmsTemplate模板
     */
    private JmsTemplate jmsTemplate;

    /**
     * 请求队列
     */
    private Queue requestQueue;

    /**
     * 响应队列
     */
    private Queue responseQueue;

    /**
     * 响应消息接收超时时间
     */
    private long receiveTimeout;

    /**
     * 
     * MQ异步接口调用 <br>
     * 〈功能详细描述〉
     * 
     * @param requestMessage
     * @see [相关类/方法](可选)
     * @since [产品/模块版本](可选)
     */
    public void asynRequestProcessor(String requestMessage) {
        Map attributes = new HashMap();
        attributes.put(MQConstants.JMS_CORRELATION_ID, RequestIDGenerator.generateMessageRequestID());

        sendToProcessQueue(requestMessage, attributes);
    }

    /**
     * 
     * MQ同步接口调用 <br>
     * 〈功能详细描述〉
     * 
     * @param requestMessage
     * @return
     * @see [相关类/方法](可选)
     * @since [产品/模块版本](可选)
     */
    public String syncRequestProcessor(String requestMessage) {
        Map attributes = new HashMap();
        String jmsCorrelationID = RequestIDGenerator.generateMessageRequestID();
        attributes.put(MQConstants.JMS_CORRELATION_ID, jmsCorrelationID);

        // 向队列中发送消息
        sendToProcessQueue(requestMessage, attributes);

        // 根据jmsCorrelationID来接收同步返回的消息
        String messageText = reciveFromReceiptQueue(jmsCorrelationID);

        return messageText;
    }

    /**
     * 
     * 向队列中发送消息 <br>
     * 〈功能详细描述〉
     * 
     * @param msgText
     * @param attributes
     * @see [相关类/方法](可选)
     * @since [产品/模块版本](可选)
     */
    public void sendToProcessQueue(final String msgText, Map attributes) {

        final String correlationID = (String) attributes.get(MQConstants.JMS_CORRELATION_ID);
        final Destination replyTo = (Destination) attributes.get(MQConstants.JMS_REPLY_TO);
        final String type = (String) attributes.get(MQConstants.JMS_TYPE);
        jmsTemplate.send(requestQueue, new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                BytesMessage bm = session.createBytesMessage();

                bm.setIntProperty(MQConstants.JMS_IBM_CHARACTER_SET, 1208);

                try {
                    bm.writeBytes(msgText.getBytes("utf-8"));
                } catch (UnsupportedEncodingException e) {
                    logger.error(e.getMessage(), e);
                    throw new JMSException(e.getMessage());
                }
                bm.setJMSCorrelationID(correlationID);
                bm.setJMSReplyTo(replyTo);
                bm.setJMSType(type);
                return bm;
            }
        });
        logger.debug(
                "[MessageProcessor:sendToProcessQueue()]: [jmsCorrelationID={}]: [message={}] send to queue is successful!",
                correlationID, msgText);

    }

    /**
     * 
     * 根据jmsCorrelationID来接收同步返回的消息 <br>
     * 〈功能详细描述〉
     * 
     * @param jmsCorrelationID
     * @return
     * @see [相关类/方法](可选)
     * @since [产品/模块版本](可选)
     */
    private String reciveFromReceiptQueue(String jmsCorrelationID) {
        // 设置接收等待的超时时间
        BytesMessage msg = null;
        String msgText = null;
        ByteArrayOutputStream byteStream = null;
        try {
            jmsTemplate.setReceiveTimeout(receiveTimeout);
            // 设置根据JMSCorrelationID过滤需要接收的消息
            String selector = "JMSCorrelationID = 'ID:" + byte2HexStr(jmsCorrelationID.getBytes()) + "'";
            // 接收消息
            msg = (BytesMessage) jmsTemplate.receiveSelected(responseQueue, selector);//
            if (msg == null) {
                throw new RuntimeException("socket timeout!" + "jmsCorrelationID=" + jmsCorrelationID);
            }
            byteStream = new ByteArrayOutputStream();
            byte[] buffer = new byte[(int) msg.getBodyLength()];
            msg.readBytes(buffer);
            byteStream.write(buffer, 0, (int) msg.getBodyLength());
            msgText = new String(byteStream.toByteArray());
            logger.debug(
                    "[MessageProcessor:reciveFromReceiptQueue()]: [jmsCorrelationID={}]: [response={}] receive from queue is successful!",
                    jmsCorrelationID, msgText);
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        } finally {
            try {
                if (byteStream != null) {
                    byteStream.close();
                }
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            }
        }
        return msgText;
    }

    public static String byte2HexStr(byte[] b) {
        String hs = "";
        String stmp = "";
        for (int n = 0; n < b.length; n++) {
            stmp = (Integer.toHexString(b[n] & 0XFF));
            if (stmp.length() == 1)
                hs = hs + "0" + stmp;
            else
                hs = hs + stmp;
        }
        return hs.toUpperCase();
    }

    public JmsTemplate getJmsTemplate() {
        return jmsTemplate;
    }

    public void setJmsTemplate(JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;
    }

    public Queue getRequestQueue() {
        return requestQueue;
    }

    public void setRequestQueue(Queue requestQueue) {
        this.requestQueue = requestQueue;
    }

    public long getReceiveTimeout() {
        return receiveTimeout;
    }

    public void setReceiveTimeout(long receiveTimeout) {
        this.receiveTimeout = receiveTimeout;
    }

    public Queue getResponseQueue() {
        return responseQueue;
    }

    public void setResponseQueue(Queue responseQueue) {
        this.responseQueue = responseQueue;
    }
}





public class MQConstants {

    public static final String JMS_CORRELATION_ID = "jmsCorrelationID";

    public static final String JMS_REPLY_TO = "jmsReplyTo";

    public static final String JMS_TYPE = "jmsType";

    public static final String JMS_IBM_CHARACTER_SET = "JMS_IBM_Character_Set";
}



import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;

public class RequestIDGenerator {

    public static String generateMessageRequestID() {
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMddHHmmssSSS");
        String id = simpleDateFormat.format(new Date());
        String numberChar = "0123456789";
        StringBuffer sb = new StringBuffer();
        Random random = new Random();
        for (int i = 0; i < 4; i++) {
            sb.append(numberChar.charAt(random.nextInt(numberChar.length())));
        }
        return id + sb.toString();
    }
}




import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.InputStreamReader;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.testng.AbstractTestNGSpringContextTests;
import org.testng.annotations.Test;

import com.ibm.mq.jms.MQQueue;

/**
 * 〈一句话功能简述〉<br>
 * 〈功能详细描述〉
 * 
 * @see [相关类/方法](可选)
 * @since [产品/模块版本] (可选)
 */
@ContextConfiguration(locations = { "file:src/main/java/com/iteye/jms/send/bean.xml" })
public class JmsTemplateInvokeTest extends AbstractTestNGSpringContextTests {

    @Autowired
    @Qualifier("jmsTemplate")
    private JmsTemplate jmsTemplate;

    @Autowired
    @Qualifier("requestQueue")
    private MQQueue requestQueue;

    @Autowired
    @Qualifier("jmsTemplateInvoke")
    private JmsTemplateInvoke jmsTemplateInvoke;

    /**
     * 
     * 发送MQ消息 <br>
     * 〈功能详细描述〉
     * 
     * @throws Exception
     * @see [相关类/方法](可选)
     * @since [产品/模块版本](可选)
     */
    // @Test
    public void testSendMqMsg1() throws Exception {
        String requestMessage = getEsbXml();
        JmsTemplateInvoke jmsTemplateInvoke = new JmsTemplateInvoke();
        jmsTemplateInvoke.setJmsTemplate(jmsTemplate);
        jmsTemplateInvoke.setRequestQueue(requestQueue);

        jmsTemplateInvoke.asynRequestProcessor(requestMessage);
    }

    /**
     * 
     * 发送MQ消息 <br>
     * 〈功能详细描述〉
     * 
     * @throws Exception
     * @see [相关类/方法](可选)
     * @since [产品/模块版本](可选)
     */
    @Test
    public void testSendMqMsg2() throws Exception {
        String requestMessage = getEsbXml();

        jmsTemplateInvoke.asynRequestProcessor(requestMessage);
    }

    /**
     * 
     * 获取请求报文 <br>
     * 〈功能详细描述〉
     * 
     * @return
     * @throws Exception
     * @see [相关类/方法](可选)
     * @since [产品/模块版本](可选)
     */
    private String getEsbXml() throws Exception {
        StringBuilder esbXml = new StringBuilder(10000);

        BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(
                "D:/CReviewMgmt-syncOrderCommentInfoNew_in_zr.xml"), "UTF-8"));

        String line = null;
        while ((line = br.readLine()) != null) {
            esbXml.append(line);
        }

        br.close();

        return esbXml.toString();
    }
}
分享到:
评论

相关推荐

    SpringBoot集成JmsTemplate(队列模式和主题模式)及xml和JavaConfig配置详解

    主要介绍了SpringBoot集成JmsTemplate(队列模式和主题模式)及xml和JavaConfig配置详解,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下

    JMS与Spring之一(用JmsTemplate同步收发消息)

    JMS与Spring之一(用JmsTemplate同步收发消息)

    Spring JMS异步发收消息 ActiveMQ

    JMS(使用消息中介:ActiveMQ) JMS为JAVA开发者提供了一个与消息中介进行交互,以及发送和接收消息的标准API,而且每一个消息中介的实现都会支持JMS。(即JMS为所有消息中介提供了统一接口);JmsTemplate是Spring...

    Spring JMSTemplate 与 JMS 原生API比较

    NULL 博文链接:https://holdbelief.iteye.com/blog/1491604

    spring-jms:Spring JmsTemplate演示

    简单使用 public class Main { public static void main(String[] args) { ApplicationContext context = new ClassPathXmlApplicationContext("classpath*:applicationContext.xml"); JmsTemplate jmsTemplate =...

    使用SpringJMS轻松实现异步消息传递

    传统的使用JMSAPI进行消息传递的实现包括多个步骤,例如JNDI查询队列连接工厂和Queue资源,在实际发送和接收消息前创建一个JMS会话。Spring框架则简化了使用JEE组件(包括JMS)的任务。它提供的模板机制隐藏了典型的...

    Spring+JMS+消息处理

    Spring+JMS+消息处理

    SpringBoot 整合 JMSTemplate的示例代码

    主要介绍了SpringBoot 整合 JMSTemplate的示例代码,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

    Spring中文帮助文档

    6.8.1. 在Spring中使用AspectJ进行domain object的依赖注入 6.8.2. Spring中其他的AspectJ切面 6.8.3. 使用Spring IoC来配置AspectJ的切面 6.8.4. 在Spring应用中使用AspectJ加载时织入(LTW) 6.9. 更多资源 7...

    Spring-Reference_zh_CN(Spring中文参考手册)

    6.8.1. 在Spring中使用AspectJ来为domain object进行依赖注入 6.8.1.1. @Configurable object的单元测试 6.8.1.2. 多application context情况下的处理 6.8.2. Spring中其他的AspectJ切面 6.8.3. 使用Spring IoC来...

    Spring 2.0 开发参考手册

    6.8.4. 在Spring应用中使用AspectJ Load-time weaving(LTW) 6.9. 其它资源 7. Spring AOP APIs 7.1. 简介 7.2. Spring中的切入点API 7.2.1. 概念 7.2.2. 切入点实施 7.2.3. AspectJ切入点表达式 7.2.4. ...

    Spring API

    6.8.1. 在Spring中使用AspectJ进行domain object的依赖注入 6.8.2. Spring中其他的AspectJ切面 6.8.3. 使用Spring IoC来配置AspectJ的切面 6.8.4. 在Spring应用中使用AspectJ加载时织入(LTW) 6.9. 更多资源 7...

    spring chm文档

    6.8.4. 在Spring应用中使用AspectJ Load-time weaving(LTW) 6.9. 其它资源 7. Spring AOP APIs 7.1. 简介 7.2. Spring中的切入点API 7.2.1. 概念 7.2.2. 切入点实施 7.2.3. AspectJ切入点表达式 7.2.4. ...

    Springboot ActiveMQ 集成.rar

    Springboot ActiveMQ 集成,该项目中包含手动创建连接,以及使用Spring提供的支持,JmsTemplate的使用方式。

    JMEAndroidTemplate:使用Gradle的Android JMonkey Engine项目模板

    如果您发现错误,请向其报告问题或仅对其进行修复并发送拉取请求(第二个请求会更好:))。正在启动这个样板是准备就绪,因此您只需使用此强大工具即可使用此模板创建任何android游戏项目。 无论如何,您甚至可以...

    Spring系列学习之SpringMessaging消息支持

    SpringFramework为与消息传递系统的集成提供了广泛的支持,从使用JmsTemplate简化JMSAPI的使用到异步接收消息的完整基础结构。SpringAMQP为高级消息队列协议提供了类似的功能集。SpringBoot还为RabbitTemplate和...

    Spring+JMS+ActiveMQ+Tomcat实现消息服务的demo

    基于Spring+JMS+ActiveMQ+Tomcat,我使用的版本情况如下所示:Spring 3.2.0,ActiveMQ 5.4.3,Tomcat 6.0.43。本例通过详细的说明和注释,实现消息服务的基本功能:发送与接收。Spring对JMS提供了很好的支持,可以...

    Spring+JMS+ActiveMQ+Tomcat实现消息服务_服务器应用

    基于Spring+JMS+ActiveMQ+Tomcat,我使用的版本情况如下所示: •Spring 2.5 •ActiveMQ 5.4.0 •Tomcat 6.0.30 下面通过学习与配置,实现消息服务的基本功能:发送与接收。Spring对JMS提供了很好的支持,可以...

Global site tag (gtag.js) - Google Analytics