Messaging Endpoints

Messaging Mapper

Camel supports the Messaging Mapper from the EIP patterns by using either Message Translator pattern or the Type Converter module.

See also

Using This Pattern

If you would like to use this EIP Pattern then please read the Getting Started, you may also find the Architecture useful particularly the description of Endpoint and URIs. Then you could try out some of the Examples first before trying this pattern out.

Event Driven Consumer

Camel supports the Event Driven Consumer from the EIP patterns. The default consumer model is event based (i.e. asynchronous) as this means that the Camel container can then manage pooling, threading and concurrency for you in a declarative manner.

The Event Driven Consumer is implemented by consumers implementing the Processor interface which is invoked by the Message Endpoint when a Message is available for processing.

For more details see

Using This Pattern

If you would like to use this EIP Pattern then please read the Getting Started, you may also find the Architecture useful particularly the description of Endpoint and URIs. Then you could try out some of the Examples first before trying this pattern out.

Polling Consumer

Camel supports implementing the Polling Consumer from the EIP patterns using the PollingConsumer interface which can be created via the Endpoint.createPollingConsumer() method.

So in your Java code you can do

Endpoint endpoint = context.getEndpoint("activemq:my.queue");
PollingConsumer consumer = endpoint.createPollingConsumer();
Exchange exchange = consumer.receive();

There are 3 main polling methods on PollingConsumer

Method nameDescription
receive() Waits until a message is available and then returns it; potentially blocking forever
receive(long) Attempts to receive a message exchange immediately without waiting and returning null if a message exchange is not available yet
receiveNoWait() Attempts to receive a message exchange, waiting up to the given timeout and returning null if no message exchange could be received within the time available

Scheduled Poll Components

Quite a few inbound Camel endpoints use a scheduled poll pattern to receive messages and push them through the Camel processing routes. That is to say externally from the client the endpoint appears to use an Event Driven Consumer but internally a scheduled poll is used to monitor some kind of state or resource and then fire message exchanges.

Since this a such a common pattern, polling components can extend the ScheduledPollConsumer base class which makes it simpler to implement this pattern.

There is also the Quartz Component which provides scheduled delivery of messages using the Quartz enterprise scheduler.

For more details see

Using This Pattern

If you would like to use this EIP Pattern then please read the Getting Started, you may also find the Architecture useful particularly the description of Endpoint and URIs. Then you could try out some of the Examples first before trying this pattern out.

Competing Consumers

Camel supports the Competing Consumers from the EIP patterns using a few different components.

You can use the following components to implement competing consumers:-

Using This Pattern

If you would like to use this EIP Pattern then please read the Getting Started, you may also find the Architecture useful particularly the description of Endpoint and URIs. Then you could try out some of the Examples first before trying this pattern out.

Message Dispatcher

Camel supports the Message Dispatcher from the EIP patterns using various approaches.

You can use a component like JMS with selectors to implement a Selective Consumer as the Message Dispatcher implementation. Or you can use an Endpoint as the Message Dispatcher itself and then use a Content Based Router as the Message Dispatcher.

See Also

Using This Pattern

If you would like to use this EIP Pattern then please read the Getting Started, you may also find the Architecture useful particularly the description of Endpoint and URIs. Then you could try out some of the Examples first before trying this pattern out.

Selective Consumer

The Selective Consumer from the EIP patterns can be implemented in two ways

The first solution is to provide a Message Selector to the underlying URIs when creating your consumer. For example when using JMS you can specify a selector parameter so that the message broker will only deliver messages matching your criteria.

The other approach is to use a Message Filter which is applied; then if the filter matches the message your consumer is invoked as shown in the following example

Using the Fluent Builders

RouteBuilder builder = new RouteBuilder() {
    public void configure() {
        from("seda:a").filter(header("foo").isEqualTo("bar")).process(myProcessor);
    }
};

Using the Spring XML Extensions

<camelContext id="buildCustomProcessorWithFilter" xmlns="http://activemq.apache.org/camel/schema/spring"><route><from uri="seda:a"/><filter><predicate><header name="foo"/><isEqualTo value="bar"/></predicate></filter><process ref="#myProcessor"/></route></camelContext>

Using This Pattern

If you would like to use this EIP Pattern then please read the Getting Started, you may also find the Architecture useful particularly the description of Endpoint and URIs. Then you could try out some of the Examples first before trying this pattern out.

Durable Subscriber

Camel supports the Durable Subscriber from the EIP patterns using the JMS component which supports publish & subscribe using Topics with support for non-durable and durable subscribers.

Another alternative is to combine the Message Dispatcher or Content Based Router with File or JPA components for durable subscribers then something like Queue for non-durable.

See Also

Using This Pattern

If you would like to use this EIP Pattern then please read the Getting Started, you may also find the Architecture useful particularly the description of Endpoint and URIs. Then you could try out some of the Examples first before trying this pattern out.

Idempotent Consumer

The Idempotent Consumer from the EIP patterns is used to filter out duplicate messages.

This pattern is implemented using the IdempotentConsumer class. This uses an Expression to calculate a unique message ID string for a given message exchange; this ID can then be looked up in the MessageIdRepository to see if it has been seen before; if it has the message is consumed; if its not then the message is processed and the ID is added to the repository.

The Idempotent Consumer essentially acts like a Message Filter to filter out duplicates.

Using the Fluent Builders

The following example will use the header myMessageId to filter out duplicates

RouteBuilder builder = new RouteBuilder() {
    public void configure() {
        from("seda:a").idempotentConsumer(
                header("myMessageId"), memoryMessageIdRepository(200)
        ).to("seda:b");
    }
};

