apache qpid and spring boot

How to run local instance of Apache QPID server on JVM. AMQP broker configuration

Read Time:4 Minute, 13 Second

Among many queue implementations that are available on the market I’ve chosen Apache Qpid as it implements AMQP 1.0 protocol which is used by Azure Service Bus.

It allows me to locally debug JMS/queue issues just like it would be done on Azure (getting the same messages etc.). AMQP is also used by RabbitMQ but its server is not written in Java, therefore I decided to show you Apache Qpid instead.

The entire code is availalble at my github profile.

This is a simple Spring Boot app powered by Gradle project tool so you need to create a project first. You can read more about it in my other articles (Spring Boot, Gradle)

Here I’m going to describe classes and configuration files that build up the application.

Spring Boot and project config

That’s a standard Spring Boot project so let’s begin with main class, the app’s entry point.

QpidBrokerBootApp.kt

@SpringBootApplication
open class QpidBrokerBootApp

fun main(args: Array<String>) {
    runApplication<QpidBrokerBootApp>(*args) {
        setBannerMode(Banner.Mode.OFF)
    }
}

and application.yml for injecting properties:

qpid-broker-boot:
  http-port: 8080
  amqp-port: 5672

and last but not at least, Gradle config files:

build.gradle containing project config (eg. dependencies).

plugins {
    id 'org.jetbrains.kotlin.jvm' version '1.4.21'
    id 'org.springframework.boot' version '2.4.0'
    id 'io.spring.dependency-management' version '1.0.10.RELEASE'
}

group = 'com.rocketzki'
version = '1.0.0-SNAPSHOT'

//LIB VERSIONS
def qpidVersion = '8.0.0'

repositories {
    mavenCentral()
}

configurations {
    compileOnly {
        extendsFrom annotationProcessor
    }
}

springBoot {
    mainClass.set('com.rocketzki.broker.QpidBrokerBootAppKt')
}



compileKotlin {
    kotlinOptions {
        jvmTarget = "11"
    }
}

dependencies {
    compile "org.springframework:spring-jms:5.3.1"
    compile "org.apache.qpid:qpid-jms-client:0.54.0"
    compile "org.apache.qpid:qpid-broker-core:${qpidVersion}"
    compile "org.apache.qpid:qpid-broker-plugins-management-http:${qpidVersion}"
    compile "org.apache.qpid:qpid-broker-plugins-derby-store:${qpidVersion}"
    compile "org.apache.qpid:qpid-broker-plugins-memory-store:${qpidVersion}"
    compile "org.apache.qpid:qpid-broker-plugins-management-amqp:${qpidVersion}"
    compile "org.apache.qpid:qpid-broker-plugins-amqp-1-0-protocol:${qpidVersion}"
    compile "org.apache.qpid:qpid-broker-plugins-logging-logback:${qpidVersion}"
    compileOnly "org.projectlombok:lombok"

    implementation "org.springframework.boot:spring-boot-starter-integration"
    implementation "org.jetbrains.kotlin:kotlin-script-runtime:1.4.21"
    implementation "org.jetbrains.kotlin:kotlin-stdlib-jdk8"

    annotationProcessor "org.springframework.boot:spring-boot-configuration-processor"
    annotationProcessor "org.projectlombok:lombok"
}

project.properties.put('spring.boot.mainclass', 'com.rocketzki.broker.QpidBrokerBootAppKt')


and a small one: settings.gradle

rootProject.name = 'qpid-broker-boot'

2. Apache Qpid Broker config

More interesting stuff is located in this part of the app. I’ve created two config classes, one for broker configuration BrokerRunner

@Component
class BrokerRunner : QpidConfig {

    @Value("\${qpid-broker-boot.http-port}")
    private val httpPort: String? = null

    @Value("\${qpid-broker-boot.amqp-port}")
    private val amqpPort: String? = null

    private val systemLauncher = SystemLauncher()

    override fun createSystemConfig(): Map<String, Any?> {
        val initialConfig = BrokerRunner::class.java.classLoader.getResource("config.json")
        return mapOf(
            "type" to "Memory",
            "initialConfigurationLocation" to Objects.requireNonNull(initialConfig).toExternalForm(),
            "startupLoggedToSystemOut" to true,
            "qpid.http_port" to httpPort,
            "qpid.amqp_port" to amqpPort
        )
    }

    @SneakyThrows
    override fun run() {
        systemLauncher.startup(createSystemConfig())
    }

    override fun destroy() {
        systemLauncher.shutdown()
    }

It’s a Spring component and provides configuration that is required by Apache Qpid server. It reads config.json file and uses it to launch the broker. It also reads port numbers from application.yml properties file. Broker is closed when the bean is destroyed by the Spring container.

And the other class is used for launching the config component and managing its lifecycle: BrokerLauncher

@Component
class BrokerLauncher(private val brokerRunner: QpidConfig) {
    private val executorService: ExecutorService = Executors.newSingleThreadExecutor()

    @PostConstruct
    private fun runBroker() {
        executorService.execute(brokerRunner)
    }

    @PreDestroy
    fun shutdown() {
        executorService.shutdownNow()
    }
}

That class executes config’s method Runnable method implemenation and closes it when container decides to destroy that bean so we’re sure there won’t be any memory leak.

We can run the application by calling gradle bootRun Gradle task.

That’s it for the server so let’s test it!

Testing using JMS client

Client package provides preconfigured JMS client for sending and receiveing messages.

Firstly run the broker by calling gradle bootRun After few seconds the server is up and we can talk to it.

2021-01-05 18:39:17.005 INFO 10848 --- [ Broker-Config] qpid.message.broker.ready : [Broker] BRK-1004 : Qpid Broker Ready

Take a look at the ExampleSendingScript.kts file from client package. Hit CTRL+SHIFT+F10 to run the script. If we get:

### Creating connection  
### Sending message '{}' to 'someQueue'
### Sending message '{"num":[1,2,3]}' to 'someQueue'
### Sending message '{"key":"someval"}' to 'someQueue'

Process finished with exit code 0

…it’s working brilliantly. Congrats!

Now check out server logs (console or derby.log file). It seems like it has received some data. Cool

Today you learned how to configure a simple Spring Boot app containing Apache Qpid component. You were able to configure it and send/recieve messages via JMS interface. Everything coded in Kotlin language.

Hope you like it!

In case of any questions and/or remarks don’t hesitate commenting or reaching me out πŸ™‚

Happy
Happy
0 %
Sad
Sad
100 %
Excited
Excited
0 %
Sleepy
Sleepy
0 %
Angry
Angry
0 %
Surprise
Surprise
0 %
0 0 votes
Article Rating
Subscribe
Notify of
guest
0 Comments
Inline Feedbacks
View all comments
0
Would love your thoughts, please comment.x
()
x