Broker A-MQ embedded (Resource Adapter) no JBoss EAP

Network Connector VM

O conector de transporte do tipo vm:// é usado por aplicativos Java para executar um broker embedded e se conectar a ele. Nenhuma conexão de redes é utilizada neste caso, então a comunicação entre cliente e broker acontece através de invocações de método diretamente, aumentando a performance significamente. O broker é iniciado quando a primeira conexão usando o protocolo VM é criada e as próximas conexões vindas da mesma JVM irão se conectar ao mesmo broker. Quando todas as conexões VM ao broker forem fechadas, o broker embedded será desligado automaticamente.

webconsole

Configuração do JBoss EAP 6.1

1) Baixe o JBoss EAP 6.1 diretamente do site.

2) Descompacte-o num local de sua preferência. No meu caso utilizei o /opt.

3) Edite o arquivo /opt/jboss-eap-6.1/standalone/configuration/standalone-full.xml adicionando o seguinte trecho no perfil <subsystem xmlns=”urn:jboss:domain:resource-adapters:1.1″> :

<subsystem xmlns=”urn:jboss:domain:resource-adapters:1.1″>
<resource-adapters>
<resource-adapter id=”activemq-rar-5.8.0.rar”>
<archive>
activemq-rar-5.8.0.rar
</archive>
<transaction-support>XATransaction</transaction-support>
<config-property name=”UseInboundSession”>
false
</config-property>
<config-property name=”Password”>
defaultPassword
</config-property>
<config-property name=”UserName”>
defaultUser
</config-property>
<config-property name=”ServerUrl”>
vm://localhost
</config-property>
<connection-definitions>
<connection-definition class-name=”org.apache.activemq.ra.ActiveMQManagedConnectionFactory” jndi-name=”java:jboss/ConnectionFactory” enabled=”true” pool-name=”ConnectionFactory”>
<xa-pool>
<min-pool-size>1</min-pool-size>
<max-pool-size>20</max-pool-size>
</xa-pool>
</connection-definition>
</connection-definitions>
<admin-objects>
<admin-object class-name=”org.apache.activemq.command.ActiveMQQueue” jndi-name=”java:jboss/queue/MyActiveMQQueue” use-java-context=”true” pool-name=”MyActiveMQQueue”>
<config-property name=”PhysicalName”>
QueuePhysicalName
</config-property>
</admin-object>
</admin-objects>
</resource-adapter>
</resource-adapters>
</subsystem>

Configuração do ActiveMQ Resource Adapter

1) Baixe o JBoss A-MQ diretamente do site.

O resource adapter está escondido em: extras/apache-activemq-5.8.0.redhat-60024-bin.zip/lib/optional/activemq-rar-5.8.0.redhat-60024.rar

Obs: altere a extensão de activemq-rar-5.8.0.redhat-60024.rar para activemq-rar-5.8.0.redhat-60024.zip e descompacte-o.

2) Depois de extrair o resource adapter substitua o corpo do arquivo broker-config.xml com o seguinte conteúdo:

<?xml version=”1.0″ encoding=”UTF-8″?>

<beans
xmlns=”http://www.springframework.org/schema/beans&#8221;
xmlns:amq=”http://activemq.apache.org/schema/core&#8221;
xmlns:xsi=”http://www.w3.org/2001/XMLSchema-instance&#8221;
xsi:schemaLocation=”http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd”&gt;

<!– Embedded – load configuration file from classpath –>
<bean class=”org.springframework.beans.factory.config.PropertyPlaceholderConfigurer”/>

<!– shutdown hook is disabled as RAR classloader may be gone at shutdown –>
<!– Embedded – Name the broker –>
<broker xmlns=”http://activemq.apache.org/schema/core&#8221; useJmx=”true” brokerName=”JBoss.Embedded” useShutdownHook=”false”>

<managementContext>
<!– use appserver provided context instead of creating one,  for jboss use: -Djboss.platform.mbeanserver –>
<managementContext createConnector=”false”/>
</managementContext>

<persistenceAdapter>
<!– Embedded – Use the server data directory for the broker database –>
<kahaDB directory=”${jboss.server.data.dir}/kahadb”/>
</persistenceAdapter>

<transportConnectors>
<!– Embedded – The embeded broker uses the in-JVM transport, and a NIO transport is also provided.
The “nio” below may be changed – use TCP, bind to just one interface, etc..
–>
<transportConnector name=”JBoss.Embedded” uri=”vm://localhost”/>
<transportConnector name=”nio” uri=”nio://0.0.0.0:61616″/>
</transportConnectors>

</broker>
</beans>

3) No arquivo META-INF/ra.xml procure a linha e descomente:

<config-property-value>tcp://localhost:61616</config-property-value>

Em seguida, descomente a linha:

<!–config-property-value>vm://localhost</config-property-value–>

Resultado:

<!– config-property-value>tcp://localhost:61616</config-property-value –>
<config-property-value>vm://localhost</config-property-value>

4) Remova as seguintes libs do resource adapter para não haver conflito com as já existentes no EAP:

  • slf4j-api-1.6.6.jar
  • slf4j-log4j12-1.6.6.jar

5) Depois de tudo alterado, compacte o arquivo .zip e renomeie-o para activemq-rar-5.8.0.rar e o mova para /opt/jboss-eap-6.1/standalone/deployments/

Inicialização do JBoss EAP 6.1   

Neste ponto temos o JBoss EAP 6.1 com o resource adapter configurado e copiado para a pasta deployments. Para inicializá-lo com o perfil customizado execute o seguinte comando na pasta a /opt/jboss-eap-6.1/bin: 

