The technicalities 15 min read 23 August 2023

Big cartesian join in Big Query

Author
Amadeusz Kosik
Amadeusz Kosik

Big Data Engineer

Share
Share:

When working on more advanced analytics (or reports), you may stumble upon the problem of doing a self-join. Be it all pairs of available products, account connections in social media or anything else; it requires creating a heavy self-join on the data. Since this pattern is not as commonly documented as aggregation by group or filtering, let’s look into possible implementations of it. In this case, we use BigQuery service on the Google Cloud Platform.

The challenge

We have a table of products available in shops: 

1. One row represents one product in one shop.

2. If a product is available in multiple stores, it will have multiple rows, each with an appropriate shop ID.

As a result, we need a table of pairs of products:

+ All to all product pairs are computed for each shop and month, and we do not want cross-month or cross-store pairs (if the first product is from shop 38 and month 2001-04, the second one must be from the same period and store),

+ The table will be queried with a month predicate – only one single month per query will be requested,

+ Each pair of products within one month & shop should appear only once (if (A, B) pair is present, there should not be a row of (B, A) for a given month and shop).

All queries must run successfully in GCP BigQuery on-demand pricing mode.

Input data

In this case, we are going to look into the following table containing input data:

CREATE TABLE IF NOT EXISTS db.products AS
 SELECT
   product,
   shop,
   transactions,
   transaction_date
 FROM
   db.products_ingestion;

This table is the input for self-join. The available columns are narrowed to a minimum for the sake of clarity:

  • Product (containing product id – those will be cross-joined).
  • Shop (shop id).
  • Transactions (number of transactions involving a given product).
  • Transaction_date (effectively truncated to year-month). 

Naive approach

The first thing that comes to mind when doing the self-join is to do the self-join. Let’s test the simplest approach:

CREATE TABLE db.products_pairs AS
 SELECT
   lhs.product AS product_1,
   rhs.product AS product_2,
   lhs.transactions AS transactions_1,
   rhs.transactions AS transactions_2,
   lhs.shop AS shop,
   lhs.transaction_date AS transaction_date
 FROM (SELECT * FROM db.products) lhs
 INNER JOIN (SELECT * FROM db.products) rhs
 USING (shop, transaction_date)
 WHERE
   lhs.transaction_date = DATE(2022, 1, 1) 
      AND rhs.transaction_date = DATE(2022, 1, 1) 
      AND lhs.product < rhs.product;

The quiet assumption here is to run data filtering at the input level (before the join) and select only a single month of data on each query. The last predicate is to filter out product duplicates. Although this query looks good, we end up with either of those error messages:

Resources exceeded during query execution: Your project or organization exceeded the maximum disk and memory limit available for shuffle operations. Consider provisioning more slots, reducing query concurrency, or using more efficient logic in this job.

The query exceeded resource limits. This query used 1666502 CPU seconds but would charge only 2394M Analysis bytes. This exceeds the ratio supported by the on-demand pricing model. Please consider moving this workload to the flat-rate reservation pricing model, which does not have this limit. 1666502 CPU seconds were used, and this query must use less than 612800 CPU seconds.

The BigQuery engine seems to look into input data size and estimate the join cost based on that data. This case appears to be a corner case, though – we use only one table, but twice. This is clearly not considered in this computation, and using only one table fools the engine. Let’s fool it deeper and make it estimate the cost correctly. 

Complicated approach

We are going to look into two approaches: 

  • tweaking the query to make the BigQuery engine calculate the cost correctly,
  • recreating the query to get rid of self-join.

Along with comparing those two ideas, we will look at the impact of using the two most apparent techniques for fine-tuning queries: partitioning and clustering. They should improve both the execution time and cost – let’s see how effective they are.

Fixed join approach

Changing the cartesian join goes first. One trick must be applied: each side of the join must come from a different table. This can be accomplished by creating a 1:1 copy of the input table. We cannot use a view here – BQ is smart enough to look at the source of the view.

CREATE TABLE IF NOT EXISTS db.products_aux AS
 SELECT * FROM db.products;
