Problem statement

The Qpid JMS clients for AMQP 1.0 and AMQP 0-x support XOAUTH2 SASL mechanism to authenticate messaging connection against OAUTH2 based services. The XOAUTH2 SASL mechanism is described on Google Developers page OAuth 2.0 mechanism and involves sending a bearer token as part of  initial client response as illustrated below:

base64("user=" {User} "^Aauth=Bearer " {Access Token} "^A^A")

The broker is supposed to validate the token and accept connections from the clients providing a valid token. Depending from OAUTH2 authentication provider the broker can perform some extra client validation by invoking OAUTH2 authentication provider specific REST API. 

The XOAUTH2 SASL mechanism works perfectly for an initial connection establishment. However, when token expires and connectivity is lost, the failover functionality would attempt to reconnect with an expired token.  The token needs to be updated in order make sub-subsequent  reconnect successful.

This wiki describes an approach which can be used for refreshing or regenerating the token.

Using Qpid JMS connection extension to supply OAUTH2 access token

The Qpid JMS clients for AMQP 1.0 allows to register an implementation of BiFunction<Connection, URI, Object>  as a password connection extension on JmsConnectionFactory . If extension is set, on opening connection the registered function is invoked to provide a password. Such function can be used to check the access token expiration time and re-generate or refresh the token  by invoking OAUTH2 provider API. The following sample code illustrates this approach for token regeneration

Token re-generation
final String brokerURI = "...";                                                                     // 1 broker connection URI
final TokenGenerator tokenGenerator = new TokenGenerator(...);                                      // 2 helper object to generate access token for a specific OAUTH2 implementation 

final BiFunction<Connection, URI, Object> tokenExtension = new BiFunction<Connection, URI, Object>()// 3 password extension for token renewal
{
    private volatile String token;
    private long currentTokenExpirationTime;

    @Override
    public Object apply(final Connection connection, final URI uri)
    {
        long currentTime = System.currentTimeMillis();
        if (currentTime > currentTokenExpirationTime)                                               // 4 check token expiration
        {
            this.token = tokenGenerator.generateAccessToken();                                      // 5 get new token
            this.currentTokenExpirationTime = tokenGenerator.getTokenExpirationTime(token);         // 6 preserve token expiration time
        }
        return this.token;
    }
};
final JmsConnectionFactory factory = new JmsConnectionFactory(brokerURI);                           // 7 create connection factory
factory.setExtension(JmsConnectionExtensions.PASSWORD_OVERRIDE.toString(), tokenExtension);         // 8 register password extension for token regeneration

final Connection connection = factory.createConnection();                                           // 9 open connection

In the snippet above an implementation of  BiFunction<Connection, URI, Object> is created at (3) for access token provisioning. The function implementation checks the token expiration at (4) and regenerate the token at (5) using a helper object (2) implementing calls to OAUTH2 specific API. The token expiration time is preserved at (6) for the following reconnects attempts. An instance of JmsConnectionFactory is created  at (7) for a given brokerURI (1). A password extension is registered at (8). JMS connection is open at (9). The example uses a hypothetical class TokenGenerator invoking underlying  OAUTH2 API to generate/renew access token and get token expiration time.

Access token regeneration with legacy JMS Client for AMPQ 0-x.

The legacy JMS client does not provide an extension similar to new JMS Client for AMQP 1.0. The client application needs to track the token expiration timeout, close the connection after token expiration and re-opening a new connection with a new token.



  • No labels