./standalone.sh -c standalone-full.xml

Console 

19:07:35,582 INFO [org.jboss.as.messaging] (ServerService Thread Pool — 58) JBAS011601: Bound messaging object to jndi name java:/ConnectionFactory
19:07:35,583 INFO [org.jboss.as.messaging] (ServerService Thread Pool — 59) JBAS011601: Bound messaging object to jndi name java:jboss/exported/jms/RemoteConnectionFactory
19:07:35,632 INFO [org.jboss.as.connector.deployment] (MSC service thread 1-7) JBAS010406: Registered connection factory java:/JmsXA
19:07:35,666 INFO [org.hornetq.ra] (MSC service thread 1-7) HornetQ resource adaptor started
19:07:35,667 INFO [org.jboss.as.connector.services.resourceadapters.ResourceAdapterActivatorService$ResourceAdapterActivator] (MSC service thread 1-7) IJ020002: Deployed: file://RaActivatorhornetq-ra
19:07:35,670 INFO [org.jboss.as.connector.deployment] (MSC service thread 1-2) JBAS010401: Bound JCA ConnectionFactory [java:/JmsXA]
19:07:35,830 INFO [org.jboss.as] (Controller Boot Thread) JBAS015961: Http management interface listening on http://127.0.0.1:9990/management
19:07:35,831 INFO [org.jboss.as] (Controller Boot Thread) JBAS015951: Admin console listening on http://127.0.0.1:9990
19:07:35,831 INFO [org.jboss.as] (Controller Boot Thread) JBAS015874: JBoss EAP 6.1.0.GA (AS 7.2.0.Final-redhat-8) started in 3487ms – Started 154 of 216 services (61 services are passive or on-demand)
19:08:25,127 INFO [org.jboss.as.server.deployment] (MSC service thread 1-4) JBAS015876: Starting deployment of “activemq-rar-5.8.0.rar” (runtime-name: “activemq-rar-5.8.0.rar”)
19:08:25,892 INFO [org.jboss.as.connector.deployers.RADeployer] (MSC service thread 1-2) IJ020001: Required license terms for file:/opt/jboss-eap-6.1/standalone/tmp/vfs/temp188ec44f68d80f15/activemq-rar-5.8.0.rar-12de4b0407eb87cc/contents/
19:08:25,923 INFO [org.jboss.as.connector.deployers.RaXmlDeployer] (MSC service thread 1-5) IJ020001: Required license terms for file:/opt/jboss-eap-6.1/standalone/tmp/vfs/temp188ec44f68d80f15/activemq-rar-5.8.0.rar-12de4b0407eb87cc/contents/
19:08:25,931 INFO [org.jboss.as.connector.deployment] (MSC service thread 1-5) JBAS010406: Registered connection factory java:jboss/ConnectionFactory
19:08:25,937 WARN [org.jboss.as.connector.deployers.RaXmlDeployer] (MSC service thread 1-5) IJ020016: Missing <recovery> element. XA recovery disabled for: java:jboss/ConnectionFactory
19:08:25,940 INFO [org.jboss.as.connector.deployment] (MSC service thread 1-5) JBAS010405: Registered admin object at java:jboss/queue/MyActiveMQQueue
19:08:25,945 INFO [org.jboss.as.connector.deployers.RaXmlDeployer] (MSC service thread 1-5) IJ020002: Deployed: file:/opt/jboss-eap-6.1/standalone/tmp/vfs/temp188ec44f68d80f15/activemq-rar-5.8.0.rar-12de4b0407eb87cc/contents/
19:08:25,950 INFO [org.jboss.as.connector.deployment] (MSC service thread 1-1) JBAS010401: Bound JCA ConnectionFactory [java:jboss/ConnectionFactory]
19:08:25,951 INFO [org.jboss.as.connector.deployment] (MSC service thread 1-6) JBAS010401: Bound JCA AdminObject [java:jboss/queue/MyActiveMQQueue]
19:08:26,118 INFO [org.jboss.as.server] (DeploymentScanner-threads – 2) JBAS018559: Deployed “activemq-rar-5.8.0.rar” (runtime-name : “activemq-rar-5.8.0.rar”)

Producer exemplo

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.naming.InitialContext;
import javax.naming.NamingException;

