Elasticsearch Connector

Example

If you want to enrich with elasticsearch you have to provide a client to the enricher, and others classes index_handler, reindex_handler, pipeline_handler, policy_handler, that enricher need.

Firstly, you have to insert your data into elasticsearch, so using index_handler:

Here, you have created a Data with a function that parse your document, that function is a default, but you can create your own parser function that implements some nice features for your enrichment. After that, you instantiate an index_handler and use load_index method, passing a parser and others kwargs. We will enrich a specific geo_location enrichment so we need the geo_location and code_h3 equals true. The code_h3 is to get the point and hashing this point using h3 library. If the lat/long is in array object you have to pass the name of this field.

from moredata.enricher import Enricher, EnricherBuilder
from moredata.enricher.elasticsearch_connector import (
   ElasticsearchConnector,
   IndexHandler,
   ReindexHandler,
   Pipeline,
   PipelineHandler,
   PolicyHandler,
   Policy,
)
from moredata.models.data import Data
from moredata.parser import parse_document
from moredata.utils.util import read_json_from_file

from elasticsearch import Elasticsearch

es = Elasticsearch(
   hosts=[{'host': HOST, 'port': PORT}],
   timeout = 10000
)

user = Data(data_file=USER_DATA, parser_func=parse_document, data_type="json", unstructured_data=True)
index_handler = IndexHandler(client, "users", "user")
index_handler.load_index(parser=data.parse, array_point_field="points_of_interest", geo_location=True, code_h3=True)

Here we have a geo_location enrichment based on latitude and longitude, and has a query with CONTAINS, so every point in points_of_interest will be enriched if this point is contained by a geo shape that is a field defined by the policy of city-policy.

elk_city_enricher = Enricher(connector=ElasticsearchConnector(
                              index_handler=IndexHandler(client=es, index="cities", doc_type="city"),
                              pipeline=Pipeline(client=es,
                                                name="user-city-enricher",
                                                pipeline_handler=PipelineHandler(
                                                      description="enriching user with cities",
                                                      match_field="geo_location",
                                                      target_field_name="city",
                                                      policy_name="city-policy",
                                                      field_array="points_of_interest",
                                                      shape_relation="CONTAINS")),
                              reindex_handler=ReindexHandler(index="users",
                                                               target_index="users-city-enriched",
                                                               pipeline_name="user-city-enricher")))

Here it’s returning the result of enrichment to user_enriched variable without fields that are created in Enricher, geo_location and code_h3.

user_enriched = \
   EnricherBuilder(user) \
   .with_enrichment(elk_city_enricher) \
   .get_result(array_point_field="points_of_interest", geo_location=True, code_h3=True)

With the code below it’s written the result of enrichment in two formats json or csv. This library supports three conversions file type: parquet, json and csv. You can see more about this here: conversion It’s up to developer choose what type of file it’ll be written.

import moredata.utils.util as util
util.write_json_generator_to_json("../../data/output/json/user-enriched", user_enriched, 1000)
util.Converter.json_enriched_to_csv("../../data/output/json/*.json", "../data/output/csv/")

Elasticsearch Connector

class moredata.enricher.elasticsearch_connector.elasticsearch_connector.ElasticsearchConnector(index_handler, pipeline, reindex_handler)

ElasticsearchConnector implements interface IEnricherConnector, so this is a connector that can be used to enrich some data. ElasticsearchConnector implements handlers for index, pipeline and policy, all are requirement to enrich your data using elasticsearch.

Parameters
  • index_handler (IndexHandler) – control index actions in elasticsearch

  • pipeline (Pipeline) – control pipeline actions in elasticsearch

  • reindex_handler (ReindexHandler) – control reindexing in elasticsearch

index_handler
Type

IndexHandler

pipeline
Type

Pipeline

reindex_handler
Type

ReindexHandler

enrich(data, **kwargs)

Method overrided of interface. This interface do enrichment using elasticsearch as a enricher: create a pipeline, reindex with a pipeline specified to enrich and return all data enriched as Json.

Parameters

data (Data) –

Index Handler

Example

With index handler we can insert data into elasticsearch. When you want to define your mapping you should send data with optional argument streaming equals True. If you don’t know mapping of the data even so you can load data without that optional argument and elasticsearch will infer the types of your data.

import moredata.parser as parser
import moredata.models as models
from moredata.enricher.elasticsearch_connector import IndexHandler
from moredata.utils.util import read_json_from_file

from elasticsearch import Elasticsearch

es = Elasticsearch(
    hosts=[{'host': 'localhost', 'port': 9200}]
)

index_handler = IndexHandler(client, "apps-json", "app")

mapping = read_json_from_file(MAPPING_APPS_FILE)
index_handler.create_index(mapping=mapping)

index_handler.load_index(parser=app.parse, streaming=True)
class moredata.enricher.elasticsearch_connector.index_handler.IndexHandler(client, index, doc_type)

Index Handler control index actions in Elasticsearch.

Parameters
  • client (elasticsearch.Elasticsearch) –

  • index (str) – name of index

  • doc_type (str) – name of document

client
Type

elasticsearch.Elasticsearch

index

name of index

Type

str

doc_type

name of document

Type

str

create_index(mapping)

