Skip to content

Power of Adaptive Query Execution Advanced Techniques and Strategies

In the highly demanding world of big data analytics optimizing the query plays a vital role in up-to-date data and timely insights from expensive data sets as organizations have ever-increasing volumes of data that keep growing and need to be processed efficiently. In this blog, we will take a deep delve into advanced spark setting techniques where understand the Delta Lake performance optimization. Apache Spark offers optimization features to improve performance and reliability which is different compared to the traditional query Optimisation technique that depends on static Optimisation based on statistical data needs which is not a feasible solution where the data keeps changing rapidly.

Overview

Here comes the ground-breaking Spark feature AQE that aims to improve the efficiency and performance of query processing in Delta Lake to dynamically adjust the query plans based on runtime where the query optimization generates the plan based on the data static at one point but data distribution changes before the query changes which can refer to a change in data value in data pattern data skew which leads to the query hence increasing the resource things and performance fallback. AQE addresses this challenge by adjusting the query plan on runtime by continuously analyzing the data and adapting the query plan in real time this overcomes the better resource usage query performance and enhances system robustness.

History

AQE how it works

Enabling AQE execution during the query execution AQE finds the optimal solution based on the real-time data where data can be distributed changes planning the efficient current data distribution the query is executed Delta Lake continuously collects and analyzes runtime statistics which includes data distribution partition size and execution time considering these terms AQE adjust the aspects such as joint strategies partitioning schemes and shuffle operation.

What are the statistical metrics AQE leverages for query performance?

Adaptive Query Execution (AQE) collects the runtime statistics on various metrics like data distribution, partition sizes, execution times, memory usage, and data skewness.  The runtime statistics are collected by enabling AQE to adapt query strategies and resource allocation during query runtime. AQE allows the instance to adjust the join strategies and shuffling of the data-on-data distribution, it also helps in equalizing the workload among the executors by repartitioning or coalescing. And with all the data distribution AQE also helps in memory, optimized resource usage, and data skewness through techniques like skew join handling or partition pruning.

Now let us deep dive into the AQE feature and understand how it works.

1) Dynamically coalescing shuffle partitions

The default number of partitions in Spark is 200. Does this partition size work in a feasible way for all scenarios?

Let us consider having the default number of partitions that can be fewer partitions which will lead to memory spill, or if it is too many where the size of the partition is less to network I/O operation for data fetch. Either of the cases that leads to the slow down performance.

Enabling the AQE will dynamically change the partition size based on the data distribution and size of data collected during the runtime statistics.

Without AQE and With AQE

With the above pictorial presentation, we can see the d1 and d3 in the two mappings are larger in size, but d2, d4, and d5 are smaller in size. During the execution, it takes more time to process for d1, and d3 compared to d2, d4, and d5. So, this leads to d2, d4, and d5 being ideal until, d1, and d3 complete their execution, enabling AQE to merge these partitions and make one partition by coalescing.

AQE collects the runtime statistics monitors the size for inefficiencies dynamically adjusts coalescing to optimize resource utilization. By dynamically coalescing shuffle partitions based on runtime statistics and optimization goals, AQE aims to improve resource utilization, reduce overhead, and enhance query performance in distributed computing environments. This adaptive approach ensures efficient execution of data processing tasks while maximizing the utilization of cluster resources.

2) Dynamically optimizes skew join

Data skew means the long-running task where one partition has a huge volume of data compared to other partitions. AQE identifies the Data skew by using sophisticated analysis techniques like sampling and histograms. AQE continuously monitors the runtime statistics to detect data skew in key columns and by using different optimizing techniques such as dynamic pruning, skew join handling, or adaptive join strategies to optimize performance. As the skewness in the data is detected, AQE may redistribute data across partitions to ensure each executor processes the same volume of data for efficient query execution if not the other executors are ideal and cannot change the plan.

Adjusting the degree of parallelism.

AQE detects the skewness dynamically and starts adjusting the degree of parallelism by scaling, by increasing the parallel tasks to distribute the workload evenly among executors or decreasing the parallelism for resource constraint (optimize resource utilization). By dynamically adjusting parallelism, AQE ensures that queries are processed efficiently and maximizes resource utilization leading to better performance and overall cluster efficiency.

Without AQE and With AQE

With the above pictorial representation, we can see the query execution by enabling AQE to have a skew shuffle reader redistribute the data partition with skew and scale by splitting the partition.

3) Dynamically switching the join strategy

By enabling the AQE which automatically selects the join strategy based on the runtime statistics of the data. The join strategy is selected based on the size of data, memory available, and skewness in data.

Now let us see the different Join strategies available in the spark.

  • Shuffle Hash Join

Shuffle Hash Join is particularly effective in scenarios like when we have skewed data distribution where certain joint keys have significantly more data than others and parallel processing is important when handling large data set that needs the ability to join operations in parallel across multiple nodes and helps optimize resource utilization.

Shuffle Hash Join is the default join strategy where the data from different partitions are shuffled and distributed occurs different nodes. To perform the Join operation the data movement among the nodes is necessary to get the required data results. This makes the operation costly as there are resource-intensive operations because of the network communication and data movement amount nodes.

In spark during the shuffle join hashing is used as a part of the process to efficiently distribute the data across the nodes. Initially, each of the Join keys are hash and a partition across the nodes based on the hash value and again distributed using the network shuffle for the related data to reside in the same node, leading to minimizing the data movement and processing the query efficiently.

  • Broadcast join

Broadcast join is used to efficiently combine to data set by broadcasting one dataset in all the worker nodes avoiding the data movement across the number of notes in the network communication.

Broadcast joints benefit efficient memory usage as a small data set is residing in each node avoiding the potential memory constraint that comes during shuffle join, eliminating the data of scales well when one data set is smaller. Broadcast join is suitable for one data set the significantly smaller and fits into the memory on each worker node so works well with skewed data.

  • Sort-Merge Join

Sort-Merge join in fundamental Join strategy in Apache Spark. Sort-Merge sorts both datasets based on the join keys ensuring the same keys are residing adjacent and merges the sorted dataset in a merge-like process, this involves iterating both datasets in parallel comparing the value of their join keys to combine and obtain the required results.

Sort-Merge benefits when dealing with large datasets that can be sorted efficiently and merging the records. Unlike the nested loop joins Sort-Merge Join’s performance is more predictable and scales well with data size and Sort-Merge can work effectively in parallelization where each node can sort and merge its portion of the data independently. Sort-Merge Join is a powerful strategy for efficiently joining large-scale datasets in distributed computing environments. By understanding the mechanics, optimizations, and best practices, data engineers and analysts can leverage Sort-Merge Join for massive datasets.

After sorting the dataset

Merging the sorted Dataset: Merge-Join started merging the dataset after sorting based on the join key.

Compare the first rows from each dataset. Like the below continue comparing and merging the remaining rows.

  • Employee table: 1ADD11, Jhon Chen, T1
  • Technology table: T1, Java
  • Since T1 matches T1, combine the rows.

AQE performs the cost-based analysis and estimates the execution cost of different join under different conditions and based on estimates AQE selects the join strategy that has minimum execution cost and query performance. AQE adjusts the default join strategies like shuffle must join and switch to other join strategies like sort-merge join, and broadcast join. So based on runtime statistics, data skewness is detected during query execution.

Conclusion:

In conclusion, Adaptive Query Execution (AQE) is a pioneering feature in Apache Spark for query processing and optimization in Delta Lake by dynamically adjusting query plans based on real-time data distribution and statistics. AQE tackles common challenges such as inefficient resource usage, slow query performance, and data skew through techniques such as dynamically coalescing shuffle partitions, dynamically switching join strategies, and dynamically optimizing skew joins leading to efficient query execution and resource utilization.

For More Details, Diggibyte Technologies Pvt Ltd has all the experts you need. Contact us Today to embed intelligence into your organization.

Author: Nandini Srinivas

Leave a Reply

Your email address will not be published. Required fields are marked *