Python script for data export via elasticsearch
1. Introduction
The purpose of this script is to extract data from the ibat
database hosted by the elasticsearch
server. The data are exported in csv format.
2. Setting up Python dependencies
Creating a virtual Python environment and installing dependencies
mkdir ~/python-env
cd ~/python-env
virtualenv elasticsearch
source ~/python-env/elasticsearch/bin/activate
pip install elasticsearch
4. Run the script
Load the Python virtual environment if you haven’t already done so
source ~/python-env/elasticsearch/bin/activate
Run the script
usage: export.py [-h] [-sd START_DATE] [-ed END_DATE] [-in INTERVAL]
[-f FIELD [{"temperature","humidity","light","pir","sound"}]]
[-c CHOICES] [-z ZONE]
optional arguments:
-h, --help show this help message and exit
-sd, --start_date
the start date for the data export in the format "yyyy-mm-ddTHH:mn:ssZ" (default: 2019-03-01T00:00:00Z)
-ed, --end_date
the end date for the data export in the format "yyyy-mm-ddTHH:mn:ssZ" (default: 2019-01-31T23:59:59Z)
-in, --interval
set the sampling period in minutes (default: 60)
-f, --field [{"temperature", "humidity", "light", "pir", "sound"}]
set the phisical field(s) (default: ["temperature","humidity", "light", "pir", "sound"])
-c, --choices
set geometry choices in the format:"min-max": nodes interval(pieces), "value,value,...,value": list of nodes, "min-max,value,...": nodes intervall, with list of nodes
-z, --zone
set the zones to request in the format:"min-max": zones interval, "value,value,...,value": list of zones, "min-max,value,...": zones intervall, with list of zones
5. Description of the script
5.1. Serveur connexion
First of all, we connect to the elasticsearch
server via port 8443 in
using the connection identifiers provided by the environment variables
SERVER
, LOGIN
and PASSWORD
.
try:
SERVER = os.environ["SERVER"]
except KeyError:
print("Please set the environment variable SERVER")
sys.exit(1)
try:
LOGIN = os.environ["LOGIN"]
except KeyError:
print("Please set the environment variable LOGIN")
sys.exit(1)
try:
PASSWORD = os.environ["PASSWORD"]
except KeyError:
print("Please set the environment variable PASSWORD")
sys.exit(1)
try:
es = Elasticsearch(
[SERVER],
http_auth=(LOGIN, PASSWORD),
port=8443,
use_ssl=True,
ca_certs=False,
verify_certs=False,
connection_class=RequestsHttpConnection,
timeout=100
)
print("Connected", es.info())
except Exception as ex:
print("Error:", ex)
At the moment, the connection is insecure (it is in CONNECTION OPTIONS
|
5.2. Data retrieval query
The following function
def build_query(node, start, end, interval, field):
query_ibat = {
"size": 0,
"query": {
"bool": {
"filter": [
{
"range": {
"@timestamp": {
"gte": start,
"lte": end
}
}
},
{
"query_string": {
"analyze_wildcard": True,
"query": "node: \""+node+"\""
}
}]
}
},
"aggs": {
"2": {
"date_histogram": {
"interval": str(interval)+"m",
"field": "@timestamp",
"min_doc_count": 0,
"format": "yyyy-MM-dd'T'HH:mm:ss"
},
"aggs": {
"1": {
"avg": {"field": field}
}
}
}
}
}
return query_ibat
allows to retrieve data by means of a query written in query
.
It takes 5 arguments:
-
node
: designates the node of sensors of interest between zigduino-1 to zigduino-69 , zigduino-81 and zigduino-82.
names_nodes = []
if args.choices:
names_nodes = string_elm_to_list(args.choices, 82, "zigduino-")
zone_to_request = []
# If args.zones, we map the nodes to the appropriate zone
# The results are stored in a dictionnary called dic_zones
# as {"zone_i": [(node_1, area_1), (node_2, area_2) etc...]}
if args.zone:
zone_to_request = string_elm_to_list(args.zone, 14, "zone")
dic_zones = nodes_to_zones("multizone_modele_attributes.json",
zone_to_request)
-
start
: the start date in date format"yyyy-MM-ddTHH:mm:ssZ"
. -
end
: the end date in the same format as the argumentstart
. -
interval
: the number of minutes over which we want to average the data. -
field
: the physical field we’re interested in. There are 5 fields in the base:FIELD LIST-
"temperature"
-
"humidity"
-
"light"
-
"pir"
-
"sound"
-
All fields are strings except |
all_field = ["temperature","humidity","light","pir","sound"]
interval = 60
date_deb = "2019-01-01T00:00:00Z"
date_fin = "2019-01-31T23:59:59Z"
To extract the data, we loop through all the nodes by calling the function submit_query
:
def submit_query(node, args, field, es):
temp_val = []
body = build_query(node,
args.start_date,
args.end_date,
args.interval,
field)
result = es.search(index="ibat-*", body=body, size=0)
for hit in result['aggregations']['2']['buckets']:
temp_val.append(hit['1']['value'])
return result, temp_val
For each node the averages are retrieved during interval
minutes of the field
from start
to end
.
data = []
for field in set(args.field):
if args.choices:
for i, node in enumerate(names_nodes):
tmp_result, temp_val = submit_query(node, args, field, es)
if not(temp_val):
continue
result = tmp_result
data.append(temp_val)
col_names_csv.append(node + ":" + field)
elif args.zone:
temp_val_field = []
for key, value in dic_zones.items():
for node, area in value:
tmp_result, temp_val = submit_query(node, args, field, es)
if not(temp_val):
continue
result = tmp_result
data.append(temp_val)
temp_val_field.append(temp_val)
col_names_csv.append(key + node + ":" + field)
time_query = time.time() - start_time
5.3. Data export
The data extracted from the database is exported in csv
format. The format of the
file is the following : the first two columns correspond to the time under two formats yyyyy-MMM-ddTHH:mm:ssZ
and epoch
in order; the number of
remaining columns is variable depending on the configuration options, and each column designates the value of a chosen field on a given node (or nodes of a given zone). The columns are named with the formalism: node:field
or zone_node:field
val_date = []
val_epoch = []
try:
if result['aggregations']['2']['buckets']:
for hit in result['aggregations']['2']['buckets']:
val_date.append(hit['key_as_string'])
val_epoch.append(hit['key'])
except Exception:
print('No nodes to request')
exit(1)
var_temp = [val_date, val_epoch]
var_temp.extend(data)
with open("export.csv", "w") as f_write:
writer = csv.writer(f_write)
writer.writerow(col_names_csv)
writer.writerows(zip(*var_temp))
5.4. EXAMPLES
-
Database query by nodes command line example: one month every 60 minutes in all the rooms
python export_bis.py -f 'temperature' 'humidity' 'light' 'pir' 'sound' -sd 2019-01-01T00:00:00Z -ed 2019-01-31T23:59:59Z -in 60 -c 1-69,81,82
With the parameters given in the configuration example, we obtained the file nodes_export.csv.
In order to study the performances, we ran the script with different configuration exporting all physical fields.
The results are in the following table:
Interval (mn) | Execution time | File size (MB) |
---|---|---|
1 |
28m3,47s |
182.2 |
5 |
10m12,16s |
47.6 |
30 |
6m32,816s |
8,4 |
60 |
6m18,4s |
4.3 |
120 |
5m50,73s |
2.2 |
It is noticeable that from an interval of 30 minutes, the duration hardly varies any more.
-
Database query by nodes command line example: one month every 60 minutes in all the zones
python export_bis.py -f 'temperature' 'humidity' 'light' 'pir' 'sound' -sd 2019-01-01T00:00:00Z -ed 2019-01-31T23:59:59Z -in 60 -z 1-13
We then obtained the file zones_export.csv.
The same performance study was carried out for the case of zones, and the results are shown in the table below :
Interval (mn) | Execution time | File size (MB) |
---|---|---|
1 |
10m18,87s |
63.5 |
5 |
3m2,97s |
16.6 |
30 |
1m56,71s |
2.9 |
60 |
1m19,23s |
1.5 |
120 |
1m14,19s |
0.7627 |