Como comunicar Nodos Wildfly independientes con mensajeria asincrónica (Implementación de JMS)

Fundamentos de JMS e implementación

JMS o Java Message Service es una solución provista por Java para el uso de colas de mensajería, nos permite armar sistemas distribuidos, intercomunicando MDBs (tipo especial de EJB) o aplicaciones.

JMS nos permite crear, enviar, recibir y leer los mensajes de forma síncrona y asíncrona entre aplicaciones y entre nodos de servidor de aplicaciones.

Modelo de implementación de JMS

Esquema de conceptos JMS
  • ConnectionFactory
    • Objeto usado por el cliente (el cual puede ser un Producer o un Consumer) para generar conexiones con el proveedor.
  • Connection
    • Un objeto Connection encapsula una conexión con el proveedor de JMS por TCP/IP, al crear una se generan una serie de objetos para gestionar la mensajería.
  • Session
    • Contexto monohilo usado para generar Producers, Consumers, Messages y Queues.
  • Message Producer
    • Objeto creado por una Session que se utiliza para enviar mensajes a un destino.
  • Message Consumer
    • Objeto creado por una Session utilizado para recibir mensajes enviados desde un destino.
  • Message
    • El objeto Message encapsula información para intercambiar mensajes entre aplicaciones
    • Contiene 3 componentes: Campos de cabecera, propiedades específicas de la aplicación y el cuerpo del mensaje.

Mapa conceptual

Modelo de referencia de Wildfly 26.1.1

A continuación, se definen los dos modelos de implementación con el código que se necesita para implementarlos.

Modelo Punto a punto

En el modelo punto a punto puede haber varios Consumidores, pero solo uno puede recibir un mensaje, también puede haber varios Productores.

El productor produce el mensaje y lo envía a la Cola, para que luego el Consumidor reciba el mensaje y lo notifique a la misma.

Modelo Punto a punto de mensajería

Tanto emisores como consumidores pueden ser artefactos deployados en distintos contenedores Java EE (Wildfly, Glassfish, etc).

Un Consumidor es un MessageListener y está siempre a la escucha de nuevos mensajes que sean enviados por los emisores e introducidos en la cola.

El bus de JMS se encarga de que cada mensaje sea consumido por un único consumidor.

Si no hay ningún Consumidor deployado, el Producer deja el Message en el bróker, y el mismo queda en la cola.

Modelo Publicador/Suscriptor

En el modelo Publicador/Suscriptor por el contrario está pensado para hacer broadcast de mensajería enviando de uno a muchos.

El Productor desconoce cuántos Consumidores recibirán el mensaje, y todos los consumidores pueden recibir una copia de este.

En este modelo el destino no es una Cola sino un Tópico, por lo cual los mensajes no se encolan, y un nuevo mensaje en el tópico sobrescribe al existente.

Modelo Publicador/Suscriptor de mensajería asincrónica

Implementación de prototipo en código Java

Codificación base de un Consumer (MDB)

A continuación, se observa el código mínimo de un mdb:

@MessageDriven(name = "Nodo1toNodo2Consumer", activationConfig={
        @ActivationConfigProperty(propertyName="destinationType", propertyValue = "javax.jms.Queue"),
        @ActivationConfigProperty(propertyName = "destinationLookup", propertyValue ="java:/Nodo1toNodo2"),
        @ActivationConfigProperty(propertyName="acknowledgeMode", propertyValue="Auto-acknowledge")
})
@ResourceAdapter(value = "remote-artemis")
public class COREtoEXT implements MessageListener {
    private static final Logger log = LogManager.getLogger(Nodo1toNodo2.class);

    @Resource
    private MessageDrivenContext context;

    @Override
    public void onMessage(Message message) {
        TextMessage msg = null;
        try{
            if(message instanceof TextMessage){
                msg = (TextMessage) message;
                /*Recibido MDB*/
                log.info("CONSUMER Nodo1toNodo2 RECIBE MENSAJE: " + msg.getText() );
            }else{
                System.out.println("El mensaje no es del tipo texto");
            }
        } catch (Throwable t){
            System.out.println("Exception: " + t.getLocalizedMessage());
        }
    }

}

En destinationLookup, colocamos el lookup a la cola, que luego se configurará en Wildfly en el Standalone(java:/Nodo1ToNodo2).

En el tag @ResourceAdapter indicamos el conector a usar, que será el remote-artemis .

La clase implementa la interfaz MessageListener, entonces en el método onMessage debemos implementar toda la lógica que debe suceder al consumir un mensaje. En este caso, solo tiramos un log en Wildfly.

