Big cartesian join in Big Query
When working on more advanced analytics (or reports), you may stumble upon the problem of doing a self-join. Be it all pairs of available products, account connections in social media or anything else; it requires creating a heavy self-join on the data. Since this pattern is not as commonly documented as aggregation by group or filtering, let’s look into possible implementations of it. In this case, we use BigQuery service on the Google Cloud Platform.
The challenge
We have a table of products available in shops:
1. One row represents one product in one shop.
2. If a product is available in multiple stores, it will have multiple rows, each with an appropriate shop ID.
As a result, we need a table of pairs of products:
+ All to all product pairs are computed for each shop and month, and we do not want cross-month or cross-store pairs (if the first product is from shop 38 and month 2001-04, the second one must be from the same period and store),
+ The table will be queried with a month predicate – only one single month per query will be requested,
+ Each pair of products within one month & shop should appear only once (if (A, B) pair is present, there should not be a row of (B, A) for a given month and shop).
All queries must run successfully in GCP BigQuery on-demand pricing mode.
Input data
In this case, we are going to look into the following table containing input data:
CREATE TABLE IF NOT EXISTS db.products AS
SELECT
product,
shop,
transactions,
transaction_date
FROM
db.products_ingestion;
This table is the input for self-join. The available columns are narrowed to a minimum for the sake of clarity:
- Product (containing product id – those will be cross-joined).
- Shop (shop id).
- Transactions (number of transactions involving a given product).
- Transaction_date (effectively truncated to year-month).
Naive approach
The first thing that comes to mind when doing the self-join is to do the self-join. Let’s test the simplest approach:
CREATE TABLE db.products_pairs AS
SELECT
lhs.product AS product_1,
rhs.product AS product_2,
lhs.transactions AS transactions_1,
rhs.transactions AS transactions_2,
lhs.shop AS shop,
lhs.transaction_date AS transaction_date
FROM (SELECT * FROM db.products) lhs
INNER JOIN (SELECT * FROM db.products) rhs
USING (shop, transaction_date)
WHERE
lhs.transaction_date = DATE(2022, 1, 1)
AND rhs.transaction_date = DATE(2022, 1, 1)
AND lhs.product < rhs.product;
The quiet assumption here is to run data filtering at the input level (before the join) and select only a single month of data on each query. The last predicate is to filter out product duplicates. Although this query looks good, we end up with either of those error messages:
Resources exceeded during query execution: Your project or organization exceeded the maximum disk and memory limit available for shuffle operations. Consider provisioning more slots, reducing query concurrency, or using more efficient logic in this job.
The query exceeded resource limits. This query used 1666502 CPU seconds but would charge only 2394M Analysis bytes. This exceeds the ratio supported by the on-demand pricing model. Please consider moving this workload to the flat-rate reservation pricing model, which does not have this limit. 1666502 CPU seconds were used, and this query must use less than 612800 CPU seconds.
The BigQuery engine seems to look into input data size and estimate the join cost based on that data. This case appears to be a corner case, though – we use only one table, but twice. This is clearly not considered in this computation, and using only one table fools the engine. Let’s fool it deeper and make it estimate the cost correctly.
Complicated approach
We are going to look into two approaches:
- tweaking the query to make the BigQuery engine calculate the cost correctly,
- recreating the query to get rid of self-join.
Along with comparing those two ideas, we will look at the impact of using the two most apparent techniques for fine-tuning queries: partitioning and clustering. They should improve both the execution time and cost – let’s see how effective they are.
Fixed join approach
Changing the cartesian join goes first. One trick must be applied: each side of the join must come from a different table. This can be accomplished by creating a 1:1 copy of the input table. We cannot use a view here – BQ is smart enough to look at the source of the view.
CREATE TABLE IF NOT EXISTS db.products_aux AS
SELECT * FROM db.products;
CREATE TABLE db.products_pairs_join AS
SELECT
lhs.product AS product_1,
rhs.product AS product_2,
lhs.transactions AS transactions_1,
rhs.transactions AS transactions_2,
lhs.shop AS shop,
lhs.transaction_date AS transaction_date
FROM (SELECT * FROM db.products) lhs
INNER JOIN (SELECT * FROM db.products_aux) rhs
USING (shop, transaction_date)
WHERE
lhs.transaction_date = DATE(2022, 1, 1)
AND rhs.transaction_date = DATE(2022, 1, 1)
AND lhs.product < rhs.product;
One downside of this approach is that Google does not recommend cartesian joins. It also creates a duplicate of the input table, which must be managed and synchronized with the input. Nevertheless, it is possible to run such a query successfully.
Windowing approach
Aggregation and windowing can be employed to compute the output table to avoid doing a self-join. It is a bit similar to the computation of rolling sum (or average), but instead of a sum function, rows will be collected into an array: for each shop and transaction date, we want all the following products.
Original | Original | Original | Original | Appended aggregate |
Shop | Transaction Date | Product | Transactions | Product 2 |
Shop 1 | 2020-01-01 | Product 1 | 7 | (Product 2, 6) (Product 3, 3) (Product 4, 2) (Product 5, 6) |
Shop 1 | 2020-01-01 | Product 2 | 6 | (Product 3, 3) (Product 4, 2) (Product 5, 6) |
Shop 1 | 2020-01-01 | Product 3 | 3 | (Product 4, 2) (Product 5, 6) |
Shop 1 | 2020-01-01 | Product 4 | 2 | (Product 5, 6) |
Shop 1 | 2020-01-01 | Product 5 | 6 | null |
Shop 2 | 2020-01-01 | Product 1 | 7 | (Product 2, 3) (Product 3, 1) |
Shop 2 | 2020-01-01 | Product 2 | 3 | (Product 3, 1) |
Shop 2 | 2020-01-01 | Product 3 | 1 | null |
The second step is splitting array elements into separate rows via UNNEST instruction to produce the final output. It also clears null rows that cannot be joined when unnesting.
CREATE TABLE IF NOT EXISTS db.products_pair_aggregate AS
WITH root AS (
SELECT *,
ARRAY_AGG(STRUCT (product, transactions)) OVER (product_window) AS product_2
FROM db.products
WHERE transaction_date = DATE(2022, 1, 1)
WINDOW product_window
AS (PARTITION BY shop, transaction_date
ORDER BY product ROWS BETWEEN 1 FOLLOWING AND UNBOUNDED FOLLOWING)
)
SELECT
shop,
transaction_date,
root.product AS product_1,
product_2.product AS product_2,
root.transactions AS transactions_1,
product_2.transactions AS transactions_2
FROM root, UNNEST(product_2) product_2
WHERE root.transaction_date = DATE(2022, 1, 1);
The final query is more complicated than one in the join approach, but it does not require creating any ‘hacky’ mirror tables.
Results
Both solutions can be implemented as unpartitioned, partitioned or partitioned and clustered tables (and queries creating them); therefore, there are six implementations to compare. Five of them work with BQ on-demand pricing, and the aggregation approach with only partitioning fails.
The results are presented in the table below. Each approach has been measured in three configurations: vanilla (no optimisation), partitioning (P) as well as partitioning and clustering (P+C). The join benchmark includes creating the mirror table (upper row) and the join itself (lower).
Join | Join P | Join P + C | Aggregate | Aggregate P | Aggregate P + C | |
Processed size | 2 340 MB 4 670 MB | 2 340 MB 229 MB | 2 340 MB 229 MB | 2 340 MB | failed | 115 MB |
Billed size | 2 340 MB 4 670 MB | 2 340 MB 229 MB | 2 340 MB 229 MB | 2 340 MB | failed | 115 MB |
Wall time | 4s 1m 54s | 9s 29m 48s | 3s 1m 54s | 1m 31s | failed | 7m 34s |
Slot time | 4m 35s 7h 18m | 3m 40s 9h 16m | 6m 27s 10h 19m | 8h 20m | failed | 15h 07m |
Shuffled | 5 GB 423 GB | 6 GB 507 GB | 6 GB 841 GB | 737 GB | failed | 840 GB |
Main points
1. Using partitioning on filtered tables is necessary to bring the cost of a query into acceptable range.
2. Both join, and aggregate-based queries can produce the output in a reasonable amount of time and money. The aggregate one will further halve the cost, as it does not require creating a mirror table.
3. Apart from the pure benchmarking results, it is good to compare the query complexity when deciding on the approach. The join query may be trickier to maintain due to the extra table, but it is way easier to understand than the aggregate one.