Blog

Towards Efficient Distributed Group Aggregation

FreejwwApr 12, 2024
Towards Efficient Distributed Group Aggregation

Group aggregation is widely used for large-scale data analysis (OLAP). Unlike regular

SELECT
queries , it changes the cardinality of result. Here we give an example, columns appearing after
GROUP BY
are the "grouping columns", and columns appearing after
SELECT
must be either the "grouping columns" or "aggregations".

SELECT
MobilePhone,
MobilePhoneModel,
COUNT(DISTINCT UserID) AS u
FROM
hits
GROUP BY
MobilePhone,
MobilePhoneModel
LIMIT
10;

Method

One intuitive approach is to process input data sequentially after sorting, with the best complexity being

O(nlogn)
. However, a more common approach is to use a hash table, which has a complexity of
O(n)
. Database vendors use different ways to handle group aggregation. ClickHouse employs a general-purpose hash table. DuckDB implements a specialized hash table for group aggregation, it fully utilizes features such as cache locality and vectorization to optimize performance, especially in scenarios with high cardinality.

Our implementation draws inspiration from DuckDB's design, while introducing new optimizations for distributed environments.

New design

In the previous implementation, we first builds keys for grouping columns, then constructs hash values based on keys, and iterates through all rows to insert into hash table. This method is straightforward but brings several problems:

  1. Before probing, all grouping columns must be built into keys, which is a significant overhead when dealing with large amounts of data.
  2. Probing is performed row by row without utilizing vectorization.
  3. When the hash table needs to be resized, all data in the hash table needs to be moved out and then moved back in.

The new design addresses the above issues. Figure 1 is the architecture diagram.

Figure 1: The Architecture Of The New Aggregate Hash Table

The new aggregate hash table includes two parts: Point Array and Payload.

  1. Point Array is a
    Vec<u64>
    structure, and each u64 value represents an entry. Salt(u16) represents the high 16 bits of the group hash value, and the pointer(u48) points to the row address of the payload page data.
  2. Payload actually holds the data of the hash table. Data is stored row by row in a fixed-length format within pages. Each page is a
    Vec<u8>
    structure with a size of 256KB. Each row contains four members: Validity, Groups, Hash and Stateaddrs. Validity represents the validity bits of the grouping columns. Groups are group data, which could contain multiple values of fixed size, while strings are stored elsewhere. Hash is the hash value of the groups. State_Addrs represent the addresses of the aggregate functions (_8 bytes each). All payloads constitute a partitioned payload.

This two-level indexing structure has many advantages:

  1. Point Array is a PodType Array with a size of only u64, making access very friendly in terms of size and alignment.
  2. Before probing, there is no need to build keys and only the group hash needs to be built. Once the groups need to be appended into the payload, we build keys.
  3. Salt is the high 16 bits of the hash, and comparing salt in advance can significantly reduce the logic of key comparison.
  4. When the hash table needs to be resized, we simply scan all payloads to reconstruct the Point Array. There is no need to move the payload data, so the reconstruction cost is relatively low. Furthermore, in distributed environments, rebuilding the hash table only requires the payload. This allows the payload to be serialized to other nodes and then deserialized to reconstruct the hash table.
  5. In the Aggregate Partial stage, we don't need to rebuild the Point Array during resizing or repartition, we just clear the hash table and make sure it'll be merged in the Aggregate Final stage.
  6. The new aggregate hash table facilitates vectorized probe and vectorized combining of states.

Workflow

Above, we introduced the design of a new aggregate hash table. Now, let's discuss more about how it works. Figure 2 demonstrates the workflow, there are three primary operators: Aggregate Partial, Transform Partition Bucket, and Aggregate Final. And since we are operating in distributed environments, we introduce additional operators such as Aggregate Exchange Injector to ensure the even distribution of data across all nodes.

Alt text

Figure 2: Workflow Diagram

Aggregate Partial

The upstream sends data to the downstream idle Aggregate Partial. During the Aggregate Partial stage, each thread independently constructs a thread-local hash table in two steps.

Step 1: Compute group hash(each column is hashed separately, and then the hashes are combined using XOR).

Step 2: Perform vectorized probe. First, we check whether the current hash table can accommodate this batch data. If not, we should resize the hash table first. Then, we calculate the entry offset based on the hash. If the entry is occupied, we compare the salt. If the salt is equal, we add the data index to the

group compare vector
; otherwise, we increment the offset to continue probing. If the entry is empty, we occupy the entry and add the data index to the
empty vector
. For data in the
empty vector
, we perform a vectorized append, adding all rows to the corresponding payload. For data in the
group compare vector
, we perform a vectorized compare. The matched data will undergo a vectorized combination of aggregation states. The remaining unmatched data will increment the offset and repeat step 2 until all data is processed.

In the Aggregate Partial stage, we also handle resizing and repartitioning. When the data in the hash table exceeds a certain threshold, we need to resize the hash table. At this point, we double the capacity and clear the point array, then we scan all payloads to reconstruct the Point Array. Additionally, because the size of each payload's page cannot grow indefinitely, so we repartition the partitioned payload by the radix bits once size exceeds a certain factor.

Aggregate Exchange Injector

This operator primarily scatters the data received from the upstream to ensure uniform distribution across all nodes. The Aggregate Partial operator can either transmit the entire partitioned payload or send individual payloads separately. We opt to send payloads individually to guarantee even distribution. Following the scatter process, the downstream operator is responsible for serializing the payload and forwarding it to the respective nodes.

Due to varying radix bits in each thread-local hash table, in distributed scenarios, ensuring transform partition bucket operator can correctly perform streaming merging requires upstream data transmission to maintain an ordered state. In the Aggregate Partial stage, We have ensured that buckets are sent monotonically to downstream. At this point, in the Exchange Sorting operator, we implement a multi-way merge sort based on the monotonicity of the input. Specifically, we calculate a unique block number for each bucket according to a formula(

block number = 1000 * radix_bits + bucket
), and then send them to downstream in a monotonically increasing order.

Transform Partition Bucket

In this operator, we receive both payloads and serialized data from the upstream. For the serialized data, we first deserialize it into payloads. Then, we normalize all payloads to the maximum radix bits and classify them into buckets for further processing by the downstream Aggregate Final operator. As this operator needs to handle all data from the upstream, to alleviate potential blocking states, we use working and pushing buckets to ensure that upstream data is processed in a streaming fashion.

Aggregate Final

This operator needs to merge the data from the buckets sent by the upstream. We construct a hash table(

radix_bits= 0
) to ensure that all data is probed and placed into the correct position.

Optimization

We have implemented several optimization strategies to improve the performance of the hash table:

  1. In the Aggregate Partial stage:
    1. We maintain an
      Arc<atomic> max_radix_hint
      value, which is updated to the maximum value triggered by the thread-local hash table when repartition occurs. This ensures that subsequent repartitions of the thread-local hash table can directly expand to the maximum radix bits, reduce downstream normalization pressure.
    2. We adopt dynamic resizing with an initially small capacity, and calculate a maximum capacity based on the number of threads and L1~3 cache. Before thread-local hash table reaching the maximum capacity, we resize continuously. Once the maximum capacity is reached, we no longer expand, but instead directly clear the point array. This ensures balanced performance for both low and high cardinality scenarios.
  2. When sending data downstream during the Aggregate Partial stage, we do not send an entire partitioned payload. Instead, we treat each payload as individual buckets and send them one by one, recording the bucket value and radix bits of each payload to indicate whether normalization is needed. This approach has two benefits:
    1. In clusters, it ensures that data is evenly distributed to each node during scatter operations and also reduces serialization pressure.
    2. In standalone, downstream operators can determine the maximum radix bits for each input as soon as the first data is pulled, facilitating streaming processing.
  3. When deserializing payloads during the Transform Partition Bucket stage, no probing is required. Instead, data can be directly appended to the payload. Probes will be exectued in the Aggregate Final stage.
  4. Additionally, we support payload spills in both standalone and distributed environments, following a similar process as described in the workflow.

Experiments

We use

ClickBench/hits
as a benchmark because there are many group by tests. Table 1 shows some performance improvements. For a comprehensive view, you can refer to the complete set of performance tests. Our local machine configuration: 32GB memory, 12-core CPU.

Table 1: Performance Improvement Summary

QueryLocal Singleton (s)ImprovementLocal Cluster - 2 Nodes (s)ImprovementLocal Cluster - 3 Nodes (s)ImprovementCloud Small (s)ImprovementCloud Medium (s)Improvement
Q131.081 → 0.88119%1.652 → 1.41614%1.995 → 1.41729%0.722 → 0.53326%0.838 → 0.68618%
Q141.837 → 1.42422%2.514 → 2.04119%2.909 → 2.12227%1.158 → 0.86425%1.022 → 0.9349%
Q151.279 → 0.98323%1.876 → 1.53618%2.270 → 1.56331%0.817 → 0.60326%0.885 → 0.71120%
Q160.927 → 0.69625%1.501 → 1.16223%1.737 → 1.39320%0.563 → 0.46118%0.752 → 0.6997%
Q173.030 → 1.62047%4.154 → 2.63137%4.361 → 2.93433%1.714 → 1.04939%1.435 → 1.04727%
Q181.663 → 0.96942%1.757 → 1.20032%1.600 → 1.31318%1.084 → 0.77728%0.542 → 0.5194%
Q196.223 → 2.73756%8.269 → 4.69943%9.316 → 5.06046%3.326 → 1.87244%2.515 → 1.64235%
Q321.720 → 1.29625%2.259 → 1.94614%2.586 → 1.89727%1.331 → 1.09917%1.349 → 1.18112%
Q339.633 → 3.77261%13.700 → 7.86343%19.029 → 8.30556%4.258 → 2.32245%3.620 → 2.35235%
Note: Times are shown as "main → pr" where "main" refers to the original time and "pr" the improved time after updates.

Table 1: Performance Improvement Ratio

Summary

We were inspired by DuckDB and implemented an aggregate hash table to handle group aggregation. Furthermore, we added several optimizations tailored for distributed scenarios. Currently, it supports both standalone and distributed environments, and spill functionality has been implemented. Compared to the previously designed general hash table, the new aggregate hash table performs better when processing group aggregations.

Refercence

  1. https://clickhouse.com/blog/hash-tables-in-clickhouse-and-zero-cost-abstractions
  2. https://bohutang.me/2021/01/21/clickhouse-and-friends-groupby/
  3. https://hannes.muehleisen.org/publications/icde2024-out-of-core-kuiper-boncz-muehleisen.pdf
  4. https://duckdb.org/2022/03/07/aggregate-hashtable.html
  5. https://duckdb.org/2024/03/29/external-aggregation.html
  6. https://www.youtube.com/watch?v=INCPhRwC0EE
  7. https://github.com/datafuselabs/databend/pull/15155
Share this post

Subscribe to our newsletter

Stay informed on feature releases, product roadmap, support, and cloud offerings!