Sunday, May 15, 2016

Relational Operations on Inverted Indexes using ES-Hadoop

I've been interested in how to apply relational algebra to flat collections like inverted indexes.  This coincided well with the need to do a research project for a Big Data Algorithms course (my last at CU) with @JoseVieitez.  These are the assumptions we started with:
  • We want to persist data in ElasticSearch because it's fast, scalable, and generally awesome.
  • Doing complex queries similar to a SQL JOIN are difficult on flat collections like this.  People often denormalize or squeeze their ontology into partial solutions like Parent-Child docs.
  • Many paths lead to unpleasantries like client side joins.  This headache is to be avoided for many reasons - we want to push this responsibility into the persistence layer.
  • ElasticSearch publishes an interesting connector called ES-Hadoop.  The beauty of this application is its a two way street - you can process data in Hadoop-based systems (like Spark) on the way into ElasticSearch or you can grab data out of ElasticSearch for Hadoop processing.


Store normalized data in ElasticSearch and impose relational operations in Hadoop with Spark.  Our main goal was to compare query processing times between PostgreSQL and this system.

Here are the technologies we used:
  • ElasticSearch (2.2.0)
  • ES-Hadoop (2.2.0)
  • Apache Spark (1.6.0 for Hadoop 2.6+)
  • PostgreSQL (9.4.5)

Test Data

For test data, we grabbed a bunch of IMDB data (approx 15M "acted in" tuples linking 3.7M movies to 2.3M actors) which is conceptually linked but it still required some munging.  We teased the data into the following relational schema.
  • Movies
    • id (BigInt, PK)
    • name (Text, GIN FTS)
  • Actors
    • id (BigInt, PK)
    • name (Text)
  • Acted_In
    • movie_id (BigInt, btree)
    • actor_id (BigInt, btree)
    • role (Text)
Our canonical SQL query is as follows:

SELECT AS Actor, AS Movie, j.role AS Role 
FROM actors a 
LEFT JOIN acted_in j ON = j.actor_id
INNER JOIN movies m 
ON j.movie_id = AND to_tsvector('english', @@ to_tsquery('english', 'bourne');

This query is saying "Show me the actor name, the name of the movie, and the role he played for every movie with the word 'Bourne' in the title". We liked it because it's a 3 table join and touches on full text search. It feels like a pretty common query pattern that relational databases often field so this is the query we sought to implement in our architecture.

Our desired output are rows like: ("Alfieri Victor", "The Bourne Supremacy", "Renzi").


Our goal wasn't to write a SQL-to-ES-Hadoop query processing engine - we're evaluating performance. So our program follows a static path which simulates a complex system which translates SQL (the 'bourne' query above) to interactions between ElasticSearch and Spark which ultimately produces structured results like the 'Renzi' row.

There are 3 ES collections - Movies, Actors, and Acted_In. To accomplish the 'bourne' query, there will be 3 ES interactions:
  1. Get all movies with 'Bourne' in the title. 
  2. Get all acted_in tuples for actors in these movies. 
  3. Get all actors for these tuples.
In our system, Spark has the responsibility of performing relational algebra which collates these separate datasets into a single response (the "Renzi" row mentioned above). We looked to Spark SQL to, somewhat ironically, abstract this away. Spark SQL is a way to interact with structured data which will ultimately allow us to run the same SQL query in Spark that we ran in PostgreSQL.

So our final workflow became:
  1. Get all movies with 'Bourne' in the title and feed them into a temporary Spark SQL table. 
  2. Get all acted_in tuples for actors in these movies and feed them into a temporary Spark SQL table. 
  3. Get all actors for these tuples and feed them into a temporary Spark SQL table.
  4. RUN THE SAME SQL QUERY THAT WAS RUN AGAINST POSTGRES (minus any thinning constraints like the 'Bourne' requirement which is now performed in ElasticSearch).
Our final source is not production quality - step 1 (getting 'bourne' movies from ES into Spark) is pretty simple and demonstrates the nature of our program:

PostgreSQL Performance

We expected PostgreSQL to do well with this dataset and we were not disappointed. Quantifying performance is mix of art and science - we wanted to stay away from cached results but didn't run against entirely cold indexes.

Here are query running times in ms for the 3 volume profiles (1M, 15M, 150M, 150M paged tuples) vs query result set sizes (5, 500, 5,000 rows).

L (1M)
M (15M)*
H (150M)
H/paged (150M)
L (5)
M (500)
H (5,000)

* We consider the Medium volume times to be anomalies since PostgreSQL did not properly utilize indexes - it did at the High volume stage. We didn't spend an extraordinary amount of time trying to resolve this since it's outside the focus of this project. Previous experience dictates these queries can probably be run in no more than a couple hundred ms.

The far right column (H/paged) explores a profile we feel PostgreSQL sometimes has difficultly with - paging. So using the large dataset (150M tuples), we attempted some queries which paged and did find it lagged somewhat as we expected.

ES-based System Performance

L (1M)
M (15M)
H (150M)
H/paged (150M)
L (5)
M (500)
H (5,000)


  • There was minimal correlation between corpus size and response time for the ES-Hadoop program - As expected, ElasticSearch was blazingly fast so the larger volumes didn't matter much. 
  • ES-Hadoop suffered greatly as result set sizes increased - Spark is billed as a faster MapReduce but we had difficultly getting good performance, at least compared against PostgreSQL. This could partially be due to our pedestrian knowledge of Spark. Spark SQL in particular struggled to keep up. We wrote a Spark script that used native Python data containers instead of Spark SQL and observed 20-30% improvements in time. Worthy of further investigation but it wouldn't make a huge difference in our evaluation. 
  • ES-Hadoop always incurred a non-trivial amount of startup costs - We always suffered an approximately 1,000 ms penalty firing up Spark's infrastructure. Switching from local mode to distributed mode did not help, perhaps due to the relative small amount of data we were playing with. 
  • PostgreSQL won every category but there is a caveat - It’s clear that PostgreSQL is much faster, however, recall our observation that corpus size does not impact ES-Hadoop execution times. PostgreSQL will eventually break down in ways ElasticSearch will not. So we feel there is a potential inflection point that was not exposed in our research particularly when you consider paging (PostgreSQL can struggle counting) which is a very common requirement for data repos.

Other Work

  • Solr Streaming Expressions push parallel relational operation work into the Solr nodes themselves (rather than extract the data like we do).  I feel like this is a very promising concept.  I've played with this feature for a day or two and was very impressed.
  • ZomboDB can be thought of ElasticSearch-backed PostgreSQL indexes.  A great option if you need the robustness of a solid RDMBS and want access to the power of ES searching/aggregation/etc.  There's also a concept of linking indexes similar to a join.  I've used this product and think it's a really cool merging of technologies.
  • I haven't tried the SIREn Join Plugin for Elasticsearch but this appears to offer a left join option between ES indexes.

Saturday, January 2, 2016

Basic recursive Postgres updates

I recently came across a need to update a large amount of Postgres data (many millions of rows) laid out in a hierarchical format with potentially inherited attributes. To make the use case generic, consider the following structure that a bookstore might employ:
If it's not clear, this is the structure being modeled. Locations with asterisks indicate those locations are not being inherited - ie, they are direct. Note 'ww2' books just happen to have the same location as its logical parent ('history' books).  This is intentional just to show a potential condition.
Let's say we want to update the 'book' location from 'main floor' to 'new floor'. This should cascade to all object_types which inherit their location from 'book'. The query (pardon the bizarre cast to bigint) utilizes a WITH RECURSIVE CTE to process through an adjacency tree:
Perhaps more readable as a JPA named query. Note the inner query is traversing the hierarchy looking for candidates to give to the outer UPDATE.
The result shows the 4 relevant rows ('books' and those which descend from 'books' AND inherit their location) are updated:

Sunday, May 31, 2015

Inverted Index Text Mining of Twitter Data

A year ago, I got tired of wading through my timeline to find tweets of interest.  It was beginning to feel like work or another chore I had to complete every day.  OnlyWorthy was born over a weekend and it has served very admirably.  A little Python script wakes up every night, grabs all the day's tweets from a handful of accounts, and retweets those with the highest scores (a combination of favorites and retweets).  I follow @OnlyWorthy instead of those handful of accounts so I only get these 'good' tweets in my timeline.

This is obviously a crude way to solve my problem.  So I saw an opportunity to iterate OnlyWorthy in conjunction with my Data Mining class project with a more sophisticated mechanism.

The idea is as follows: put tweets into the Vector Space Model.  If a collection of tweets which a user enjoys is known, they could be indexed as individual documents.  New tweets could also be indexed.  At this point, similarity scores can be generated and relevant tweets can be identified.

This is the first time I've worked with PyLucene and I was very impressed.  Common Lucene analyzers and the Similarity API were utilized.

Here's an example workflow:

- A user identifies 100 tweets which they have enjoyed.  This can be done a variety of ways (examination of Twitter activity, an app with swipe left/right feedback, etc) but for this project their text was simply fed in or randomly selected.  All these tweets are placed in an index.
- A user identifies 50 twitter usernames they like.  This is to scope the project but if bulk-level API access was provided, perhaps more of the Twitter firehose can be ingested.
- At some interval, the application grabs recent tweets from those 50 accounts.  All these tweets are placed in an index and a score is calculated for each new tweet against each liked tweet.
- A handful of the top scoring new tweets are considered worthy and displayed to the user.  This could be done by retweet or presentation in a special app.  In my project, they were simply printed to stdout.
- I did not implement a continual feedback loop here but that's what should be done to keep refining the performance.

I went with an ensemble system which calculated TF-IDF cosine similarity and BM25 values.

Results were pretty good.  For example, given these tweets:
the program identified these tweets which seem like pretty good matches (one about monetary policy and one about new gadgets):


- It sometimes told the user what they already knew.  If the user indicated they liked a tweet accouncing a new version of Elasticsearch, for example, other tweets with similiar content would be flagged.  I added some penaltization based on time thinking that these tweets would be clustered but it didn't work well enough.
- Tuning is necessary to get the performance needed to evaluate millions of tweets.
- Lots of scaffolding was mocked and needs to be implemented.  While it's not extremely difficult to build a system to enable people to flag tweets that they enjoy, it also can't be written quickly.

If interested, the source code is available here.  I haven't deployed this to OpenShift because there is still a lot of work required to make it run continuously.  Feel free to borrow whatever ideas are expressed here if you're interested in this concept!

Sunday, August 24, 2014

CU-Boulder's CAETE program - my experience

When I left the military in 2011, I intended to make use of my GI Bill benefits.  They are generous enough to pay for a graduate degree and I felt like I really needed that to make up for lost time spent outside the software development industry.

I wasn't keen on having to physically attend a classroom, having unsatisfactorily tried that before.  So even though there are a few options in San Diego, an Internet-based program was my preference.  With a job and a family, time efficiency is critical.

University of Colorado Boulder has a solid engineering program with a department called Center for Advanced Engineering and Technology Education (CAETE).  I believe it's being re-branded as Engineering Anywhere - can't tell what's going on with the name.

In any event, I wanted to pursue the Masters of Engineering, Computer Science degree.  You're given 6 years to complete 30 credits (basically 10 courses).  I've gone with the coursework-only plan which just requires some breadth in my classes.

I've taken 1 class per Fall/Spring semester for the last 3 years.  I'm about to start my 4th year with a plan to take 1 class this Fall, 2 in the Spring, and 1 in the Summer of 2015 to complete the program.

Overall, I've been extremely satisfied.  The instructors have been great and the course content has been interesting and relevant to me.

Lectures conducted during the day in Boulder are almost always posted that night in time for me to watch them.  My strategy is to watch the lectures and do any required reading during the weekdays and leave the weekends open to homework, project work, and studying for tests.  I would guess I've been spending approximately 12 hours a week doing schoolwork.

The infrastructure needed to stream the lectures has steadily improved since I started the program.  When I had DSL with 1 Mbps download speeds, things didn't always work so well but it's better now with 10 Mbps.  It's usually a split screen with the instructor on one side and slides on the other.  Sometimes it gets difficult to see what the instructor is writing on a classroom whiteboard but I think I've had only 1 course with the problem more than a couple times.  They usually stick with writing on the slide deck which comes through fine.

Tests are performed with a proctor and you're given a time window to get it done.  Never had a problem with this.  I recall having to travel for work a couple years ago and needing to move my test around by a day or so and it wasn't an issue.

I get the most out of hand's on experience so the projects have been the most interesting to me.  Sometimes the project gets demo'd to the instructor or TA via Skype or Hangouts which gives us a chance to talk about it.  Everything has been in Java or Python with the exception of the networking class which was all in C.

So I'd recommend this program to anyone thinking about getting their CS graduate degree from an Internet-based program.  The coursework has been appropriate and engaging.  I definitely think it's helped me improve my software development knowledge.  Since the GI Bill is paying my way I don't feel like I know enough about the financial costs to comment on the tuition level.

Here's a list of the classes I've already taken with a couple notes:

Saturday, June 7, 2014

A SolrCloud backup scheme

SolrCloud needs a real backup mechanism - there's an open ticket for this but until then users will have to make due with ReplicationHandler.  The HTTP API is really simple:
It takes a snapshot of the whole index at that path.  This is obviously targeted for legacy Solr, not SolrCloud.  So let's shoehorn this into SolrCloud by practicing with the following configuration in which a single collection has 4 shards spread over 4 hosts and each shard has a replica (so 2 total copies of each shard):

  • host1: mycollection_shard1_replica1, mycollection_shard4_replica2
  • host2: mycollection_shard2_replica1, mycollection_shard3_replica2
  • host3: mycollection_shard3_replica1, mycollection_shard2_replica2
  • host4: mycollection_shard4_replica1, mycollection_shard1_replica2
If each host is running a single Solr instance at port 8983, both replicas are being served from the same Solr process.  Replication commands need to declare what data it wants backed up.  More on that below.

4 calls to ReplicationHandler's HTTP API are required to snapshot all of mycollection.  It doesn't matter which replica gets hit but perhaps it's a good idea to distribute the work and to stagger the commands so the cloud isn't too bogged down.  That is, instead of pummeling host1 and host2, perhaps each host should have to backup 1 shard.  So 4 GETs:
Note the location path includes a date attribute.  The HTTP call to /replication will return immediately but it might take several seconds or many minutes for the replication action to complete.  You can write some fancy daemon to figure out when it actually completes but why not write to some shared slab of disk and forget about it?

The data files will be uncompressed so you'll probably want to compress it at some point.  Rotation will be important because this isn't rsync - ReplicationHandler will write the whole thing each time.  In case you have a compatible configuration, note there is a "numberToKeep" param available.

Ultimately, your /backups/060714/ dir is left with 4 directories, each with the index files for a particular shard.  Recovery is really easy - just drop those files into the appropriate data/index/ directory.

Saturday, May 17, 2014

Metering tweets (a weekend project)

This project will probably take me more than a weekend but Twitter's new mute feature got me thinking - I don't need to mute people I follow but I would like to eliminate a lot of their fluff.  I can recall at least 10 people I had to unfollow because even though I thought they sent out good stuff, for every good tweet there were 5 worthless ones.

The idea is this: I'd like only a user's best x tweets every day to show up in my feed.  Let's define quality by quantifying what happens with a tweet (replies, retweets, favorites, etc).  Lots of hand waving here obviously and this part isn't fully baked.  Let's ignore this problem for now :-)

So every day an app combs through a user's tweets and picks out the best few tweets.  How do I get those tweets into my feed?  I'm sure as heck not going to build a new client, though that might be the smoothest implementation.  I created a "shadow account" which I can send tweets from and if I follow it, content will show up in my timeline without having to build a separate client.

Everything I've described is fairly easy.  Adding new users for the app to monitor, adjusting when the app evaluates tweets, how many to select, etc all require a web or mobile app.  I'm not too keen on building that part.  I'm hoping I'll know if this is a good idea before I get to this point - the app can be hard coded for now.

I'm hosting this on Openshift and writing it in Python.  I've loaded a cron cartridge and envision adding a script to the daily bin.  Twitter's API is pretty serious, especially auth, so I'm taking time to learn that.  I found a nice little Python library that abstracts the application-only auth mechanism so I'll roll with that for now (Edit: switched to tweepy).  My repo is public so feel free to take a look!

Monday, March 17, 2014

"Illegal instant due to time zone offset transition" with Joda-Time

Thanks to Daylight Saving Time, you might run into this error while using the excellent Joda-Time library:
java.lang.IllegalArgumentException: Cannot parse "2014-03-09T02:01:47Z": Illegal instant due to time zone offset transition (America/Los_Angeles)

I'm getting this pulling UTC timestamps out of Solr.  My app's input and output will always be UTC.  The offending code:

So Joda is trying to turn this into a time that never existed in Los Angeles (we sprung forward from 01:59:59 to 03:00:00).

Most answers on the web related to this error correctly indicate you should probably use LocalDateTime.  But what if you want to remain UTC?  Just tell the formatter to not revert to the local time zone.  This makes the rest of the app safer as well because it's not handling time values which were translated from UTC to local.