Una vez deployado el artefacto que contenga esta clase, quedará el mdb activo, listo para consumir mensajes. Por defecto crea 15 instancias, y si la cola no existe, la crea automáticamente.

Codificación de WebService con un Producer

Este webservice se encarga de producir n cantidad de mensajes sobre la cola especificada.

Si hay consumers disponibles los mensajes son procesados en el momento, de lo contrario quedan encolados para ser procesados posteriormente.

A continuación, el código:

@Stateless
@WebService(serviceName="EjbMessaging")
@SOAPBinding(style=SOAPBinding.Style.RPC)
@Remote(RemoteEjbMessaging.class)
public class EjbMessaging implements RemoteEjbMessaging {
    private static final Logger log = LogManager.getLogger(EjbMessaging.class.toString());
    private Level logInfo = Level.forName("INFO", 550);

    @Inject
    @JMSConnectionFactory("java:/RemoteJmsXA")
    private JMSContext context;

    @Resource(lookup = "java:/Nodo1toNodo2")
    private Queue q_Nodo1toNodo2;
    @Resource(lookup = "java:/Nodo2toNodo1")
    private Queue q_Nodo2toNodo1;

    @WebMethod(operationName="enviarMensajesCola")
    @Override
    public boolean enviarMensajeCola(@WebParam(name = "QueueJNDI") String QueueJNDI, @WebParam(name = "cantMensajes") int cantMensajes) {
        try{
            sendMessages(context, QueueJNDI, cantMensajes);
        } catch (Exception ex) {
            log.log(logInfo,ex.getLocalizedMessage());
        }
        return true;

    }
    private void sendMessages(JMSContext context, String QueueJNDI, int cantMensajes) throws JMSException {
        log.log(logInfo, "Cola seleccionada: " + QueueJNDI);
        Queue queue = null;
        switch(QueueJNDI){
            case "Nodo1toNodo2":
                queue = q_Nodo1ToNodo2;
                break;
            case "Nodo2ToNodo1":
                queue = q_Nodo2ToNodo1;
                break;
                 break;
              }
        for(int i = 0; i<cantMensajes;i++){
            String mensaje = new Date() + " // Codigo de mensaje = " + Util.generateRandomString() + " // Nro de iteración: " + i;
            log.log(logInfo, "Iteración nro:" + i + " // Mensaje a enviar::: " + mensaje);
            JMSProducer producer = context.createProducer();
            producer.send(queue, mensaje);
        }
    }

}

En las annotations se observa efectivamente que se conecta a la dirección java:/RemoteJmsXA , y como resources están agregadas las colas, como ser una de ellas java:/Nodo1ToNodo2.

Por otro lado, si vamos específicamente a la acción de enviar un mensaje, las dos líneas a usar son las siguientes:

JMSProducer producer = context.createProducer();
producer.send(queue, mensaje);

Donde queue es una de las 2 colas con prefijos “q_” y el mensaje es el string a enviar, que puede ser un base64 con un archivo.

Filters

La idea de los filters es poder clasificar los mensajes de una misma cola en grupos, para que cada grupo de mensajes cumpla una determinada condición.

Entonces de esta forma, nosotros tendremos un consumer implementado por cada filter.

El filter por default sería un consumer que no incluye el atributo messageSelector.

Consumer con Filter

Un consumer de un determinado filter quedaría así:

@MessageDriven(name = "Nodo1ToNodo2ConsumerType1", activationConfig = {
        @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
        @ActivationConfigProperty(propertyName = "destinationLookup", propertyValue = "java:/Nodo1toNodo2"),
        @ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Auto-acknowledge"),
        @ActivationConfigProperty(propertyName = "messageSelector", propertyValue = "TypeMessage='Type1'")
})
@ResourceAdapter(value = "remote-artemis")
public class Nodo1ToNodo2Type1 implements MessageListener {
    private static final Logger log = LogManager.getLogger(Nodo1ToNodo2Type1.class);

    @Resource
    private MessageDrivenContext context;

    @Override
    public void onMessage(Message message) {
        TextMessage msg = null;
        try {
            if (message instanceof TextMessage) {
                msg = (TextMessage) message;
                /*Recibido MDB*/
                log.info("CONSUMER Nodo1ToNodo2 RECIBE MENSAJE TIPO Type1: " + msg.getText());
            } else {
                System.out.println("El mensaje no es del tipo Type1");
            }
        } catch (Throwable t) {
            System.out.println("Exception: " + t.getLocalizedMessage());
        }
    }

}

Se agrega la ActivationConfigProperty “messageSelector”, en la cual indicamos con sintaxis SQL una condición que debe cumplir el ObjectMessage.