/**
*
* @author sfantin
*
*/
public class Producer {

public void produce() throws JMSException, NamingException {
try {
// Obtain a JNDI connection
InitialContext jndi = new InitialContext();

// Look up a JMS connection factory
ConnectionFactory conFactory = (ConnectionFactory) jndi.lookup(“java:jboss/ConnectionFactory”);

// Getting JMS connection from the server and starting it
Connection connection = conFactory.createConnection();
connection.start();

// JMS messages are sent and received using a Session. We will
// create here a non-transactional session object. If you want
// to use transactions you should set the first parameter to ‘true’
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

Destination destination = (Destination) jndi.lookup(“java:jboss/queue/MyActiveMQQueue”);

// MessageProducer is used for sending messages (as opposed
// to MessageConsumer which is used for receiving them)
MessageProducer producer = session.createProducer(destination);

for (int i = 0; i < 1000; i++) {
producer.send(session.createTextMessage(i + ” message”));
System.out.println(“Sent message ” + i);
}
producer.close(); session.close(); connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}

Obs: Assim que o primeiro produtor enviar uma mensagem para a fila MyActiveMQQueue, o message store jboss-eap-6.1/bin/activemq-data/localhost/KahaDB será criado.

MDB exemplo

import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
import javax.jms.Message;
import javax.jms.MessageListener;

import org.apache.log4j.Logger;
import org.jboss.ejb3.annotation.ResourceAdapter;

/**
*
* @author sfantin
*
*/
@MessageDriven( activationConfig = {
@ActivationConfigProperty(propertyName=”destinationType”, propertyValue=”javax.jms.Queue”),
@ActivationConfigProperty(propertyName=”destination”, propertyValue=”MyActiveMQQueue”),
@ActivationConfigProperty(propertyName=”acknowledgeMode”, propertyValue=”Auto-acknowledge”)
})
@ResourceAdapter(value=”activemq-rar-5.8.0.rar”)
public class SampleMDB implements MessageListener {

private static final Logger LOG = Logger
.getLogger(SampleMDB.class);

/*
* (non-Javadoc)
* @see javax.jms.MessageListener#onMessage(javax.jms.Message)
*/
@Override
public void onMessage(final Message msg) {
LOG.log(null, msg);

System.out.println(“received message: %s” + msg);

}

}

Download do JBoss EAP 6.1 configurado

Disponibilizei o JBoss EAP com todas as configurações comentadas neste tutorial.

JBossEAP6.1.zip

Obs: o activemq-rar-5.8.0.rar pode ser encontrado dentro do diretório /jboss-eap-6.1/standalone/deployments/

Até o próximo post!


Livro : ActiveMQ in Action

ActiveMQ in Action

Em busca de reforçar os principais conceitos e estratégias que garantem Alta Disponibilidade (HA) em arquitetura de mensageria distribuída, lí o livro ActiveMQ in Action que cobre as principais características e opções de configuração que o ActiveMQ oferece. Apesar de não estar atualizado, o livro pode ser utilizado como ponto de partida para aqueles que prentendem utilizar todo o potencial que o ActiveMQ tem a oferecer, pois ele cobre desde conceitos básicos até configurações mais avançadas para ambientes corporativos em produção.
Conversei com Bruce Snyder (um dos escritores do livro) que comentou que ainda não há previsão de uma nova versão do livro cobrindo as versões mais novas do ActiveMQ. É uma pena, pois houve mudanças significativas no projeto como a depreciação da estratégia de cluster Master/Slave puro (sem message store compartilhado), entre outras.

Boa leitura!


JBoss A-MQ : Consumidor de mensagens utilizando protocolo fabric

Introdução

O Fuse Fabric discovery agent utiliza o protocolo fabric para a descoberta de brokers que estão em um grupo específico. Para que isto aconteça o discovery agent precisa que todos os brokers sejam implantados em somente uma fábrica. Quando um cliente tenta se conectar a um broker, o agent procura por todos os brokers disponíveis no registro do fabric e os retorna.

fabric_consumer

Características

  • O protocolo fabric garante failover automaticamente
  • Clientes não precisam saber onde os brokers estão localizados 
  • Funciona como o transporte failover
  • Oferece opções para reconexão:

discovery:(fabric:usa-group)?reconnectDelay=1000&useExponentialBackOff=false

  • Clientes precisam ter a URL do Zookeeper

Como configurar o broker

Para saber em detalhes como fazer o deploy de brokers utilizando o Fuse Fabric, dê uma olhada no post Cluster JBoss A-MQ : Master/Slave Network of Brokers utilizando Fuse Fabric.

 Como configurar o cliente

Para saber em detalhes como criar um client que envia mensagens para o Fuse Fabric, dê uma olhada no post JBoss A-MQ : Produtor de mensagens utilizando protocolo fabric.

Setando o zookeeper.url

Para que o cliente consiga encontrar corretamente o grupo de brokers, precisamos adicionar a propriedade zookeeper.url que indica a instância do Fuse Fabric’s Zookeeper, no meu caso rodando localmente adicionei no pom.xml do meu projeto:

<zookeeper.url>localhost:2181</zookeeper.url>
<zookeeper.password>admin</zookeeper.password>

Download do projeto completo

Consumer-teste.zip

Até o próximo post!


JBoss A-MQ : Produtor de mensagens utilizando protocolo fabric

Introdução

O Fuse Fabric discovery agent utiliza o protocolo fabric para a descoberta de brokers que estão em um grupo específico. Para que isto aconteça o discovery agent precisa que todos os brokers sejam implantados em somente uma fábrica. Quando um cliente tenta se conectar a um broker, o agent procura por todos os brokers disponíveis no registro do fabric e os retorna.

fabric_client

Características

  • O protocolo fabric garante failover automaticamente
  • Clientes não precisam saber onde os brokers estão localizados 
  • Funciona como o transporte failover
  • Oferece opções para reconexão:

discovery:(fabric:usa-group)?reconnectDelay=1000&useExponentialBackOff=false

  • Clientes precisam ter a URL do Zookeeper

Como configurar o broker

Para saber em detalhes como fazer o deploy de brokers utilizando o Fuse Fabric, dê uma olhada no post Cluster JBoss A-MQ : Master/Slave Network of Brokers utilizando Fuse Fabric.

 Como configurar o cliente

import javax.jms.Connection;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

public class ProducerTeste {

public static void main(String[] args) throws Exception {

ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(“discovery:(fabric:usa-group)“);

Connection connection = factory.createConnection(“admin”, “admin”);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
Queue queue = session.createQueue(“SampleQueue”);
MessageProducer producer = session.createProducer(queue);

for (int i = 0; i < 1000; i++) {

producer.send(session.createTextMessage(i + ” message”));
System.out.println(“Sent message ” + i);
Thread.sleep(1000);
}
producer.close(); session.close(); connection.close();
}
}

Setando o zookeeper.url

Para que o cliente consiga encontrar corretamente o grupo de brokers, precisamos adicionar a propriedade zookeeper.url que indica a instância do Fuse Fabric’s Zookeeper, no meu caso rodando localmente adicionei no pom.xml do meu projeto:

<zookeeper.url>localhost:2181</zookeeper.url>
<zookeeper.password>admin</zookeeper.password>

Download do projeto completo

Producer-teste.zip

Até o próximo post!


Cluster JBoss A-MQ : Master/Slave Network of Brokers utilizando Fuse Fabric

Introdução

Fuse Fabric é um sistema distribuído de configuração, gerenciamento e provisionamento para Apache Karaf, Apache ActiveMQ, ServiceMix, Apache Camel para nuvem pública ou privada, que trabalha com a noção de perfis, onde cada cada perfil pode ser entendido como uma lista de configurações.
Todos os perfis são gravados no Apache Zookeeper que é um confiável coordenador de serviços distribuídos responsável por proparagar automaticamente as configurações atualizadas entre todos os nós, além de gerenciar o lock (bloqueio) de cada nó master.

Alguns benefícios:

  • Configuração centralizada entre rotas/brokers distribuídos;
  • Crescimento dinâmico o qual é essencial para ambientes de Cloud;
  • Alta Disponibilidade (HA) para aplicações de missão crítica;
  • Utiliza OGSi e Apache Karaf criação de novos brokers;
  • Provê ferramenta centralizada para configuração e monitoração (Fuse Management Console);

fabric_master_slave

Master/slave

Este tutorial mostra em detalhes como configurar e executar uma  ActiveMQ network of brokers master/slave utilizando o Fuse Management Console (FMC) para simplificar a configuração e execução dos múltiplos brokers que serão criados.
Nesta estratégia somente um broker pode ter o status master por vez, assim os slaves aguardam a parada do master para que um deles se torne o novo master.
Criar uma topologia Master-Slave with Fabric é muito fácil, basta criar múltiplos brokers em um mesmo grupo e o resto fica por conta do Fabric.
No nosso exemplo vamos criar quatro instâncias: duas USA e outras duas Japan. O primeiro broker que for iniciado será o master do seu lado, enquanto o outro será o slave.
Por default os brokers são persistentes, pois utitilizam o perfil mq-base como base.
Não é o caso deste exemplo, mas se as instâncias estiverem em máquinas separadas e utilizando storage compartilhado, precisaríamos de configurações adicionais. Por ora vamos montar o ambiente localmente.
O benefício deste tipo de configuração é que não é necessário nenhuma estratégia de lock em storage compartilhado mesmo utilizando brokers não persistentes, pois o Zookeeper garante o bloqueio dos nós master.

Obs: ainda assim há a necessidade de um message store compartilhado para que o estados dos brokers sejam garantidos.

Networks

A conexão entre os brokers dos dois grupos é especificado através da opção networks.

Instalação

1) Download

O Red Hat JBoss Fuse pode ser baixo aqui: https://www.jboss.org/products/fuse.html

Obs: disponível somente para fins de Desenvolvimento.

2) Descompactar