This method create index with mapping provided using elasticsearch-py package (https://elasticsearch-py.readthedocs.io/en/master/api.html#elasticsearch.client.IndicesClient.create)

Parameters

mapping (Json) – mapping is a definitions of attributes index types (https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping.html)

get_all_data(index, **kwargs)

Get all data indexed by a index.

Parameters
  • index (str) – name of index

  • query (dict) – query is the document that elasticsearch uses to retrive information

Yields

generate the data retrieved by elasticsearch.

load_index(parser, streaming=None, **kwargs)

load_index method load documents to specified index in constructor. It can stream the load or just load all data passing an iterable as parameter. The difference between of these two methods is that if you want that your data assume specified type you have to stream but if you don’t care about this you can simply bulk data that elasticsearch will infer the types of your document.

Parameters
  • pasrser (Callable) – It’s a function that yield the document which has data to index into elasticsearch.

  • streaming (bool) – If streaming load_index method will do streaming bulk instead of bulk all data.

re_index(reindex_handler)

reindex to apply pipeline to enrich

Parameters

reindex_handler (ReindexHandler) –

Reindex Handler

To enrich data this library provide a high-level client for reindex api.

Example

Here we have a json object that you can send to elasticsearch to reindex using a pipeline for enrich twitter data.

{
    "source": {
        "index": "twitter"
    },
    "dest": {
        "index": "new_twitter",
        "pipeline": "enricher"
    }
}

Simplifying this json object we can use.

ReindexHandler(index="twitter", target_index="new_twitter", pipeline_name="enricher")

Warning

The pipeline referenced by pipeline_name in reindex object should already be created likewise the indexes.

class moredata.enricher.elasticsearch_connector.index_handler.ReindexHandler(index, target_index, pipeline_name)

ReindexHandler creates a necessary json to send to elasticsearch to reindex with pipeline and enrich the index provided.

Parameters
  • index (str) – name of source index

  • target_index (str) – name of destination index

  • pipeline_name (str) – name of pipeline

json

this is a json file created by the parameters to sendo to reindex route

Type

dict

Pipeline

Pipeline with Pipeline Handler controls what and which fields will be enriched by the elasticsearch. Pipeline Documentation

Example

This pipeline specifies the field that will be used for join is applications_id_list and the result will be put in attribute called apps (target_field_name). Besides that, the target_field wiil be an array that has a maximum of 128 positions defined by max_matches. The argument policy_name specifies what policy it will be used for enrichment.

es = Elasticsearch(
    hosts=[{'host': HOST, 'port': PORT}],
    timeout = 10000
)

pipeline=Pipeline(client=es,
                name="user-app",
                pipeline_handler=PipelineHandler(
                    description="enriching user with apps",
                    match_field="applications_id_list",
                    target_field_name="apps",
                    policy_name="apps-json",
                    max_matches=128))
class moredata.enricher.elasticsearch_connector.pipeline_handler.Pipeline(client, name, pipeline_handler)

A pipeline is a definition of a series of processors that are to be executed in the same order as they are declared. (https://www.elastic.co/guide/en/elasticsearch/reference/current/pipeline.html)

Parameters
  • client (elasticsearch.Elasticsearch) – a elasticsearch client.

  • name (str) – name for pipeline.

  • pipeline_handler (PipelineHandler) – object that contains json object of pipeline.

client
Type

str

name
Type

str

pipeline_handler
Type

PipelineHandler

create_pipeline(params=None)

create_pipeline method create the elasticsearch pipeline with processors specified in pipeline_handler json.

class moredata.enricher.elasticsearch_connector.pipeline_handler.PipelineHandler(description, target_field_name, match_field, policy_name, **kwargs)

PipelineHandler create json object for elasticsearch pipeline.

Parameters
  • description (str) – explain what is pipeline.

  • target_field_name (str) – field name that will post the result of enrichment.

  • match_field (str) – what index lookup field to bind the enrichments.

  • policy_name (str) – name of policy.

  • field_array (str) – name of field if the field you want to enrich is an array.

  • max_matches (int) – if the matches is > 1 the target_field will be an array and the results of enrichments will bind more than 1 object to get result.

  • shape_relation (str) –

    if the enrichment is geobased you have to put this parameter to specify which relation you want to get.

    • INTERSECTS

      Return all documents whose field intersects the query geometry.

    • DISJOINT

      Return all documents whose field has nothing in common with the query geometry.

    • WITHIN

      Return all documents whose field is within the query geometry.

    • CONTAINS

      Return all documents whose field contains the query geometry.

json

this attribute is the json created with the parameters specifying the pipeline.

Type

dict

Policy

Policy defines what fields you want to enrich, what is the match_field.

Example

Here we are defining a policy for enrich some documents with city. So, all fields inside the list enrich_fields will be part of enrichment result. The match_field here provides what key will be used for join, and the index is the document loaded into elasticsearch that has these fields.

enrich_fields = ["name", "Nome da Grande Região", "Nome da Mesorregião", "Nome da Microrregião", "Nome da Região Rural"]

policy = Policy(client, name="city-policy", policy_handler=PolicyHandler(type_match="geo_match",
                                                    index="cities",
                                                    match_field="geometry",
                                                    enrich_fields=enrich_fields))
class moredata.enricher.elasticsearch_connector.policy_handler.PolicyHandler(type_match, index, match_field, enrich_fields)

PolicyHandler creates json object for elasticsearch Policy

Parameters
  • type_match (str) –

    • geo_match

      enrich data to incoming documents based on a geographic location using a geo_shape query.

    • match

      enrich data to incoming documents based on a precise value, such as an email address or ID, using a term query.

  • index (str) – name of source index.

  • match_field (str) – field in the source indices used to match incoming documents.

  • enrich_fields (array) – fields to add to matching incoming documents. These fields must be present in the source indices