Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

update: doc for final demo #74

Merged
merged 1 commit into from
May 2, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 23 additions & 11 deletions docs/proposal.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@

> What is the goal of this project? What will this component achieve?

The objective of this project is to develop an Input/Output (I/O) service for an Online Analytical Processing (OLAP) database system. This service will facilitate communication between the execution engine and remote storage solutions such as Amazon S3 or Microsoft Azure Blob. Additionally, a local cache will be incorporated to store recently accessed data on the local disk, thereby accelerating future data retrievals.
The objective of this project is to develop an Input/Output (I/O) service for an Online Analytical Processing (OLAP) database system. This service will facilitate communication between the execution engine and remote storage solutions such as Amazon S3. Additionally, a local cache will be incorporated to store recently accessed data on the local disk, thereby accelerating future data retrievals.

The I/O service is designed to manage requests from the execution engine, fetching pertinent data (e.g., Parquet files) from either the local cache or remote storage. It will process the data and return a stream of decoded information to the execution engine.
The I/O service is designed to manage requests from the execution engine, fetching pertinent data (e.g., Parquet files) from either the local cache or remote storage. It will process the data and return a stream of decoded information as a record batch to the execution engine.

The initial phase aims to construct a fully functional I/O service following the specifications outlined above. Further enhancements, such as kernel bypass and integration of io_uring, may be considered based on project timeline and requirements.

Expand All @@ -29,12 +29,20 @@ Our design comprises several key components:
- Storage Client
- Storage Node
- Storage Manager
- Cache
- Storage Reader
- DataStore Cache
- Replacer (LRU, LRU-K)
- DataStore
- MemDiskStore --> File system
- SqliteStore --> SQLite
- Storage Reader
- S3 Reader (Read from S3)
- Mock S3 Reader (Read from file system)

The Storage Client resides in the compute node, where it establishes connections with the executors from the execution engine. The Storage Manager orchestrates requests from the compute node and then directs them to either the cache or the Storage Reader. The cache works by recording the access timestamp and make evictions of the cached elements, and we plan to use embedded databases such as RocksDB or Redis as our cache. For the cache policy, we plan to incorporate common policies such as LRU-K, LFU, etc. The Storage Reader includes several APIs for reading from different storage systems such as Amazon S3, Microsoft Azure Blob, and the local file system.
The Storage Client resides in the compute node, where it establishes connections with the executors from the execution engine. The Storage Manager orchestrates requests from the compute node and then directs them to either the cache or the Storage Reader. The cache works by recording the access timestamp and making evictions of the cached elements, and we plan to use embedded databases such as RocksDB or Redis as our cache. For the cache policy, we plan to incorporate common policies such as LRU-K. The Storage Reader includes several APIs for reading from different storage systems such as Amazon S3 and the local file system.

The workflow of the I/O service is as follows. Initially, the execution engine invokes the API exposed by the I/O service. The Storage Client will then contact the catalog to retrieve the corresponding physical location based on the logical columns (TBD). Next, the Storage Client transmits the requests using HTTP to the Storage Node. The Storage Manager then verifies whether the data is already present on the local disk by consulting the cache. This cache includes a mapping where the key represents the file's physical location in S3, and the value denotes the physical location on the local disk. If the data is found, it is directly returned to the Storage Client. Otherwise, the Storage Reader reads the data from the underlying storage and updates the cache. Finally, the Parquet file is decoded in the Storage Client, and the resulting stream is returned to the execution engine.
The workflow of the I/O service is as follows. Initially, the execution engine invokes the API exposed by the I/O service. The Storage Client will then contact the catalog to retrieve the corresponding physical location based on the logical columns (update: After discussing with the other I/O service team, we used a hashmap to mimic the behavior of catalog for sake of time). Next, the Storage Client transmits the requests via HTTP to the Storage Node. The Storage Manager then verifies whether the data is already present on the local disk by consulting the cache.

We design two levels of cache. One sits in the memory for small files fast retrieval, and the other uses disk as storage for caching large files. The latter includes a mapping where the key represents the file's physical location in S3, and the value denotes the physical location on the local disk. If the data is found, it is directly returned to the Storage Client. Otherwise, the Storage Reader reads the data from the underlying storage and updates the cache. Finally, the Parquet file is decoded in the Storage Client, and the resulting record batch stream is returned to the execution engine.


## Design Rationale
Expand All @@ -47,7 +55,7 @@ We use HTTP rather than TCP for the interaction between the storage client and t

The storage node, on the other hand, is designed to be of high performance and at the same time be easily extendible. We adopt LRU cache algorithm because it is one of the most widely-used cache strategies in the industry since it maintains a good hit rate in real-world scenarios while requiring moderate computation. Besides LRU, we also plan to adopt more cache algorithms to get a sense of the performance difference between various cache algorithms.

In addition to utilizing disk-based caching, we intend to incorporate either RocksDB or Redis as our primary cache storage solution. RocksDB employs the LSM tree structure, while Redis serves as a high-performing key-value store. Both options surpass the performance of a traditional disk-based cache and deliver enhanced concurrency control capabilities.
In addition to utilizing disk-based caching, we intend to incorporate SQLite as our primary cache storage solution. We choose SQLite because it is out-of-box and stable to use.

The storage reader is designed to retrieve data from different storage services. Currently, we plan to support the local file systems and Amazon S3, but we can easily add more storage services in the future via the abstraction of the storage reader.

Expand All @@ -68,21 +76,25 @@ The storage reader is designed to retrieve data from different storage services.

2. Integration Tests

The integration test will use the public API of the I/O service. We will call the API of the storage client in the way execution engine does. We will test on different request types (table, column, etc.), different storage types (file system, S3), and different request data amount. We will focus on the availability of the data and the correctness of the contents. Also, the I/O service should report the error appropriately when there is an exception.
The integration test will use the public API of the I/O service. We will call the API of the storage client in the way the execution engine does. We will test on different request types (table, column, etc.), different storage types (file system, S3), and different request data amounts. We will focus on the availability of the data and the correctness of the contents. Also, the I/O service should report the error appropriately when there is an exception.

2. Performance Tests (Benchmark)

We can test our component using a combination of diverse real-world datasets that have various data types, including numerical, string, and date/time data. These datasets should exhibit different characteristics such as NDV (Number of Distinct Values) ratio, null ratio, sortedness, and Zipf distribution. Some examples of datasets suitable for testing include UCI-ML, Yelp, IMDb, and the Public BI Benchmark. We can also generate our own data for testing.
We would write a Python script to generate random parquet files of certain sizes for benchmarking. Since the data type of the parquet files does not affect the performance, we generate floating point numbers for each parquet file.

The dataset we create is two sets of 10 parquet files, with one set containing 1Mb files and the other set containing 100Mb files. We adopt the Zipfian distribution for the access pattern.

For benchmarking, we measure the detailed elapsed time during each phase, starting from the Storage Client receiving the request, to the Storage Client returning the data. The machine we use is AWS EC2 type `C5.xlarge`, with 4vCPU, 8Gb memory and 32Gb disk. We set up one instance for Storage Client and one for Storage Server.

To simulate the behavior of AWS S3, we can mock it using a simple local server with a deliberately introduced delayed response time. Also, the benchmark tests should include a comparison of response times under various data request scenarios like sequential scan, index scan, etc.
The way we trigger the benchmarking is through GitHub Actions, which enables benchmarking to be triggered automatically on certain PR or push without manual operations.

## Trade-offs and Potential Problems

> Write down any conscious trade-off you made that can be problematic in the future, or any problems discovered during the design process that remain unaddressed (technical debts).

The whole design is based on the fact that the database is a static one (i.e. no data manipulation) and we only have read requests on the storage. This assumption makes everything easier, since there will be few concurrency issues for a read-only database. However, if we are going to enable updates, then we should correctly handle the read-write and write-write conflicts, which requires a more complicated design than the current one.

Moreover, even if all data are ETLed into the database system (this is our assumption), there can still be updates if the user replaces some of the underlying Parquet files. In this case, we might need another service to perform data discovery on the storage to deal with these situations. Also, we have to ensure the consistency of caches in different compute nodes (if we are going to build the cache) and ensure that the data we read is not stale.
Moreover, even if all data is ETLed into the database system (this is our assumption), there can still be updates if the user replaces some of the underlying Parquet files. In this case, we might need another service to perform data discovery on the storage to deal with these situations. Also, we have to ensure the consistency of caches in different compute nodes (if we are going to build the cache) and ensure that the data we read is not stale.


<!-- ## Glossary (Optional)
Expand Down
Loading