Descompacte o Fuse em algum lugar de sua preferência. No meu caso descopactei em /opt do meu Fedora 18.

Configuração

1) users.properties

Altere o arquivo $fuse-diretorio/etc/users.properties descomentando a linha:

#admin=admin,admin

2) system.properties

Altere o arquivo $fuse-diretorio/etc/system.properties substituindo localhost pelo hostname da máquina virtual. No meu caso é o maquina:

# Activemq configuration
activemq.port = 61616
activemq.host = maquina
activemq.url = tcp://${activemq.host}:${activemq.port}

Inicialização

Execute o seguinte arquivo $fuse-diretorio/bin 

./fuse 

fuse

Criando o cluster

Execute os seguintes comandos para criar e incializar o cluster:

1) Instala o FMC:

fabric:create -p fmc

2) Cria os dois containers usa:

fabric:container-create-child root usa 2

Jmx Login for root: admin 

Jmx Password for root: admin 

3) Cria os dois containers japan:

fabric:container-create-child root japan 2

4) O seguinte comando cria o par master/slave de brokers no grupo usa-group e o liga com os brokers japan através do linkToJapan:

fabric:mq-create --group usa-group --networks linkToJapan --networks-username admin --networks-password admin --assign-container usa1,usa2 amq-usa-profile

5) O seguinte comando cria o par master/slave de brokers no grupo japan-group e o liga com os brokers usa através do linkToUsa:

fabric:mq-create --group japan-group --networks linkToUsa --networks-username admin --networks-password admin --assign-container japan1,japan2 amq-japan-profile

6) Lista todos os clusters

fabric:cluster-list

Management Console

Acesse a seguinte URL: http://localhost:8181

1

Obs: se não estiver conseguindo acessar o Managente Console, execute o seguinte comando para instalar o FMC:  fabric:create -p fmc

No próximo post mostrarei como criar um Produtor e um Consumidor utilizando o protocolo fabric

Até o próximo post!


Cluster JBoss A-MQ : Failover em Network of Brokers – Store and Forward

Introdução

ActiveMQ suporta o conceito de Network of Brokers, onde os brokers estão remotos e as aplicações precisam de comunicar com eles de uma forma confiável. 
Na abordagem store and forward no contexto de Network of Brokers as mensagens são passadas de um broker para outro até chegar em um consumidor, onde esta mensagem estará em posse de somente um broker ao mesmo tempo, ou seja, nesta abordagen podemos garantir failover por causa do protocolo de mesmo nome, porém não podemos garantir Alta Disponibilidade (HA).
Seguindo este paradigma, ter uma arquitetura centralizada de brokers onde todos os clientes se conectam não parece uma boa ideia. Vamos supor que um broker esteja guardando uma certa quantidade de mensagens em seu persistent store e por alguma motivo ele falha. Estas mensagens estarão em posse deste broker até que ele retorne.

