Setting Up a Network IO Monitoring Dashboard in Grafana

The page provides a prototype setting to display Xrootd Network IO monitory info on a Grafana dashboard. It assume that a functioning Telegraf/InfluxDB/Grafana system is available.

Setting in Xrootd

Monitoring upload/write and download/read type IO traffic

Data from Xrootd summary monitoring's Link Summary Data will be used to monitor this type of traffic. To do so, add the following line to your Xrootd configuration file:

xrd.report 127.0.0.1:9300 every 60s link

The directive says to send link related info (bytes in and out of Xrootd to WAN) every 60s to host 127.0.0.1 at UDP port 9300

The monitoring info it will send out is in XML format, like this:

<statistics tod="1725485991" ver="v5.6.9" src="sdfdtn005.sdf.slac.stanford.edu:2094" tos="1725485980" pgm="xrootd" ins="atlas" pid="482140" site="SLAC">
    <stats id="link">
        <num>1</num>
        <maxn>4</maxn>
        <tot>14</tot>
        <in>158400</in>
        <out>85782</out>
        <ctime>14</ctime>
        <tmo>0</tmo>
        <stall>0</stall>
        <sfps>0</sfps>
    </stats>
</statistics>

Monitoring TPC traffic

Data to be used to monitor TPC traffic are detail monitoring's TPC g-Stream. To collect these info, add the following line to the Xrootd configuration file:

xrootd.mongstream tpc use flush 60s maxlen 65534 send json insthdr 127.0.0.1:9300

The data format is in JSON. It may include multiple JSON block separated by \n. The following json may occure at the start of the Xrootd service and occassionally at other times:

{
    "code": "=",
    "pseq": 1,
    "stod": 1728672599,
    "sid": 188905941001298,
    "src": {
        "site": "SLAC",
        "host": "sdfdtn005.sdf.slac.stanford.edu",
        "port": 2094,
        "inst": "atlas-slac",
        "pgm": "xrootd",
        "ver": "v5.6.9"
    }
}

The following json will be sent regularly whenever there are one or more completed TPC transfers

{
    "code": "g",
    "pseq": 1,
    "stod": 1728672599,
    "sid": 188905941001298,
    "src": {
        "site": "SLAC",
        "host": "sdfdtn005.sdf.slac.stanford.edu",
        "port": 2094,
        "inst": "atlas-slac"
    },
    "gs": {
        "type": "P",
        "tbeg": 1728672619,
        "tend": 1728672692
    }
}
{
    "TPC": "xroot",
    "Client": "yangw.3206895:83@[2620:114:d000:55a3:42a6:b7ff:fe97:1cd0].sdf.slac.stanford.edu",
    "Xeq": {
        "Beg": "2024-10-11T18:50:20.728522Z",
        "End": "2024-10-11T18:50:57.635623Z",
        "RC": 0,
        "Strm": 1,
        "Type": "pull",
        "IPv": 4
    },
    "Src": "xroot://se0.oscer.ou.edu:9090//ourdisk/hpc/xrd_test/a",
    "Dst": "xroot://sdfdtn005.sdf.slac.stanford.edu:2094//xrootd/dteam/a",
    "Size": 10485760
}
{
    "TPC": "http",
    "Client": "yangw.3206895:83@[2620:114:d000:55a3:42a6:b7ff:fe97:1cd0].sdf.slac.stanford.edu",
    "Xeq": {
        "Beg": "2024-10-11T18:52:21.518724Z",
        "End": "2024-10-11T18:52:58.834753Z",
        "RC": 0,
        "Strm": 1,
        "Type": "pull",
        "IPv": 4
    },
    "Src": "https://se0.oscer.ou.edu:9090//ourdisk/hpc/xrd_test/b",
    "Dst": "https://sdfdtn005.sdf.slac.stanford.edu:2094//xrootd/dteam/b",
    "Size": 104757600 
}

Note Xrootd server will only send these data if it drives the TPC event, e.g. it is the transfer destination of a TPC pull request, or the transfer source of a TPC push request.

Run a collector that saves info for Telegraf

For both types of data collected, we will send the totoal bytes to Telegraf, that is, total bytes of upload/write and download/read, and total bytes of TPC pull and push (in both xrootd and http protocols).

The following python script running on 127.0.0.1 will receive both types of Xrootd monitoring info and and save the data in the InfluxDB line protocol format to a file (/tmp/xrdmon.telegraf.log).

#!/usr/bin/python3

import socket
import datetime, random
import threading
import xml.etree.ElementTree as ET
import json
from concurrent.futures import ThreadPoolExecutor

serverSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
serverSocket.bind(('', 9300))

