Qpid broker-j can be embedded into another Java application. This is an especially attractive option for functional tests where dependency starting a stand alone broker is undesired. This page provides instructions how to do so using Qpid Broker-J 7.0 components.
The goal will be to keep the broker in memory without the need for a writable disk. That means that a couple of features (message persistence, flow-to-disk, ...) will not be available. Adapting these instructions should be easy enough.
Configuring Dependencies
The configuration in this HowTo requires the following dependencies (in maven's pom.xml format):
<dependencies> <!-- Note that the below is qpid-broker-core and *not* qpid-broker. --> <dependency> <groupId>org.apache.qpid</groupId> <artifactId>qpid-broker-core</artifactId> <version>7.0.0</version> </dependency> <!-- AMQP protocol support is modular. Here we show support for 0-8/0-9 and 1.0. Support for 0-10 is excluded. --> <dependency> <groupId>org.apache.qpid</groupId> <artifactId>qpid-broker-plugins-amqp-0-8-protocol</artifactId> <version>7.0.0</version> </dependency> <dependency> <groupId>org.apache.qpid</groupId> <artifactId>qpid-broker-plugins-amqp-1-0-protocol</artifactId> <version>7.0.0</version> </dependency> <!-- If a different store type is required it needs to be included here --> <dependency> <groupId>org.apache.qpid</groupId> <artifactId>qpid-broker-plugins-memory-store</artifactId> <version>7.0.0</version> </dependency> </dependencies>
The support for AMQP versions in Qpid Broker-J is modular. The following gavs are required for the different protocol versions:
For AMQP 0-8, 0-9, and 0-9-1:
org.apache.qpid:qpid-broker-plugins-amqp-0-8-protocol:7.0.0- For AMQP 0-10:
org.apache.qpid:qpid-broker-plugins-amqp-0-10-protocol:7.0.0 - For AMQP 1.0:
org.apache.qpid:qpid-broker-plugins-amqp-1-0-protocol:7.0.0
Providing a minimal config file
The broker contains an initial configuration that we want to replace for the embedded case. The initial configuration below declares only broker itself, a Scram SHA256 authentication provider with single use entry for user name "admin" having password "admin", amqp port where authentication provider is set to Scram SHA256 authentication provider and Memory Virtual host. This initial configuration should be placed into the resource folder of the java module.
{ "name": "Embedded Broker", "modelVersion": "7.0", "authenticationproviders" : [ { "name" : "Scram", "type" : "SCRAM-SHA-256", "users" : [ { "name" : "admin", "type" : "managed", "password" : "RlZheCx8NY5fmWpshDA2P1gULzLPA8GtoCe5uJgr4ms=,,mnHeeEsCki4ql4slKwooAncju7cL+5QLSxhUMzmvMJw=,z95S+Lmz5v5mESAmov1mJZN17310oMuAPyJIWtg4gBo=,4096" } ] } ], "ports" : [ { "name" : "AMQP", "port" : "${qpid.amqp_port}", "protocols": [ "AMQP_0_9", "AMQP_0_9_1", "AMQP_1_0" ], "authenticationProvider" : "Scram", "virtualhostaliases" : [ { "name" : "nameAlias", "type" : "nameAlias" }, { "name" : "defaultAlias", "type" : "defaultAlias" }, { "name" : "hostnameAlias", "type" : "hostnameAlias" } ] }], "virtualhostnodes" : [ { "name" : "default", "type" : "Memory", "defaultVirtualHostNode" : "true", "virtualHostInitialConfiguration" : "{\"type\": \"Memory\", \"nodeAutoCreationPolicies\": [{\"patterns\":\".*\",\"createdOnPublish\":\"true\",\"createdOnConsume\":\"true\",\"nodeType\":\"queue\",\"attributes\":{}}] }" }] }
This shows the Port supporting AMQP versions 0-9, 0-9-1 and 1.0. If support for other protocols is required remember to include the dependency in the pom.xml.
Starting the Broker
The following code shows how to start the broker by creating and opening a SystemConfig
TaskExecutor taskExecutor = new TaskExecutorImpl(); try { taskExecutor.start(); SystemConfig systemConfig = createSystemConfig(taskExecutor, tempDir.getAbsolutePath()); // open the systemConfig to start the broker systemConfig.open(); try { // code that uses the broker } finally { systemConfig.close(); } } finally { taskExecutor.stop(); }
As can be seen the main task is creating the SystemConfig which is shown in the following snippet:
private static final String INITIAL_CONFIGURATION = "test-initial-config.json"; private static final Principal SYSTEM_PRINCIPAL = () -> "system"; private static SystemConfig createSystemConfig(final TaskExecutor taskExecutor, String workDirectory) throws IOException { // create event logger EventLogger eventLogger = new EventLogger(new LoggingMessageLogger()); // get memory system config factory PluggableFactoryLoader<SystemConfigFactory> configFactoryLoader = new PluggableFactoryLoader<>(SystemConfigFactory.class); SystemConfigFactory configFactory = configFactoryLoader.get(MemorySystemConfigImpl.SYSTEM_CONFIG_TYPE); // system config attributes defining initial configuration location Map<String, Object> attributes = new HashMap<>(); URL initialConfig = Test.class.getClassLoader().getResource(INITIAL_CONFIGURATION); attributes.put("initialConfigurationLocation", initialConfig.toExternalForm()); attributes.put("startupLoggedToSystemOut", false); // create system config return configFactory.newInstance(taskExecutor, eventLogger, SYSTEM_PRINCIPAL, attributes); }
Final Remarks
Note
Please note, that provided code samples do not start broker with http management. If http management is required, the http port and http-management plugin need to be added into initial configuration. A maven dependency for qpid-broker-plugins-management-http needs to be added into project as well.