`
岳乡成
  • 浏览: 120520 次
  • 性别: Icon_minigender_1
  • 来自: 北京
文章分类
社区版块
存档分类
最新评论

JMS(Java Massage Service)与MDB(Massage Driver Bean)

阅读更多
JMS(Java Massage Service)与MDB(Massage Driver Bean)
作者:岳乡成
一、 JMS简介
1、 什么是JMS
JMS(Java Massage Service,Java消息服务)是一组Java应用程序接口(Java API),它提供创建、发送、接受、读取消息的服务。由Sun公司和它的合作伙伴设计的JMS API定义了一组公共的应用程序接口和相应的语法,使得Java程序能够和其他消息组件进行通信。
JMS是一种与厂商无关的API,提供了独立于特定厂商的企业消息系统访问方式。
JMS由两部分组成:JMS客户端和JMS消息发送服务(有时也称为消息中介程序或路由器)(JMS Provide)。使用JMS消息的被称为JMS客户端;处理消息路由与传递的消息系统被称为JMS Provide。JMS应用是由多个JMS客户端和一个JMS Provide构成的业务系统。发送消息的JMS客户端被称为生产者(producer),而接受消息的JMS客户端则被称为消费者(consumer)。同一个JMS客户端即可以是生产者,也可以是消费者。
JMS的编程过程很简单,概括为:应用程序A发送一条消息到消息服务器(JMS Provide),然后消息服务器把消息转发给应用程序B。如下图所示:

消息传递系统的中心是消息。一条Massage由3个部分组成:
(1)头:每条JMS消息都必须具有消息头。头字段包含用于路由和识别消息的值。
(2)属性:消息可以包含称作属性的可选头字段。
(3)主体:包含要发送给接收应用程序的内容。
根据有效负载的类型来划分,可以将消息分为几种类型,它们分别携带:简单文本 (TextMessage)、可序列化的对象 (ObjectMessage)、属性集合 (MapMessage)、字节流 (BytesMessage)、原始值流 (StreamMessage),还有无有效负载的消息 (Message)。
消息收发系统是异步的,也就是说,JMS客户机可以发送消息而不必等待回应。在JMS中,客户机将消息发送给一个虚拟通道(主题或队列),而其它JMS客户机则预定或监听这个虚拟通道。当JMS客户机发送消息时,它并不等待回应。它执行发送操作,然后继续执行下一条指令。消息可能最终转发到一个或许多个客户机。这些客户机都不需要立即做出回应。
JMS支持两种消息模型:Point-to-Point消息(PTP)和发布订阅消息(Publish Subscribe messaging,简称Pub/Sub)。JMS规范并不要求供应商同时支持这两种消息模型,但开发者应该熟悉这两种消息模型的优势与缺点。
PTP消息模型是在点对点之间传递消息时使用。如果应用程序开发者希望每一条消息都能够被处理,那么应该使用PTP消息模型。与Pub/Sub消息模型不同,P2P消息总是能够被传送到指定的位置。
通过点对点(PTP)的消息传递模式,一个应用程序可以向另一个应用程序发送消息。在此传递模式中,目标类型是队列。消息首先被传递至队列目标,然后从该队列将消息传送至对此队列进行监听的某个消费者。如下图所示:

Pub/Sub模型在一到多的消息广播时使用。如果一定程度的消息传递的不可靠性可以被接受的话,那么应用程序开发者也可以使用Pub/Sub消息模型。换句话说,它适用于所有的消息消费程序并不要求能够收到所有的信息或者消息消费程序并不想接收到任何消息的情况。
通过发布/订阅(pub/sub)消息传递模式,应用程序能够将一条消息发送到多个接受方。在此传递模式中,目标类型是主题。消息首先被传递至主题目标,然后传送至所有已订阅此主题的活动消费者。

PTP消息传递模式是传统意义上的拉模式,在此模式中,消息不是自动推送给客户端的,而是要由客户端从队列中请求获得。Pub/Sub消息传递模型基本上是一个推模式,在该模式中,消息会自动广播,消费者无需通过主动请求或轮询主题的方式来获得新的消息。
上面两种消息传递模式里,我们都需要定义消息生产者和消费者,生产者把消息发送到JMS Provider的某个目标地址(Destination),消息从该目标地址传送至消费者。消费者可以同步或异步接收消息,一般而言,异步消息消费者的执行和伸缩性都优于同步消息的接收者。体现在如下几个方面:
(1)异步消息的接收者创建的网络流量比较小。
(2)异步消息接收者使用的线程比较小。
(3)使用异步消息接受者可以防止应用程序代码在服务器上执行阻塞操作。
消息驱动Bean是异步消息消费者,它由EJB容器进行管理,具有一般的JMS消费者所不具有的优点。例如:容器可以创建多个消息驱动Bean实例来处理大量的并发消息,而一般的JMS消费者开发时则必须对此进行处理才能获得类似的功能。同时消息驱动Bean可取得EJB所能提供的标准服务,如容器管理事务等服务。
2、什么是消息驱动Bean
   消息驱动Bean是设计用来专门处理基于消息请求的组件。它能够收发异步的JMS消息,并能够轻易地与其他EJB交互,特别适用于当一个业务执行的时间很长,而执行结果无需实时向用户反馈的场合。
消息驱动Bean像一个没有local和remote接口的无状态Session Bean,它和无状态的Session Bean一样也使用了实例池机制,容器可以为它创建大量的实例,用来并发处理成千上万个JMS消息。
消息驱动Bean通常要实现MassageListerner接口,该接口定义了onMassage()方法,消息驱动Bean通过它来处理收到的JMS消息。
Package javax.jms;
Public interface MassageListener{
        Public void onMassage(Massage message);
}
消息驱动Bean通过注释来指定监听哪一个消息发送者发生的JMS消息,当监听到有消息到达时,容器调用onMassage()方法,将消息作为参数传入消息驱动Bean中,消息驱动Bean在onMassage()中决定如何处理该消息。
当一个业务执行的时间很长,而执行结果无需实时向用户反馈时,非常适合MDB,如订单成功后给用户发送一份电子邮件或发送一条短信等。

二、 PTP消息传递模型
1、配置消息到达的目的地址(Destination)
       在开始JMS编程前,需要先配置消息到达的目的地址(Destination),正如发送电子邮件一样,需要先知道对方的E-mail地址。以Jboss为例。
       Jboss使用一个XML文件配置队列地址,其文件的命名格式应遵守*-service.xml。
       <?xml version="1.0" encoding="UTF-8"?>
<server> 
  <mbean code="org.jboss.mq.server.jmx.Queue"
         name="jboss.mq.destination:service=Queue,name=foshanshop">
    <attribute name="JNDIName">queue/foshanshop</attribute>  
    <depends optional-attribute-name="DestinationManager">jboss.mq:service=DestinationManager</depends>
  </mbean>
</server>
配置文件编写完成后,将它复制到“jboss的安装目录server/default/deploy”目录下,这样会引发队列的热部署,可以通过http://localhost:8080/jmx-console进入jboss管理台,查看刚才部署的队列。
            其实在Jboss中使用MDB,可以不配置上述文件,Jboss会根据MDB里的信息自动创建队列地址。
     2、当队列部署成功后,就可以针对该队列编写消息生产者。它发送了5种类型的消息。
              package com.foshanshop.ejb3.app;
import java.util.Properties;

import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;

import com.foshanshop.ejb3.bean.Man;
/**
* 发送Queue消息
* @author yuexiangcheng
*
*/
public class QueueSender {
    public static void main(String[] args) {
        QueueConnection conn = null;
        QueueSession session = null;
        try {
            Properties props = new Properties();
            props.setProperty(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
            props.setProperty(Context.PROVIDER_URL, "localhost:1099");
            props.setProperty(Context.URL_PKG_PREFIXES, "org.jboss.naming:org.jnp.interfaces");
            InitialContext ctx = new InitialContext(props);
           
            QueueConnectionFactory factory = (QueueConnectionFactory) ctx.lookup("QueueConnectionFactory");
            conn = factory.createQueueConnection();
            session = conn.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
            Destination destination = (Queue) ctx.lookup("queue/foshanshop");
            MessageProducer producer = session.createProducer(destination);
           
            //发送文本
            TextMessage msg = session.createTextMessage("山西人您好,这是我的第一个消息驱动Bean");
            producer.send(msg);
           
            //发送Ojbect(对象必须实现序列化,否则等着出错吧)
            producer.send(session.createObjectMessage(new Man("大美女", "北京朝阳区和平里一号")));
           
            //发送MapMessage
            MapMessage mapmsg = session.createMapMessage();
            mapmsg.setObject("no1", "北京和平里一号");
            producer.send(mapmsg);
           
            //发送BytesMessage
            BytesMessage bmsg = session.createBytesMessage();
            bmsg.writeBytes("我是一个兵,来自老百姓".getBytes());
            producer.send(bmsg);           
           
            //发送StreamMessage
            StreamMessage smsg = session.createStreamMessage();
            smsg.writeString("巴巴运动网,http://www.babasport.com");
            producer.send(smsg);
           
        } catch (Exception e) {
        e.printStackTrace();
        }finally{
            try {
                session.close ();
                conn.close();
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
}
3、下面是Queue消息的接受方,它是一个MDB。

package com.foshanshop.ejb3.impl;

import java.io.ByteArrayOutputStream;

import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
import javax.jms.BytesMessage;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;

import com.foshanshop.ejb3.bean.Man;

@MessageDriven(activationConfig =
{
  @ActivationConfigProperty(propertyName="destinationType",
    propertyValue="javax.jms.Queue"),
  @ActivationConfigProperty(propertyName="destination",
    propertyValue="queue/foshanshop"),
  @ActivationConfigProperty(propertyName="acknowledgeMode",
    propertyValue="Auto-acknowledge")
})
public class PrintBean implements MessageListener {

    public void onMessage(Message msg) {
        try {           
            if (msg instanceof TextMessage) {
                TextMessage tmsg = (TextMessage) msg;
                String content = tmsg.getText();
                System.out.println(content);
            }else if(msg instanceof ObjectMessage){
                ObjectMessage omsg = (ObjectMessage) msg;
                Man man = (Man) omsg.getObject();
                String content = man.getName()+ " 家住"+ man.getAddress();
                System.out.println(content);
            }else if(msg instanceof MapMessage){
                MapMessage map = (MapMessage) msg;
                String content = map.getString("no1");
                System.out.println(content);
            }else if(msg instanceof BytesMessage){
                BytesMessage bmsg = (BytesMessage) msg;
                ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
                byte[] buffer = new byte[256];
                int length = 0;
                while ((length = bmsg.readBytes(buffer)) != -1) {
                      byteStream.write(buffer, 0, length);
                }
                String content = new String(byteStream.toByteArray());
                byteStream.close();
                System.out.println(content);
            }else if(msg instanceof StreamMessage){
                StreamMessage smsg = (StreamMessage) msg;
                String content = smsg.readString();
                System.out.println(content);
            }
           
        } catch (Exception e){
            e.printStackTrace();
        }
    }
}
三、 Pub/Sub消息传递模式
Topic消息允许很多个主题订阅者接受同一个消息,所以下面的例子定义了两个消息接收者,当一条消息到达时,这两个接收者都可以收到。
1、配置消息到达的目标地址
   <?xml version="1.0" encoding="UTF-8"?>
<server> 
  <mbean code="org.jboss.mq.server.jmx.Topic"
name="jboss.mq.destination:service=Topic,name=chatTopic">
<attribute name="JNDIName">topic/chatTopic</attribute>
    <depends optional-attribute-name="DestinationManager">jboss.mq:service=DestinationManager</depends>
  </mbean>
</server>
     2、针对该目标地址编写消息的生产者如下:
        package com.foshanshop.ejb3.app;
import java.util.Properties;

import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.naming.Context;
import javax.naming.InitialContext;
/**
* 发送Topic消息
* @author lihuoming
*
*/
public class TopicSender {

    public static void main(String[] args) {
        TopicConnection conn = null;
        TopicSession session = null;
        try {
            Properties props = new Properties();
            props.setProperty(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
            props.setProperty(Context.PROVIDER_URL, "localhost:1099");
            props.setProperty(Context.URL_PKG_PREFIXES, "org.jboss.naming:org.jnp.interfaces");
            InitialContext ctx = new InitialContext(props);
           
            TopicConnectionFactory factory = (TopicConnectionFactory) ctx.lookup("TopicConnectionFactory");
            conn = factory.createTopicConnection();
            session = conn.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
            Destination destination = (Topic) ctx.lookup("topic/chatTopic");
            MessageProducer producer = session.createProducer(destination);
            //发送文本
            TextMessage msg = session.createTextMessage("您好,这是我的第一个消息驱动Bean");
            producer.send(msg);
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }finally{
            try {
                session.close ();
                conn.close();
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
}
   3、消息的消费者MDB
第一个MDB:TopicPrintBeanOne.java
import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

@MessageDriven(activationConfig =
{
  @ActivationConfigProperty(propertyName="destinationType",
    propertyValue="javax.jms.Topic"),
  @ActivationConfigProperty(propertyName="destination",
    propertyValue="topic/chatTopic")
})
public class TopicPrintBeanOne implements MessageListener{
   
    public void onMessage(Message msg) {
        try {           
            TextMessage tmsg = (TextMessage) msg;
            String content = tmsg.getText();
            System.out.println(this.getClass().getName()+"=="+ content);             
        } catch (Exception e){
            e.printStackTrace();
        }
    }
}
第二个MDB:TopicPrintBeanTwo.java
import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

@MessageDriven(activationConfig =
{
  @ActivationConfigProperty(propertyName="destinationType",
    propertyValue="javax.jms.Topic"),
  @ActivationConfigProperty(propertyName="destination",
    propertyValue="topic/chatTopic")
})
public class TopicPrintBeanTwo implements MessageListener{

    public void onMessage(Message msg) {
        try {
            TextMessage tmsg = (TextMessage) msg;
            String content = tmsg.getText();
            System.out.println(this.getClass().getName()+"=="+ content);           
        } catch (Exception e){
            e.printStackTrace();
        }
    }
}
四、 在Session bean中发送消息
1、在Session bean中发送Queue消息实例如下:
   package com.foshanshop.ejb3.impl;
import javax.annotation.Resource;
import javax.ejb.Remote;
import javax.ejb.Stateless;
import javax.jms.BytesMessage;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;

import com.foshanshop.ejb3.QSender;
import com.foshanshop.ejb3.bean.Man;

/**
* 发送Queue消息
* @author yuexiangcheng
*
*/
@Stateless
@Remote (QSender.class)
public class QSenderBean implements QSender {
    @Resource(mappedName="QueueConnectionFactory") private QueueConnectionFactory factory;
    @Resource(mappedName="queue/foshanshop") private Queue destination;
   
public void send() {
        QueueConnection conn = null;
        QueueSession session = null;
        try {           
            conn = factory.createQueueConnection();
            session = conn.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
            MessageProducer producer = session.createProducer(destination);
           
            //发送文本
            TextMessage msg = session.createTextMessage("佛山人您好,这是我的第一个消息驱动Bean");
            producer.send(msg);
           
            //发送Ojbect(对象必须实现序列化,否则等着出错吧)
            producer.send(session.createObjectMessage(new Man("大美女", "北京朝阳区和平里一号")));
           
            //发送MapMessage
            MapMessage mapmsg = session.createMapMessage();
            mapmsg.setObject("no1", "北京和平里一号");
            producer.send(mapmsg);
           
            //发送BytesMessage
            BytesMessage bmsg = session.createBytesMessage();
            bmsg.writeBytes("我是一个兵,来自老百姓".getBytes());
            producer.send(bmsg);           
           
            //发送StreamMessage
            StreamMessage smsg = session.createStreamMessage();
            smsg.writeString("巴巴运动网,http://www.babasport.com");
            producer.send(smsg);
        }catch (Exception e){
            e.printStackTrace();
        }finally{
            try {
                session.close ();
                conn.close();
            } catch (JMSException e) {
            e.printStackTrace();
            }
        }
}

}
2、在Session bean中发送Topic消息实例如下:
package com.foshanshop.ejb3.impl;

import javax.annotation.Resource;
import javax.ejb.Remote;
import javax.ejb.Stateless;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;

import com.foshanshop.ejb3.TSender;
/**
* 发送Topic消息
* @author yuexiangcheng
*
*/
@Stateless
@Remote (TSender.class)
public class TSenderBean implements TSender {
    @Resource(mappedName="TopicConnectionFactory") private TopicConnectionFactory factory;
    @Resource(mappedName="topic/chatTopic") private Topic destination;
   
public void send(String msg) {
TopicConnection conn = null;
TopicSession session = null;
        try {           
            conn = factory.createTopicConnection();
            session = conn.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
            MessageProducer producer = session.createProducer(destination);
            TextMessage text = session.createTextMessage(msg);
            producer.send(text);
        }catch (Exception e){
            e.printStackTrace();
        }finally{
            try {
                session.close ();
                conn.close();
            } catch (JMSException e) {
            e.printStackTrace();
            }
        }
}
}
五、 消息驱动bean消费远程的JMS消息
以上的例子都是处理本地JMS消息,那么如何实现Jboss下的消息驱动bean消费远程的JMS消息,可以通过以下两步来实现。
1、创建目标地址文件,该文件命名规则为*-service.xml(*为自定义的一个或多个字符),该文件编写好后放在jboss的deploy目录下,这样就可以在本地得到一个从远程获取的JMS provider。代码如下:
<mbean code="org.jboss.jms.jndi.JMSProviderLoader" name="jboss.mq:service=JMSProviderLoader,name=RemoteJMSProvider,server=remotehost">
    <attribute name="ProviderName">RemoteJMSProvider</attribute>
    <attribute name="ProviderAdapterClass">org.jboss.jms.jndi.JNDIProviderAdapter</attribute>
    <!-- The connection factory -->
    <attribute name="FactoryRef">UIL2XAConnectionFactory</attribute>
    <!-- The queue connection factory -->
    <attribute name="QueueFactoryRef">UIL2XAConnectionFactory</attribute>
    <!-- The topic factory -->
    <attribute name="TopicFactoryRef">UIL2XAConnectionFactory</attribute>
    <!-- Connect to JNDI on the host "the-remote-host-name" port 1099-->
    <attribute name="Properties">
        java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory
        java.naming.factory.url.pkgs=org.jnp.interfaces
        java.naming.provider.url=192.168.1.101:1099
    </attribute>
  </mbean>
把这行java.naming.provider.url=192.168.1.101:1099代码中的IP地址(192.168.1.101)该成你要获取消息的远程服务器IP地址。
2、编写MDB类,代码如下:
   @MessageDriven(activateConfig = {
        @ActivationConfigProperty(propertyName="destinationType", propertyValue="javax.jms.Queue"),
        @ActivationConfigProperty(propertyName="destination", propertyValue="queue/testQueue"),
        @ActivationConfigProperty(propertyName="providerAdapterJNDI", propertyValue="java:/RemoteJMSProvider")
    })
    public class MDB implements MessageListener {
        ...
    }
3
0
分享到:
评论
3 楼 lvwenwen 2012-05-23  
不错,楼主要是能上传能跑的工程就更好了,楼主加油!
2 楼 hancslg 2010-05-20  
necessary of my,thx
1 楼 lhhuai 2010-01-24  
好文章,研究中

相关推荐

Global site tag (gtag.js) - Google Analytics