Python script for data export via elasticsearch

1. Introduction

The purpose of this script is to extract data from the ibat database hosted by the elasticsearch server. The data are exported in csv format.

2. Setting up Python dependencies

Creating a virtual Python environment and installing dependencies

	mkdir ~/python-env
	cd ~/python-env
	virtualenv elasticsearch
	source ~/python-env/elasticsearch/bin/activate
	pip install elasticsearch

3. Configuration

Define environment variables SERVER, LOGIN and PASSWORD .

4. Run the script

Load the Python virtual environment if you haven’t already done so

	source ~/python-env/elasticsearch/bin/activate

Run the script

usage: export.py [-h] [-sd START_DATE] [-ed END_DATE] [-in INTERVAL]
                      [-f FIELD [{"temperature","humidity","light","pir","sound"}]]
                      [-c CHOICES] [-z ZONE]

optional arguments:
  -h, --help            show this help message and exit

  -sd, --start_date
                        the start date for the data export in the format "yyyy-mm-ddTHH:mn:ssZ" (default: 2019-03-01T00:00:00Z)

  -ed, --end_date
                        the end date for the data export in the format "yyyy-mm-ddTHH:mn:ssZ" (default: 2019-01-31T23:59:59Z)

  -in, --interval
                        set the sampling period in minutes (default: 60)

  -f, --field [{"temperature", "humidity", "light", "pir", "sound"}]
                        set the phisical field(s) (default: ["temperature","humidity", "light", "pir", "sound"])

  -c, --choices
                        set geometry choices in the format:"min-max": nodes interval(pieces), "value,value,...,value": list of nodes, "min-max,value,...": nodes intervall, with list of nodes

  -z, --zone
                        set the zones to request in the format:"min-max": zones interval, "value,value,...,value": list of zones, "min-max,value,...": zones intervall, with list of zones

5. Description of the script

5.1. Serveur connexion

First of all, we connect to the elasticsearch server via port 8443 in using the connection identifiers provided by the environment variables SERVER, LOGIN and PASSWORD .

try:
    SERVER = os.environ["SERVER"]
except KeyError:
    print("Please set the environment variable SERVER")
    sys.exit(1)

try:
    LOGIN = os.environ["LOGIN"]
except KeyError:
    print("Please set the environment variable LOGIN")
    sys.exit(1)

try:
    PASSWORD = os.environ["PASSWORD"]
except KeyError:
    print("Please set the environment variable PASSWORD")
    sys.exit(1)
try:
    es = Elasticsearch(
        [SERVER],
        http_auth=(LOGIN, PASSWORD),
        port=8443,
        use_ssl=True,
        ca_certs=False,
        verify_certs=False,
        connection_class=RequestsHttpConnection,
        timeout=100
    )
    print("Connected", es.info())
except Exception as ex:
    print("Error:", ex)

At the moment, the connection is insecure (it is in http). When a secure connection will be set up, we’ll need a python module additional certif to verify the certificate. The connection will then need to be made with the following options in addition to the login credentials:

CONNECTION OPTIONS
  • use_ssl = True,

  • verify_certs = True,

  • Ca_certs = certifi.where()

5.2. Data retrieval query

The following function

def build_query(node, start, end, interval, field):
    query_ibat = {
     "size": 0,
     "query": {
        "bool": {
            "filter": [
             {
                "range": {
                    "@timestamp": {
                        "gte": start,
                        "lte": end
                    }
                }
             },
             {
                "query_string": {
                    "analyze_wildcard": True,
                    "query": "node: \""+node+"\""
                }
             }]
        }
     },
     "aggs": {
        "2": {
            "date_histogram": {
                "interval": str(interval)+"m",
                "field": "@timestamp",
                "min_doc_count": 0,
                "format": "yyyy-MM-dd'T'HH:mm:ss"
            },
            "aggs": {
                 "1": {
                    "avg": {"field": field}
                 }
            }
        }
     }

    }
    return query_ibat

allows to retrieve data by means of a query written in query.
It takes 5 arguments:

  • node: designates the node of sensors of interest between zigduino-1 to zigduino-69 , zigduino-81 and zigduino-82.

names_nodes = []
if args.choices:
    names_nodes = string_elm_to_list(args.choices, 82, "zigduino-")