logfile = "/tmp/xrdmon.telegraf.log"
nlines2keep = 720

loglock = threading.Lock()
tpclock = threading.Lock()

ibytes = {}
obytes = {}

def gstreamTPCMon(message, logfile, loglock, tpclock):
    #print("Calling gstreamTPCMon")
    messages = message.split('\n')

    jdata = []
    for i in range(0, len(messages)):
        try:  # ignore non-json lines (e.g. the last "empty" line)
            x = json.loads(messages[i])
            jdata.append(x)
        except:
            pass

    try:
        # in case it receive something like this, ignore it
        #  ['{"code":"=","pseq":3,"stod":1728535536,"sid":188905941001298,"src":{"site":"SLAC","host":"sdfdtn005.sdf.slac.stanford.edu","port":2094,"inst":"atlas-slac","pgm":"xrootd","ver":"v5.6.9"}}\x00']

        if jdata[0]['code'] != 'g':
            return
    except:
        pass

    hostport = jdata[0]['src']['host'] + ":" + str(jdata[0]['src']['port'])
    instance = jdata[0]['src']['inst']
    sitename = jdata[0]['src']['site']

    if not hostport in ibytes.keys():
        ibytes[hostport] = {}
        ibytes[hostport][instance] = 0
    elif not instance in ibytes[hostport].keys():
        ibytes[hostport][instance] = 0

    if not hostport in obytes.keys():
        obytes[hostport] = {}
        obytes[hostport][instance] = 0
    elif not instance in obytes[hostport].keys():
        obytes[hostport][instance] = 0

    for i in range(1, len(jdata)):
        if 'TPC' in jdata[i].keys():
            if jdata[i]['TPC'] == 'xroot':
                print(json.dumps(jdata[i]))
            with tpclock:
                if jdata[i]['Xeq']['Type'] == 'pull':
                    ibytes[hostport][instance] += jdata[i]['Size']
                else:
                    obytes[hostport][instance] += jdata[i]['Size']

    msg = "xrootd,host=%s,instance=%s,site=%s,type=tpc ibytes=%d,obytes=%d" % \
          (hostport, instance, sitename, ibytes[hostport][instance], obytes[hostport][instance])
    write2influx(msg, logfile, loglock)

def summaryNetMon(msgroot, logfile, loglock):
    #print("Calling summaryNetMon")
    if msgroot.tag == 'statistics':
        hostport = msgroot.attrib['src']
        instance = msgroot.attrib['ins']
        sitename = msgroot.attrib['site']
    for child in msgroot:
        if child.attrib['id'] == 'link':
            for item in child:
                if item.tag == "in":
                    ibytes = int(item.text)
                elif item.tag == "out":
                    obytes = int(item.text)
    msg = "xrootd,host=%s,instance=%s,site=%s,type=summary ibytes=%d,obytes=%d" % \
          (hostport, instance, sitename, ibytes, obytes)
    write2influx(msg, logfile, loglock)

def write2influx(msg, logfile, loglock):
    t = datetime.datetime.now().timestamp()
    with loglock:
        with open(logfile, "a") as out_file:
            # see https://docs.influxdata.com/influxdb/v2/reference/syntax/line-protocol/
            out_file.write("%s %d\n" % (msg, t*1000000000))
        if ( random.random() > 0.95 ):
            with open(logfile, "r") as in_file:
                lines = in_file.readlines()
            with open(logfile, "w") as out_file:
                for i in range(len(lines)-nlines2keep, len(lines)):
                    if i >= 0:
                        out_file.write(lines[i])

executor = ThreadPoolExecutor(max_workers=2)
while True:
    message, address = serverSocket.recvfrom(65536)
    message = message.decode()

    try:
        # Summary monitoring data is in XML format, while TPC
        # g-Stream detail monitoring data is in JSON format
        msgroot = ET.fromstring(message)
        executor.submit(summaryNetMon, msgroot, logfile, loglock)
    except:
        executor.submit(gstreamTPCMon, message, logfile, loglock, tpclock)

Config Telegraf

Add the following file to /etc/telegraf/telegraf.d/30-xrootd.conf

[[inputs.exec]]
    commands = ["cat /tmp/xrdmon.telegraf.log"]
    timeout = "30s"
    data_format = "influx"

Now you are ready to go to your Grafana to create a monitoring dashboard. An example Grafana query of InfluxDB can be like this:

SELECT non_negative_derivative(mean("ibytes"), 1s) FROM "xrootd" WHERE ("instance"::tag = 'atlas' AND "type"::tag = 'summary') AND $timeFilter GROUP BY time($__interval)

Multiply Xrootd instances can send the summary monitoring info to the same receiver process.