Supersonic pipelines

December 23, 2019
It all started during Flatiron's last Hackathon , in July 2019. Our regular sprint work was paused, and we had three days to work on any project that we thought could be interesting. My team and I wanted to work together this time, and the day before the hackathon, we had a meeting to decide what our project would be.

With a confident tone, our tech lead, Rohit Kapur, said, "I want to improve Blocks [our internal ETL tool] to make the slowest pipeline [which takes over ten hours on average] finish in fewer than ten minutes." I have to admit that a smile showed up on my face, and I quickly answered, "I have a better idea, let's make the slowest pipeline finish in one minute!"

I really liked this game; after all, dreaming is free and fun, but after some laughs and other jokes the conversation continued seriously. We found out that everyone on the team already had a lot of ideas on how to improve the performance of Blocks. Suddenly, what I thought was going to be just a game was becoming a very exciting possibility!

We spent the following three days trying to make this fantasy come true through different phases that I will describe below. But before we continue, I will describe a couple of concepts to help you understand this article better:

  • Blocks: An ETL tool that allows you to express data transformations as a series of dependent nodes

  • Pipeline: A series/collection of nodes and dependencies between them that define a transformation of data from source to target system

  • Node: A script file within a pipeline. It may or may not depend on the output of other nodes to run

  • Graph: The dependency description of the nodes within a pipeline


Phase 1: Throw money at the problem


First of all, we needed to set up an environment where we could run slow running data pipelines and do our proof of concept safely without breaking anything in production. After talking to some teams, we found an important pipeline that we could run as many times as we wanted without any side effects. It consistently took around two hours to run and while it wasn't the slowest pipeline, it was a good example containing the most common slow operations performed by most pipelines.

Our first approach was to throw money at the problem. We wanted to see how much of an improvement we could make by just upsizing the virtual machines on which our pipelines were running. To test, we swapped out an m5.xlarge (4 vCPUs, 16GB RAM) AWS EC2 instance for a p3.8xlarge (32 vCPUs, 244GB RAM, 4 GPUs with 64GB memory) instance.

This resulted in excellent improvements, as we can see in the chart below.

It looked like the number of CPUs didn't affect the speed as the pipeline graph is only seven levels wide, but the disk I/O performance and increased network bandwidth brought us down from two hours to about 35 minutes!

Phase 2: Be smarter


Throwing money at the problem was a great success, but we had to get smarter if we wanted to reach our goal.

In Blocks, a single node execution looks like this:


We publish every node for caching and reproducibility purposes, but when we disabled this feature, the pipeline execution was much faster. We, of course, wanted to keep this behavior of uploading node results, but we definitely didn't have to do this synchronously.

We decided to create a simple async server that managed a queue of node results to upload to S3 in a different process. That way, the pipeline wouldn't get blocked during execution and we would still retain the desired process of storing results permanently.

As you can see in the image above, we had an impressive 25 percent speed improvement.

Another really interesting finding from our tests was that the data upload usually finished in sync with or before all of the actual compute, meaning all operations from the pipeline execution finished much faster than before!

After this achievement, we still thought we could do better, and we had another idea that could potentially improve the performance.

Due to some architectural decisions in Blocks, it spins up multiple python interpreters per node execution, which can increase the latency. If we could use the same async python server that we created before and execute the code without starting new interpreters, it had to speed up the pipelines, right?

Unfortunately, after spending some hours refactoring really old code, we didn't get any consistent improvement. It turns out that just the http latency and isolating the environment was taking the same amount of time as spinning up the interpreters.

Phase 3: GPUs are not ready


It was time to test the limits of the huge machine and start using our GPUs!

Most of the nodes in blocks use local sql database to do simple transformations and we wanted to use modern and more efficient technologies instead, using the power of our new shiny GPUs.

The goal was to be able to easily replace the execution engine of the nodes without invasive changes to the pipelines. That means that we needed to use GPU libraries that could run SQL queries similar to what we had already.

Firstly, we tried omnisci , which seemed to be a good choice as it claims it can do what we mentioned above. But after some testing, we had to discard this option for two main reasons:

  • It doesn't support quoting for selecting fields, which broke a lot of our existing nodes.

  • It doesn't support any aggregate operations for non-numeric types, for example, getting the max value of a string.


We weren't going to give up, so we tried another interesting new technology, blazingSQL . This seemed to have all the features that we needed, but in our testing, it was very unstable, crashing randomly. Overall, it seemed too "alpha" for our goals.

Ok, I know I said we weren't going to give up, but we did this time. We felt that the GPUs' databases were not ready to be used in real environments, and we couldn't spend more time on this phase, as we only had two days left to reach our goal of a ten-minute pipeline!

Phase 4: Do not scan postgresql databases


As the GPUs didn't help us, we needed to get some metrics and check what was taking most of the time. Using Splunk, our logging tool, we wanted to see which nodes were taking the longest time so that we could do further investigation.

