Out-of-core Polars

A performance benhcmarking and comparison of Polars and DataFusion using TPC-H data

banchmarks
polars
query-engine
Author

Hussain Sultan

Published

September 17, 2023

Modified

September 17, 2023

tl;dr

Polars is a fast DataFrame library in Rust, offering performance optimizations such as copy-on-write and lazy evaluation. Polars has a new streaming/out-of-core engine that integrates with Apache Arrow and other data science tools. This blog post evaluates Polars’ capabilities using a modified TPC-H benchmark, comparing its performance with DataFusion on various scale factors.

(a) Out-of-core/Streaming Benchmark

Figure 1: Polars’s streaming vs in-memory engine comparison at Scale-factor 10

Introduction

Polars, a high-performance DataFrame library, is gaining traction as a formidable alternative to the well-known pandas DataFrame library, primarily due to its remarkable performance optimizations. Written in Rust, Polars has quickly garnered popularity among Pandas users. It boasts a comprehensive feature set, including operations in both eager and lazy modes, support for various data types, and an extensive API for data manipulation and analysis.

Furthermore, Polars is compatible with Apache Arrow, facilitating seamless integration with other prominent data science tools such as NumPy, pandas, and PySpark.

Although pandas was designed for simplicity and ease of use, it may underperform compared to Polars. Patrick Hoefler demonstrated that pandas can perform on par with Polars through join order optimization and projection pushdowns1. In terms of projection pushdowns, Patrick’s version manually pushes down all filters in PyArrow. A pull request detailing this comparison can be found here. However, as with all benchmarks, interpretations may vary. If you seek efficiency, it is worthwhile to dive in. While pandas workflows are easy to use with small datasets due to in-memory processing, memory limitations pose a bottleneck for larger datasets.

Types of DataFrame Compute

DataFrame processing is a crucial step in the pipeline leading to Machine Learning training. Large-scale ML pipelines often begin with ETL/OLAP SQL operations, transitioning to the DataFrame API for batch processing to handle feature engineering (e.g., one-hot encoding) and ML preprocessing.

Types of DataFrame Compute

Figure 2: In-memory vs Out-of-core DataFrame engines

The continuing evolution of decoupled storage and the increasing demand for swift data iteration are blurring the boundaries between DataFrames and SQL capabilities. Polars stands out as an exceptional DataFrame library that bridges this gap, offering both in-memory and out-of-core processing capabilities. By employing copy-on-write semantics and eliminating the need for indexes, Polars has risen to provide out-of-core performance and query optimization for handling data beyond memory limits, as demonstrated in the TPC-H benchmark.

The purpose of this blog post is to assess the capabilities of Polars using a modified version of the TPC-H benchmark. We will begin with a single parquet file and use handwritten queries via Polars’ API. We will also explore Polars’ performance on small scale factors and compare it to DataFusion at 100GB and 500GB scale factors.

Pushing Query Engine Design

Polars, while written in Rust, has further innovated on state-of-the-art DataFrame design. First and foremost, it employs Copy On Write (COW)2 semantics, a resource-management technique that boosts performance by copying data structures only when modifications occur. This means that operations on large datasets can be more efficient, as unnecessary copying is avoided, providing significant speed improvements, especially for large datasets.

Secondly, Polars adopts a unique approach by forgoing the use of indexes3. While indexes can accelerate specific data lookups, they can also incur considerable overhead in terms of memory usage and update time. By avoiding indexes, Polars offers a leaner and more streamlined data handling experience, particularly for large and complex datasets.

The third key innovation is the embrace of lazy evaluation. In the Polars context, lazy evaluation means that operations are not immediately executed when declared. Instead, they are executed only when the results are needed. This strategy can markedly enhance performance for large data manipulations, as it allows Polars to optimize operations before execution and enables more efficient memory usage by loading and processing only the necessary data. For example, on a memory budget for 128GB ram, a large number of TPC-H queries at 500TB scale-factor worked, even when swap was completely turned off.

Finally, Polars supports Single Instruction, Multiple Data (SIMD) operations, allowing high-speed analytics and data manipulation as multiple operations can be performed simultaneously, thereby accelerating computation.

Moreover, Polars has recently incorporated SQL support, further aligning it with tools like Apache DataFusion.

Benchmarks

Benchmarking Methodology

This is a modified TPC-H benchmark in that: 1. It uses a single parquet data file for each of the source tables. 2. It does not use standard TPC-H SQL. Instead, for the case of Polars it uses the streaming mode with collect(streaming=True) argument, I am using hand-tuned queries that benefit from the optimization that TPC-H’s provided SQL does not. However, the resulting datasets are equivalent. Please note that polars streaming engine is nascent and does not support all the TPC-H query set. 3. It includes benchmarks at SF 100GB and 500GB with equivalent results. 4. Polars queries are sourced from the official TPC-H repository here. Similarly, DataFusion queries are also modified SQL4.

These modifications make some comparisons challenging. For instance, for a query that is difficult to represent in Polars as TPC-H SQL does, the query may include join ordering hints or caching hints not present in the official TPC-H SQL5. With these caveats in mind, let’s dive in.

TPC-H (Single Parquet)

I take an average of 4 runs for the execution time. Here are the details of the benchmark set-up:

Benchmark Configurations
GitHub https://github.com/hussainsultan/tpch-duckdb-bench
Database Version polars 0.18.7 datafusion 23.0.0
Hardware AMD Threadripper 3960X 3.8 GHz 24 Core, 128 GB Memory, 2TB NVME complete spec
Number of runs 4

Overall TPC-H Performance

Code
import urllib.request
import pandas as pd
import scipy
import matplotlib.pyplot as plt

from IPython.display import Markdown
from tabulate import tabulate

plt.style.use("../pitaasmoothie-dark.mplstyle")


def read_and_filter_csv(url):
    data = pd.read_csv(url)
    group_columns = ["name", "db"]
    processed_data = (
        data.query("success == True")
        .groupby(group_columns)
        .total_time_process.mean()
        .reset_index()
    )
    return processed_data


def parse_log(url):
    log_text = urllib.request.urlopen(url).read().decode("utf-8")
    log_lines = log_text.split("\n")[4:]
    queries, times = [], []
    for line in log_lines:
        words = line.split()
        if not words or words[2] != "'Overall":
            continue
        queries.append("h" + words[7].strip("'").zfill(2))
        times.append(float(words[9]))
    return pd.DataFrame({"name": queries, "total_time_process": times})


def geomean_and_plot(url1, url2, url3, x_label, y_label, title):
    df1 = read_and_filter_csv(url1)[["name", "total_time_process"]]
    df2 = read_and_filter_csv(url2)[["name", "total_time_process"]]
    df3 = read_and_filter_csv(url3)[["name", "total_time_process"]]
    gmean1 = scipy.stats.gmean(df1.total_time_process) / 60
    gmean2 = scipy.stats.gmean(df2.total_time_process) / 60
    gmean3 = scipy.stats.gmean(df3.total_time_process) / 60
    combined = pd.DataFrame(
        [gmean1, gmean2, gmean3], index=["sf100", "sf500", "sf1000"], columns=["Polars"]
    )
    combined.plot(kind="bar", color=["#c56cf0", "#facd49"])
    plt.title(
        title,
        fontdict={
            "fontsize": "large",
            "fontweight": plt.rcParams["axes.titleweight"],
            "verticalalignment": "baseline",
        },
    )
    plt.xlabel(x_label)
    plt.ylabel(y_label)
    plt.show()


geomean_and_plot(
    "https://raw.githubusercontent.com/hussainsultan/tpch-duckdb-bench/polars/results/polars/sf100_streaming.csv",
    "https://raw.githubusercontent.com/hussainsultan/tpch-duckdb-bench/polars/results/polars/sf500_streaming.csv",
    "https://raw.githubusercontent.com/hussainsultan/tpch-duckdb-bench/polars/results/polars/sf1000_streaming.csv",
    "Scale-factor",
    "Geomean Time (Minutes)",
    "Streaming (incl. reading data from disk)",
)


In-memory vs Out-of-core

Expand to see the code to process the results
def process_and_plot(df1, df2, mode1, mode2, title, x_label, y_label):
    df1["mode"], df2["mode"] = mode1, mode2
    combined = pd.concat([df1, df2])
    grouped = (
        combined.groupby(["name", "mode"])["total_time_process"]
        .mean()
        .unstack()
        .dropna()
    )
    grouped.plot(kind="bar", color=["#c56cf0", "#facd49"])
    plt.title(
        title,
        fontdict={
            "fontsize": "large",
            "fontweight": plt.rcParams["axes.titleweight"],
            "verticalalignment": "baseline",
        },
    )
    plt.ylabel(y_label)
    plt.xlabel(x_label)
    plt.show()


t1 = read_and_filter_csv(
    "https://raw.githubusercontent.com/hussainsultan/tpch-duckdb-bench/polars/results/polars/sf10_streaming.csv"
)[["name", "total_time_process"]]
t2 = read_and_filter_csv(
    "https://raw.githubusercontent.com/hussainsultan/tpch-duckdb-bench/polars/results/polars/sf10_memory.csv"
)[["name", "total_time_process"]]
process_and_plot(
    t1,
    t2,
    "Out-of-core",
    "In-Memory",
    "Out-of-core vs In-Memory Mode (SF10)",
    "Query",
    "Mean Total Time (Process)",
)

Failed Queries by Scale-factor


sf100 sf500 sf1000
h03 h03 h03
h07 h06 h06
h18 h07 h07
h08 h08
h09 h09
h12 h12
h13 h13
h17 h17
h18 h18
h21 h21

Comparison with DataFusion: Out-of-core Performance at higher scale-factors

To fully appreciate Polars, it’s insightful to compare it with another powerful data processing library, Apache DataFusion. The rest of the analysis is based on version 28.0.0 of datafusion available here.

While polars outperforms many query engines at lower scale-factors, as the data sizes grow the performance gains dissapear in streaming mode. Albeit Polar’s streaming support is nascent and is bound to get better over time, DataFusion is ~3x faster for larger than memory data starting with SF500. This is not particularly suprising since DataFusion was designed for a distributed SQL use-case from the start, while Polars comes from the DataFrame operations world targeting pandas users, and relucatantly adoped out-of-core query support. It is worth it to mention that duckdb is faster than both for same scale-factors.

DataFusion relies on the Tokio async runtime, an event-driven, non-blocking I/O platform for writing asynchronous applications using Rust. This model is robust and highly scalable, ideal for managing a multitude of connections and data streams. Since the Tokio runtime isn’t specifically tailored for compute-heavy workloads, which are often encountered in data analytics, DataFusion has been pushing the boundaries in showing that Tokio runtime should be used for CPU bound tasks6. Anecdotally, as compared to DuckDB that uses multithreading, I notice that DuckDB is frequently able to saturate all the cores while DataFusion and Polars have varied core utilization across each computation stages7 This could be that there are further optimizations to be had. For example, at the time of writing, a new DataFusion version came out with 2-3x faster parallel aggregation performance imporovements.

Conversely, Polars utilizes Rayon, a data parallelism library for Rust, for its backend. Unlike DataFusion’s asynchronous model, Rayon is designed with multithreading in mind. The multithreading model of Rayon divides tasks into smaller pieces and processes them in parallel across multiple threads with tighter control. For example, in Polars a user can specify the number of threads the runtime is allowed to use, but in DataFusion that is not the case.

Expand to see the code to process the results
def process_data(url, group_columns):
    # Read data from a CSV file and calculate the mean total time process
    data = pd.read_csv(url)
    processed_data = (
        data.query("success == True")
        .groupby(group_columns)
        .total_time_process.mean()
        .reset_index()
    )
    return processed_data


def calculate_gmean_and_create_df(data, index, column_name):
    gmean = scipy.stats.gmean(data.total_time_process) / 60
    return pd.DataFrame([gmean], index=index, columns=[column_name])


sf100_url = "https://raw.githubusercontent.com/hussainsultan/tpch-duckdb-bench/datafusion/results/datafusion/sf100_streaming_28.csv"
sf500_url = "https://raw.githubusercontent.com/hussainsultan/tpch-duckdb-bench/datafusion/results/datafusion/sf500_streaming_28.csv"
sf1000_url = "https://raw.githubusercontent.com/hussainsultan/tpch-duckdb-bench/datafusion/results/datafusion/sf1000_streaming_28.csv"
polars_sf100_url = "https://raw.githubusercontent.com/hussainsultan/tpch-duckdb-bench/polars/results/polars/sf100_streaming.csv"
polars_sf500_url = "https://raw.githubusercontent.com/hussainsultan/tpch-duckdb-bench/polars/results/polars/sf500_streaming.csv"
polars_sf1000_url = "https://raw.githubusercontent.com/hussainsultan/tpch-duckdb-bench/polars/results/polars/sf1000_streaming.csv"

# Read and process the csv files
datafusion_sf100 = process_data(sf100_url, ["name", "db"])
datafusion_sf500 = process_data(sf500_url, ["name", "db"])
datafusion_sf1000 = process_data(sf1000_url, ["name", "db"])
polars_sf100 = process_data(polars_sf100_url, ["name", "db"])
polars_sf500 = process_data(polars_sf500_url, ["name", "db"])
polars_sf1000 = process_data(polars_sf1000_url, ["name", "db"])

# Calculate geometric mean and create dataframes
datafusion_geomean_sf100 = calculate_gmean_and_create_df(
    datafusion_sf100, ["sf100"], "DataFusion"
)
datafusion_geomean_sf500 = calculate_gmean_and_create_df(
    datafusion_sf500, ["sf500"], "DataFusion"
)
datafusion_geomean_sf1000 = calculate_gmean_and_create_df(
    datafusion_sf1000, ["sf1000"], "DataFusion"
)
polars_geomean_sf100 = calculate_gmean_and_create_df(polars_sf100, ["sf100"], "Polars")
polars_geomean_sf500 = calculate_gmean_and_create_df(polars_sf500, ["sf500"], "Polars")
polars_geomean_sf1000 = calculate_gmean_and_create_df(
    polars_sf1000, ["sf1000"], "Polars"
)

# Combine DataFusion and Polars geometric mean dataframes
combined_geomean_sf100 = datafusion_geomean_sf100.join(polars_geomean_sf100)
combined_geomean_sf500 = datafusion_geomean_sf500.join(polars_geomean_sf500)
combined_geomean_sf1000 = datafusion_geomean_sf1000.join(polars_geomean_sf1000)
# Combine SF100 and SF500 geometric mean dataframes and plot
combined_geomean = pd.concat(
    [combined_geomean_sf100, combined_geomean_sf500, combined_geomean_sf1000]
)


def compare_and_plot(url1, url2, title, x_label, y_label):
    df1 = read_and_filter_csv(url1)[["name", "db", "total_time_process"]]
    df2 = read_and_filter_csv(url2)[["name", "db", "total_time_process"]]
    common_queries = set(df1["name"]).intersection(set(df2["name"])) - {"h09", "h21"}
    df1 = df1[df1.name.isin(common_queries)]
    combined = pd.concat([df1, df2]).dropna()
    grouped = (
        combined.groupby(["name", "db"])["total_time_process"].mean().unstack().dropna()
    )
    grouped.plot(kind="bar")
    plt.title(
        title,
        fontdict={
            "fontsize": "large",
            "fontweight": plt.rcParams["axes.titleweight"],
            "verticalalignment": "baseline",
        },
    )
    plt.ylabel(y_label)
    plt.xlabel(x_label)
    plt.show()


combined_geomean.plot(kind="bar")
plt.xlabel("Scale-factor")
plt.ylabel("Geomean Time (minutes)")
plt.show()

Expand to see the code to process the results
compare_and_plot(
    "https://raw.githubusercontent.com/hussainsultan/tpch-duckdb-bench/datafusion/results/datafusion/sf100_native.csv",
    "https://raw.githubusercontent.com/hussainsultan/tpch-duckdb-bench/polars/results/polars/sf100_streaming.csv",
    "Comparison of Total Process Time (SF100)",
    "Query",
    "Mean Total Time (Process)",
)
compare_and_plot(
    "https://raw.githubusercontent.com/hussainsultan/tpch-duckdb-bench/datafusion/results/datafusion/sf500_native.csv",
    "https://raw.githubusercontent.com/hussainsultan/tpch-duckdb-bench/polars/results/polars/sf500_streaming.csv",
    "Comparison of Total Process Time (SF500)",
    "Query",
    "Mean Total Time (Process)",
)

Resources

  1. A guide for user defined function in DataFusion
  2. Polars Benchmark published by the project at scale-factor 10
  3. TPC-H queries for polars from the developers
  4. Polars’s benchmarks from pandas lens
  5. Modified TPC-H queries for DataFusion
  6. Modifed Polars TPC-H queries

Acknowledgments

I’d like to thank Ritchie Vink for this gracious review of the blog post. My earlier version of the post mistakingly used the in-memory engine. On 128 GB RAM, it is astonishing that a large percentage of TPC-H queries passed at SF500 while all queries passed at SF100. This speaks to the power of polars using the lazy evaluation mechanism to optimize the expressions to use less memory and also Apache Arrow as the in-memory data format.

Secondly, I’d like to thank Andrew Lamb for their review and suggestions.

Footnotes

  1. Ritchie Vink noted that the hand-tuned scripts go even further with join order optimization, utilizing cardinality information and further pruning the join key before applying the join filter.↩︎

  2. It is worth it to mention that recent versions of pandas as COW but it is not turned on by default.here is a good read that explains what COW mean in context for pandas↩︎

  3. it makes a strong case for not needing an indes. More info SO thread↩︎

  4. Q15 failed because the context can only execute a single statement (this is trivially solvable and perhaps an issue with upstream SQL queries). Q16 gave me a “Schema at index 0 was different” error. Please note that these errors were based on 23.0.0 when using the official tpc-h queries (via ibis-tpch). I will re-run the full official suite and report back with my findings at↩︎

  5. Here is a specific case where a comparison with DuckDB does not make sense due to the API incompatibility.↩︎

  6. “can we make a tokio based scheduler build a system that take into account NUMA and cache locality” asks one of the core DataFusion devs in an issue↩︎

  7. In correspondence with Andrew Lamb, they suggested to use target_partitions configuration to control the number of threads.I tried playing around with target_partitions settings. It seemed to restrict the number of cores for the IO/parquet reading part but not for the compute operations. Again, my understanding is old, and I will try this again. The best way that I have found is to run Datafusion under Docker and use resource constraints for the exact number of CPUs to restrict.↩︎

Reuse