Message Queue Protocols (MQP)

Protocols

The list of message queue protocols (MQP) that are official international standards is short: AMQP 0.9.1, AMQP 1.0, MQTT 3.1.1, and MQTT 5.0.

With the exception of MQTT5, these protocols are supported by widely available open-source solutions such as RabbitMQ.

The list of supported protocols is yet to be defined. The set of recommended protocols will be chosen based on proven adoption and availability of open-source client libraries. 

We are likely to recommend that one or more of the official MQP standards is used when publishing messages in WIS2 - either from GISC, DCPC, or NC. This may just be a political discussion based on what GISCs are prepared to support.

Applications may support additional protocols. For example, a WIS Centre may choose to use the Amazon Simple Queue Service (SQS) to distribute messages within the AWS infrastructure. Applications operating within AWS could consume SQS natively. However, given that a proprietary protocol such as this is highly unlikely to be included in the official list of protocols supported in WIS2, a façade would be used to republish messages using an approved protocol (e.g. the Adapter pattern) for consumption by other client applications.  

GISCs may establish agreements within their Area of Responsibility to support additional message queue protocols, e.g. when republishing messages from affiliated centres. Whatever the protocol used by the affiliated centre, GISCs shall publish using one or more [?] of the approved protocols. 

Note: OpenAPI has callbacks (using Webhooks). These are being considered for OGC API – Records subscription workflows. See Subscribe to record changes · Issue #111 · opengeospatial/ogcapi-records

For discussion: use of AsyncAPI (a protocol-agnostic abstraction layer for asynchronous APIs)

AsyncAPI is a protocol agnostic industry standard for defining asynchronous API end-points. It defines and documents the components of an Event-Driven Architecture and enables people to publish a description of their asynchronous API, including which specific protocols are supported. As OpenAPI is for RESTful APIs, the service description supports both discovery and automated code generation for client applications. The current version is AsyncAPI specification 2.0.  

AsyncAPI uses the concepts familiar to most Message Queue Protocols (MQP). The key concepts are: 

  • Producer: A producer is a type of application, connected to a server, that is creating messages and addressing them to channels

  • Consumer: A consumer is a type of application, connected to a server via a supported protocol, that is consuming messages from channels.  

  • Message: A message is the mechanism by which information is exchanged via a channel between servers and applications. A message MUST contain a payload and MAY also contain headers. The headers MAY be subdivided into protocol-defined headers and header properties defined by the application which can act as supporting metadata. The payload contains the data, defined by the application, which MUST be serialized into a format (JSON, XML, Avro, binary, etc.). Since a message is a generic mechanism, it can support multiple interaction patterns such as event, command, request, or response. 

  • Channel: A channel is an addressable component, made available by the server, for the organization of messagesProducer applications send messages to channels and consumer applications consume messages from channels. Servers MAY support many channel instances, allowing messages with different content to be addressed to different channels. A channel MAY be bound to multiple protocols

  • Protocol: A protocol is the mechanism (wireline protocol OR API) by which messages are exchanged between the application and the channel. Example protocol include, but are not limited to, AMQP, HTTP, JMS, Kafka, MQTT, STOMP, WebSocket. 

Using AsyncAPI to describe pub/sub end-points will make WIS2 more resilient to technology change. For example, as new versions of protocols are adopted WIS2 can easily include them. 

AsyncAPI provides an interoperable way to describe a pub/sub end-point. But we also need interoperability at the protocol level. Applications producing or consuming messages must support one or more of the protocols listed in the WIS2 Technical Regulations.  

Issues to consider:

  1. It is not clear what benefit an abstraction layer such as AsyncAPI provides. Message queues are already unfamiliar to many Members; the AsyncAPI abstraction layer adds further complexity that seems unwarranted.

    1. protocol agnostic; auto-generation of client libraries from AsyncAPI endpoint description

  2. Should we investigate innovative solutions for event streaming? It's just packet-based streaming – perhaps it's possible to repurpose technology from video streaming? The key challenge here is making sure that client applications are available. In such circumstances, it would be prudent to deliver the client application (or a libraries from which a client application can be built) alongside the stream.

Discussion:

Peter Silva:

  • I don't understand AsyncAPI enough to recommend it.  I have played with it a bit, just enough to really understand that I don't understand. I do not think topics are channels, and that is probably the main blocker. It might work eventually, but I think AsyncAPI will slow things down if introduced right now. As far as I can tell, they are conceptually incompatible and conflicting.

  • I have made some minor trials with asyncAPI, and so far, I have nothing even conceptually functional. Firstly, while multi-protocol, it does not support AMQP in any published way that I could see.  I had to get an MQTT driver working to have something to work with ... so I had to circle around after doing that. I tried again once I had a working MQTT driver, but the conceptual model for AsyncAPI is quite different and I don´t see how to mesh them at all yet, but I'm very much a neophyte with it.

  • As far as I can tell (though I am by no means an expert) the topic hierarchy used in AsyncAPI is used to identify entry points,  rather than a tree to be interpreted by the routines.  

  • The use of the word "channel" above is a hint to a conceptual mismatch. The various initiatives and protocols use the similar words with conflicting meanings. For example, a "topic" in Kafka, roughly matches an "exchange" in AMQP, is entirely absent in MQTT, and matches "channel" in asyncAPI... and does not match the purpose of a topic hierarchy in either MQTT or AMQP (identification of subsets of information made available on a single channel.)  

  • With AsyncAPI, as far as I can tell so far, you would need to identify a channel per dataset, ending up with tens of thousands of channels (one per topic), and you can only get that data from that channel.  So every subscriber has to conceptually subscribe to large sets of channels (end points.) 

  • In contrast, the methods used so far have meant connecting to a single end point, and providing it a list of topic of interest. This is more aligned with how MQP's naturally work. 

  • I might be mistaken about AsyncAPI.  I have not spent enough time with it, but what I have proposed before has always been the result of years of experience and prior operational deployment.  I only propose what I know works, and I don't know that AsyncAPI can keep its promises yet.

  • Someone who knew AsyncAPI well might be able to have a single end point that takes care of a whole topic sub-tree, which would better match what I think is needed, but so far I don't know how to do that (one seems to have to associate and endpoint with every single topic in a hierarchy.)


 Jeremy Tandy:

  • AsyncAPI is intended to be an abstraction layer for documenting Event APIs. And any abstraction layer intended to work with a wide variety of systems will usually have some issues in mapping capabilities of individual systems.  

  • The benefit is it would force you to define your pub/sub requirement in a technology agnostic way,  in the long term this could help to reduce technical debt because you are not building a system definition around the technical capabilities of a particular version of a piece of software.

  • AsyncAPI gives you a clean way of cleanly separating the channels, from the operations (pub/sub), from the actual servers – independent of the implementation. As this specification can be used to generate code, this opens up the opportunity to generate compliance tools, mock testing clients, etc. without needing the infrastructure or test servers  (higher productivity for developers and consumers). Using AsyncAPI also means protocols like web sockets and Kafka (which are commonly used) are on the table, as well as any future protocols that might become available/useful. 

  • Not sure where the confusion about channel vs. topic subscription comes from? An AsyncAPI channel is manifest as a server endpoint ( wss://api.xxx.com/market_data ) but you then subscribe using parameterisation ( wss://api.xxx.com/market_data/AAPL for Apple’s stock price). The AsyncAPI specification describes the endpoint and the subscription – generators create the stubs/clients for you.

 

 


Replay / catchup

WIS2 has two requirements for distribution of messages: 

  1. Real-time delivery or "event streaming", as discussed above.  

  2. Replay or "catch up" - where a consumer application may have been offline for a while and need to retrieve the messages that it missed during that time. 

Aspect #2 is the equivalent of the 24-hour cache for messages.

For discussion:

Message queues protocols have expiry settings. Meaning that the queue [of messages] will remain there through any transitory failure (e.g. using TTL), so that client applications can just pick up where they left off. 

But not all protocols support persisted messages or the ability for a subscriber to catch up.

One option is to to consider these requirements as two separate concerns, solved in different ways: 

  • Real-time delivery (push) via asynchronous API - a well-solved problem.

  • Replay / catch-up (pull) via a Web service 

Splitting out the replay / catch-up service because of "separation of concerns" is good practice - but we need a pragmatic approach in WIS2 that is simple enough for all Members to use.

That said, the replay / catch-up service would be simple to implement as a RESTful Web service, described via OpenAPI. Such an API may offer retrieval of messages according to (I) channel identifier, (II) time range, and (III) geographic bounding box. 

GISCs will be required to offer both an asynchronous API for real-time message distribution AND a replay / catch-up Web service to retrieve past messages. GISCs MUST retain at least 24-hours of messages.  

There are many ways to implement a replay / catch-up service such as this. Messages received by the GISC in real-time could be stored [locally] in databases such as AWS Athena or MongoDB. For high-performance, GISCs may consider persisting messages within their CDN and using edge-compute (e.g. Lambda@Edge) to process requests.  

When recovering from an outage, a subscriber will likely reconnect to the message broker first. This way, the application can determine the duration for which missed messages need to be retrieved. 

We can defer a decision on this topic until after INFCOM2.

Issues to consider:

  1. Need to determine how users will discover these services.

  2. Do we need 24-hour message persistence? If you haven’t consumed the messages and with a TTL of 24h, they will still be available. But, do we need 24h depth? So, it could be solved by recommending having a TTL of 3? 6? hours

Discussion:

Peter Silva:

  • I do not see a requirement for any additional replay service as I think the existing proposal covers it (though perhaps a bit too implicitly.)