Skip to content

Using AMQ

Before starting

The usage of AMQ to ingest your data is recommended for producers outside CERN.

The first step is to open a SNOW Request providing these details:

  • which service data you wish to send and if possible an example
  • the expected daily data volume and data rate
  • how you plan to access your data: hdfs files, kafka stream, opensearch/grafana dashboard, etc.

Important

Please respect the agreed data volume/rate. We have limited quota in all backends used by MONIT. Usage is monitored but in case a significant change is required please contact us in advance.

Your data must be represented as a valid JSON object with the following fields (as strings):

  • (mandatory) producer: used to name your data set, only one value allowed
  • (mandatory) type: used to classify your data set, you can define multiple values
  • (optional) type_prefix: used to categorise your metrics, possible values are raw|agg|enr
  • (optional) timestamp: used to indicate the event submission time
  • (optional) host: used to add extra information about the node submitting your data

To understand how to access your data, please refer to the Data Access section.

Sending data

Once you have received from us the destination details, user, password, topic, host and port, you will need to start sending the messages to the Active MQ topic. The expected input data for this example should look like this, where your actual metric is nested inside the body and the rest are the expected monitoring fields:

{
  ...
  "body": {
    "metadata": {
      "producer": <producer>,
      "type": <type>,
      "timestamp": <timestamp>
    },
    # Your metrics here
  }
}

Please pay attention to the following:

  • all timestamps must be in UTC milliseconds or seconds, without any subdecimal part
  • use double quotes and not single quote (not valid in JSON)

Here is an example implementation using the stomp.py library, you will need at least the listener class to create the connection.

class StompyListener(object):
    """
    Auxiliar listener class to fetch all possible states in the Stomp
    connection.
    """
    def __init__(self):
        self.logr = logging.getLogger(__name__)

    def on_connecting(self, host_and_port):
        self.logr.info('on_connecting %s', str(host_and_port))

    def on_error(self, headers, message):
        self.logr.info('received an error %s %s', str(headers), str(message))

    def on_message(self, headers, body):
        self.logr.info('on_message %s %s', str(headers), str(body))

    def on_heartbeat(self):
        self.logr.info('on_heartbeat')

    def on_send(self, frame):
        self.logr.info('on_send HEADERS: %s, BODY: %s ...', str(frame.headers), str(frame.body)[:160])

    def on_connected(self, headers, body):
        self.logr.info('on_connected %s %s', str(headers), str(body))

    def on_disconnected(self):
        self.logr.info('on_disconnected')

    def on_heartbeat_timeout(self):
        self.logr.info('on_heartbeat_timeout')

    def on_before_message(self, headers, body):
        self.logr.info('on_before_message %s %s', str(headers), str(body))

        return (headers, body)

class Stompy(object):
    """
    Class to generate send messages to a given Stomp broker
    on a given topic.
    :param username: The username to connect to the broker.
    :param password: The password to connect to the broker.
    :param host_and_ports: The hosts and ports list of the brokers.
        Default: [('agileinf-mb.cern.ch', 61213)]
    """
    def __init__(self, username, password,
                 host_and_ports,
                 topic):
        self._host_and_ports = host_and_ports if host_and_ports else [('agileinf-mb.cern.ch', 61213)]
        self._username = username
        self._password = password
        self._topic = topic

    def produce(self, messages):
        """
        Dequeue all the messages on the list and sent them to the
        Stomp broker.
        """
        conn = stomp.Connection(host_and_ports=self._host_and_ports,
                                user=self._username,
                                passcode=self._password,
                                )
        conn.set_listener('StompyListener', StompyListener())
        conn.start()
        conn.connect(wait=True)
        # Send all the messages together
        while len(messages) > 0:
            try:
                message = messages.pop(0)
                body = json.dumps(message.pop('body'))
                conn.send(body,
                          headers=message,
                          destination=self.topic,
                          ack='auto')
            except Exception as msg:
                logging.error('ERROR message: %s not send, error: %s' %
                              (str(message), str(msg)))
        if conn.is_connected():
            conn.stop()