CREATE TABLE db.products_pairs_join AS
 SELECT
   lhs.product AS product_1,
   rhs.product AS product_2,
   lhs.transactions AS transactions_1,
   rhs.transactions AS transactions_2,
   lhs.shop AS shop,
   lhs.transaction_date AS transaction_date
 FROM (SELECT * FROM db.products) lhs
 INNER JOIN (SELECT * FROM db.products_aux) rhs
 USING (shop, transaction_date)
 WHERE
   lhs.transaction_date = DATE(2022, 1, 1) 
      AND rhs.transaction_date = DATE(2022, 1, 1) 
      AND lhs.product < rhs.product;

One downside of this approach is that Google does not recommend cartesian joins. It also creates a duplicate of the input table, which must be managed and synchronized with the input. Nevertheless, it is possible to run such a query successfully.

Windowing approach

Aggregation and windowing can be employed to compute the output table to avoid doing a self-join. It is a bit similar to the computation of rolling sum (or average), but instead of a sum function, rows will be collected into an array: for each shop and transaction date, we want all the following products. 

OriginalOriginalOriginalOriginalAppended aggregate
ShopTransaction DateProductTransactionsProduct 2
Shop 12020-01-01Product 17(Product 2, 6)
(Product 3, 3)
(Product 4, 2)
(Product 5, 6)
Shop 12020-01-01Product 26(Product 3, 3)
(Product 4, 2)
(Product 5, 6)
Shop 12020-01-01Product 33(Product 4, 2)
(Product 5, 6)
Shop 12020-01-01Product 42(Product 5, 6)
Shop 12020-01-01Product 56null
Shop 22020-01-01Product 17(Product 2, 3)
(Product 3, 1)
Shop 22020-01-01Product 23(Product 3, 1)
Shop 22020-01-01Product 31null
Input product table

The second step is splitting array elements into separate rows via UNNEST instruction to produce the final output. It also clears null rows that cannot be joined when unnesting.

CREATE TABLE IF NOT EXISTS db.products_pair_aggregate AS
 WITH root AS (
   SELECT *,
       ARRAY_AGG(STRUCT (product, transactions)) OVER (product_window) AS product_2
       FROM db.products
       WHERE transaction_date = DATE(2022, 1, 1)
       WINDOW product_window
         AS (PARTITION BY shop, transaction_date 
             ORDER BY product ROWS BETWEEN 1 FOLLOWING AND UNBOUNDED FOLLOWING)
 )
 SELECT
   shop,
   transaction_date,
   root.product AS product_1,
   product_2.product AS product_2,
   root.transactions AS transactions_1,
   product_2.transactions AS transactions_2
   FROM root, UNNEST(product_2) product_2
 WHERE root.transaction_date = DATE(2022, 1, 1);

The final query is more complicated than one in the join approach, but it does not require creating any ‘hacky’ mirror tables.

Results

Both solutions can be implemented as unpartitioned, partitioned or partitioned and clustered tables (and queries creating them); therefore, there are six implementations to compare. Five of them work with BQ on-demand pricing, and the aggregation approach with only partitioning fails.

The results are presented in the table below. Each approach has been measured in three configurations: vanilla (no optimisation), partitioning (P) as well as partitioning and clustering (P+C). The join benchmark includes creating the mirror table (upper row) and the join itself (lower).

JoinJoin PJoin
P + C
AggregateAggregate
P
Aggregate
P + C
Processed size2 340 MB
4 670 MB
2 340 MB
229 MB
2 340 MB
229 MB
2 340 MBfailed115 MB
Billed size2 340 MB
4 670 MB
2 340 MB
229 MB
2 340 MB
229 MB
2 340 MBfailed115 MB
Wall time4s
1m 54s
9s
29m 48s
3s
1m 54s
1m 31sfailed7m 34s
Slot time4m 35s
7h 18m
3m 40s
9h 16m
6m 27s
10h 19m
8h 20mfailed15h 07m
Shuffled5 GB
423 GB
6 GB
507 GB
6 GB
841 GB
737 GBfailed840 GB
Resulting product pairs

Main points

1. Using partitioning on filtered tables is necessary to bring the cost of a query into acceptable range. 

2. Both join, and aggregate-based queries can produce the output in a reasonable amount of time and money. The aggregate one will further halve the cost, as it does not require creating a mirror table.

3. Apart from the pure benchmarking results, it is good to compare the query complexity when deciding on the approach. The join query may be trickier to maintain due to the extra table, but it is way easier to understand than the aggregate one.

Share
Share:

More insights