This page applies to Java Broker versions before 0.18. For the master copy of this documentation see the Other Queue Types section in the Java documentation

General Information

The Qpid 0.7 release introduces Last Value Queues (LVQs) into the Java Messaging Broker. The LVQ implementation in the Java Broker provides a superset of the behaviour of the LVQ in the C++ Broker. It is hoped that later revisions of the C++ Broker will adopt the same enhanced semantics.

LVQ Semantics

In an LVQ messages in the queue are be "replaced" by newer messages having the same value for some specified header. An example of an LVQ might be where a queue representing prices on a stock exchange: when you first consume from the queue you get the latest quote for each stock, and then as new prices come in you are sent only these updates.

Like other queues, LVQs can either be browsed or consumed from. When browsing an individual subscriber does not remove the message from the queue when receiving it. This allows for many subscriptions to browse the same LVQ (i.e. you do not need to create and bind a separate LVQ for each subscriber who wishes to receive the contents of the LVQ).

Defining LVQs

You must define an LVQ specifically before you start to use it. You cannot subsequently change a queue to/from an LVQ (without deleting it and re-creating). Also note that a queue cannot be both an LVQ and a Priority Queue. If a queue is defined with both LVQ and Priority Queue attributes, it will only take on an LVQ nature.

You define a queue as an LVQ in the virtualhost configuration file, which the broker loads at startup. When defining the queue, add a <lvq>true</lvq> element. Without any further configuration this will define an LVQ which uses the header "qpid.LVQ_key" as the key for replacement. If you wish to define your own key then you can do so using the <lvqKey> element (e.g. <lvqKey>myKey</lvqKey> will use the header "myKey" as the replacement key).

<queue>
    <name>test</name>
    <test>
        <exchange>amq.direct</exchange>
        <lvq>true</lvq>
        <lvqKey>ISIN</lvqKey>
    </test>
</queue>

LVQs can also be declared by setting arguments in the Queue.Declare AMQP command.

    Map<String,Object> arguments = new HashMap<String, Object>();
    arguments.put("qpid.last_value_queue",true);
    arguments.put("qpid.last_value_queue_key","key");
    ((AMQSession) session).createQueue(queueName, autoDelete, durable, exclusive, arguments);

Receiving messages from an LVQ

When receiving messages from an LVQ you may wish to change the default pre-fetch setting for your client. With a large pre-fetch value message will be sent from the queue as soon as they arrive, and then be buffered on the client. This may lead to cases where newer versions of messages are buffered behind older versions within the client. With a lower pre-fetch value the broker has an opportunity to replace older versions of the message with newer versions before sending to the client.

Set low pre-fetch

Qpid clients receive buffered messages in batches, sized according to the pre-fetch value. The current default is 5000.

In order to set pre-fetch set the java system property max_prefetch on the client environment (using -D) before creating your consumer.

Setting the Qpid pre-fetch to 1 for your client means that message conflation will be honoured by the Qpid broker as it dispatches messages to your client. A default for all client connections can be set via a system property:

-Dmax_prefetch=1

The prefetch can be also be adjusted on a per connection basis by adding a 'maxprefetch' value to the connection url

amqp://guest:guest@client1/development?maxprefetch='1'&brokerlist='tcp://localhost:5672'

There is a slight performance cost here if using the receive() method and you could test with a slightly higher pre-fetch (up to 10) if the trade-off between throughput and conflation is weighted towards the former for your application. (If you're using OnMessage() then this is not a concern.)

Implication of low pre-fetch

If you are using the receive() method to consume messages then you should also only use one consumer per session. If you're using onMessage() then this is not a concern.

Browsing an LVQ from Java

One way to use an LVQ is to have a single queue which receives all updates, and many subscribers which only "browse" the queue without actually consuming messages. Unfortunately the standard JMS MessageBrowser is not suitable for this use case as it closes at the point where there are no more updates to be received. Instead we wish to use a browser more like a standard QueueReceiver, but with the caveat that the receiver is not actually consuming messages from the queue.

To do this we can specify in the URL for a destination that any created consumer should be browse only, e.g.:

direct://amq.direct//myqueue?browse='true'

Note that browsing will only work on consumers in NO_ACKNOWLEDGE mode.

Browsing an LVQ from .net

To allow the creation of a "consumer" in browse-only mode from the AMQP 0-8 .net client the following method has been added to the API for IChannel:

        IMessageConsumer CreateConsumer(string queueName,
                                        int prefetchLow,
                                        int prefetchHigh,
                                        bool noLocal,
                                        bool exclusive,
                                        bool browse);

where passing browse as true will get the desired behaviour.

Similarly the following method has been added to MessageConsumerBuilder:

 public MessageConsumerBuilder WithBrowse(bool browse)

Note that browsing will only work on consumers in NO_ACKNOWLEDGE mode.

  • No labels