Welcome Guest! Log in
Stambia versions 2.x, 3.x, S17, S18, S19 and S20 are reaching End of Support January, 15th, 2024. Please consider upgrading to the supported Semarchy xDI versions. See Global Policy Support and the Semarchy Documentation.

The Stambia User Community is moving to Semarchy! All the applicable resources have already been moved or are currently being moved to their new location. Read more…

Stambia DI for AMQP

    Stambia Data Integration allows to work with Advanced Message Queuing Protocol (AMQP) brokers.

    An AMQP Metadata Model is available to define all the broker's credentials and communication properties, and to design the different AMQP objects.

    It can then be used on the five Process Actions that allows to:

    • Declare AMQP exchanges and queues
    • Send messages to exchanges or queues
    • Receive messages from queues

    Read more: Stambia DI for AMQP

    Getting started with AMQP

      In this article

      The Advanced Message Queuing Protocol (AMQP) is a protocol used for message transmission and routing.

      Stambia supports AMQP 0.9.1 through a Metadata Model to design the different objects and five Process Actions which will be described in this article.

      A sample project showing most of the functionalities can be found here, but we suggest to read first the article that explains the basics.

       

      Prerequisites

      • Stambia DI Designer S18.1.1 or higher
      • Stambia DI Runtime S17.3.0 or higher
      • An AMQP broker. For the examples of this article, we are using RabbitMQ, but other brokers implementing AMQP 0.9.1 like Apache Qpid can be used.
      • A knowledge about AMQP and its concepts. This article will not go in detail about it and assumes that you already know what is AMQP and how it works. The RabbitMQ guide on AMQP is a good start to learn the basics.

       

      Creating the AMQP metadata

      Defining Metadata

      The AMQP Metadata will hold the information about the AMQP server (called a broker), the Queues and the Exchanges.

      This metadata information can be used in processes through the AMQP Actions, with the usual Stambia Metadata features (Referencing, Configuration, etc.).

      Create a new AMQP Server:

      newMetadata

      and set the properties with your own settings:

      metadataProperties

       

       Property  Description
       Name  Alias for the broker.
       Addresses  A comma separated list of AMQP broker addresses, using the following pattern: <host>:<port>
       User  Login/Username of the AMQP Broker to use.
       Password  Encrypted password of the user.
       VirtualHost  Name of the AMQP Virtual Host used.
       Connection Timeout  TCP connection timeout.
       Hand Shake Timeout  AMQP protocol handshake timeout.

       

      Defining Exchanges

      Right click on the server node, choose New > Exchange and set the properties with your own settings:

      declareExchange


       Property  Description
       Name  Logical name for the Exchange.
       Physical Name  Physical name of the Exchange. If not set, the Name property is used.
       Type  Exchange type. Available types: direct, fanout, topic, headers.
       Automatic Delete  The Exchange is configured as auto-delete. The implementation of this functionality depends on the broker used.
       Durable  The Exchange is durable (kept upon restart).

       

      Defining Queues

       Right click on the server node, choose New > Queue and set the properties with your own settings:

       declareQueue

       

       Property  Description
       Name  Logical name for the Queue.
       Physical Name  Physical name of the Queue. If not set, the Name property is used.
       Durable  The Queue is durable (kept upon restart).
       Exclusive  The Queue is exclusive (Used by only one connection and deleted at close).
       Automatic Delete  The Queue is configured as auto-delete. The implementation of this functionality depends on the broker used.

       

      Using AMQP in Processes

      Overview of the actions

      The following actions are available in the "Internet & Network" category of the Designer's palette of Actions.

      actionsOverview

       

       Action  Description
       AMQP Send Message  Send a message to a broker. The Message is defined in the body of the Action.
       AMQP Send File  Send a message retrieved from a file to a broker.
       AMQP Receive Message  Receive a message.
       AMQP Receive File  Receive a message and save it in a file.
       AMQP Operation  Perform operations on the broker.

       Please refer to the Designer's Help Content for the description of all the parameters available on these actions.

       

      Declaring Queues, Exchanges, or Binds

      To declare Queues, Exchanges or Binds on the broker, the AMQP Operation Action must be used.

       

      Declaring Exchanges and Queues

      To declare an Exchange or a Queue, simply drag and drop the related metadata on the AMQP Operation Action. The needed properties will be set automatically:

      declareExchangeAndQueues

      Note:

      • If an Exchange or Queue with the same properties already exist on the broker, the broker will just ignore the operation.
      • If the Exchange or Queue already exist but with different properties, an error will be thrown.

       

      Declaring binds

      To bind a Queue to an Exchange:

      1. Drag and drop the Exchange and Queue metadata on the AMQP Operation Action.
      2. Set the Amqp Operation parameter to bindqueue
      3. Optionally set a Routing Key.
      4. Optionally set Amqp Headers (useful for headers Exchanges) by adding AMQP_HEADER_<headerName> parameters.

      declareBind

       

      Sending messages

      Two actions can be used to send a message:

      • AMQP Send Message
      • AMQP Send File

      The only difference between these two Actions is that the message is set in the body of the action in the first one whereas it is retrieved from a file in the second.

      To send a message with these actions:

      1. Drag and drop the Exchange on which the message must be sent.
      2. Optionally set the Routing Key used for the sending.
      3. Optionally set the headers used for the sending by adding AMQP_HEADER_<headerName> parameters

      sendMessage

      In this exemple we are sending a message to a headers Exchange, so we only specified the headers, as the Routing Key is ignored for this kind of Exchange.

       

      Receiving messages

       Two actions can be used to retrieve messages:

      • AMQP Receive Message
      • AMQP Receive File

      The difference between these two actions is that AMQP Receive File will store the messages in files, whereas AMQP Receive Message must be binded to another action that will receive the messages' information.

      Please refer to the Designer's Help Content for the list of variables available through the bind link.

      To receive messages from a Queue, simply drag and drop the corresponding metadata on the action:

      receiveMessage

      We are here using the messages received with the AMQP Receive Message in a Scripting Action that will export it to process variables.

       

      About sessions and parallelism

      If you look on the AMQP Process Actions, you can notice that you can define the session used for each action, with the Amqp Session Name parameter.

      sessionName

      When working with processes using multiple AMQP Actions, you'll have to take care of session management.

      There are several particularities for the sessions:

      1. Each session represents an AMQP Channel on the broker.

      As an example, if you are using three different sessions on your process, this will result as three Channels on the broker.

      2. A session can be used by only one Action at a time.

      If there are multiple Actions executed in parallel on the same session (E.g. all on T1), only one will be executed and the others will wait for the session to be released.

      In these cases, you should consider using a different session for each action executed in parallel.

      3. The acknowledge mode of a session is unique and cannot be changed. If you started T1 on SESSION_TRANSACTED, you can't change to AUTO_ACKNOWLEDGE on another action for example.

      4. If a session is closed with the closesession operation of the AMQP Operation Action, it cannot be re-opened.

       

      Miscellaneous

      Statistics

      Two statistic variables are available on the Actions:

      • MQ_MESSAGE_SENT which represents the number of messages sent for AMQP Send Actions.
      • MQ_MESSAGE_RECEIVED which represents the number of message received for AMQP Receive Actions.

       

      Apache Qpid and Routing Keys

      As you know, fanout and headers Exchanges are ignoring Routing Keys, so it is not necessary to define one when binding a queue to a fanout or headers Exchange.

      bindQueues

      But we noticed that for the Apache Qpid broker, if a bind does not have a Routing Key, it is not possible to unbind it with the unbind AMQP Operation Action.

      So if you are using Apache Qpid, we suggest to always set a Routing Key when you bind a Queue, even on a fanout or headers Exchange.

      It will not change the final behavior of the Exchanges as they will not take it into account when routing messages through the bind. This just permits to have a key when you want to unbind the queue.

       

      Articles

      Suggest a new Article!