zone_to_request = []
# If args.zones, we map the nodes to the appropriate zone
# The results are stored in a dictionnary called dic_zones
# as {"zone_i": [(node_1, area_1), (node_2, area_2) etc...]}
if args.zone:
    zone_to_request = string_elm_to_list(args.zone, 14, "zone")
    dic_zones = nodes_to_zones("multizone_modele_attributes.json",
                               zone_to_request)
  • start: the start date in date format "yyyy-MM-ddTHH:mm:ssZ".

  • end: the end date in the same format as the argument start.

  • interval: the number of minutes over which we want to average the data.

  • field: the physical field we’re interested in. There are 5 fields in the base:

    FIELD LIST
    • "temperature"

    • "humidity"

    • "light"

    • "pir"

    • "sound"

All fields are strings except interval which is an integer.

Examples of parameter configuration
all_field = ["temperature","humidity","light","pir","sound"]
interval = 60
date_deb = "2019-01-01T00:00:00Z"
date_fin = "2019-01-31T23:59:59Z"

To extract the data, we loop through all the nodes by calling the function submit_query:

def submit_query(node, args, field, es):
    temp_val = []
    body = build_query(node,
                       args.start_date,
                       args.end_date,
                       args.interval,
                       field)
    result = es.search(index="ibat-*", body=body, size=0)
    for hit in result['aggregations']['2']['buckets']:
        temp_val.append(hit['1']['value'])
    return result, temp_val

For each node the averages are retrieved during interval minutes of the field from start to end.

data = []
for field in set(args.field):
    if args.choices:
        for i, node in enumerate(names_nodes):
            tmp_result, temp_val = submit_query(node, args, field, es)
            if not(temp_val):
                continue
            result = tmp_result
            data.append(temp_val)
            col_names_csv.append(node + ":" + field)
    elif args.zone:
        temp_val_field = []
        for key, value in dic_zones.items():
            for node, area in value:
                tmp_result, temp_val = submit_query(node, args, field, es)
                if not(temp_val):
                    continue
                result = tmp_result
                data.append(temp_val)
                temp_val_field.append(temp_val)
                col_names_csv.append(key + node + ":" + field)

time_query = time.time() - start_time

5.3. Data export

The data extracted from the database is exported in csv format. The format of the file is the following : the first two columns correspond to the time under two formats yyyyy-MMM-ddTHH:mm:ssZ and epoch in order; the number of remaining columns is variable depending on the configuration options, and each column designates the value of a chosen field on a given node (or nodes of a given zone). The columns are named with the formalism: node:field or zone_node:field

val_date = []
val_epoch = []
try:
    if result['aggregations']['2']['buckets']:
        for hit in result['aggregations']['2']['buckets']:
            val_date.append(hit['key_as_string'])
            val_epoch.append(hit['key'])
except Exception:
    print('No nodes to request')
    exit(1)

var_temp = [val_date, val_epoch]
var_temp.extend(data)
with open("export.csv", "w") as f_write:
    writer = csv.writer(f_write)
    writer.writerow(col_names_csv)
    writer.writerows(zip(*var_temp))

5.4. EXAMPLES

  • Database query by nodes command line example: one month every 60 minutes in all the rooms

  python export_bis.py -f 'temperature' 'humidity' 'light' 'pir' 'sound' -sd 2019-01-01T00:00:00Z -ed 2019-01-31T23:59:59Z -in 60 -c 1-69,81,82

With the parameters given in the configuration example, we obtained the file nodes_export.csv.

In order to study the performances, we ran the script with different configuration exporting all physical fields.

The results are in the following table:

Table 1. Calculation of performance over the period from 01-01-2019 to 31-01-2019 to all the nodes
Interval (mn) Execution time File size (MB)

1

28m3,47s

182.2

5

10m12,16s

47.6

30

6m32,816s

8,4

60

6m18,4s

4.3

120

5m50,73s

2.2

It is noticeable that from an interval of 30 minutes, the duration hardly varies any more.

  • Database query by nodes command line example: one month every 60 minutes in all the zones

  python export_bis.py -f 'temperature' 'humidity' 'light' 'pir' 'sound' -sd 2019-01-01T00:00:00Z -ed 2019-01-31T23:59:59Z -in 60 -z 1-13

We then obtained the file zones_export.csv.

The same performance study was carried out for the case of zones, and the results are shown in the table below :

Table 2. Calculation of performance over the period from 01-01-2019 to 31-01-2019 to all the zones
Interval (mn) Execution time File size (MB)

1

10m18,87s

63.5

5

3m2,97s

16.6

30

1m56,71s

2.9

60

1m19,23s

1.5

120

1m14,19s

0.7627