Worked LVQ example

When running the following example, that happen when a message with the same key is published before the previous one if old message gets replaced. This example sends for messages with data & values set to key1, key2, key3, key1.

If the messages are enqueued before the listener consumes then you get the following output:

Sending Data:key1
Sending Data:key2
Sending Data:key3
Sending Data:key1
Sending Data:last
Receiving Data:key1
Receiving Data:key2
Receiving Data:key3
Receiving Data:last

Source for example


#include <qpid/client/Connection.h>
#include <qpid/client/SubscriptionManager.h>
#include <qpid/client/Session.h>
#include <qpid/client/Message.h>
#include <qpid/client/MessageListener.h>
#include <qpid/client/QueueOptions.h>

#include <iostream>

using namespace qpid::client;
using namespace qpid::framing;
using namespace qpid::sys;
using namespace std;

struct  Args : public qpid::Options,
               public qpid::client::ConnectionSettings
{
    bool help;

    Args() : qpid::Options("Simple latency test optins"), help(false)
    {
        using namespace qpid;
        addOptions()
            ("help", optValue(help), "Print this usage statement")
            ("broker,b", optValue(host, "HOST"), "Broker host to connect to") 
            ("port,p", optValue(port, "PORT"), "Broker port to connect to")
            ("username", optValue(username, "USER"), "user name for broker log in.")
            ("password", optValue(password, "PASSWORD"), "password for broker log in.")
            ("mechanism", optValue(mechanism, "MECH"), "SASL mechanism to use when authenticating.")
            ("tcp-nodelay", optValue(tcpNoDelay), "Turn on tcp-nodelay");
    }
};

class Listener : public MessageListener
{
  private:
    Session session;
    SubscriptionManager subscriptions;
    std::string queue;
    Message request;
    QueueOptions args;
  public:
    Listener(Session& session);
    void setup();
    void send(std::string kv);
    void received(Message& message);
    void start(); 
};

Listener::Listener(Session& s) :
    session(s), subscriptions(s),
    queue(session.getId().getName())
{}

void Listener::setup()
{
    // set queue mode
	args.setOrdering(LVQ);

    session.queueDeclare(arg::queue=queue, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args);
    request.getDeliveryProperties().setRoutingKey(queue);

}

void Listener::start()
{
    subscriptions.subscribe(*this, queue, SubscriptionSettings(FlowControl::unlimited(), ACCEPT_MODE_NONE));    
    subscriptions.run();
}

void Listener::send(std::string kv)
{

    std::string key;
	args.getLVQKey(key);
    request.getHeaders().setString(key, kv);

    request.setData( kv);
    
    cout << "Sending Data:" << kv << std::endl;
    async(session).messageTransfer(arg::content=request);
    
}

void Listener::received(Message& response) 
{

    cout << "Receiving Data:" << response.getData() << std::endl;
    if (response.getData() == "last"){
        subscriptions.cancel(queue);
    }
}

int main(int argc, char** argv) 
{
    Args opts;
    opts.parse(argc, argv);

    if (opts.help) {
        std::cout << opts << std::endl;
        return 0;
    }

    Connection connection;
    try {
        connection.open(opts);
        Session session = connection.newSession();
        Listener listener(session);
        listener.setup();
        listener.send("key1");
        listener.send("key2");
        listener.send("key3");
        listener.send("key1");
        listener.send("last");
        listener.start();
        

        connection.close();
        return 0;
    } catch(const std::exception& error) {
        std::cout << error.what() << std::endl;
    }
    return 1;
}

  • No labels