network_brokers_static

Solução

Utilização de Master/Slave.

Bidirecional

Por padrão esta abordagem opera somente de forma unidirecional a logicamente envia mensagens para a rede, porém uma simples configuração permite que a rede seja bidirecional onde o broker pode tanto enviar quanto receber mensagens.

Casos de Uso

  • Geralmente em grandes redes onde os produtores das mensagens estão em uma LAN e os consumidores estão em outra LAN. Neste caso pode-se usar um broker em cada LAN como se fosse um broker que irá concentrar a comunicação tentando minimizar o número de conexões entre WAN que liga as duas LANs.
  • Store and forward pode ser encontrado em ambientes de firewalls ou SSL entre redes.
  • Quando o Sistema Operacional (SO) não suporta um número grande de sockets, pode-se usar store and forward para conectar um número grande de clientes utilizando uma rede lógica.

Duas configurações

Red Hat JBoss A-MQ suporta dois diferentes tipos de configurações para formar uma Network of Brokers:

  • Static (proposta deste tutorial): onde é necessário saber o IP de todos os brokers que fazem parte da rede e inscrevê-los previamente no arquivo de configuração;
  • Multicast: utilização de Discovery para detecção automática dos brokers;

Como acessar o broker?

Clientes ActiveMQ Java e C++ provêem um protocolo de transferência chamado de failover,  que tenta se conectar automaticamente ao novo broker master sem perda de mensagens. Os clientes dos brokers utilizarão o seguinte protocolo de transporte:

failover://(tcp://host1:61616)

Dica: a URI acima irá tentar automaticamente estabelecer uma conexão com o mesmo broker no caso de queda. Portanto, mesmo que haja somente um broker no ambiente, é recomendado que o protocolo failover seja utilizado, pois no caso de queda, ele tentará se conectar novamente.

Instalação

1) Download

O Red Hat JBoss A-MQ 6.0 pode ser baixo aqui: https://www.jboss.org/products/amq.html

Obs: disponível somente para fins de Desenvolvimento.

2) Descompactar

Descompacte o A-MQ nas duas máquinas. No meu caso descopactei em /opt de cada RHEL virtual.

3) Conectividade

Tenha certeza que as máquinas conseguem se comunicar entre elas: localhost (física fazendo papel de cliente através o Fuse IDE), host1 (virtual) e host2 (virtual) e host3 (virtual).

Configurações host1

1) users.properties 

Altere o arquivo $jboss-a-mq-diretorio/etc/users.properties descomentando a linha:

#admin=admin,admin

2) system.properties

Altere o arquivo $jboss-a-mq-diretorio/etc/system.properties substituindo localhost pelo hostname da máquina virtual. No meu caso é o host1:

# Activemq configuration
activemq.port = 61616
activemq.host = host1 
activemq.url = tcp://${activemq.host}:${activemq.port}

3) activemq.xml

Altere o arquivo $jboss-a-mq-diretorio/etc/activemq.xml

<broker xmlns=”http://activemq.apache.org/schema/core&#8221;
brokerName=”amq1
useJmx=”true”
dataDirectory=”${data}”>

<managementContext>
<managementContext connectorPort=”1091” createConnector=”false”/>
</managementContext>

<networkConnectors>
<networkConnector name=“amq1:nc”
uri=”static:(failover:(tcp://host2:61616,tcp://host3:61616))”
dynamicOnly=”true”
networkTTL=”3″
duplex=”true”/>
</networkConnectors>

<persistenceAdapter>
<kahaDB directory=”${data}/kahadb”/>
</persistenceAdapter>

<plugins>
<loggingBrokerPlugin logAll=”false” logConnectionEvents=”true”/>
</plugins>

<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage limit=”20 mb”/>
</memoryUsage>
<storeUsage>
<storeUsage limit=”1 gb” name=”foo”/>
</storeUsage>
<tempUsage>
<tempUsage limit=”100 mb”/>
</tempUsage>
</systemUsage>
</systemUsage>

<transportConnectors>
<transportConnector name=”openwire” uri=”tcp://host1:61616″/>
</transportConnectors>

</broker>

Configurações host2

1) users.properties

Altere o arquivo $jboss-a-mq-diretorio/etc/users.properties descomentando a linha:

#admin=admin,admin

2) system.properties

Altere o arquivo $jboss-a-mq-diretorio/etc/system.properties substituindo localhost pelo hostname da máquina virtual. No meu caso é o host2:

# Activemq configuration
activemq.port = 61616
activemq.host = host2 
activemq.url = tcp://${activemq.host}:${activemq.port}

3) activemq.xml

<broker xmlns=”http://activemq.apache.org/schema/core&#8221;
brokerName=”amq2
useJmx=”true”
dataDirectory=”${data}”>

<managementContext>
<managementContext connectorPort=”1092” createConnector=”false”/>
</managementContext>

<persistenceAdapter>
<kahaDB directory=”${data}/kahadb”/>
</persistenceAdapter>

<plugins>
<loggingBrokerPlugin logAll=”false” logConnectionEvents=”true”/>
</plugins>

<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage limit=”20 mb”/>
</memoryUsage>
<storeUsage>
<storeUsage limit=”1 gb” name=”foo”/>
</storeUsage>
<tempUsage>
<tempUsage limit=”100 mb”/>
</tempUsage>
</systemUsage>
</systemUsage>

