Elastic Search
Lors de l'indexation des données pour accélérer la vitesse d'ingestion il n'est pas optimal d'avoir des réplicas lors de cette étape. La vitesse d'indexation est fonction du nombre de réplicas, deux réplicas divisent par deux la vitesse d'indexation.
Si l'on veut plus de sécurité, on peut définir le nombre de réplicas une fois que l'indexation des données est terminée.
Je configurai Elasticstash en mode serveur un seul noeud, car l'outil de monitoring Marvel n'est accessible que dans ce mode. Il nous fournira la vitesse d'indexation.
A partir de Kibana http://0.0.0.0:5601, on accède à l'outil de monitoring Marvel, à noter que la licence gratuite est limitée à 30 jours. Cet outil de monitoring permet d'avoir les informations suivantes : le taux de recherche par seconde, la latence de la recherche en millisecondes, le taux d'indexation par seconde, la latence d'indexation en millisecondes.
Exemple d'un graphique relatif au taux d'indexation par seconde
Configuration de Elasticsearch
On peut changer le nombre de shards qui est de cinq par défaut, ainsi que le nombre de
réplicas qui est de un par défaut en modifiant le fichier de configuration
config/elasticsearch.yml.
Par exemple pour un nombre de réplicas de 0 et un nombre des shards de 1 :
index.number_of_shards: 1
index.number_of_replicas: 0
2.1 Elasticsearch : Importation à partir d'un fichier csv
Un fichier de *.conf permet de configurer Logstash sur la manière d’intégrer les données dans Elasticsearch (sensors2.conf).
Dans la partie input, on définit le répertoire où les fichiers issus du flux seront déposés, ainsi que le format de fichier. Le format de type csv n'est pas reconnu nativement par Elasticsearch, j'utilise logstash pour injecter ce type de fichier.
Le fichier est lu à partir du début, l'opération de lecture s'opère dès les premières écritures du fichier dans le répertoire, il n'attend pas la copie complète du fichier pour commencer l'injection des données dans elasticsearch.
file{
path => "/home/alfard/Documents/NFA204/Log/*.csv"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
On choisit les variables que l'on conserve et l'on supprime les variables qui ne sont pas nécessaires.
On convertit les données x, y, z en données numériques.
La date est de format, UNIX_MS, MS pour millisecondes. La variable temporelle est "Arrival_Time".
filter {
csv {
separator => ","
columns => ["Index","Arrival_Time","Creation_Time","x","y","z",
"User" ,"Model","Device","gt"]
remove_field => ["Index","Creation_Time"]
}
##############################################
mutate {
convert => {"x" => "float"
"y" => "float"
"z" => "float"}
}
###############################################
date {
match => ["Arrival_Time", "UNIX_MS"]
target => "@timestamp"
}
###############################################
}
output {
elasticsearch {
hosts => [ "192.168.0.14:9200" ]
}
stdout {}
}
2.1.1 Vitesse d'indexation : les résultats pour un csv
Tableau 1 : Vitesse d'indexation d'Elasticsearch
Nombre de réplicas |
Nombre de shards |
Vitesse d'indexation la plus élevée lors de l'indexation |
Latence d'indexation |
---|---|---|---|
0 | 1 | 1 931 documents par seconde | 0,10 milliseconde |
0 | 2 | 1 220 documents par seconde | 0,17 milliseconde |
Avant chaque test, l'ensemble des index est effacé.
Le fichier d'origine est de type csv nécessite lors de son passage par logstash des modifications de format et donc des étapes de lectures et d'écritures supplémentaires qui peuvent ralentir la vitesse d'indexation.
Le format d'origine d'Elasticsearch est de type json, dans la partie suivante je vais modifier le fichier csv en amont en json et procéder à son intégration pour tester si le fait d'utiliser un json comme source de donnée augmente la vitesse d'indexation.
2.2 Elasticsearch : Importation à partir d'un fichier json
Script en scala pour changer csv dans le format json
import scala.io.Source
import java.io._
val startTimeMillis = System.currentTimeMillis()
for (lines <-
Source.fromFile("/home/alfard/Documents/NFA204/Phones_gyroscope.csv").getLines()) {
val iter = lines.mkString.split(",")
val rt = "{"+""""gt""""+":" + '"' +iter(9) + '"' + ","+""""user""""+":" + '"'+ iter(6) + '"' +
","+""""model""""+":" + '"' + iter(7) + '"' + ","+""""device""""+":" + '"' + iter(8) + '"'+ ",x:" +
iter(3) + ",y:" + iter(4) + ",z:" + iter(5) + ","+""""Arrival_Time""""+":"+ '"' +iter(1) + '"' + "}"
val pw = new PrintWriter(new FileOutputStream(new
File("/home/alfard/Documents/NFA204/acceljson.json"),true))
pw.write(rt+System.lineSeparator)
pw.close
}
val endTimeMillis = System.currentTimeMillis()
val durationSeconds = (endTimeMillis - startTimeMillis) / 1000
Scala en plus de sa "simplicité" est vraiment un langage très performant du fait de l'utilisation de la java virtual machine, avec un seul coeur utilisé il traite près de 14 millions d'enregistrement en 315 secondes, un peu plus de cinq minutes.
Suppression de la première ligne qui contient le libellé du csv : sed '1d' acceljson.json > acceljson2.json
Exemple fichier de sortie pour deux documents
{"gt":"stand","user":"a","model":"gear","device":"gear_1","x":-0.5650316,"y":-9.572019,"z":-
0.61411273,"Arrival_Time":"1424696638740"}
{"gt":"stand","user":"a","model":"gear","device":"gear_1","x":-0.83258367,"y":-9.713276,"z":-
0.60693014,"Arrival_Time":"1424696638740"}
__Configuration logstash pour l'importation du json (sensors4.conf)__
input {
file{
path => "/home/alfard/Documents/NFA204/Log/*"
codec => json
}
}
filter {
date {
match => ["Arrival_Time", "UNIX_MS"]
target => "@timestamp"
}
}
output {
elasticsearch {
hosts => [ "192.168.0.14:9200" ]
}
stdout {}
}