Sunday, May 15, 2016

Relational Operations on Inverted Indexes using ES-Hadoop

I've been interested in how to apply relational algebra to flat collections like inverted indexes.  This coincided well with the need to do a research project for a Big Data Algorithms course (my last at CU) with @JoseVieitez.  These are the assumptions we started with:
  • We want to persist data in ElasticSearch because it's fast, scalable, and generally awesome.
  • Doing complex queries similar to a SQL JOIN are difficult on flat collections like this.  People often denormalize or squeeze their ontology into partial solutions like Parent-Child docs.
  • Many paths lead to unpleasantries like client side joins.  This headache is to be avoided for many reasons - we want to push this responsibility into the persistence layer.
  • ElasticSearch publishes an interesting connector called ES-Hadoop.  The beauty of this application is its a two way street - you can process data in Hadoop-based systems (like Spark) on the way into ElasticSearch or you can grab data out of ElasticSearch for Hadoop processing.


Store normalized data in ElasticSearch and impose relational operations in Hadoop with Spark.  Our main goal was to compare query processing times between PostgreSQL and this system.

Here are the technologies we used:
  • ElasticSearch (2.2.0)
  • ES-Hadoop (2.2.0)
  • Apache Spark (1.6.0 for Hadoop 2.6+)
  • PostgreSQL (9.4.5)

Test Data

For test data, we grabbed a bunch of IMDB data (approx 15M "acted in" tuples linking 3.7M movies to 2.3M actors) which is conceptually linked but it still required some munging.  We teased the data into the following relational schema.
  • Movies
    • id (BigInt, PK)
    • name (Text, GIN FTS)
  • Actors
    • id (BigInt, PK)
    • name (Text)
  • Acted_In
    • movie_id (BigInt, btree)
    • actor_id (BigInt, btree)
    • role (Text)
Our canonical SQL query is as follows:

SELECT AS Actor, AS Movie, j.role AS Role 
FROM actors a 
LEFT JOIN acted_in j ON = j.actor_id
INNER JOIN movies m 
ON j.movie_id = AND to_tsvector('english', @@ to_tsquery('english', 'bourne');

This query is saying "Show me the actor name, the name of the movie, and the role he played for every movie with the word 'Bourne' in the title". We liked it because it's a 3 table join and touches on full text search. It feels like a pretty common query pattern that relational databases often field so this is the query we sought to implement in our architecture.

Our desired output are rows like: ("Alfieri Victor", "The Bourne Supremacy", "Renzi").


Our goal wasn't to write a SQL-to-ES-Hadoop query processing engine - we're evaluating performance. So our program follows a static path which simulates a complex system which translates SQL (the 'bourne' query above) to interactions between ElasticSearch and Spark which ultimately produces structured results like the 'Renzi' row.

There are 3 ES collections - Movies, Actors, and Acted_In. To accomplish the 'bourne' query, there will be 3 ES interactions:
  1. Get all movies with 'Bourne' in the title. 
  2. Get all acted_in tuples for actors in these movies. 
  3. Get all actors for these tuples.
In our system, Spark has the responsibility of performing relational algebra which collates these separate datasets into a single response (the "Renzi" row mentioned above). We looked to Spark SQL to, somewhat ironically, abstract this away. Spark SQL is a way to interact with structured data which will ultimately allow us to run the same SQL query in Spark that we ran in PostgreSQL.

So our final workflow became:
  1. Get all movies with 'Bourne' in the title and feed them into a temporary Spark SQL table. 
  2. Get all acted_in tuples for actors in these movies and feed them into a temporary Spark SQL table. 
  3. Get all actors for these tuples and feed them into a temporary Spark SQL table.
  4. RUN THE SAME SQL QUERY THAT WAS RUN AGAINST POSTGRES (minus any thinning constraints like the 'Bourne' requirement which is now performed in ElasticSearch).
Our final source is not production quality - step 1 (getting 'bourne' movies from ES into Spark) is pretty simple and demonstrates the nature of our program:

PostgreSQL Performance

We expected PostgreSQL to do well with this dataset and we were not disappointed. Quantifying performance is mix of art and science - we wanted to stay away from cached results but didn't run against entirely cold indexes.

Here are query running times in ms for the 3 volume profiles (1M, 15M, 150M, 150M paged tuples) vs query result set sizes (5, 500, 5,000 rows).

L (1M)
M (15M)*
H (150M)
H/paged (150M)
L (5)
M (500)
H (5,000)

* We consider the Medium volume times to be anomalies since PostgreSQL did not properly utilize indexes - it did at the High volume stage. We didn't spend an extraordinary amount of time trying to resolve this since it's outside the focus of this project. Previous experience dictates these queries can probably be run in no more than a couple hundred ms.

The far right column (H/paged) explores a profile we feel PostgreSQL sometimes has difficultly with - paging. So using the large dataset (150M tuples), we attempted some queries which paged and did find it lagged somewhat as we expected.

ES-based System Performance

L (1M)
M (15M)
H (150M)
H/paged (150M)
L (5)
M (500)
H (5,000)


  • There was minimal correlation between corpus size and response time for the ES-Hadoop program - As expected, ElasticSearch was blazingly fast so the larger volumes didn't matter much. 
  • ES-Hadoop suffered greatly as result set sizes increased - Spark is billed as a faster MapReduce but we had difficultly getting good performance, at least compared against PostgreSQL. This could partially be due to our pedestrian knowledge of Spark. Spark SQL in particular struggled to keep up. We wrote a Spark script that used native Python data containers instead of Spark SQL and observed 20-30% improvements in time. Worthy of further investigation but it wouldn't make a huge difference in our evaluation. 
  • ES-Hadoop always incurred a non-trivial amount of startup costs - We always suffered an approximately 1,000 ms penalty firing up Spark's infrastructure. Switching from local mode to distributed mode did not help, perhaps due to the relative small amount of data we were playing with. 
  • PostgreSQL won every category but there is a caveat - It’s clear that PostgreSQL is much faster, however, recall our observation that corpus size does not impact ES-Hadoop execution times. PostgreSQL will eventually break down in ways ElasticSearch will not. So we feel there is a potential inflection point that was not exposed in our research particularly when you consider paging (PostgreSQL can struggle counting) which is a very common requirement for data repos.

Other Work

  • Solr Streaming Expressions push parallel relational operation work into the Solr nodes themselves (rather than extract the data like we do).  I feel like this is a very promising concept.  I've played with this feature for a day or two and was very impressed.
  • ZomboDB can be thought of ElasticSearch-backed PostgreSQL indexes.  A great option if you need the robustness of a solid RDMBS and want access to the power of ES searching/aggregation/etc.  There's also a concept of linking indexes similar to a join.  I've used this product and think it's a really cool merging of technologies.
  • I haven't tried the SIREn Join Plugin for Elasticsearch but this appears to offer a left join option between ES indexes.

1 comment:

  1. It is nice blog Thank you porovide important information and i am searching for same information to save my time Big Data Hadoop Online Course Hyderabad