<transportConnectors>
<transportConnector name=”openwire” uri=”tcp://host2:61616″/>
</transportConnectors>
</broker>

Configurações host3

1) users.properties

Altere o arquivo $jboss-a-mq-diretorio/etc/users.properties descomentando a linha:

#admin=admin,admin

2) system.properties

Altere o arquivo $jboss-a-mq-diretorio/etc/system.properties substituindo localhost pelo hostname da máquina virtual. No meu caso é o host3:

# Activemq configuration
activemq.port = 61616
activemq.host = host3 
activemq.url = tcp://${activemq.host}:${activemq.port}

3) activemq.xml

<broker xmlns=”http://activemq.apache.org/schema/core&#8221;
brokerName=”amq3
useJmx=”true”
dataDirectory=”${data}”>

<managementContext>
<managementContext connectorPort=”1093” createConnector=”false”/>
</managementContext>

<persistenceAdapter>
<kahaDB directory=”${data}/kahadb”/>
</persistenceAdapter>

<plugins>
<loggingBrokerPlugin logAll=”false” logConnectionEvents=”true”/>
</plugins>

<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage limit=”20 mb”/>
</memoryUsage>
<storeUsage>
<storeUsage limit=”1 gb” name=”foo”/>
</storeUsage>
<tempUsage>
<tempUsage limit=”100 mb”/>
</tempUsage>
</systemUsage>
</systemUsage>

<transportConnectors>
<transportConnector name=”openwire” uri=”tcp://host3:61616″/>
</transportConnectors>
</broker>

Start do host1

Execute o seguinte arquivo $jboss-a-mq-diretorio/bin 

./amq 

Start do host2

Execute o seguinte arquivo $jboss-a-mq-diretorio/bin 

./amq 

Start do host3

Execute o seguinte arquivo $jboss-a-mq-diretorio/bin 

./amq 

Na maquina física

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

public class SampleClient {

public static void main(String[] args) throws Exception {
ActiveMQConnectionFactory factory =

new ActiveMQConnectionFactory(“failover:(tcp://host1:61616)?timeout=1000“);

Connection connection = factory.createConnection(“admin”, “admin”);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
Queue queue = session.createQueue(“SampleQueue“);
MessageProducer producer = session.createProducer(queue);

producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

// send messages
for (int i = 0; i < 100; i++) {
producer.send(session.createTextMessage(i + ” message”));
System.out.println(“Sent message ” + i);
Thread.sleep(1000);
}
producer.close(); session.close(); connection.close();
}
}

O programa acima envia 100 mensagens para o broker.

static_networkbrokers

Consumindo as mensagens

Disponibilizei logo abaixo as classes que produzem e consomem as mensagens dos brokers, além de toda configuração dos brokers.

Download de todos os recursos

  • activemq1.xml
  • activemq1.xml
  • activemq1.xml
  • Producer
  • Consumer

>>> Mensageria.zip 

Até o próximo post!


Cluster JBoss A-MQ : Master/Slave com File System compartilhado

Introdução

O ponto fundamental desta abordagem é prover Alta Disponibilidade (HA) em cenários de produção utilizando ActiveMQ. Na configuração master/slave existe somente um broker Master e vários slaves aguardando a falha do master para serem promovidos, na verdade somente um será. A falha do master é detectada pela falta de conectividade entre o master e os slaves.
masterslave

Duas configurações

A versão 6.0 do Red Hat JBoss A-MQ suporta dois diferentes tipos de configurações master/slave:

  • Shared nothing, onde cada broker ActiveMQ tem seu próprio message storage;
  • Shared storage (proposta deste tutorial), onde múltiplos brokers ActiveMQ brokers podem se contectar ao message store (banco relacional ou sistema de arquivos) compartilhado mas somente um broker estará ativo ao mesmo tempo. Nenhum intervenção manual é requerida para manter a integridade de aplicação. Nenhum limitação de número de brokers slave. O master terá o lock do DB.

Três tipos de Message Stores

  1. File System Master Slave compartilhado: por exemplo SAN, NFS, etc.
  2. JDBC Master Slave:  Um banco de dados compartilhado. Não é a melhor opção tem termos de performance porque não utiliza journal de alta performance, mas a configuração é bem simples.

Por que usar File System ao invés de Banco de Dados?

Utilizar file system compartilhado é a melhor solução para prover Alta Disponibilidade, pois combina o alto throughput do KahaDB e a simplicidade de se utilizar um recurso compartilhado. KahaDB é extremamente rápido e é limitado somente pela performance do file system.
Não é necessário configuração adicional dos brokers e não existe limite de slaves acessando o file system.

Neste tutorial utilizarei o Network File System (NFS) como file system compartilhado, porém poderia ter escolhido outros como Global File System (GFS) 2 ou Storage Area Network (SAN).

Como acessar o broker?

Clientes ActiveMQ Java e C++ provêem um protocolo de transferência chamado de failover,  que tenta se conectar automaticamente ao novo broker master sem perda de mensagens. Os clientes dos brokers utilizarão o seguinte protocolo de transporte:

failover://(tcp://host1:61616,tcp://host2:61616)?randomize=false

Dica: a URI acima irá tentar automaticamente estabelecer uma conexão com o mesmo broker no caso de queda. Portanto, mesmo que haja somente um broker no ambiente, é recomendado que o protocolo failover seja utilizado, pois no caso de queda, ele tentará se conectar novamente.

Instalação

1) Download

O Red Hat JBoss A-MQ 6.0 pode ser baixo aqui: https://www.jboss.org/products/amq.html

Obs: disponível somente para fins de Desenvolvimento.

2) Descompactar