En este caso una property llamada “TypeMessage” debe tener el valor “Type1”, para que el mensaje entre en este consumer.

Esta implementación nos permite tener un comportamiento distinto para cada filter, o para el consumer por defecto sin messageSelector, si deseamos incluirlo.

Producer con Filter

En el caso del producer además de usar los siguientes recursos:

@Inject
@JMSConnectionFactory("java:/RemoteJmsXA")
private JMSContext context;

@Resource(lookup = "java:/Nodo1ToNodo2")
private Queue q_Nodo1toNodo2;
@Resource(lookup = "java:/Nodo2ToNodo1")
private Queue q_Nodo2toNodo1;

Ahora debemos observar el siguiente código:

private void sendMessagesFilter(JMSContext context, String QueueJNDI, int cantMensajes, String TypeMessage) throws JMSException {
    log.log(logInfo, "Cola seleccionada: " + QueueJNDI);
    Queue queue = selectorQueue(QueueJNDI);
    for(int i = 0; i<cantMensajes;i++){
        ObjectMessage objectMessage = context.createObjectMessage();
        objectMessage.setStringProperty("TypeMessage",TypeMessage);
        String mensaje = new Date() + " // Codigo de mensaje = " + Util.generateRandomString() + " // Nro de iteración: " + i;
        log.log(logInfo, "Iteración nro:" + i + " // Mensaje a enviar::: " + mensaje);
        objectMessage.setStringProperty("Mensaje", mensaje);
        JMSProducer producer = context.createProducer();
        producer.send(queue, objectMessage);
    }
}

Usamos un denominado ObjectMessage que es un objeto del paquete jms de java, en el encapsulamos properties, y si enviamos un objeto, el mismo debe ser un Serializable.

En este caso observemos que la propiedad “TypeMessage” adquiere un valor de entrada dado por el webservice, si el mismo es “Type1” es consumido por Artemis, de lo contrario si no cumple con las condiciones de ningún otro consumer queda en espera.

Configuración de nodos

Todos los nodos Wildfly se comunicarán entre ellos a través de un nodo ActiveMQ Artemis, que será el broker de mensajería, de la siguiente manera:

Diagrama de componentes

Ahora bien, se detallará la configuración que se debe hacer en el Nodo1 (standalone-full-ha.xml), Nodo2(standalone-full-ha.xml), y en el Broker (ficheros broker.xml, jolokia-access.xml, bootstrap.xml)

Configuración en Wildfly Nodo1 y Nodo2

En Wildfly debemos realizar una serie de configuraciones en el standalone-full-ha.xml para poder manejar los lookup por JNDI a las colas externas de Artemis.

En el subsistema messaging-activemq:13.1 debemos agregar la siguiente configuración:

<subsystem xmlns="urn:jboss:domain:messaging-activemq:13.1">
<server name="default">
    <security elytron-domain="ApplicationDomain"/>
    <security-setting name="#">
        <role name="guest" send="true" consume="true" create-non-durable-queue="true" delete-non-durable-queue="true"/>
    </security-setting>
    <address-setting name="#" dead-letter-address="jms.queue.DLQ" expiry-address="jms.queue.ExpiryQueue" redistribution-delay="1000"/>
    <http-connector name="http-connector" socket-binding="http" endpoint="http-acceptor"/>
    <http-connector name="http-connector-throughput" socket-binding="http" endpoint="http-acceptor-throughput">
        <param name="batch-delay" value="50"/>
    </http-connector>
    <remote-connector name="remote-artemis" socket-binding="remote-artemis"/>
    <in-vm-connector name="in-vm" server-id="0">
        <param name="buffer-pooling" value="false"/>
    </in-vm-connector>
    <http-acceptor name="http-acceptor" http-listener="default"/>
    <http-acceptor name="http-acceptor-throughput" http-listener="default">
        <param name="batch-delay" value="50"/>
        <param name="direct-deliver" value="false"/>
    </http-acceptor>
    <in-vm-acceptor name="in-vm" server-id="0">
        <param name="buffer-pooling" value="false"/>
    </in-vm-acceptor>
    <jms-queue name="ExpiryQueue" entries="java:/jms/queue/ExpiryQueue"/>
    <jms-queue name="DLQ" entries="java:/jms/queue/DLQ"/>
    <connection-factory name="InVmConnectionFactory" entries="java:/ConnectionFactory" connectors="in-vm"/>
    <connection-factory name="RemoteConnectionFactory" entries="java:jboss/exported/jms/RemoteConnectionFactory" connectors="http-connector" ha="true" block-on-acknowledge="true" reconnect-attempts="-1"/>
    <pooled-connection-factory name="activemq-ra" entries="java:/JmsXA java:jboss/DefaultJMSConnectionFactory" connectors="in-vm" transaction="xa"/>
    <pooled-connection-factory name="remote-artemis" entries="java:/RemoteJmsXA java:jboss/RemoteJmsXA" connectors="remote-artemis" ha="false" user="user" password="user1234" min-pool-size="15" max-pool-size="30" statistics-enabled="true">
        <inbound-config rebalance-connections="true" setup-attempts="-1" setup-interval="5000"/>
    </pooled-connection-factory>
