Integration of pmacct with ElasticSearch and Kibana

In this post I want to show a solution based on a script (pmacct-to-elasticsearch) that I made to gather data from pmacct and visualize them using Kibana/ElasticSearch. It’s far from being the state of the art of IP accounting solutions, but it may be used as a starting point for further customizations and developments.

I plan to write another post with some ideas to integrate pmacct with the canonical ELK stack (ElasticSearch/Logstash/Kibana). As usual, add my RSS feed to your reader or follow me on Twitter to stay updated!

The big picture

This is the big picture of the proposed solution:

pmacct-to-elasticsearch - The big picture

There are 4 main actors: pmacct daemons (we already saw how to install and configure them) that collect accounting data, pmacct-to-elasticsearch, which reads pmacct’s output, processes it and sends it to ElasticSearch, where data are stored and organized into indices and, at last, Kibana, that is used to chart them on a web frontend.


The starting point of this tutorial is the scenario previously viewed in the Installing pmacct on a fresh Ubuntu setup post.

UPDATE: I made some changes to the original text (that was about Kibana 4 Beta 2) since Kibana 4 has been officially released

In the first part of this post I’ll cover a simple setup of both ElasticSearch 1.4.4 and Kibana 4.

In the second part I’ll show how to integrate pmacct-to-elasticsearch with the other components.

Setup of ElasticSearch and Kibana

This is a quick guide to setup the aforementioned programs in order to have a working scenario for my goals: please strongly consider security and scalability issues before using it for a real production environment. You can find everything you need on the ElasticSearch web site.

Dependencies

Install Java (Java 8 update 20 or later, or Java 7 update 55 or later are recommended at time of writing for ElasticSearch 1.4.4):

# apt-get install openjdk-7-jre

ElasticSearch

Install ElasticSearch from its APT repository

# wget -qO - https://packages.elasticsearch.org/GPG-KEY-elasticsearch | sudo apt-key add -
# add-apt-repository "deb http://packages.elasticsearch.org/elasticsearch/1.4/debian stable main"
# apt-get update && apt-get install elasticsearch

… and (optionally) configure it to automatically start on boot:

# update-rc.d elasticsearch defaults 95 10

Since this post covers only a simple setup, tuning and advanced configuration are out of its scope, but it is advisable to consider the official configuration guide for any production-ready setup.
Just change a network parameter to be sure that ES does not listen on any public socket; edit the /etc/elasticsearch/elasticsearch.yml file and set

network.host: 127.0.0.1

Finally, start it:

# service elasticsearch start

Wait some seconds then, if everything is ok, you can check its status with an HTTP query:

# curl http://localhost:9200/?pretty
{
  "status" : 200,
  "name" : "Wild Thing",
  "cluster_name" : "elasticsearch",
  "version" : {
    "number" : "1.4.4",
    "build_hash" : "c88f77ffc81301dfa9dfd81ca2232f09588bd512",
    "build_timestamp" : "2015-02-19T13:05:36Z",
    "build_snapshot" : false,
    "lucene_version" : "4.10.3"
  },
  "tagline" : "You Know, for Search"
}

Kibana 4

Download and install the right version of Kibana 4, depending on your architecture (here I used the x64):

# cd /opt
# curl -O https://download.elasticsearch.org/kibana/kibana/kibana-4.0.0-linux-x64.tar.gz
# tar -zxvf kibana-4.0.0-linux-x64.tar.gz

By default, Kibana listens on 0.0.0.0:5601 for the web front-end: again, for this simple setup it’s OK, but be sure to protect your server using a firewall and/or a reverse proxy like Nginx.

Run it (here I put it in background and redirect its output to /var/log/kibana4.log):

# /opt/kibana-4.0.0-linux-x64/bin/kibana > /var/log/kibana4.log &

Wait some seconds until it starts, then point your browser at http://YOUR_IP_ADDRESS:5601 to check that everything is fne.

pmacct-to-elasticsearch configuration

Now that all the programs we need are up and running we can focus on pmacct-to-elasticsearch setup.