Descompacte o A-MQ nas duas máquinas. No meu caso descopactei em /opt de cada RHEL virtual.

3) Conectividade

Tenha certeza que as máquinas conseguem se comunicar entre elas: localhost (física), host1 (virtual) e host2 (virtual).

4) Diretório compartilhado

Criei um /shared na minha máquina física utilizando NFS e o exportei para as máquinas virtuais: host1 e host2

Configurações host1

1) users.properties

Altere o arquivo $jboss-a-mq-diretorio/etc/users.properties descomentando a linha:

#admin=admin,admin

2) system.properties

Altere o arquivo $jboss-a-mq-diretorio/etc/system.properties substituindo localhost pelo hostname da máquina virtual. No meu caso é o host1:

# Activemq configuration
activemq.port = 61616
activemq.host = host1 
activemq.url = tcp://${activemq.host}:${activemq.port}

3) activemq.xml

Altere o arquivo $jboss-a-mq-diretorio/etc/activemq.xml

<broker xmlns=”http://activemq.apache.org/schema/core&#8221;
brokerName=”broker_host1
dataDirectory=”${data}”
useJmx=”true”
start=”false”>

<transportConnectors>
<!– Na perspectiva do cliente a uri é usada para criar uma conexão com o broker para enviar/receber mensages –>
<transportConnector name=”openwire” uri=”tcp://host1:61616″/>
</transportConnectors>

<persistenceAdapter>
<kahaDB directory=“/shared“>
</persistenceAdapter>

<managementContext>
<managementContext createConnector=”trueconnectorPort=”1091″/>
</managementContext>

Obs: coloquei somente os trechos alterados.

Configurações host2

1) users.properties

Altere o arquivo $jboss-a-mq-diretorio/etc/users.properties descomentando a linha:

#admin=admin,admin

2) system.properties

Altere o arquivo $jboss-a-mq-diretorio/etc/system.properties substituindo localhost pelo hostname da máquina virtual. No meu caso é o host2:

# Activemq configuration
activemq.port = 61616
activemq.host = host2 
activemq.url = tcp://${activemq.host}:${activemq.port}

3) activemq.xml

Altere o arquivo $jboss-a-mq-diretorio/etc/activemq.xml

<broker xmlns=”http://activemq.apache.org/schema/core&#8221;
brokerName=”broker_host2
dataDirectory=”${data}”
useJmx=”true”
start=”false”>

<transportConnectors>
<!– Na perspectiva do cliente a uri é usada para criar uma conexão com o broker para enviar/receber mensages –>
<transportConnector name=”openwire” uri=”tcp://host2:61616″/>
</transportConnectors>

<persistenceAdapter>
<kahaDB directory=“/shared“>
</persistenceAdapter>

<managementContext>
<managementContext createConnector=”true” connectorPort=”1092″/>
</managementContext>

Obs: coloquei somente os trechos alterados.

Start do host1

Execute o seguinte arquivo $jboss-a-mq-diretorio/bin 

./amq 

Console host1

2013-11-05 12:26:30,843 | INFO | veMQ Broker: amq | PListStoreImpl | tore.kahadb.plist.PListStoreImpl 331 | 104 – org.apache.activemq.activemq-osgi – 5.8.0.redhat-60024 | PListStore:[/opt/jboss-a-mq-6.0.0.redhat-024/data/amq/broker_host1/tmp_storage] started
2013-11-05 12:26:31,004 | INFO | JMX connector | ManagementContext | q.broker.jmx.ManagementContext$1 138 | 104 – org.apache.activemq.activemq-osgi – 5.8.0.redhat-60024 | JMX consoles can connect to service:jmx:rmi:///jndi/rmi://localhost:1091/jmxrmi
2013-11-05 12:26:31,016 | INFO | veMQ Broker: amq | BrokerService | he.activemq.broker.BrokerService 595 | 104 – org.apache.activemq.activemq-osgi – 5.8.0.redhat-60024 | Using Persistence Adapter: KahaDBPersistenceAdapter[/shared]
2013-11-05 12:26:32,252 | INFO | veMQ Broker: amq | MessageDatabase | .kahadb.MessageDatabase$Metadata 147 | 104 – org.apache.activemq.activemq-osgi – 5.8.0.redhat-60024 | KahaDB is version 4
2013-11-05 12:26:32,264 | INFO | veMQ Broker: amq | MessageDatabase | emq.store.kahadb.MessageDatabase 552 | 104 – org.apache.activemq.activemq-osgi – 5.8.0.redhat-60024 | Recovering from the journal …
2013-11-05 12:26:32,265 | INFO | veMQ Broker: amq | MessageDatabase | emq.store.kahadb.MessageDatabase 565 | 104 – org.apache.activemq.activemq-osgi – 5.8.0.redhat-60024 | Recovery replayed 1 operations from the journal in 0.009 seconds.
2013-11-05 12:26:32,403 | INFO | veMQ Broker: amq | BrokerService | he.activemq.broker.BrokerService 634 | 104 – org.apache.activemq.activemq-osgi – 5.8.0.redhat-60024 | Apache ActiveMQ 5.8.0.redhat-60024 (broker_host1, ID:host1-60970-1383672392281-0:1) is starting
2013-11-05 12:26:32,434 | INFO | veMQ Broker: amq | TransportServerThreadSupport | ort.TransportServerThreadSupport 72 | 104 – org.apache.activemq.activemq-osgi – 5.8.0.redhat-60024 | Listening for connections at: tcp://host1:61616
2013-11-05 12:26:32,435 | INFO | veMQ Broker: amq | TransportConnector | tivemq.broker.TransportConnector 254 | 104 – org.apache.activemq.activemq-osgi – 5.8.0.redhat-60024 | Connector openwire Started
2013-11-05 12:26:32,437 | INFO | veMQ Broker: amq | BrokerService | he.activemq.broker.BrokerService 658 | 104 – org.apache.activemq.activemq-osgi – 5.8.0.redhat-60024 | Apache ActiveMQ 5.8.0.redhat-60024 (broker_host1, ID:host1-60970-1383672392281-0:1) started
2013-11-05 12:26:32,476 | INFO | veMQ Broker: amq | ActiveMQServiceFactory | q.fabric.ActiveMQServiceFactory$ 52 | 120 – org.jboss.amq.mq-fabric – 6.0.0.redhat-024 | Broker amq has started.
2013-11-05 12:35:59,091 | INFO | qtp252820188-194 | RemoteJMXBrokerFacade | tivemq.web.RemoteJMXBrokerFacade 153 | 133 – org.jboss.amq.mq-web-console – 5.8.0.redhat-60024 | Connected via JMX to the broker at service:jmx:rmi:///jndi/rmi://host1:1091/karaf-root