</server>
</subsystem>

Se debe configurar un remote-connector que apuntará al servidor remoto de Artemis, el mismo debe tener un pool de connection factory donde se configura la entry java:/RemoteJmsXA , que es la que usaremos para hacer lookup en nuestro producer.

El conector in-vm, su connection factory y su respectivo pooled connection factory, son los usados para trabajar con el activemq bundled de Wildfly, el cual en este caso funcionará como cliente.

El remote-connector denominado remote-artemis, tendrá una configuración que lo redirecciona a nuestro servidor Artemis en su puerto 61616, dentro del tag socket-binding-group :

<outbound-socket-binding name="remote-artemis">
    <remote-destination host="192.168.10.8" port="61616"/>
</outbound-socket-binding>

También debemos adicionar la configuración en el subsistema naming:2.0 en el cual seteamos los lookups y el acceso al servidor Artemis:

<bindings>
    <external-context name="java:global/remoteContext" module="org.apache.activemq.artemis" class="javax.naming.InitialContext">
        <environment>
            <property name="java.naming.factory.initial" value="org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory"/>
            <property name="java.naming.provider.url" value="tcp://192.168.10.8:61616"/>
            <property name="connectionFactory.ConnectionFactory" value="tcp://192.168.10.8:61616"/>
            <property name="queue.Nodo1toNodo2" value="Nodo1toNodo2"/>
            <property name="queue.Nodo2toNodo1" value="Nodo2ToNodo1"/>
        </environment>
    </external-context>
    <lookup name="java:/Nodo1toNodo2" lookup="java:global/remoteContext/Nodo1toNodo2"/>
    <lookup name="java:/Nodo2toNodo1" lookup="java:global/remoteContext/Nodo2toNodo1"/>
</bindings>
<remote-naming/>

Si observamos, se configura un context remoto, donde java:/Nodo1toNodo2 es el lookup a una de las colas que realizaremos tanto en el producer como en el consumer. Entonces, para configurar otras colas debemos seguir la lógica de la sintaxis expuesta.

Configuración en ActiveMQ Artemis Broker

broker.xml

Dentro del tag <acceptors> debemos agregar lo siguiente:

<acceptor name="artemis">tcp://192.168.10.8:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true;supportAdvisory=false;suppressInternalManagementObjects=false</acceptor>

Es la url de acceso a nuestro Artemis, en adición de algunas propiedades.

jolokia-access.xml

Me sucedió al implementar Artemis con un nodo docker, que al visualizar el backoffice aparecía todo en blanco sin mostrar ningún dato o interfaz gráfica al respecto.

blank web console
Interfaz de ActiveMQ en blanco

Al principio intenté implementar esta solución de stackoverflow, con un proxy nginx, pero no es la mejor manera de arreglar el problema.

La solución que encontré en este caso fue configurar el jolokia-access de esta manera:

<restrict>
<cors>
    <!-- Allow cross origin access from localhost ... -->
    <allow-origin>*://*</allow-origin>
    <!-- Options from this point on are auto-generated by Create.java from the Artemis CLI -->
    <!-- Check for the proper origin on the server side, too -->
    <strict-checking/>
</cors>
</restrict>

bootstrap.xml

En este fichero se establece la url y puerto de acceso al backoffice:

<broker xmlns="http://activemq.apache.org/schema">
<jaas-security domain="activemq"/>
<!-- artemis.URI.instance is parsed from artemis.instance by the CLI startup.
     This is to avoid situations where you could have spaces or special characters on this URI -->
<server configuration="file:/opt/artemis/apache-artemis-2.26.0/bin/broker/etc/broker.xml"/>

<!-- The web server is only bound to localhost by default -->
<web path="web" rootRedirectLocation="console">
    <binding uri="http://192.168.10.8:8161">
        <app url="activemq-branding" war="activemq-branding.war"/>
        <app url="artemis-plugin" war="artemis-plugin.war"/>
        <app url="console" war="console.war"/>
    </binding>
</web>
</broker>

Configurando los ambientes de esta manera obtenemos una infraestructura de servidores de aplicaciones comunicados con mensajería asincrónica.

Leave a Reply

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *