Skip to content

Commit

Permalink
0.0.8
Browse files Browse the repository at this point in the history
  • Loading branch information
marsupialtail committed Oct 4, 2022
1 parent d6ba121 commit 57da0d4
Show file tree
Hide file tree
Showing 17 changed files with 106 additions and 77 deletions.
2 changes: 1 addition & 1 deletion apps/tpc-h/tpch.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from pyquokka.df import *
from pyquokka.utils import LocalCluster, QuokkaClusterManager
from schema import *
mode = "DISK"
mode = "S3"
format = "parquet"
disk_path = "/home/ziheng/tpc-h/"
#disk_path = "s3://yugan/tpc-h-out/"
Expand Down
38 changes: 27 additions & 11 deletions docs/docs/simple.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

This section is for learning how to use Quokka's DataStream API. **Quokka's DataStream API is basically a dataframe API.** It takes heavy inspiration from SparkSQL and Polars, and adopts a lazy execution model. This means that in contrast to Pandas, your operations are not executed immediately after you define them. Instead, Quokka builds a logical plan under the hood and executes it only when the user wants to "collect" the result, just like Spark.

For the first part of our tutorial, we are going to go through implementing a few SQL queries in the TPC-H benchmark suite. You can download the data [here](https://drive.google.com/file/d/19hgYxZ4u28Cxe0s616Q3yAfkuRdQlmvO/view?usp=sharing). It is about 1GB unzipped. Please download the data (should take 2 minutes) and extract it to some directory locally. The SQL queries themselves can be found on this awesome [interface](https://umbra-db.com/interface/).
For the first part of our tutorial, we are going to go through implementing a few SQL queries in the TPC-H benchmark suite. You can download the data [here](https://drive.google.com/file/d/19hgYxZ4u28Cxe0s616Q3yAfkuRdQlmvO/view?usp=sharing). It is about 1GB unzipped. Please download the data (should take 2 minutes) and extract it to some directory locally. If you are testing this on a VM where clicking the link can't work, try this command after pip installing gdown: `~/.local/bin/gdown https://drive.google.com/uc?id=19hgYxZ4u28Cxe0s616Q3yAfkuRdQlmvO`. The SQL queries themselves can be found on this awesome [interface](https://umbra-db.com/interface/).

These tutorials will use your local machine. They shouldn't take too long to run. It would be great if you can follow along, not just for fun -- **if you find a bug in this tutorial I will buy you a cup of coffee!**

Expand Down Expand Up @@ -89,13 +89,10 @@ This is how you would write it in Quokka. This is very similar to how you'd writ

~~~python
def do_1():

d = lineitem.filter("l_shipdate <= date '1998-12-01' - interval '90' day")
d = d.with_column("disc_price", lambda x: x["l_extendedprice"] * (1 - x["l_discount"]), required_columns ={"l_extendedprice", "l_discount"})
d = d.with_column("charge", lambda x: x["l_extendedprice"] * (1 - x["l_discount"]) * (1 + x["l_tax"]), required_columns={"l_extendedprice", "l_discount", "l_tax"})

f = d.groupby(["l_returnflag", "l_linestatus"], orderby=["l_returnflag","l_linestatus"]).agg({"l_quantity":["sum","avg"], "l_extendedprice":["sum","avg"], "disc_price":"sum", "charge":"sum", "l_discount":"avg","*":"count"})

return f.collect()
~~~

Expand All @@ -122,15 +119,11 @@ When you call `.collect()`, the logical plan you have built is actually optimize
Joins work very intuitively. For example, this is how to do [TPC-H query 12](https://github.com/dragansah/tpch-dbgen/blob/master/tpch-queries/12.sql).
~~~python
def do_12():

d = lineitem.join(orders,left_on="l_orderkey", right_on="o_orderkey")

d = d.filter("l_shipmode IN ('MAIL','SHIP') and l_commitdate < l_receiptdate and l_shipdate < l_commitdate and \
l_receiptdate >= date '1994-01-01' and l_receiptdate < date '1995-01-01'")

d = d.with_column("high", lambda x: (x["o_orderpriority"] == "1-URGENT") | (x["o_orderpriority"] == "2-HIGH"), required_columns={"o_orderpriority"})
d = d.with_column("low", lambda x: (x["o_orderpriority"] != "1-URGENT") & (x["o_orderpriority"] != "2-HIGH"), required_columns={"o_orderpriority"})

f = d.groupby("l_shipmode").aggregate(aggregations={'high':['sum'], 'low':['sum']})
return f.collect()
~~~
Expand All @@ -143,7 +136,6 @@ def do_3():
d = customer.join(d,left_on="c_custkey", right_on="o_custkey")
d = d.filter("c_mktsegment = 'BUILDING' and o_orderdate < date '1995-03-15' and l_shipdate > date '1995-03-15'")
d = d.with_column("revenue", lambda x: x["l_extendedprice"] * ( 1 - x["l_discount"]) , required_columns={"l_extendedprice", "l_discount"})

f = d.groupby(["l_orderkey","o_orderdate","o_shippriority"]).agg({"revenue":["sum"]})
return f.collect()
~~~
Expand All @@ -155,10 +147,34 @@ An important thing to note is that Quokka currently only support inner joins. Ot
Feel free to look at some other queries in the Quokka [github](https://github.com/marsupialtail/quokka/tree/master/apps), or browse the [API reference](datastream.md). While you are there, please give Quokka a star!

##Lesson 2: Writing Things
So far, we have just learned about
So far, we have just learned about how to read things into DataStreams and do things to DataStreams. You can also write out DataStreams to persistent storage like disk or S3 to record all the amazing things we did with them.

Quokka currently operates like Spark and by default writes a directory of files, with a default maximum file size for different file formats. This makes it easy to perform parallel writing.

To write out a DataStream to CSV or Parquet to a local directory (you must specify a valid absolute path), simply do:

~~~python
d.write_csv("/home/ubuntu/test-path/")
d.write_parquet("/home/ubuntu/test-path/")
~~~

To write out a DataStream to S3, you should specify an S3 bucket and prefix like this:

~~~python
d.write_csv("s3://bucket/prefix/")
d.write_parquet("s3://bucket/prefix/")
~~~

Writing out a DataStream is a blocking API and will automatically call a `collect()` for you. The collected Polars DataFrame at the end is just a column of filenames produced.

##Lesson 3: Things you can't do.

Here is a brief discussion of what Quokka is not great for. Quokka's main advantage stems from the fact it can pipeline the execution of DataStreams. Once a partition (typically a Polars DataFrame) in a DataStream has been generated, it can be immediately consumed by a downstream user. This means downstream processing of this partition and upstream generation of the next partition can be overlapped.

Now, if an operator processing a DataStream cannot emit any partitions downstream until it has seen all of the partitions in its input DataStreams, the pipeline breaks. An example of this is an aggregation.
Now, if an operator processing a DataStream cannot emit any partitions downstream until it has seen all of the partitions in its input DataStreams, the pipeline breaks. An example of this is an aggregation. You cannot safely emit the result of a sum of a column of a table until you have seen every row! The main examples of this in data processing are groupby-aggregations and distributed sorts.

Currently, calling `groupby().agg()` or just `agg()` on a DataStream will produce another DataStream. However that DataStream will consist of exactly one batch, which holds the final result, emitted when it's computed. It is recommended to just call `collect()` or `compute()` on that result.

Quokka currently does not support distributed sort -- indeed a sort heavy workload is really great for Spark. Distributed sorting is not exactly needed for many analytical SQL workloads since you typically do the aggregation before the order by, which greatly reduce the number of rows you have to sort. You can then sort after you have done `collect()`. However for many other workloads distributed sorting is critical, and Quokka aims to support this as soon as possible.

Things that Quokka can do and doesn't do yet: fine grained placement of UDFs or UDAFs on GPUs or CPUs, core-count-control, Docker support, reading JSON, etc. Most of these can be easily implemented (and some already are) in the graph level API, however it takes effort to figure out what's the best abstractions to expose in the DataStream API. If you want to make this list shorter, I welcome contributions: [email protected].
17 changes: 11 additions & 6 deletions docs/docs/started.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,22 @@ Quokka can be installed as a pip package:
~~~bash
pip3 install pyquokka
~~~
However it needs the latest version of Redis (at least 6.0), which you can get by running the following in sudo:

**However it needs the latest version of Redis (at least 7.0)**, which you can get by running the following:
~~~bash
curl https://packages.redis.io/gpg | apt-key add -
echo "deb https://packages.redis.io/deb $(lsb_release -cs) main" | tee /etc/apt/sources.list.d/redis.list
apt-get update
apt-get install redis
curl -fsSL https://packages.redis.io/gpg | sudo gpg --dearmor -o /usr/share/keyrings/redis-archive-keyring.gpg

echo "deb [signed-by=/usr/share/keyrings/redis-archive-keyring.gpg] https://packages.redis.io/deb $(lsb_release -cs) main" | sudo tee /etc/apt/sources.list.d/redis.list

sudo apt-get update
sudo apt-get install redis
~~~

If you only plan on running Quokka locally, you are done. Here is a [10 min lesson](simple.md) on how it works.

If you plan on using Quokka for cloud, there's a bit more setup that needs to be done. Currently Quokka only provides support for AWS. Quokka provides a utility library under `pyquokka.utils` which allows you to manager clusters and connect to them. It assumes that awscli is configured locally and you have a keypair and a security group with the proper configurations. To set these things up, you can follow the [AWS guide](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-quickstart.html).
If you are planning on reading files from S3, you need to install the awscli and you have your credentials set up.

If you plan on using Quokka for cloud by launching EC2 clusters, there's a bit more setup that needs to be done. Currently Quokka only provides support for AWS. Quokka provides a utility library under `pyquokka.utils` which allows you to manager clusters and connect to them. It assumes that awscli is configured locally and you have a keypair and a security group with the proper configurations. To set these things up, you can follow the [AWS guide](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-quickstart.html).

More detailed instructions can be found in [Setting Up Cloud Cluster](cloud.md).

Expand Down
2 changes: 1 addition & 1 deletion docs/site/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -172,5 +172,5 @@ <h2 id="contact">Contact</h2>

<!--
MkDocs version : 1.4.0
Build Date UTC : 2022-10-03 01:58:01.117473+00:00
Build Date UTC : 2022-10-04 05:55:22.070729+00:00
-->
2 changes: 1 addition & 1 deletion docs/site/search/search_index.json

Large diffs are not rendered by default.

Loading

0 comments on commit 57da0d4

Please sign in to comment.