pmacct-to-elasticsearch is designed to read JSON output from pmacct daemons, to process it and to store it into ElasticSearch. It works with both memory and print plugins and, optionally, it can perform manipulations on data (such as to add fields on the basis of other values).

pmacct-to-elasticsearch Data flow

Install git, download the repository from GitHub and install it:

# apt-get install git
# cd /usr/local/src/
# git clone https://github.com/pierky/pmacct-to-elasticsearch.git
# cd pmacct-to-elasticsearch/
# ./install

Now it’s time to configure pmacct-to-elasticsearch to send some records to ElasticSearch. Configuration details can be found in the CONFIGURATION.md file.

In the last post an instance of pmacctd was configured, with a memory plugin named plugin1 that was performing aggregation on a socket basis (src host:port / dst host:port / protocol):

plugins: memory[plugin1]

imt_path[plugin1]: /var/spool/pmacct/plugin1.pipe
aggregate[plugin1]: etype, proto, src_host, src_port, dst_host, dst_port

In order to have pmacct-to-elasticsearch to process plugin1 output, we need to create the homonymous pmacct-to-elasticsearch configuration file, /etc/p2es/plugin1.conf; default values already point pmacct-to-elasticsearch to the local instance of ElasticSearch (URL = http://localhost:9200), so we just need to set the destination index name and type:

{
    "ES_IndexName": "example-%Y-%m-%d",
    "ES_Type": "socket"
}

Since this is a memory plugin, we also need to schedule a crontab task to consume data from the in-memory-table and pass them to pmacct-to-elasticsearch, so edit the /etc/cron.d/pmacct-to-elasticsearch file and add the line:

*/5 *  * * *     root  pmacct -l -p /var/spool/pmacct/plugin1.pipe -s -O json -e | pmacct-to-elasticsearch plugin1

Everything is now ready to have the first records inserted into ElasticSearch: if you don’t want to wait for the crontab task to run, execute the above command from command line then query ElasticSearch to show the records:

# curl http://localhost:9200/example-`date +%F`/socket/_search?pretty
{
  ...
  "hits" : {
    "total" : 6171,
    "max_score" : 1.0,
    "hits" : [ {
      "_index" : "example-2014-12-15",
      "_type" : "socket",
      "_id" : "AUo910oSOUAYMzMu9bxU",
      "_score" : 1.0,
      "_source": { "packets": 1, "ip_dst": "172.16.1.15",
                   "@timestamp": "2014-12-15T19:32:02Z", "bytes": 256,
                   "port_dst": 56529, "etype": "800", "port_src": 53, 
                   "ip_proto": "udp", "ip_src": "8.8.8.8" }
      }
      ...
    ]
  }
}

(the `date +%F` is used here to obtain the actual date in the format used by the index name, that is YYYY-MM-DD)

Just to try the configuration for a print plugin, edit the /etc/pmacct/pmacctd.conf configuration file, change the plugins line and add the rest:

plugins: memory[plugin1], print[plugin2]

print_output_file[plugin2]: /var/lib/pmacct/plugin2.json
print_output[plugin2]: json
print_trigger_exec[plugin2]: /etc/p2es/triggers/plugin2
print_refresh_time[plugin2]: 60
aggregate[plugin2]: proto, src_port
aggregate_filter[plugin2]: src portrange 0-1023

Then, prepare the p2es configuration file for pmacct-to-elasticsearch execution for this plugin (/etc/p2es/plugin2.conf):

{
    "ES_IndexName": "example-%Y-%m-%d",
    "ES_Type": "source_port",
    "InputFile": "/var/lib/pmacct/plugin2.json"
}

Here, pmacct-to-elasticsearch is instructed to read from /var/lib/pmacct/plugin2.json, the file where pmacctd daemon writes to.

As you can see from the pmacctd plugin2 configuration above, a trigger is needed in order to run pmacct-to-elasticsearch: /etc/p2es/triggers/plugin2. Just add a link to the default_trigger script and it’s done:

# cd /etc/p2es/triggers/
# ln -s default_trigger plugin2

Now you can restart pmacct daemons in order to load the new configuration for plugin2:

# service pmacct restart

or, if you preferred not to install my pmacct System V initscript:

# killall -INT pmacctd -w ; pmacctd -f /etc/pmacct/pmacctd.conf -D

After the daemon has finished writing the output file (/var/lib/pmacct/plugin2.json), it runs the trigger which, in turn, executes pmacct-to-elasticsearch with the right argument (plugin2) and detaches it.

Wait a minute, then query ElasticSearch from command line:

# curl http://localhost:9200/example-`date +%F`/source_port/_search?pretty

From now on it’s just a matter of customizations and visualization in Kibana. The official Kibana 4 Quick Start guide can help you to create visualizations and graphs. Remember, the name of the index used in these examples follows the [example-]YYYY-MM-DD daily pattern.

Housekeeping

Time series indices tend to grow and to fill up disk space and storage, so a rotation policy may be useful to delete data older than a specific date.

The Curator tool and its Delete command can help you in this:

# apt-get install python-pip
# pip install elasticsearch-curator

Once installed, test it using the right arguments…

# curator --dry-run delete indices --prefix example- --timestring %Y-%m-%d --older-than 1 --time-unit days
2014-12-15 19:04:13,026 INFO      Job starting...
2014-12-15 19:04:13,027 INFO      DRY RUN MODE.  No changes will be made.
2014-12-15 19:04:13,031 INFO      DRY RUN: Deleting indices...
2014-12-15 19:04:13,035 INFO      example-2014-12-15 is within the threshold period (1 days).
2014-12-15 19:04:13,035 INFO      DRY RUN: Speficied indices deleted.
2014-12-15 19:04:13,036 INFO      Done in 0:00:00.020131.

… and, eventually, schedule it in the pmacct-to-elasticsearch crontab file (/etc/cron.d/pmacct-to-elasticsearch), setting the desired retention period:

# m h dom mon dow user  command
...
0 1  * * *     root  curator delete indices --prefix example- --timestring \%Y-\%m-\%d --older-than 30 --time-unit days
#EOF

Of course, you can use Curator for many other management and optimization tasks too, but they are out of the scope of this post.

The following two tabs change content below.
Italian, born in 1980, I started working in the IT/telecommunications industry in the late '90s; I'm now a system and network engineer with a deep knowledge of the global Internet and its core architectures, and a strong focus on network automation.

Latest posts by Pier Carlo Chiodi (see all)

43 Comments

  1. Davide says:

    Ciao,
    mai pensato di integrare pmacct con Riemann prima di ES (così avresti la possibilità di applicare logiche applicative e notifiche di vario genere agli eventi rilevati)?

    • pierky says:

      Ciao, non ho mai usato Riemann e dovrei entrare bene nell’ottica delle idee di come far sposare le due logiche; grazie per lo spunto comunque, magari un giorno avrò modo di approfondire e postare qualcosa!

  2. Xentoo says:

    Hello,
    do you see a way to display the info from pmacct on a map using Kibana 4 ? pmacct can add fields with ISO country codes, but unfortunately Kibana 4 requires coordinates.

  3. NetFlow says:

    Nice work, using Kibana with the flow data rivals commercial products.

  4. Matt Elliott says:

    Fantastic post, I have done some testing with this in my lab. Can you add a little more detail about why you’re using both the memory and print plugins? Wouldn’t the print plugin be sufficient on it’s own?

  5. Bassem says:

    Hello Pier, It’s a very good well put tutorial. I followed it and the HOW-TO on project’s page on github, but I’m facing a problem in the integration with pmacct:

    # pmacct-to-elasticsearch -t plugin1

    2016-05-24 12:22:08,316 ERROR Error loading configuration from /etc/p2es/plugin1.conf
    Traceback (most recent call last):
    File “/usr/local/bin/pmacct-to-elasticsearch”, line 515, in Main
    NewConfig = json.load( NewConfig_File )
    File “/usr/lib/python2.7/json/__init__.py”, line 290, in load
    **kw)
    File “/usr/lib/python2.7/json/__init__.py”, line 338, in loads
    return _default_decoder.decode(s)
    File “/usr/lib/python2.7/json/decoder.py”, line 366, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
    File “/usr/lib/python2.7/json/decoder.py”, line 382, in raw_decode
    obj, end = self.scan_once(s, idx)
    ValueError: Expecting property name: line 5 column 2 (char 130)

    • Bassem says:

      P.S. I’m using print plugin.

    • Bassem says:

      Config file:

      {
      “ES_IndexName”: “netflow-%Y-%m-%d”,
      “ES_Type”: “Clients”,
      “InputFile”: “/home/ubuntu/plugin1.json”,
      }

    • Bassem says:

      Thanks Pier, I managed to fix it. it was just a comma 🙂

      • Bassem says:

        Now I have his error:

        2016-05-24 15:21:04,579 ERROR Error while creating index netflow-2016-05-24 from template /etc/p2es/new-index-template.json
        Traceback (most recent call last):
        File “/usr/local/bin/pmacct-to-elasticsearch”, line 432, in CreateIndex
        HTTPResponse = urllib2.urlopen( URL, Template )
        File “/usr/lib/python2.7/urllib2.py”, line 127, in urlopen
        return _opener.open(url, data, timeout)
        File “/usr/lib/python2.7/urllib2.py”, line 410, in open
        response = meth(req, response)
        File “/usr/lib/python2.7/urllib2.py”, line 523, in http_response
        ‘http’, request, response, code, msg, hdrs)
        File “/usr/lib/python2.7/urllib2.py”, line 448, in error
        return self._call_chain(*args)
        File “/usr/lib/python2.7/urllib2.py”, line 382, in _call_chain
        result = func(*args)
        File “/usr/lib/python2.7/urllib2.py”, line 531, in http_error_default
        raise HTTPError(req.get_full_url(), code, msg, hdrs, fp)
        HTTPError: HTTP Error 400: Bad Request

      • Mike says:

        and what was the problem ? thanks

  6. Vishnu B says:

    Hi, Thanks for the Nice tutorial…
    When i am running the cron tab command manually i get the below error.. Any idea about this error?

    # pmacct -l -p /data/flows/plugin1.pipe -s -O json -e | pmacct-to-elasticsearch plugin1

    pmacct: symbol lookup error: /pmacct-1.5.3/bin/pmacct: undefined symbol: json_object_update_missing

    • Hi, thank you.
      It seems to be a pmacct issue, please have a look at the official pmacct website (http://www.pmacct.net/) and the mailing-list on it.

      • Vishnu B says:

        Thanks Pier, I got that fixed after upgrading the Jansson package to 2.6 version.

        Now i am getting a different error while running the pmacct-to-elasticsearch script.

        it gives the below error
        ERROR Error while decoding pmacct’s JSON output: {“as_src”: 0, “iface_in”: 8, “label”: “”, “as_dst”: 0, “packets”: 1, “peer_ip_src”: “10.0.9.251”, “ip_proto”: .160.16.31″, “port_dst”: 15353, “as_src”: 0, “iface_in”: 17, “as_dst”: 0, “peer_ip_src”: “10.0.5.252”, “bytes”: 55, “iface_out”: 50, “ip_src”: “10.160.8.30”, “port_src”: 48688, “ip_proto”: “udp”, “packets”: 1}

        I have the pmacct saves the json outputs via print plugin and triggers the below file:

        PLUGIN_NAME=`basename $0`
        /usr/local/bin/pmacct-to-elasticsearch $PLUGIN_NAME &

        Below is the command it runs

        ps -ef | grep python
        root 6057 1 43 15:02 ? 00:00:19 python /usr/local/bin/pmacct-to-elasticsearch plugin1

        cat /etc/p2es/plugin1.conf
        {
        “ES_IndexName”: “example-%Y-%m-%d”,
        “ES_Type”: “source_port”,
        “InputFile”: “/data/flows/plugin1.json”
        }

  7. Vishnu B says:

    Thanks Pier! Sorry to bother you again.

    pmacct was not writing the json correctly so i modified the plugin to write it as CSV and i used a CSV to json converter( python script). But still i am getting the “2016-05-26 15:18:50,177 ERROR Error while decoding pmacct’s JSON output: {“SRC_PORT”: “60138”, “PEER_SRC_IP”: “10.0.5.252”, “PACKETS”: “9”, “SRC_AS”: “0”, “PROTOCOL”: “tcp”, “IN_IFACE”: “33”, “LABEL”: “”, “SRC_IP”: “103.29.32.4”, “DST_AS”: “4755”, “DST_PORT”:
    ” error. i searched for that line in my json output i see its in correct format.

    {“SRC_PORT”: “53”, “PEER_SRC_IP”: “10.0.9.251”, “PACKETS”: “1”, “SRC_AS”: “0”, “PROTOCOL”: “udp”, “IN_IFACE”: “38”, “LABEL”: “”, “SRC_IP”: “192.158.243.3”, “DST_AS”: “0”, “DST_PORT”: “60138”, “DST_IP”: “217.163.52.9”, “OUT_IFACE”: “27”, “BYTES”: “109”}

    i tried loading this in json.loads it loaded without any issue..

    Am i missing anything?? i am having this installed in a VM. is it because of the performance of the server?

  8. Vishnu B says:

    Sorry these are the correct outputs:
    Error message:

    2016-05-26 15:24:52,159 ERROR Error while decoding pmacct’s JSON output: {“SRC_PORT”: “45803”, “PEER_SRC_IP”: “10.0.9.251”, “PACKETS”: “2”, “SRC_AS”: “0”, “PROTOCOL”: “tcp”, “IN_IFACE”: “8”, “LABEL”: “”, “SRC_IP”: “10.160.3.40”, “DST_AS”:

    Json file :

    {“SRC_PORT”: “45803”, “PEER_SRC_IP”: “10.0.9.251”, “PACKETS”: “2”, “SRC_AS”: “0”, “PROTOCOL”: “tcp”, “IN_IFACE”: “8”, “LABEL”: “”, “SRC_IP”: “10.160.3.40”, “DST_AS”: “0”, “DST_PORT”: “80”, “DST_IP”: “10.160.17.41”, “OUT_IFACE”: “29”, “BYTES”: “194”}

    • What follows the “Error while decoding pmacct’s JSON output” message is the exact string that p2es tries to convert to JSON. This id done by splitting the whole input file into multiple lines: could you check that no new line occurs after the DST_AS? Also, could you send me privately a copy of that file? My contacts are at https://pierky.com/
      Thanks

  9. Vishnu B says:

    Pier, I found the issue.. I set the print_refresh_time:300 in pmacct but the pmacct was purging the data to disk every 60 sec instead of 300 sec. because it had print_cache_entries default value.
    I increase the print_cache_entries 4 times the default one.. its no more giving the error..
    Thanks for your help..

  10. Bassem says:

    Hello pier,

    Everything was working fine, then suddenly it stopped working, and I had the WARNING below. According to the source code it should be an ERROR in Json response, bur I can’t find the root cause.

    2016-05-30 15:13:06,151 WARNING Bulk insert on test-2016-05-30 failed to process one or more documents
    2016-05-30 15:23:21,003 WARNING Bulk insert on test-2016-05-30 failed to process one or more documents
    2016-05-30 15:24:22,228 WARNING Bulk insert on test-2016-05-30 failed to process one or more documents

    • Hello,

      it means that ElasticSearch could not process one or more records sent by p2es; this may be due to some problems on the ElasticSearch side or to issues in the data/schema mapping (for example, a string in place of a numeric field, or a date in a bad format).

      The quick & dirty approach to understand which document failed is to log the entire output of p2es: you can add the following line of code after the one containing the ‘failed to process one or more documents’ error message (line 309 of the source code):

        Log( logging.WARNING, Output )

      Be aware that, depending on your configuration, this may be a huge amount of data; you can temporary set the ES_FlushSize configuration parameter to 1 in order to have p2es to send one line at time to ElasticSearch, to have a better view of which the failing document is and what it contains.

      • Bassem says:

        Pier,

        Here is the error log. I think the problem is in timestamp field ??

        2016-05-31 10:34:33,317 WARNING Bulk insert on flows-2016-05-31 failed to process one or more documents
        2016-05-31 10:34:33,317 WARNING {“index”:{}}
        {“ip_dst”: “54.255.154.226”, “@timestamp”: “2016-05-31T08:34:17Z”, “packets”: 2, “port_dst”: 80, “bytes”: 104, “port_src”: 37959, “ip_src”: “192.168.7.138”}

        • I tried to create a new index using the default template that I distribute with p2es (distrib/new-index-template.json file) and then to add the document you filed above using curl (here the official doc) and I had no problems. I tried it on a test machine running ES 2.1.0.

          curl -XPUT 'http://localhost:9200/test/test_type/1' -d '{"ip_dst": "54.255.154.226", "@timestamp": "2016-05-31T08:34:17Z", "packets": 2, "port_dst": 80, "bytes": 104, "port_src": 37959, "ip_src": "192.168.7.138"}'

          Could you try with the same command? (replace test and test_type with your real names) Do you obtain an error message?

  11. Bassem says:

    Pier,

    I just noticed that in the housekeeping section you need to add command “indices” after “delete”:

    # curator –dry-run delete indices –prefix example- –timestring %Y-%m-%d –older-than 1 –time-unit days

  12. Lorenzo Mainardi says:

    What is the advantage using pmacct instead that Logstash?
    Logstash has native integration with Elasticsearch and it seems very easy to configure and setup.

    • I’ve never analyzed LogStash deeply but AFAIK it only supports NetFlow, while pmacct supports other formats too; pmacct can also perform aggregations, transformations and filtering (also based on BGP feeds and routing info) before data are sent to its backend: maybe some of them can be done by LogStash too, I don’t know it, but I was more confortable with pmacct and it fully suited my needs.

  13. Aimee says:

    Hi
    The error trying to create an index, see earlier, may be if you use a later elasticsearch, it only supports PUT, not POST. urlib2 does not appear to work correctly, only supports POST.

  14. Aimee says:

    Hi
    Down loaded and reinstalled, created index ok.
    Reason for using, Logstash had a unique indexing issue, and its only possibly been fixed, so I decided
    to find an alternative. Looking at the geoip transform, and possibly a port-name transformer.

    • Hello, nice to hear that.

      With regards to geoip transform, does this post work for you?

      For what concerns port names, you can configure a transformation like this one, which is based on the peer IP address and interface ID:

      "Transformations": [
      {
      "Conditions": [ { "peer_ip_src": "", "__op__": "!=" }, { "iface_in": "", "__op__": "!=" } ],
      "Actions": [
      { "Type": "AddField", "Name": "temporary1", "Value": "$peer_ip_src-$iface_in" },
      { "Type": "AddFieldLookup", "Name": "entry_point", "LookupFieldName": "temporary1", "LookupTableFile": "/etc/p2es/entry_point.map" },
      { "Type": "DelField", "Name": "temporary1" }
      ]
      }
      ]

      Then in /etc/p2es/entry_point.map you can put your port names:

      {
      "192.168.0.10-1": "Router10 port 1",
      "192.168.0.20-666": "Router20 port 666",
      "*": "$peer_ip_src-$iface_in"
      }

      • Aimee says:

        Hi
        will have a look. Re the COW database, their account is suspended. I have used the free maxmind db’s in the past, will have a play.

  15. krutik khatri says:

    Hii
    You can also use https://onlinejsontools.org/ for json validator,beautify,minify,xml,yaml,CSV,bson,plain text,base64,tsv.
    Do checkout this site!

  16. garky says:

    Can you make the fields sent to ElasticSearch to be compatible with Elastiflow dashboards?

  17. ghostx says:

    Hii
    What about nfacctd,what can i do to send netflow from memory to elasticsearch please ?

Leave a Reply