The table below shows how much time was taken by the five slowest out of 296 nodes after applying our previous optimizations. Most of the nodes run in parallel, and the sum of the durations does not represent the duration of the whole pipeline.



























Node Duration
Slowest Node 14.04 minutes
Second-Slowest Node 5.82 minutes
Third-Slowest Node 2.12 minutes
Fourth-Slowest Node 2.1 minutes
Fifth-Slowest Node 1.8 minutes

As you would've expected from the title of this phase, the slowest node was reading a whole 30GB table from a postgres database. The query looked something like this:

```

COPY(

SELECT

*

FROM my_big_table

WHERE

NOT is_deprecated

) TO STDOUT WITH CSV HEADER;

```

Sadly, this is something that happens too often across our pipelines, and there is nothing we can do in Blocks to solve this problem. We are limited by the database bandwidth.

After that, we took a look at the second node, and this was more surprising. It was just getting the result of the node above and filtered the rows based on a foreign key.

```

for chunk in pd.read_csv(data_src, chunksize=10000000):

filtered = chunk[chunk['f_key_id'].isin(candidate_f_key_ids)]

filtered.to_csv(sys.stdout)
```

We immediately thought that merging these two nodes into one would speed up the pipeline for two main reasons. Firstly, it would reduce the data that had to be downloaded from the database, and secondly, it would reduce the disk I/O iterations.

Happily, this time we were right, and we were able to cut the time significantly for these two nodes!













Node Before Splitting Combined Duration
Slowest and Second-slowest Nodes 20.86 minutes 1.4 minutes


Phase 5: Parquet and Athena made it possible!


The previous phase got us really close to the goal, only two minutes away! But because of the following, we know that it wasn't an ideal solution:

  • We had to modify pipelines, which is not scalable when you have over 1,000 of them.

  • Combining nodes might not be possible in a lot of cases where you want to reuse nodes.

  • Postgresql databases are not as ideal for data analysis as other columnar databases like Athena.


Fortunately, our team has already built internal tools to easily export tables from databases to S3 writing them in parquet format and linking them to tables in Amazon Athena.

Don't worry if you don't know what Athena or parquet are; I didn't know about them a couple of months ago either. Here you have some short definitions and links if you want to learn more about them. It is totally worth it!

  • Athena: An interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL. More info here.

  • Parquet: A columnar file format that provides optimizations to speed up queries and is a far more efficient file format than CSV or JSON. More info here.


We decided to improve the other slowest nodes (the ones over two minutes). They were also postgres queries, but this time, they were getting aggregated columns and the queries seemed to be well optimized, so we decided to give Athena a try.

Once we had the parquet files and the Athena tables, it was straightforward to make the change. The queries didn't have to change at all and we just had to modify the connection string the nodes were using.

This was a huge success! The nodes disappeared from the "Slowest Nodes" table, and we were able to run the whole pipeline in eight minutes! We made it!

However, this approach had its disadvantages too:

  • Need of an infrastructure to keep the parquet files up to date

  • Need to change the connection string to point to the Athena database

  • Need to make some small changes in the queries as Athena dialect is not exactly the same as postgresql


But most of these problems can be solved with scalable automated scripts. The only problem would be the query dialect disparity, but in most cases no changes would be necessary.

Although we didn't use it for our Hackathon project, we did experience one other interesting thing when using parquet. In order to test the performance of the parquet files, we wanted to see how much faster scanning the 30GB table we had mentioned in the previous phase would be. Unsurprisingly, we saw a lot of improvements again:

  • Disk space for the table using parquet was 6GB (one-fifth the original size)

  • Downloading and converting to csv took 50 seconds (20 times faster than reading from Postgresql)


Conclusion


We made it! In just three days we were able to improve the performance of one of the slowest pipelines from almost two hours to less than ten minutes. This is 17 times faster! I never thought we would get to that point.

But the numbers weren't the best thing we got from the Hackathon. All the knowledge and understanding of good approaches to deal with a lot of data probably exceed the short-term excitement of reaching the imaginary goal of ten minutes that we established.

These are my main learnings from this project:

  • Money always makes life much easier. Using bigger and more powerful machines can be a "simple" solution to many problems.

  • Do not block unless it is totally necessary. Delay things like logging or caching if they significantly slow down your main product.

  • GPU local storage is not ready yet, but it is worth it to keep an eye on them as cuDF gets better and more stable.

  • Do not use row-databases for analysis. Databases like Prosgresql are really good for writing data and transaction processing, but for analysis, use columnar databases and file formats.


The three days were not only productive but also super fun. It is incredible how fast time passes when you are working on something interesting, with new technologies, all together as a team and continuously challenging yourself. This also has had more indirect benefits like increasing happiness on our team, team-building, and thinking outside the box to solve problems that you might not know that you have. Here's to the next Hackathon!