Big data on GCP: dataflow, bigquery and spark cost comparison
I work in a typical big tech social network organization. Our task is to produce ML for our tiktok-like feed. We store a lot of data generated by users: clicks, likes, video plays, server events with specific info. We aggregate it, join with each other, transform into datasets to then train our models.
Our data comes from two sources: our servers and our users (==android and ios apps). In both cases, it is pushed to gcloud pub/sub, and then transferred to Bigquery.
Most of our data processing is done in Bigquery as well. We try to use other engines (like dataflow) only in the end of the pipeline to minimize costs.
The main reason why we use Bigquery is because it is considered to be cheaper than the alternatives. On the other hand, it has its problems: scalability and code sustainability:
Scalability: with a fixed number of reserved slots per organization, large queries - in our case - 100tb of data for our 1000 slot quota - either a) time out: bigquery seems to have intrinsic 6 hour timeout; bq client has an option to override that, but it doesn’t work; or b) bump into out of memory problems - with no way to control amount of memory allocated or query concurrency. Because of this, our engineers have a set of “hacks” to make it work: split big queries into smaller one, launching them manually, laboriously sitting at night and monitoring the results.
Code sustainability: SQL query is always a self-contained peace of code with no dependencies on user-defined libs. Because of that, if there is a common set of operations that is being reused by different pipelines, all the “code reuse” is done by just copy-pasting, with hundreds of slightly different copies of the same code living and developing independently in a big organization.
In addition, not every operation is even possible in Bigquery. Some advanced use cases like offline counters simulation requires imperative code, nothing can be done about it.
Because of all of these limitations, I decided to do a proper benchmark of various solutions available on the open market, and try and debug them a little bit to try to understand why are they so different.
The data
Proper benchmark requires proper, reproducible data. I was inspired by wordcount dataflow example, so this is what I am going with.
This is the code. It generates a set of csv files with roughly the following content:
0,tSi RB VE DrV …
1,hW rn vu I w …
The first column is used as a row index, to make it possible to load this data to BQ as a set of rows instead of just one giant corpus of text. Second column is a random set of “words” made up of random letters. I’ve made the words quite short, so that there will be a lot of duplicates - this should make wordcount a little bit more interesting.
This is how it can be uploaded to GCS:
gsutil cp /simulated_data/part0 gs://mgaiduk/simulated_data/part0
And to bigquery:
bq load mgaiduk.simulated_data simulated_data/part0 idx:integer,text:string
I ended up generating 100 files of about 150mb each.
Local processing
As a baseline, I wrote simple code that iterates over input file in a simple for-loop, aggregates counts into a map, and writes them into an output file.
This is how I run it:
go build main.go
time ./main --input simulated_data/part --output output.txt --mode simple --count 1
I ran it on GCE c3-highcpu-4 virtual machine, and it took 15 seconds on a single file. Such a VM is worth 125$/month on GCP, which turns out to be 0.00072$ per file, or 0.7$ per 1000 files. I report cost per 1000 files here because it is more convenient, not because I’ve actually ran it on 1000 files. Memory consumption should increase with input data size as well, which poses a serious problem for scalability of such solution.
Another curious fact about this solution is that it consumes only about 0.3 cores out of 4 requested, or 1/13 of available compute power. It is probably possible to write a more optimized solution that will be 13x faster and cheaper (though still not scalable!).
This theoretical local price would be 0.053$ per 1000 files, or 0.07x of our baseline cost.
I tried my hand on multithreaded programming, writing a version that launches a goroutine for each of 100 input files, creates a separate counts map in each goroutine, and finally merges them all together in one thread. It took 3 minutes 38 seconds on the same c3-highcpu-4 instance; average cpu consumption went up to 365%. This brings the cost to 0.1$ / 1000 files.
This solution, of course, is not scalable. But it provides an important reference point for a theoretical highly-optimized solution.
BigQuery
Here is the code for BQ:
create or replace table `mgaiduk.simulated_data_part0_results` as
with words as
(select split(text, " ") as txt from `maximal-furnace-783.mgaiduk.simulated_data`),
words2 as
(select word from words, unnest(txt) as word)
select count(*) as cnt, word from words2 group by word;
In such a simple case, it turns out to be rather pretty :)
It took 19 seconds to run on one file. Bigquery reported 29 seconds of consumed slot time, and 144mb of data scan.
Bigquery pricing is explained here. There are different pricing models. The simplest one, available to everyone “on-demand”, is slightly more expensive, and mostly takes “data scan” into account - with 5$ paid for 1 tb of data scan. This yields 0.00072$ per op, or 0.7$ per 1000 files - about the same cost as we’ve seen in our simple “for loop” local solution.
“Commitment” pricing model with monthly commitment costs 2000$/100 slots for a month (or even cheaper with longer commitments), which yields 0.000223$ per op, or 0.22$ per kOp, which is now only x0.28 from our baseline! Though still about 2.2x more expensive than our current best, local, unscalable multithreaded solution. Bigquery also reports “29 seconds of slot time” per file processed, so about 2x cores, compared to our simple for-loop with average of 0.3 cores. This means that BigQuery is not magic - it does waste some extra cores on all the parallelization and framework overhead. This gives me hope that some hyper-optimized framework might actually beat this.
Running the same query over 100 files took 44 seconds, 45 minutes of slot time, and 14gb of data scan. The cost for this is almost exactly the same as for one file: 0.7$ / kOp for on-demand pricing, and 0.22$ per kOp for monthly commitment. So in this respect, BigQuery scales almost perfectly: costs do not change, time changes only slightly (19 seconds to 44 seconds with 100x input size).
Dataflow
Here is the code for my dataflow wordcount implementation. It is not so different to google’s dataflow quickstart wordcount example. I also tried other sdks from the same page - Java and Python, all with roughly the same results.
This is how I run a local version:
time go run wordcount.go --output output.txt --input "gs://mgaiduk/simulated_data/part0"
This runs entirely on the local machine. Running this on my c3-highcpu-4 actually failed with OOMs, so I had to upgrade to c3-highcpu-8. Which means that the framework is insanely bloated, wasting more than 8 gb of memory for 150mb input file. It took 3 minutes and 38 seconds for one file, almost 15x the time it took for bigquery/local single threaded for loop. This brings the price up to 25.6$ per 1000 files, 36x times worse then bigquery.
This is how I run the same job on Dataflow:
time ./main --output "gs://mgaiduk/simulated_data_output/part0" --input gs://mgaiduk/simulated_data/part0 --runner dataflow --project my-project --staging_location gs://mgaiduk/binaries/ --labels='{"id": "mgaiduk_dataflow", "pod": "moj-feed-recall-dev", "service": "mgaiduk_dataflow"}'
This job on 1 file took 18 (!!) minutes. Pricing is explained here. Google bills for vcpu, memory*hour and shuffled gigabytes. All of these can be viewed in UI for the dataflow job. The cost sums up to be 24$ / 1000 files (34 times worse then bigquery). Scaling up to 100 files, it took 1 hour and 27 minutes, 86$ / 1000 files, and some OOM problems again, so I had to procure heavier workers for my dataflow:
time ./main --output "gs://mgaiduk/simulated_data_output/part0" --input "gs://mgaiduk/simulated_data/part*" --runner dataflow --project maximal-furnace-783 --staging_location gs://mgaiduk/binaries/ --labels='{"id": "mgaiduk_dataflow", "pod": "moj-feed-recall-dev", "service": "mgaiduk_dataflow"}' --experiments=shuffle_mode=appliance --worker_machine_type n1-standard-8 --disk_size_gb=100
This is so bad so I actually tried to dig a little bit deeper into what is going on, collecting a flamegraph on local runner. The runner spends less than 1% of time in my code, the rest of the time the cores just spin somewhere in the guts of beam implementation.
It looks like the main overhead (x34 cost) is just caused by bloated framework, and 3x more is being caused by parallelization and scalability overhead.
Apache spark
Spark is another popular framework for big data processing. It lacks beam’s “streaming” ability and unification of realtime and batch pipeline. On the other hand, Spark provides interactive shell for some quick distributed in-memory analytics; in addition, it provides “Dataset” abstraction over raw file storage systems like Gcs.
I read this tutorial for spark installation and some interactive script runs; then google’s dataproc documentation for running large-scale jobs on gcp clusters.
Here is the code for spark wordcount. This is how I run it locally:
spark-submit --class mgaiduk.WordCount --master "local[8]" target/scala-2.12/word-count_2.12-1.0.jar "/home/maksimgaiduk/repos/mgaiduk/dataflow/cost_benchmark/gen_random_data/simulated_data/part0" home/maksimgaiduk/tmp/output4
It takes 11 seconds on our favourite c3-highcpu-4, this time - with no memory issues. Average cpu load is 322%, and cost turns out to be 0.56$ / kOp, which is already better then on-demand bigquery and single threaded naive solution. It is, however, about 5x times worse then multithreaded local solution, with about the same cpu consumption - so some framework bloat and parallelization overhead is still present.
There are 2 ways to run the same code on an actual Spark cluster.
The first way is to create a Dataproc cluster - which creates a master with some workers that are ready to process incoming jobs. Second way is to use “dataproc batches” - serverless solution that allows you to submit jobs to google-managed cluster.
This is how you can create a dataproc cluster:
export PROJECT=maximal-furnace-783
export BUCKET_NAME=mgaiduk-us-central1
export CLUSTER=mg-dataproc
export REGION=us-central1
# create a cluster with just 1 node for master + worker
time gcloud dataproc clusters create ${CLUSTER} \
--project=${PROJECT} \
--region=${REGION} \
--single-node
# create a default cluster with 2 workers and one master
time gcloud dataproc clusters create ${CLUSTER} \
--project=${PROJECT} \
--region=${REGION}
# export cluster config to yaml
gcloud dataproc clusters export $CLUSTER --region $REGION --destination cluster.yaml
# delete our 2-worker cluster
gcloud dataproc clusters delete $CLUSTER --region=$REGION
# after modifying the yaml to have 8 instances instead, create a cluster again
gcloud dataproc clusters export $CLUSTER --region $REGION --destination cluster.yaml
Cluster creation takes about 2 minutes, so it isn’t a problem to just spin up a new cluster as soon as you need to process some big data, and delete if afterwards. However, having one big cluster used by different teams in the company will probably help ramp up the load and make it cheaper overall.
Here is how to submit a job to the cluster:
# copy jar to gcs
gsutil cp target/scala-2.12/word-count_2.12-1.0.jar gs://${BUCKET_NAME}/scala/word-count_2.12-1.0.jar
# submit a job
time gcloud dataproc jobs submit spark \
--cluster=${CLUSTER} \
--class=mgaiduk.WordCount \
--jars=gs://${BUCKET_NAME}/scala/word-count_2.12-1.0.jar \
--region=${REGION} \
-- "gs://mgaiduk/simulated_data/part*" gs://${BUCKET_NAME}/dataproc_output6/
Processing 1 file on a single-node spark cluster takes 44 seconds. Pricing is explained here. Basically, you pay all the cost of GKE nodes running (n1-standard-4, in our case), plus 0.01$/vcpu*hour for using Dataproc technology. In our case it ends up being 2$ per 1000 files for GKE costs, and extra 0.48$ for Dataproc tech usage, 20% overhead. Total is 2.48$ which is 4x worse then local Spark, which can be explained by overhead of starting jobs across different workers and communication.
Running 100 files on 8-worker cluster took me 2 minutes 29 seconds, and about 0.7$/1000 files cost. This makes it comparable to Bigquery on-demand in terms of cost. Perhaps, with some tweaks it can be optimized even further.
Second alternative was to run on dataproc batches, serverless alternative hosted by Google. This is how to submit a job:
time gcloud dataproc batches submit spark --subnet dataproc-dev --version=1.0 \
--region=$REGION \
--jars=gs://${BUCKET_NAME}/scala/word-count_2.12-1.0.jar \
--class=mgaiduk.WordCount \
-- "gs://mgaiduk/simulated_data/part*" gs://${BUCKET_NAME}/dataproc_output4/
I had 2 problems when trying to run this. First one sounded like “Subnetwork 'default' does not support Private Google Access which is required for Dataproc clusters when 'internal_ip_only' is set to 'true'. Enable Private Google Access on subnetwork 'default' or set 'internal_ip_only' to 'false'.”. Luckily, my org already had a network for some other dataproc installation with Private Google Access enabled, so adding —subnet dataproc-dev
fixed the issue.
Second problem sounded like “NoClassDefFoundError: scala/Serializable” with a huge stacktrace. From googling it I understood that it is somehow related to versons mismatch between local and remote environment, and passing --version=1.0
somehow fixed the issue.
It took 5 min 13 secs to run. Pricing is explained here. It is rather simple - you pay for “DCU” time (dataproc compute units) and shuffle gb*hours. In our case that ended up being 1.7$ / 1000 files, 2.5x worse then Bigquery on-demand.
Conclusion
Considering that my company already uses Bigquery for pretty much everything, I made 3 conclusions from the little research outlined above: 1) Apache Beam is simply too bloated, making it slow and expensive to use in pretty much any use case 2) Apache Spark might be a possible alternative to Bigquery for some imperative data processing, so I will definitely try it for some real use-cases 3) even Bigquery has significant overhead for data processing, so it is possible that here exist an amazing, super-optimized solution that will beat it in cost-effectiveness. Yandex Tables are about to get open sources, as I’ve heard, so perhaps that is the solution?