Sunday, May 15, 2016

Relational Operations on Inverted Indexes using ES-Hadoop

I've been interested in how to apply relational algebra to flat collections like inverted indexes.  This coincided well with the need to do a research project for a Big Data Algorithms course (my last at CU) with @JoseVieitez.  These are the assumptions we started with:
  • We want to persist data in ElasticSearch because it's fast, scalable, and generally awesome.
  • Doing complex queries similar to a SQL JOIN are difficult on flat collections like this.  People often denormalize or squeeze their ontology into partial solutions like Parent-Child docs.
  • Many paths lead to unpleasantries like client side joins.  This headache is to be avoided for many reasons - we want to push this responsibility into the persistence layer.
  • ElasticSearch publishes an interesting connector called ES-Hadoop.  The beauty of this application is its a two way street - you can process data in Hadoop-based systems (like Spark) on the way into ElasticSearch or you can grab data out of ElasticSearch for Hadoop processing.


Store normalized data in ElasticSearch and impose relational operations in Hadoop with Spark.  Our main goal was to compare query processing times between PostgreSQL and this system.

Here are the technologies we used:
  • ElasticSearch (2.2.0)
  • ES-Hadoop (2.2.0)
  • Apache Spark (1.6.0 for Hadoop 2.6+)
  • PostgreSQL (9.4.5)

Test Data

For test data, we grabbed a bunch of IMDB data (approx 15M "acted in" tuples linking 3.7M movies to 2.3M actors) which is conceptually linked but it still required some munging.  We teased the data into the following relational schema.
  • Movies
    • id (BigInt, PK)
    • name (Text, GIN FTS)
  • Actors
    • id (BigInt, PK)
    • name (Text)
  • Acted_In
    • movie_id (BigInt, btree)
    • actor_id (BigInt, btree)
    • role (Text)
Our canonical SQL query is as follows:

SELECT AS Actor, AS Movie, j.role AS Role 
FROM actors a 
LEFT JOIN acted_in j ON = j.actor_id
INNER JOIN movies m 
ON j.movie_id = AND to_tsvector('english', @@ to_tsquery('english', 'bourne');

This query is saying "Show me the actor name, the name of the movie, and the role he played for every movie with the word 'Bourne' in the title". We liked it because it's a 3 table join and touches on full text search. It feels like a pretty common query pattern that relational databases often field so this is the query we sought to implement in our architecture.

Our desired output are rows like: ("Alfieri Victor", "The Bourne Supremacy", "Renzi").


Our goal wasn't to write a SQL-to-ES-Hadoop query processing engine - we're evaluating performance. So our program follows a static path which simulates a complex system which translates SQL (the 'bourne' query above) to interactions between ElasticSearch and Spark which ultimately produces structured results like the 'Renzi' row.

There are 3 ES collections - Movies, Actors, and Acted_In. To accomplish the 'bourne' query, there will be 3 ES interactions:
  1. Get all movies with 'Bourne' in the title. 
  2. Get all acted_in tuples for actors in these movies. 
  3. Get all actors for these tuples.
In our system, Spark has the responsibility of performing relational algebra which collates these separate datasets into a single response (the "Renzi" row mentioned above). We looked to Spark SQL to, somewhat ironically, abstract this away. Spark SQL is a way to interact with structured data which will ultimately allow us to run the same SQL query in Spark that we ran in PostgreSQL.

So our final workflow became:
  1. Get all movies with 'Bourne' in the title and feed them into a temporary Spark SQL table. 
  2. Get all acted_in tuples for actors in these movies and feed them into a temporary Spark SQL table. 
  3. Get all actors for these tuples and feed them into a temporary Spark SQL table.
  4. RUN THE SAME SQL QUERY THAT WAS RUN AGAINST POSTGRES (minus any thinning constraints like the 'Bourne' requirement which is now performed in ElasticSearch).
Our final source is not production quality - step 1 (getting 'bourne' movies from ES into Spark) is pretty simple and demonstrates the nature of our program:

PostgreSQL Performance

We expected PostgreSQL to do well with this dataset and we were not disappointed. Quantifying performance is mix of art and science - we wanted to stay away from cached results but didn't run against entirely cold indexes.

Here are query running times in ms for the 3 volume profiles (1M, 15M, 150M, 150M paged tuples) vs query result set sizes (5, 500, 5,000 rows).

L (1M)
M (15M)*
H (150M)
H/paged (150M)
L (5)
M (500)
H (5,000)

* We consider the Medium volume times to be anomalies since PostgreSQL did not properly utilize indexes - it did at the High volume stage. We didn't spend an extraordinary amount of time trying to resolve this since it's outside the focus of this project. Previous experience dictates these queries can probably be run in no more than a couple hundred ms.

The far right column (H/paged) explores a profile we feel PostgreSQL sometimes has difficultly with - paging. So using the large dataset (150M tuples), we attempted some queries which paged and did find it lagged somewhat as we expected.

ES-based System Performance

L (1M)
M (15M)
H (150M)
H/paged (150M)
L (5)
M (500)
H (5,000)


  • There was minimal correlation between corpus size and response time for the ES-Hadoop program - As expected, ElasticSearch was blazingly fast so the larger volumes didn't matter much. 
  • ES-Hadoop suffered greatly as result set sizes increased - Spark is billed as a faster MapReduce but we had difficultly getting good performance, at least compared against PostgreSQL. This could partially be due to our pedestrian knowledge of Spark. Spark SQL in particular struggled to keep up. We wrote a Spark script that used native Python data containers instead of Spark SQL and observed 20-30% improvements in time. Worthy of further investigation but it wouldn't make a huge difference in our evaluation. 
  • ES-Hadoop always incurred a non-trivial amount of startup costs - We always suffered an approximately 1,000 ms penalty firing up Spark's infrastructure. Switching from local mode to distributed mode did not help, perhaps due to the relative small amount of data we were playing with. 
  • PostgreSQL won every category but there is a caveat - It’s clear that PostgreSQL is much faster, however, recall our observation that corpus size does not impact ES-Hadoop execution times. PostgreSQL will eventually break down in ways ElasticSearch will not. So we feel there is a potential inflection point that was not exposed in our research particularly when you consider paging (PostgreSQL can struggle counting) which is a very common requirement for data repos.

Other Work

  • Solr Streaming Expressions push parallel relational operation work into the Solr nodes themselves (rather than extract the data like we do).  I feel like this is a very promising concept.  I've played with this feature for a day or two and was very impressed.
  • ZomboDB can be thought of ElasticSearch-backed PostgreSQL indexes.  A great option if you need the robustness of a solid RDMBS and want access to the power of ES searching/aggregation/etc.  There's also a concept of linking indexes similar to a join.  I've used this product and think it's a really cool merging of technologies.
  • I haven't tried the SIREn Join Plugin for Elasticsearch but this appears to offer a left join option between ES indexes.

Saturday, January 2, 2016

Basic recursive Postgres updates

I recently came across a need to update a large amount of Postgres data (many millions of rows) laid out in a hierarchical format with potentially inherited attributes. To make the use case generic, consider the following structure that a bookstore might employ:
If it's not clear, this is the structure being modeled. Locations with asterisks indicate those locations are not being inherited - ie, they are direct. Note 'ww2' books just happen to have the same location as its logical parent ('history' books).  This is intentional just to show a potential condition.
Let's say we want to update the 'book' location from 'main floor' to 'new floor'. This should cascade to all object_types which inherit their location from 'book'. The query (pardon the bizarre cast to bigint) utilizes a WITH RECURSIVE CTE to process through an adjacency tree:
Perhaps more readable as a JPA named query. Note the inner query is traversing the hierarchy looking for candidates to give to the outer UPDATE.
The result shows the 4 relevant rows ('books' and those which descend from 'books' AND inherit their location) are updated: