Enabling JMX monitoring of SEDA Queue Depths in Apache Camel
Apache Camel’s SEDA (Staged Event-Driven Architecture) endpoints (http://camel.apache.org/seda.html) provide a useful and quick mechanism to implement asynchronous, event-driven processing within your applications. See http://www.eecs.harvard.edu/~mdw/proj/seda/ for the original description of the SEDA architecture. For those not very familiar with Camel, implementing a SEDA route can be as simple as adding the following to a route builder class:
from(“seda:start”).beanRef(“someBean”, “someMethod”);
Exchanges (messages) put to the seda:start endpoint will be processed through the remainder of the Camel route. The SEDA endpoint encapsulates an in-memory queue used as an entry point to the route. All messages are processed asynchronously. In the example above, the messages would be routed to the someMethod method on a spring bean named someBean.
From a monitoring perspective, the throughput and other statistics for this route can be viewed via Camel’s JMX functionality (assuming that JMX is enabled). However, it should be noted that the queue depth for the SEDA queue is not exposed by Camel’s JMX framework. This queue depth may be of interest for application monitoring purposes. The following describes a method for exposing SEDA queues for monitoring. Additionally, this method resolves an issue inherent to the current implementation of the SedaEndpoint in Camel: the fact that the maximum queue depth is bounded.
Step 1: Define the endpoint in the camel context
In the Camel context, define the SEDA endpoint that will be the input to the route:
<camel:camelContext id="camel">
<camel:package>com.objectpartners.camel</camel:package>
<camel:jmxAgent id="agent" createConnector="true" connectorPort="1099" />
<strong> <camel:endpoint id="startEndpoint" uri="seda:start"/></strong>
</camel:camelContext>
When the context is loaded, this will instantiate the SEDA endpoint and place a bean in the spring registry for it.
Step 2: Create the route builder class
In the route builder class, wire in the endpoint and create the route as follows:
package com.objectpartners.camel;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.seda.SedaEndpoint;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
public class Routes extends RouteBuilder {
<strong> @Autowired
@Qualifier(value = "startEndpoint")
private Endpoint startEndpoint;</strong>
/**
* Configure the routes
**/
@Override<strong>
public void configure() throws Exception {
from(startEndpoint).beanRef(“someBean”, “someMethod”);
}</strong>
}
Step 3: Alter the properties of the SEDA endpoint
At this point, we must change the properties of the SEDA endpoint to make the depth unbounded (this step would be optional, if you are okay with the depth being bounded. The default maximum depth is 1000, but can be configured explicitly on the endpoint.), and to allow the queue depth to be accessible via JMX. JMX exposure is accomplished by exposing the SEDA endpoint’s backing queue as a managed resource. Enabling unbounded queue depth can be accomplished by changing the blocking queue implementation used by the SEDA endpoint. The following code assumes that the following have been placed in your application context:
<context:mbean-export />
<context:annotation-config />
<context:component-scan base-package=<package> />
The final code for the route builder class:
package com.objectpartners.camel;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.seda.SedaEndpoint;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jmx.export.annotation.ManagedAttribute;
import org.springframework.jmx.export.annotation.ManagedResource;
import org.springframework.stereotype.Component;
<strong>@Component
@ManagedResource(objectName="example:name=TestRoutes", description = "Camel Test Routes")</strong>
public class Routes extends RouteBuilder {
@Autowired
@Qualifier(value = "startEndpoint")
private Endpoint startEndpoint;
// the new blocking queue for the SEDA endpoint
<strong>BlockingQueue<Exchange> startEndpointQueue = new LinkedBlockingQueue<Exchange>();</strong>
/**
* Configure the routes
**/
@Override
public void configure() throws Exception {
<strong>// change the BlockingQueue implementation for the seda endpoint
((SedaEndpoint) startEndpoint).setQueue(startEndpointQueue);</strong>
from(startEndpoint).beanRef(“someBean”, “someMethod”);
}
<strong>@ManagedAttribute
public long getQueueDepth() {
return startEndpointQueue.size();
}</strong>
}
The queue depth of the seda endpoint is avail in JMX. Its however avail as an JMX operation because its a BrowsableEndpoint instance. The reason why its not an attribute is that some other BrowseableEndpoints may take to much resources to compute the queue depth (such as JmsEndpoint).
See the ManagedBrowsableEndpoint class.
I recon we should allow the seda queue to easily be unbounded instead of fixed at 1000. I have created CAMEL-2471 to track that.