The above ??? will use an in-memory based MessageIdRepository which can easily run out of memory and doesn't work in a clustered environment. So you might prefer to use the JPA based implementation which uses a database to store the message IDs which have been processed

returnnew SpringRouteBuilder() {
    public void configure() {
        from("direct:start").idempotentConsumer(
                header("messageId"),
                jpaMessageIdRepository(bean(JpaTemplate.class), "myProcessorName")
        ).to("mock:result");
    }
};

In the above ??? we are using the header messageId to filter out duplicates and using the collection myProcessorName to indicate the Message ID Repository to use. This name is important as you could process the same message by many different processors; so each may require its own logical Message ID Repository.

Using the Spring XML Extensions

<camelContext id="buildCustomProcessorWithFilter" xmlns="http://activemq.apache.org/camel/schema/spring"><route><from uri="seda:a"/><filter><predicate><header name="foo"/><isEqualTo value="bar"/></predicate></filter><process ref="#myProcessor"/></route></camelContext>

For further examples of this pattern in use you could look at the junit test case

Using This Pattern

If you would like to use this EIP Pattern then please read the Getting Started, you may also find the Architecture useful particularly the description of Endpoint and URIs. Then you could try out some of the Examples first before trying this pattern out.

Transactional Client

Camel recommends supporting the Transactional Client from the EIP patterns using spring transactions.

Transaction Oriented Endpoints (Camel Toes) like JMS support using a transaction for both inbound and outbound message exchanges. Endpoints that support transactions will participate in the current transaction context that they are called from.

You should use the SpringRouteBuilder to setup the routes since you will need to setup the spring context with the TransactionTemplates that will define the transaction manager configuration and policies.

For inbound endpoint to be transacted, they normally need to be configured to use a Spring PlatformTransactionManager. In the case of the JMS component, this can be done by looking it up in the spring context.

You first define needed object in the spring configuration.

<bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager"><property name="connectionFactory" ref="jmsConnectionFactory" /></bean><bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"><property name="brokerURL" value="tcp://localhost:61616"/></bean>

Then you look them up and use them to create the JmsComponent.

PlatformTransactionManager transactionManager = (PlatformTransactionManager) spring.getBean("jmsTransactionManager");
  ConnectionFactory connectionFactory = (ConnectionFactory) spring.getBean("jmsConnectionFactory");
  JmsComponent component = JmsComponent.jmsComponentTransacted(connectionFactory, transactionManager);
  component.getConfiguration().setConcurrentConsumers(1);
  ctx.addComponent("activemq", component);

Transaction Policies

Outbound endpoints will automatically enlist in the current transaction context. But what if you do not want your outbound endpoint to enlist in the same transaction as your inbound endpoint? The solution is to add a Transaction Policy to the processing route. You first have to define transaction policies that you will be using. The policies use a spring TransactionTemplate to declare the transaction demarcation use. So you will need to add something like the following to your spring xml:

<bean id="PROPAGATION_REQUIRED" class="org.springframework.transaction.support.TransactionTemplate"><property name="transactionManager" ref="jmsTransactionManager"/></bean><bean id="PROPAGATION_NOT_SUPPORTED" class="org.springframework.transaction.support.TransactionTemplate"><property name="transactionManager" ref="jmsTransactionManager"/><property name="propagationBehaviorName" value="PROPAGATION_NOT_SUPPORTED"/></bean><bean id="PROPAGATION_REQUIRES_NEW" class="org.springframework.transaction.support.TransactionTemplate"><property name="transactionManager" ref="jmsTransactionManager"/><property name="propagationBehaviorName" value="PROPAGATION_REQUIRES_NEW"/></bean>

Then in your SpringRouteBuilder, you just need to create new SpringTransactionPolicy objects for each of the templates.

public void configure() {
   ...
   Policy requried = new SpringTransactionPolicy(bean(TransactionTemplate.class, "PROPAGATION_REQUIRED"));
   Policy notsupported = new SpringTransactionPolicy(bean(TransactionTemplate.class, "PROPAGATION_NOT_SUPPORTED"));
   Policy requirenew = new SpringTransactionPolicy(bean(TransactionTemplate.class, "PROPAGATION_REQUIRES_NEW"));
   ...
}

Once created, you can use the Policy objects in your processing routes:

// Send to bar in a new transaction
   from("activemq:queue:foo").policy(requirenew).to("activemq:queue:bar");

   // Send to bar without a transaction.
   from("activemq:queue:foo").policy(notsupported ).to("activemq:queue:bar");

See Also

Using This Pattern

If you would like to use this EIP Pattern then please read the Getting Started, you may also find the Architecture useful particularly the description of Endpoint and URIs. Then you could try out some of the Examples first before trying this pattern out.

Messaging Gateway

Camel has several endpoint components that support the Messaging Gateway from the EIP patterns.

Components like Bean, CXF and Pojo provide a a way to bind a Java interface to the message exchange.

See Also

Using This Pattern

If you would like to use this EIP Pattern then please read the Getting Started, you may also find the Architecture useful particularly the description of Endpoint and URIs. Then you could try out some of the Examples first before trying this pattern out.

Service Activator

Camel has several endpoint components that support the Service Activator from the EIP patterns.

Components like Bean, CXF and Pojo provide a a way to bind the message exchange to a Java interface/service.

See Also

Using This Pattern

If you would like to use this EIP Pattern then please read the Getting Started, you may also find the Architecture useful particularly the description of Endpoint and URIs. Then you could try out some of the Examples first before trying this pattern out.