Elasticsearch Connector
Example
If you want to enrich with elasticsearch you have to provide a client to the enricher,
and others classes
index_handler,
reindex_handler,
pipeline_handler,
policy_handler,
that enricher need.
Firstly, you have to insert your data into elasticsearch, so using index_handler:
Here, you have created a Data with a function that parse your document, that function is a default, but you
can create your own parser function that implements some nice features for your enrichment. After that, you
instantiate an index_handler and use load_index method, passing a parser and others kwargs. We will enrich a
specific geo_location enrichment so we need the geo_location and code_h3 equals true. The code_h3 is to get the point
and hashing this point using h3 library. If the lat/long is in array object you have to pass the name of this field.
from moredata.enricher import Enricher, EnricherBuilder
from moredata.enricher.elasticsearch_connector import (
ElasticsearchConnector,
IndexHandler,
ReindexHandler,
Pipeline,
PipelineHandler,
PolicyHandler,
Policy,
)
from moredata.models.data import Data
from moredata.parser import parse_document
from moredata.utils.util import read_json_from_file
from elasticsearch import Elasticsearch
es = Elasticsearch(
hosts=[{'host': HOST, 'port': PORT}],
timeout = 10000
)
user = Data(data_file=USER_DATA, parser_func=parse_document, data_type="json", unstructured_data=True)
index_handler = IndexHandler(client, "users", "user")
index_handler.load_index(parser=data.parse, array_point_field="points_of_interest", geo_location=True, code_h3=True)
Here we have a geo_location enrichment based on latitude and longitude, and has a query with CONTAINS, so every point in points_of_interest will be enriched if this point is contained by a geo shape that is a field defined by the policy of city-policy.
elk_city_enricher = Enricher(connector=ElasticsearchConnector(
index_handler=IndexHandler(client=es, index="cities", doc_type="city"),
pipeline=Pipeline(client=es,
name="user-city-enricher",
pipeline_handler=PipelineHandler(
description="enriching user with cities",
match_field="geo_location",
target_field_name="city",
policy_name="city-policy",
field_array="points_of_interest",
shape_relation="CONTAINS")),
reindex_handler=ReindexHandler(index="users",
target_index="users-city-enriched",
pipeline_name="user-city-enricher")))
Here it’s returning the result of enrichment to user_enriched variable without fields that are created in Enricher, geo_location and code_h3.
user_enriched = \
EnricherBuilder(user) \
.with_enrichment(elk_city_enricher) \
.get_result(array_point_field="points_of_interest", geo_location=True, code_h3=True)
With the code below it’s written the result of enrichment in two formats json or csv. This library supports three conversions file type: parquet, json and csv. You can see more about this here: conversion It’s up to developer choose what type of file it’ll be written.
import moredata.utils.util as util
util.write_json_generator_to_json("../../data/output/json/user-enriched", user_enriched, 1000)
util.Converter.json_enriched_to_csv("../../data/output/json/*.json", "../data/output/csv/")
Elasticsearch Connector
Index Handler
Example
With index handler we can insert data into elasticsearch. When you want to define your mapping you should send data with optional argument streaming equals True. If you don’t know mapping of the data even so you can load data without that optional argument and elasticsearch will infer the types of your data.
import moredata.parser as parser
import moredata.models as models
from moredata.enricher.elasticsearch_connector import IndexHandler
from moredata.utils.util import read_json_from_file
from elasticsearch import Elasticsearch
es = Elasticsearch(
hosts=[{'host': 'localhost', 'port': 9200}]
)
index_handler = IndexHandler(client, "apps-json", "app")
mapping = read_json_from_file(MAPPING_APPS_FILE)
index_handler.create_index(mapping=mapping)
index_handler.load_index(parser=app.parse, streaming=True)
Reindex Handler
To enrich data this library provide a high-level client for reindex api.
Example
Here we have a json object that you can send to elasticsearch to reindex using a pipeline for enrich twitter data.
{
"source": {
"index": "twitter"
},
"dest": {
"index": "new_twitter",
"pipeline": "enricher"
}
}
Simplifying this json object we can use.
ReindexHandler(index="twitter", target_index="new_twitter", pipeline_name="enricher")
Warning
The pipeline referenced by pipeline_name in reindex object should already be created likewise the indexes.
Pipeline
Pipeline with Pipeline Handler controls what and which fields will be enriched by the elasticsearch. Pipeline Documentation
Example
This pipeline specifies the field that will be used for join is applications_id_list and the result will be put in
attribute called apps (target_field_name). Besides that, the target_field wiil be an array that has a maximum of 128
positions defined by max_matches. The argument policy_name specifies what policy it will be used for enrichment.
es = Elasticsearch(
hosts=[{'host': HOST, 'port': PORT}],
timeout = 10000
)
pipeline=Pipeline(client=es,
name="user-app",
pipeline_handler=PipelineHandler(
description="enriching user with apps",
match_field="applications_id_list",
target_field_name="apps",
policy_name="apps-json",
max_matches=128))
Policy
Policy defines what fields you want to enrich, what is the match_field.
Example
Here we are defining a policy for enrich some documents with city. So, all fields inside the list enrich_fields will be part of enrichment result. The match_field here provides what key will be used for join, and the index is the document loaded into elasticsearch that has these fields.
enrich_fields = ["name", "Nome da Grande Região", "Nome da Mesorregião", "Nome da Microrregião", "Nome da Região Rural"]
policy = Policy(client, name="city-policy", policy_handler=PolicyHandler(type_match="geo_match",
index="cities",
match_field="geometry",
enrich_fields=enrich_fields))