The Advanced Message Queuing Protocol (AMQP) is a protocol used for message transmission and routing.
Stambia supports AMQP 0.9.1 through a Metadata Model to design the different objects and five Process Actions which will be described in this article.
A sample project showing most of the functionalities can be found here, but we suggest to read first the article that explains the basics.
Prerequisites
- Stambia DI Designer S18.1.1 or higher
- Stambia DI Runtime S17.3.0 or higher
- An AMQP broker. For the examples of this article, we are using RabbitMQ, but other brokers implementing AMQP 0.9.1 like Apache Qpid can be used.
- A knowledge about AMQP and its concepts. This article will not go in detail about it and assumes that you already know what is AMQP and how it works. The RabbitMQ guide on AMQP is a good start to learn the basics.
The AMQP Metadata will hold the information about the AMQP server (called a broker), the Queues and the Exchanges.
This metadata information can be used in processes through the AMQP Actions, with the usual Stambia Metadata features (Referencing, Configuration, etc.).
Create a new AMQP Server:
and set the properties with your own settings:
Property |
Description |
Name |
Alias for the broker. |
Addresses |
A comma separated list of AMQP broker addresses, using the following pattern: <host>:<port>
|
User |
Login/Username of the AMQP Broker to use. |
Password |
Encrypted password of the user. |
VirtualHost |
Name of the AMQP Virtual Host used. |
Connection Timeout |
TCP connection timeout. |
Hand Shake Timeout |
AMQP protocol handshake timeout. |
Right click on the server node, choose New > Exchange
and set the properties with your own settings:
Property |
Description |
Name |
Logical name for the Exchange. |
Physical Name |
Physical name of the Exchange. If not set, the Name property is used. |
Type |
Exchange type. Available types: direct, fanout, topic, headers. |
Automatic Delete |
The Exchange is configured as auto-delete. The implementation of this functionality depends on the broker used. |
Durable |
The Exchange is durable (kept upon restart). |
Right click on the server node, choose New > Queue
and set the properties with your own settings:
Property |
Description |
Name |
Logical name for the Queue. |
Physical Name |
Physical name of the Queue. If not set, the Name property is used. |
Durable |
The Queue is durable (kept upon restart). |
Exclusive |
The Queue is exclusive (Used by only one connection and deleted at close). |
Automatic Delete |
The Queue is configured as auto-delete. The implementation of this functionality depends on the broker used. |
The following actions are available in the "Internet & Network" category of the Designer's palette of Actions.
Action |
Description |
AMQP Send Message |
Send a message to a broker. The Message is defined in the body of the Action. |
AMQP Send File |
Send a message retrieved from a file to a broker. |
AMQP Receive Message |
Receive a message. |
AMQP Receive File |
Receive a message and save it in a file. |
AMQP Operation |
Perform operations on the broker. |
Please refer to the Designer's Help Content for the description of all the parameters available on these actions.
To declare Queues, Exchanges or Binds on the broker, the AMQP Operation Action must be used.
Declaring Exchanges and Queues
To declare an Exchange or a Queue, simply drag and drop the related metadata on the AMQP Operation Action. The needed properties will be set automatically:
Note:
- If an Exchange or Queue with the same properties already exist on the broker, the broker will just ignore the operation.
- If the Exchange or Queue already exist but with different properties, an error will be thrown.
Declaring binds
To bind a Queue to an Exchange:
- Drag and drop the Exchange and Queue metadata on the AMQP Operation Action.
- Set the Amqp Operation parameter to
bindqueue
- Optionally set a Routing Key.
- Optionally set Amqp Headers (useful for headers Exchanges) by adding AMQP_HEADER_<headerName> parameters.
Two actions can be used to send a message:
- AMQP Send Message
- AMQP Send File
The only difference between these two Actions is that the message is set in the body of the action in the first one whereas it is retrieved from a file in the second.
To send a message with these actions:
- Drag and drop the Exchange on which the message must be sent.
- Optionally set the Routing Key used for the sending.
- Optionally set the headers used for the sending by adding AMQP_HEADER_<headerName> parameters
In this exemple we are sending a message to a headers Exchange, so we only specified the headers, as the Routing Key is ignored for this kind of Exchange.
Two actions can be used to retrieve messages:
- AMQP Receive Message
- AMQP Receive File
The difference between these two actions is that AMQP Receive File will store the messages in files, whereas AMQP Receive Message must be binded to another action that will receive the messages' information.
Please refer to the Designer's Help Content for the list of variables available through the bind link.
To receive messages from a Queue, simply drag and drop the corresponding metadata on the action:
We are here using the messages received with the AMQP Receive Message in a Scripting Action that will export it to process variables.
If you look on the AMQP Process Actions, you can notice that you can define the session used for each action, with the Amqp Session Name
parameter.
When working with processes using multiple AMQP Actions, you'll have to take care of session management.
There are several particularities for the sessions:
1. Each session represents an AMQP Channel on the broker.
As an example, if you are using three different sessions on your process, this will result as three Channels on the broker.
2. A session can be used by only one Action at a time.
If there are multiple Actions executed in parallel on the same session (E.g. all on T1), only one will be executed and the others will wait for the session to be released.
In these cases, you should consider using a different session for each action executed in parallel.
3. The acknowledge mode of a session is unique and cannot be changed. If you started T1 on SESSION_TRANSACTED, you can't change to AUTO_ACKNOWLEDGE on another action for example.
4. If a session is closed with the closesession operation of the AMQP Operation Action, it cannot be re-opened.
Two statistic variables are available on the Actions:
- MQ_MESSAGE_SENT which represents the number of messages sent for AMQP Send Actions.
- MQ_MESSAGE_RECEIVED which represents the number of message received for AMQP Receive Actions.
As you know, fanout and headers Exchanges are ignoring Routing Keys, so it is not necessary to define one when binding a queue to a fanout or headers Exchange.
But we noticed that for the Apache Qpid broker, if a bind does not have a Routing Key, it is not possible to unbind it with the unbind AMQP Operation Action.
So if you are using Apache Qpid, we suggest to always set a Routing Key when you bind a Queue, even on a fanout or headers Exchange.
It will not change the final behavior of the Exchanges as they will not take it into account when routing messages through the bind. This just permits to have a key when you want to unbind the queue.