Start do host2

Execute o seguinte arquivo $jboss-a-mq-diretorio/bin 

./amq 

Console host2

2013-11-05 12:26:37,533 | INFO | veMQ Broker: amq | PListStoreImpl | tore.kahadb.plist.PListStoreImpl 331 | 104 – org.apache.activemq.activemq-osgi – 5.8.0.redhat-60024 | PListStore:[/opt/jboss-a-mq-6.0.0.redhat-024/data/amq/broker_host2/tmp_storage] started
2013-11-05 12:26:37,588 | INFO | Executor: 1 | DispatcherServlet | ork.web.servlet.FrameworkServlet 463 | 133 – org.jboss.amq.mq-web-console – 5.8.0.redhat-60024 | FrameworkServlet ‘dispatcher’: initialization completed in 338 ms
2013-11-05 12:26:37,657 | INFO | JMX connector | ManagementContext | q.broker.jmx.ManagementContext$1 138 | 104 – org.apache.activemq.activemq-osgi – 5.8.0.redhat-60024 | JMX consoles can connect to service:jmx:rmi:///jndi/rmi://localhost:1092/jmxrmi
2013-11-05 12:26:37,667 | INFO | veMQ Broker: amq | BrokerService | he.activemq.broker.BrokerService 595 | 104 – org.apache.activemq.activemq-osgi – 5.8.0.redhat-60024 | Using Persistence Adapter: KahaDBPersistenceAdapter[/shared]
2013-11-05 12:26:37,778 | INFO | veMQ Broker: amq | SharedFileLocker | .activemq.store.SharedFileLocker 58 | 104 – org.apache.activemq.activemq-osgi – 5.8.0.redhat-60024 | Database /shared/lock is locked… waiting 10 seconds for the database to be unlocked. Reason: java.io.IOException: File ‘/shared/lock’ could not be locked.
2013-11-05 12:26:47,899 | INFO | veMQ Broker: amq | SharedFileLocker | .activemq.store.SharedFileLocker 58 | 104 – org.apache.activemq.activemq-osgi – 5.8.0.redhat-60024 | Database /shared/lock is locked… waiting 10 seconds for the database to be unlocked. Reason: java.io.IOException: File ‘/shared/lock’ could not be locked.
2013-11-05 12:26:58,012 | INFO | veMQ Broker: amq | SharedFileLocker | .activemq.store.SharedFileLocker 58 | 104 – org.apache.activemq.activemq-osgi – 5.8.0.redhat-60024 | Database /shared/lock is locked… waiting 10 seconds for the database to be unlocked. Reason: java.io.IOException: File ‘/shared/lock’ could not be locked.

Ou seja: aguardando a liberação do lock, tentando checar a cada 10 segundos

Na maquina física

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

public class SampleClient {

public static void main(String[] args) throws Exception {
ActiveMQConnectionFactory factory =

new ActiveMQConnectionFactory(“failover:(tcp://host1:61616,tcp://maquina2:61616)?timeout=1000“);

Connection connection = factory.createConnection(“admin”, “admin”);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
Queue queue = session.createQueue(“SampleQueue“);
MessageProducer producer = session.createProducer(queue);

producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

// send messages
for (int i = 0; i < 100; i++) {
producer.send(session.createTextMessage(i + ” message”));
System.out.println(“Sent message ” + i);
Thread.sleep(1000);
}
producer.close(); session.close(); connection.close();
}
}

O programa acima envia 100 mensagens para o broker.

fuse-amq

Vou deixar enviando até completar 10 mensagens e em seguida derrubarei o broker que está recebendo as mensagens:

[ActiveMQ Task-1] FailoverTransport INFO Successfully connected to tcp://host1:61616
Sent message 0
Sent message 1
Sent message 2
Sent message 3
Sent message 4
Sent message 5
Sent message 6
Sent message 7
Sent message 8
Sent message 9
Sent message 10
[na/192.168.122.153:61616@52382] FailoverTransport WARN Transport (tcp://192.168.122.153:61616) failed, reason: java.io.EOFException, attempting to automatically reconnect
[ActiveMQ Task-1] FailoverTransport INFO Successfully reconnected to tcp://host2:61616
Sent message 11
Sent message 12
Sent message 13
Sent message 14
Sent message 15
Sent message 16
Sent message 17
Sent message 18
Sent message 19
Sent message 20

Acessando o console

http://host2:8181/activemqweb/queueGraph.jsp

queue-view