In this blog, we will describe a new storage format that we adopted for our search index, one of the indexes in Rockset’s Converged Index. This new format reduced latencies for common queries by as much as 70% and the size of the search index by about 20%.
As described in our Converged Index blog, we store every column of every document in a row-based store, column-based store, and a search index. We initially designed our search index to store individual key-value pairs mapping a column value to a document id. We found that a lot of queries need to retrieve a large number of document ids from the search index for a particular column value. These queries were spending as much as 70% of their time making rocksdb::DBIter::Next()
and rocksdb::DBIter::Seek()
calls.
As part of our Star Schema Benchmark, we examined ways to reduce the number of RocksDB calls that the search index was making. The goals of the initiative were to:
- Reduce query latency
- Increase compute efficiency
- Reduce the storage footprint of indexes
With the help of our new storage format, the clustered search index, we were able to achieve <1 second query latency for all 13 queries that make up the SSB. Read the rest of this blog to see how we solved this technical challenge.
Rockset’s Converged Index
With Rockset’s Converged Index, we store every column of every document in three different indexes so that our optimizer can select the index that returns the best possible latency for the query. For example, if the query contains narrow selectivity predicates, the optimizer would decide to use the search index and only retrieve those documents from the collection that satisfy the predicates. If the query contains broad selectivity predicates, the optimizer would decide to use the column store to retrieve all values for specified columns from all the documents in the collection.
In the row store, field values in a document are stored together. In the column store, all values for a column across all documents are stored together. And in the search index, every <field, value>
pair maps to the list of document ids where the field contains that value. See the image below of Rockset’s Converged Index:
Block-Based Storage Formats
The columnar store in Rockset’s Converged Index is designed like any other columnar database: we write to and read from the column store in blocks. Each block contains a set of values that are encoded and then compressed.
Reading column values as blocks is much more efficient than reading individual values. This is not only because each block is efficiently encoded for faster reads, but also because we make much fewer rocksdb::DBIter::Next()
calls. For every rocksdb::DBIter::Next()
call, RocksDB needs to look at its index of the LSM tree and use a min-heap to perform a series of key comparisons in order to find the next key. The key comparisons are expensive. When the data is stored in blocks we only need to make this expensive rocksdb::DBIter::Next()
call once per block instead of once per individual value.
While the benefits of using block-based storage format for the column store were very clear, it was not clear in the beginning how and whether we should use a block-based storage format for the search index.
We initially designed our search index to store individual key-value pairs mapping a column value to a document id. We saw that the repetitive rocksdb::DBIter::Next()
and rocksdb::DBIter::Seek()
calls caused the throughput of the search index to be much worse than the throughput of the column store. Queries that needed to read a large number of document ids from the search index were very slow.
So, we designed a block-based storage format for our search index. To the best of our knowledge, this is the first time a block-based storage format has been used for a search index in a storage system that supports real-time updates. What made this problem interesting was that the new format needed to satisfy the following requirements:
- Support real-time updates, cannot apply updates in batch.
- Updates should not make queries slow.
- Each block would still be stored in RocksDB as a key-value pair, so a block should not be more than 10s of MBs in size.
How Does It Work?
The basic idea here is that for every <field, value>
pair (e.g. <name, Dhruba>
) in the search index, we want to store the corresponding list of document ids in blocks instead of as individual entries. We call each of these blocks a “cluster”. The minimum and maximum document id numbers that could be stored in a cluster determine the boundaries of the cluster.
To help you understand the trade-offs we made in the final design, let me first describe a simple design that we considered in the beginning and the problems with that design.
Initial Design
In the initial design, for every <field, value>
pair we accumulate document ids in a cluster until we reach a certain threshold K, and store these document ids in one key-value pair in RocksDB. Note that we do not need to hold a cluster in-memory until it is full.
Instead, we continuously write incremental updates to RocksDB as updates come in and then merge all the partial RocksDB values during query processing and compaction using RocksDB’s merge operator. After the current cluster for a <field, value>
pair fills up to K entries, we create the next cluster for this <field, value>
pair to hold the next K entries and so on. We track cluster boundaries for every <field, value>
pair and use those boundaries to correctly apply updates. In other words, the boundaries determine clusters from which the document id of updated field needs to be removed from and added to.
We discovered the following problems with this approach:
- We needed to track cluster boundaries separately for each
<field, value>
pair which complicated the read/write paths. - Document updates could cause older clusters to get bigger later. These clusters would need to be split to limit the size to K entries. Splitting clusters requires us to acquire a global lock to ensure that all writer threads use the same cluster boundaries. Global locks negatively impact the latency and throughput of writes when cluster splits happen. This also makes our write path very complicated.
- All the
<field, value>
cluster boundaries need to be held in memory to be able to apply incoming writes. This metadata could become very large in size and consume a significant amount of memory in our data servers.
Final Design
Eventually, we came up with a design that is simple and helped us achieve significant performance improvements.
In this design, the cluster boundaries are predetermined. Parameter K specifies the maximum size of a cluster aka cluster size, and document_id / K
function determines the cluster id. Thus, the first K documents with document ids [0, 1K)
fall in the first cluster, next K documents with document ids [1K, 2K)
fall in the second cluster and so on. For every <field, value>
pair in an incoming document, we add the document id to the cluster determined by the above function. This means that depending on how many times a particular <field, value>
pair repeats in a consecutive set of K documents, clusters could contain much fewer entries than the cluster size of K.
We were fine with clusters containing fewer entries than the cluster size of K. As we described earlier, our queries were slow when we needed to read a lot of document ids from the search index. In these cases, there would still be thousands to tens of thousands of entries per cluster and these clusters would help avoid the rocksdb::DBIter::Next()
calls thousands of times.
We experimented with different values for parameter K, and picked 2^16 as it gives a good trade-off between performance and the worst-case RocksDB value size.
The following figure shows what the clustered search index looks like using a very small cluster size of 4.
Improvement in the Number of RocksDB Next Calls
We discovered the following benefits of the clustered search index approach:
In addition to the benefits listed above, the final design also met our initial set of requirements including allowing for mutability and low data and query latency. Here’s how we met those requirements:
- We can apply updates in real-time as they come in. We do not need to wait until a cluster fills up before we can persist it and make it available for queries.
- The multiple partial updates to the same cluster get compacted into the full value during background compaction. This helps avoid performing these merges during query execution. We also use a technique that we call lazy merging during query execution to perform live merges efficiently. Basically, when the merges happen from a read-only path like query execution, we avoid the serialization/deserialization step of the merged value which is otherwise enforced by the rocksdb::MergeOperator interface.
- The cluster size configuration parameter lets us keep the worst-case RocksDB value size under 10s of MBs.
The following charts show the improvement in the number of rocksdb::DBIter::Next()
calls made and also the processing time to retrieve a bunch of document ids from the search index in the new format. For this experiment, we used a small collection with 5 million documents in it. Cluster size of 2^16 was used for the new format, which means there can only be up to 77 clusters (5,000,000 / 2^16) for a <field, value>
pair.
The clustered search index storage format reduced latency for queries that read a lot of document ids from the search index by as much as 70%. It has also helped reduce the size of the search index for some of our production customers by about 20%.