Welcome Guest! Log in

Getting started with AMQP

    In this article

    The Advanced Message Queuing Protocol (AMQP) is a protocol used for message transmission and routing.

    Stambia supports AMQP 0.9.1 through a Metadata Model to design the different objects and five Process Actions which will be described in this article.

    A sample project showing most of the functionalities can be found here, but we suggest to read first the article that explains the basics.

     

    Prerequisites

    • Stambia DI Designer S18.1.1 or higher
    • Stambia DI Runtime S17.3.0 or higher
    • An AMQP broker. For the examples of this article, we are using RabbitMQ, but other brokers implementing AMQP 0.9.1 like Apache Qpid can be used.
    • A knowledge about AMQP and its concepts. This article will not go in detail about it and assumes that you already know what is AMQP and how it works. The RabbitMQ guide on AMQP is a good start to learn the basics.

     

    Creating the AMQP metadata

    Defining Metadata

    The AMQP Metadata will hold the information about the AMQP server (called a broker), the Queues and the Exchanges.

    This metadata information can be used in processes through the AMQP Actions, with the usual Stambia Metadata features (Referencing, Configuration, etc.).

    Create a new AMQP Server:

    newMetadata

    and set the properties with your own settings:

    metadataProperties

     

     Property  Description
     Name  Alias for the broker.
     Addresses  A comma separated list of AMQP broker addresses, using the following pattern: <host>:<port>
     User  Login/Username of the AMQP Broker to use.
     Password  Encrypted password of the user.
     VirtualHost  Name of the AMQP Virtual Host used.
     Connection Timeout  TCP connection timeout.
     Hand Shake Timeout  AMQP protocol handshake timeout.

     

    Defining Exchanges

    Right click on the server node, choose New > Exchange and set the properties with your own settings:

    declareExchange


     Property  Description
     Name  Logical name for the Exchange.
     Physical Name  Physical name of the Exchange. If not set, the Name property is used.
     Type  Exchange type. Available types: direct, fanout, topic, headers.
     Automatic Delete  The Exchange is configured as auto-delete. The implementation of this functionality depends on the broker used.
     Durable  The Exchange is durable (kept upon restart).

     

    Defining Queues

     Right click on the server node, choose New > Queue and set the properties with your own settings:

     declareQueue

     

     Property  Description
     Name  Logical name for the Queue.
     Physical Name  Physical name of the Queue. If not set, the Name property is used.
     Durable  The Queue is durable (kept upon restart).
     Exclusive  The Queue is exclusive (Used by only one connection and deleted at close).
     Automatic Delete  The Queue is configured as auto-delete. The implementation of this functionality depends on the broker used.

     

    Using AMQP in Processes

    Overview of the actions

    The following actions are available in the "Internet & Network" category of the Designer's palette of Actions.

    actionsOverview

     

     Action  Description
     AMQP Send Message  Send a message to a broker. The Message is defined in the body of the Action.
     AMQP Send File  Send a message retrieved from a file to a broker.
     AMQP Receive Message  Receive a message.
     AMQP Receive File  Receive a message and save it in a file.
     AMQP Operation  Perform operations on the broker.

     Please refer to the Designer's Help Content for the description of all the parameters available on these actions.

     

    Declaring Queues, Exchanges, or Binds

    To declare Queues, Exchanges or Binds on the broker, the AMQP Operation Action must be used.

     

    Declaring Exchanges and Queues

    To declare an Exchange or a Queue, simply drag and drop the related metadata on the AMQP Operation Action. The needed properties will be set automatically:

    declareExchangeAndQueues

    Note:

    • If an Exchange or Queue with the same properties already exist on the broker, the broker will just ignore the operation.
    • If the Exchange or Queue already exist but with different properties, an error will be thrown.

     

    Declaring binds

    To bind a Queue to an Exchange:

    1. Drag and drop the Exchange and Queue metadata on the AMQP Operation Action.
    2. Set the Amqp Operation parameter to bindqueue
    3. Optionally set a Routing Key.
    4. Optionally set Amqp Headers (useful for headers Exchanges) by adding AMQP_HEADER_<headerName> parameters.

    declareBind

     

    Sending messages

    Two actions can be used to send a message:

    • AMQP Send Message
    • AMQP Send File

    The only difference between these two Actions is that the message is set in the body of the action in the first one whereas it is retrieved from a file in the second.

    To send a message with these actions:

    1. Drag and drop the Exchange on which the message must be sent.
    2. Optionally set the Routing Key used for the sending.
    3. Optionally set the headers used for the sending by adding AMQP_HEADER_<headerName> parameters

    sendMessage

    In this exemple we are sending a message to a headers Exchange, so we only specified the headers, as the Routing Key is ignored for this kind of Exchange.

     

    Receiving messages

     Two actions can be used to retrieve messages:

    • AMQP Receive Message
    • AMQP Receive File

    The difference between these two actions is that AMQP Receive File will store the messages in files, whereas AMQP Receive Message must be binded to another action that will receive the messages' information.

    Please refer to the Designer's Help Content for the list of variables available through the bind link.

    To receive messages from a Queue, simply drag and drop the corresponding metadata on the action:

    receiveMessage

    We are here using the messages received with the AMQP Receive Message in a Scripting Action that will export it to process variables.

     

    About sessions and parallelism

    If you look on the AMQP Process Actions, you can notice that you can define the session used for each action, with the Amqp Session Name parameter.

    sessionName

    When working with processes using multiple AMQP Actions, you'll have to take care of session management.

    There are several particularities for the sessions:

    1. Each session represents an AMQP Channel on the broker.

    As an example, if you are using three different sessions on your process, this will result as three Channels on the broker.

    2. A session can be used by only one Action at a time.

    If there are multiple Actions executed in parallel on the same session (E.g. all on T1), only one will be executed and the others will wait for the session to be released.

    In these cases, you should consider using a different session for each action executed in parallel.

    3. The acknowledge mode of a session is unique and cannot be changed. If you started T1 on SESSION_TRANSACTED, you can't change to AUTO_ACKNOWLEDGE on another action for example.

    4. If a session is closed with the closesession operation of the AMQP Operation Action, it cannot be re-opened.

     

    Miscellaneous

    Statistics

    Two statistic variables are available on the Actions:

    • MQ_MESSAGE_SENT which represents the number of messages sent for AMQP Send Actions.
    • MQ_MESSAGE_RECEIVED which represents the number of message received for AMQP Receive Actions.

     

    Apache Qpid and Routing Keys

    As you know, fanout and headers Exchanges are ignoring Routing Keys, so it is not necessary to define one when binding a queue to a fanout or headers Exchange.

    bindQueues

    But we noticed that for the Apache Qpid broker, if a bind does not have a Routing Key, it is not possible to unbind it with the unbind AMQP Operation Action.

    So if you are using Apache Qpid, we suggest to always set a Routing Key when you bind a Queue, even on a fanout or headers Exchange.

    It will not change the final behavior of the Exchanges as they will not take it into account when routing messages through the bind. This just permits to have a key when you want to unbind the queue.

     

    Articles

    Suggest a new Article!