Published on

Server Sent Events with Camel Quarkus

Authors
  • avatar
    Name
    James Netherton
    Twitter

A topic that occasionally comes up from Apache Camel users, is how to work with Server-Sent Events (SSE) in a Camel application.

At present, there is no dedicated SSE Camel component, so upvote CAMEL-16431 if your're interested. However, frameworks like Quarkus and Spring Boot provide SSE functionality that can be used in conjunction with Camel.

Here's an example of a Camel Quarkus application that will publish the exchange message body flowing through a Camel route as an SSE.

System Monitoring Dashboard Application

The theme for the example is a simple system monitoring dashboard. A web frontend will collect and chart metrics emitted as SSEs from a RESTEasy endpoint.

We'll be using the following Quarkus extensions for the backend:

For the frontend, we'll keep things simple with some basic HTML, CSS and some JavaScript to handle SSE messages. Charting is provided by Chart.js.

The full source code for the project can be found here.

Camel Route

Here's what our Camel route looks like:

@ApplicationScoped
public class Routes extends RouteBuilder {
    @Inject
    CamelReactiveStreamsService reactiveStreamsService;

    @Inject
    MetricsService metricsService;

    @Outgoing("runtime-metrics")
    public Publisher<RuntimeMetrics> getDataFromCamelRoute() {
        return reactiveStreamsService.fromStream("runtime-metrics", RuntimeMetrics.class);
    }

    @Override
    public void configure() throws Exception {
        // Stream subscription happens when the browser connects to the SSE
        // Therefore, we ignore ReactiveStreamsNoActiveSubscriptionsException
        onException(ReactiveStreamsNoActiveSubscriptionsException.class)
                .handled(true);

        from("timer:updateMemoryInfo?period=5s")
                .process(exchange -> exchange.getMessage().setBody(metricsService.getRuntimeMetrics()))
                .to("reactive-streams:runtime-metrics");
    }
}

A timer will fire every 5 seconds. Next, a processor calls a service that will fetch some interesting metrics and store them in a POJO as the message exchange body. Finally, the message is routed to a Reactive Streams channel named runtime-metrics.

Metrics Service

MetricsService obtains metrics from the JMX OperatingSystem MBean. For example, to get the system CPU usage.

@Singleton
public class MetricsService {
    private static final String OPERATING_SYSTEM_MBEAN = "java.lang:type=OperatingSystem";

    String getSystemCpuUsage() {
        double value = getMBeanAttribute(OPERATING_SYSTEM_MBEAN, "SystemCpuLoad", Double.class);
        return String.format("%.2f", value * 100);
    }
}

RESTEasy SSE endpoint

The endpoint that handles the SSE looks like this.

@Path("/metrics")
public class RuntimeMetricsResource {
    @Inject
    @Channel("runtime-metrics")
    Multi<RuntimeMetrics> runtimeMetrics;

    @GET
    @RestStreamElementType(MediaType.APPLICATION_JSON)
    public Multi<RuntimeMetrics> runtimeMetrics() {
        return runtimeMetrics;
    }
}

We inject the runtime-metrics channel and use it as the return value in the SEE endpoint method. Usually you would annotate an SSE endpoint with @Produces(MediaType.SERVER_SENT_EVENTS). But in this case, we can omit it and instead use @RestStreamElementType to specify that the SSE data elements should be serialized to JSON.

The basic structure of the RuntimeMetrics class looks like this. We'll be collecting some simple memory and CPU metrics.

public class RuntimeMetrics {
    private final double memoryUsed;
    private final long openFileDescriptors;
    private final String processCpuUsage;
    private final String systemCpuUsage;
    private long timestamp;

    // getters / setters omitted for brevity
}

Running the application

Start the application in 'dev' mode and test the /metrics endpoint.

mvn quarkus:dev

With the application running, use cURL to connect to the metrics endpoint.

curl localhost:8080/metrics

You should see a stream of messages as the Camel timer fires every 5 seconds.

data:{"memoryUsed":12605.34375,"openFileDescriptors":560,"processCpuUsage":"0.16","systemCpuUsage":"4.46","timestamp":1693218265190}

data:{"memoryUsed":12595.109375,"openFileDescriptors":560,"processCpuUsage":"0.25","systemCpuUsage":"1.94","timestamp":1693218270196}

data:{"memoryUsed":12586.984375,"openFileDescriptors":560,"processCpuUsage":"0.05","systemCpuUsage":"1.68","timestamp":1693218275198}

data:{"memoryUsed":12586.21875,"openFileDescriptors":560,"processCpuUsage":"0.05","systemCpuUsage":"1.70","timestamp":1693218280202}

We can view the data in a nicer format via the web frontend. Open a browser to http://localhost:8080. The charts should start to fill up with data as each event is published.

Describing all of the frontend code is outside the scope of this article. In basic terms, an EventSource is created for the /metrics path. The charts are updated on each onmessage event.

The JavaScript looks something like this.

const eventSource = new EventSource("/metrics");
eventSource.onmessage = (event) => {
    // Parse the JSON event data containing the metrics
    const message = JSON.parse(event.data)
    // Update charts
};

Leave the application running for a minute or two and you should have collected a decent set of data points for the charts to look something like this.

charts

Conclusion

That was a simple demo of making Camel work with Server Sent Events on Quarkus. You can create more complex applications and use different components if desired.

The SmallRye Reactive Messaging Camel connector is a useful project if you want to bridge the reactive world with Camel. The links below offer further reading on the subject.

This demo only covered emitting SSEs. It did not demonstrate a client connecting to the SSE endpoint and consuming the events. However, there is a test case which demonstrates using the quarkus-rest-client-reactive extension to consume the RuntimeMetrics events.

Happy coding!