More Data
Its goal is to provide a framework with high extensability for data enrichments
Enricher
Can contains some connectors
elasticsearch_connector,
api_connector. You can choose the enricher appropiately
for your application.
Elasticsearch
If you want to enrich with elasticsearch you have to provide a client to the enricher,
and others classes
index_handler,
reindex_handler,
pipeline_handler,
policy_handler,
that enricher need.
Firstly, you have to insert your data into elasticsearch, so using index_handler:
Here, you have created a Data with a function that parse your document, that function is a default, but you
can create your own parser function that implements some nice features for your enrichment. After that, you
instantiate an index_handler and use load_index method, passing a parser and others kwargs. We will enrich a
specific geo_location enrichment so we need the geo_location and code_h3 equals true. The code_h3 is to get the point
and hashing this point using h3 library. If the lat/long is in array object you have to pass the name of this field.
from moredata.enricher import Enricher, EnricherBuilder
from moredata.enricher.elasticsearch_connector import (
ElasticsearchConnector,
IndexHandler,
ReindexHandler,
Pipeline,
PipelineHandler,
PolicyHandler,
Policy,
)
from moredata.models.data import Data
from moredata.parser import parse_document
from moredata.utils.util import read_json_from_file
from elasticsearch import Elasticsearch
es = Elasticsearch(
hosts=[{'host': HOST, 'port': PORT}],
timeout = 10000
)
user = Data(data_file=USER_DATA, parser_func=parse_document, data_type="json", unstructured_data=True)
index_handler = IndexHandler(client, "users", "user")
index_handler.load_index(parser=data.parse, array_point_field="points_of_interest", geo_location=True, code_h3=True)
Here we have a geo_location enrichment based on latitude and longitude, and has a query with CONTAINS, so every point in points_of_interest will be enriched if this point is contained by a geo shape that is a field defined by the policy of city-policy.
elk_city_enricher = Enricher(connector=ElasticsearchConnector(
index_handler=IndexHandler(client=es, index="cities", doc_type="city"),
pipeline=Pipeline(client=es,
name="user-city-enricher",
pipeline_handler=PipelineHandler(
description="enriching user with cities",
match_field="geo_location",
target_field_name="city",
policy_name="city-policy",
field_array="points_of_interest",
shape_relation="CONTAINS")),
reindex_handler=ReindexHandler(index="users",
target_index="users-city-enriched",
pipeline_name="user-city-enricher")))
Here it’s returning the result of enrichment to user_enriched variable without fields that are created in Enricher, geo_location and code_h3.
user_enriched = \
EnricherBuilder(user) \
.with_enrichment(elk_city_enricher) \
.get_result(array_point_field="points_of_interest", geo_location=True, code_h3=True)
With the code below it’s written the result of enrichment in two formats json or csv. This library supports three conversions file type: parquet, json and csv. You can see more about this here: Conversion It’s up to developer choose what type of file it’ll be written.
import moredata.utils.util as util
util.write_json_generator_to_json("../../data/output/json/user-enriched", user_enriched, 1000)
util.Converter.json_enriched_to_csv("../../data/output/json/*.json", "../data/output/csv/")