The technicalities 10 min read 30 January 2023

Storage organisation vs query performance – examples

Author
Amadeusz Kosik
Amadeusz Kosik

Big Data Engineer

Share
Share:
storage_organisation_vs_query_performance_TantusData

The article How does storage organisation affect query performance described a number of principles on how to model data in Amazon Redshift, along with underlying internal mechanisms causing them. This time we will discuss those rules put in action with real data. Using the free and open GH Archive we will compare two data models – one with no optimisation done using correct keys and one aligned to the previous article.

Preparation – data loading

If you want to follow those examples, you will need a S3 bucket and a Redshift cluster – both located in the same region. This article was written based on results from 4x dc2.large nodes. Also please make sure that RS cluster has appropriate IAM permissions to read from the S3 bucket.

Data loading

As a best practice, data should be never loaded via INSERT queries, but rather from a distributed data source, like S3, with LOAD command. This example uses a manifest file, as input JSON files (unprocessed download from the GH archive – there is no preprocessing done outside the RS).

Manifest file

{

"jsonpaths": [

     "$['id']",

     "$['type']",

     "$['actor']['id']",

     "$['actor']['login']",

     "$['actor']['url']",

     "$['repo']['id']",

     "$['repo']['name']",

     "$['repo']['url']",

     "$['org']['id']",

     "$['org']['login']",

     "$['org']['url']",

     "$['created_at']"

]

}

DDL for input table

CREATE TABLE github_archive_input

(

id     VARCHAR(64),

type     VARCHAR(64) DEFAULT 'NULL',

actor_id VARCHAR(64) DEFAULT 'NULL',

actor_login  VARCHAR(256) DEFAULT 'NULL',

actor_url VARCHAR(256) DEFAULT 'NULL',

repo_id  VARCHAR(64) DEFAULT 'NULL',

repo_name VARCHAR(256) DEFAULT 'NULL',

repo_url VARCHAR(256) DEFAULT 'NULL',

org_id   VARCHAR(64) DEFAULT 'NULL',

org_login VARCHAR(256) DEFAULT 'NULL',

org_url  VARCHAR(256) DEFAULT 'NULL',

created_at   TIMESTAMP

) DISTSTYLE EVEN SORTKEY(created_at);

Loading the data

Please be sure to update the S3 bucket location, manifest file path and IAM role ARN.

COPY github_archive_input

    FROM 's3://…/github-archive/2020/01'

    IAM_ROLE 'arn:aws:iam::...:role/service-role/AmazonRedshift-CommandsAccessRole

    JSON 's3://…/github-archive/jsonpaths.json'

DATEFORMAT AS 'YYYY-MM-DDTHH:MI:SSZ' TIMEFORMAT AS 'YYYY-MM-DDTHH:MI:SSZ';

Benchmarking distribution style

As mentioned in the previous article, distribution style defines rules of assigning row to a slice. This should help with performance of: 

1. aggregation queries (by key): distributing rows by column used in GROUP BY makes all aggregations local to a slice – cluster does not have to shuffle them between nodes;

2. joins: in case of two (or more) tables distributed by the same key joining can be performed locally (again, without network shuffle).

The goal of the benchmark is to check if above differences will have a noticeable impact on execution time and cluster load.

Tables for the benchmarks

Following DDLs will create two sets of tables: one with the default (EVEN) distribution and one with KEY. This will allow us to compare execution time (and resources used to run) for given queries in two scenarios: when tables are not optimized at all and when distribution was fine-tuned for the query.

As for the domain, the tables contain events from GH Archive and actors (users) & repositories extracted from those events. Two benchmarks will be run: one using an aggregation query and one with an inner join.

Schema – distribution EVEN

CREATE TABLE github_archive_actors_even

(

actor_id VARCHAR(64) DEFAULT 'NULL',

actor_login  VARCHAR(256) DEFAULT 'NULL',

actor_url VARCHAR(256) DEFAULT 'NULL'

) DISTSTYLE EVEN SORTKEY(actor_id);

CREATE TABLE github_archive_repos_even

(

repo_id  VARCHAR(64) DEFAULT 'NULL',

repo_name VARCHAR(256) DEFAULT 'NULL',

repo_url VARCHAR(256) DEFAULT 'NULL'

) DISTSTYLE EVEN SORTKEY(repo_id);

CREATE TABLE github_archive_events_even

(

id     VARCHAR(64),

type     VARCHAR(64) DEFAULT 'NULL',

actor_id VARCHAR(64) DEFAULT 'NULL',

repo_id  VARCHAR(64) DEFAULT 'NULL',

org_id   VARCHAR(64) DEFAULT 'NULL',

created_at   TIMESTAMP

) DISTSTYLE EVEN SORTKEY(created_at);

INSERT INTO github_archive_actors_even (actor_id, actor_login, actor_url

VALUES (SELECT actor_id, actor_login, actor_url FROM github_archive_input);

INSERT INTO github_archive_repos_even (repo_id, repo_login, repo_url

VALUES (SELECT repo_id, repo_login, repo_url FROM github_archive_input WHERE repo_id IS NOT NULL);

INSERT INTO github_archive_events_even (id, type, actor_id, repo_id, org_id, created_at

VALUES (SELECT id, type, actor_id, repo_id, org_id, created_at FROM github_archive_input WHERE id IS NOT NULL);

Schema – distribution KEY

CREATE TABLE github_archive_actors_by_actor_id

(

actor_id VARCHAR(64) DEFAULT 'NULL',

actor_login  VARCHAR(256) DEFAULT 'NULL',

actor_url VARCHAR(256) DEFAULT 'NULL'

) DISTSTYLE KEY DISTKEY(actor_id) SORTKEY(actor_id);

CREATE TABLE github_archive_repos_by_repo_id

(

repo_id  VARCHAR(64) DEFAULT 'NULL',

repo_name VARCHAR(256) DEFAULT 'NULL',

repo_url VARCHAR(256) DEFAULT 'NULL'

) DISTSTYLE KEY DISTKEY(repo_id) SORTKEY(repo_id);

CREATE TABLE github_archive_events_by_actor_id

(

id     VARCHAR(64),

type     VARCHAR(64) DEFAULT 'NULL',

actor_id VARCHAR(64) DEFAULT 'NULL',

repo_id  VARCHAR(64) DEFAULT 'NULL',

org_id   VARCHAR(64) DEFAULT 'NULL',

created_at   TIMESTAMP

) DISTSTYLE KEY DISTKEY(actor_id) SORTKEY(created_at);

CREATE TABLE github_archive_events_by_repo_id

(

id     VARCHAR(64),

type     VARCHAR(64) DEFAULT 'NULL',

actor_id VARCHAR(64) DEFAULT 'NULL',

repo_id  VARCHAR(64) DEFAULT 'NULL',

org_id   VARCHAR(64) DEFAULT 'NULL',

created_at   TIMESTAMP

) DISTSTYLE KEY DISTKEY(repo_id) SORTKEY(created_at);

INSERT INTO github_archive_actors_by_actor_id (actor_id, actor_login, actor_url

VALUES (SELECT actor_id, actor_login, actor_url FROM github_archive_input);

INSERT INTO github_archive_repos_by_repo_id (repo_id, repo_login, repo_url

VALUES (SELECT repo_id, repo_login, repo_url FROM github_archive_input WHERE repo_id IS NOT NULL);

INSERT INTO github_archive_events_by_actor_id (id, type, actor_id, repo_id, org_id, created_at

VALUES (SELECT id, type, actor_id, repo_id, org_id, created_at FROM github_archive_input WHERE id IS NOT NULL);

INSERT INTO github_archive_events_by_repo_id (id, type, actor_id, repo_id, org_id, created_at

VALUES (SELECT id, type, actor_id, repo_id, org_id, created_at FROM github_archive_input WHERE id IS NOT NULL);

Benchmark: aggregates

In the first example user runs two aggregate queries (not in parallel): counting number of actors with more than 10 events and number of repos with more than 10 events. Both (distribution EVEN and KEY) scanned equal amount of data (measured both by rows and size) and returned the same results:

  • actors: 177 228 289 rows, 2.20G
  • repos: 153 341 144 rows, 1.26G

The Redshift cluster was able to optimize computation by aggregating and filtering data in place for each slice, as each actor is guaranteed to be stored in exactly one slice.

There is an implicit limit of 100 rows returned.

Queries

SELECT actor_id, COUNT(*) AS events

FROM github_archive_events_even

WHERE actor_id <> 'NULL' GROUP BY actor_id HAVING events > 10

ORDER BY events DESC;

SELECT actor_id, COUNT(*) AS events

FROM github_archive_events_by_actor_id

WHERE actor_id <> 'NULL' GROUP BY actor_id HAVING events > 10

ORDER BY events DESC;

SELECT repo_id, COUNT(*) AS events

FROM github_archive_events_even

WHERE repo_id <> 'NULL' GROUP BY repo_id HAVING events > 10

ORDER BY events DESC;

SELECT repo_id, COUNT(*) AS events

FROM github_archive_events_by_repo_id

WHERE repo_id <> 'NULL' GROUP BY repo_id HAVING events > 10

ORDER BY events DESC;

Aggregates: results

Actors, EVENActors, KEYRepos, EVENRepos, KEY
HashAggregate output (MB)765.67< 1935.52< 1
HashAggregate time (s)8161
Total time (s)10791
Benchmark aggregates – Results

Benchmark: joins

Second example covers joining data: events with either repositories or actors. In each case of even distribution, RS had to redistribute data partitions among slices – something not needed for optimized in-place join – as highlighted in the AWS console as DS_DIST_NONE. On the other hand, evenly distributed tables were joined using DS_DIST_BOTH, the most expensive type of join. All types of joins are described in the AWS documentation.

Queries are limited to 250 responses to keep execution time bearable. A query without aggregation is tantamount to unloading the whole database to the master node, something not encouraged in Amazon Redshift.

Queries

SELECT * FROM github_archive_events_even events

INNER JOIN github_archive_actors_even actors 

ON actors.actor_id = events.actor_id LIMIT 250;

SELECT * FROM github_archive_events_by_actor_id events

INNER JOIN github_archive_actors_by_actor_id actors 

ON actors.actor_id = events.actor_id LIMIT 250;

SELECT * FROM github_archive_events_even events

INNER JOIN github_archive_repos_even repos 

ON repos.repo_id = events.repo_id LIMIT 250;

SELECT * FROM github_archive_events_by_repo_id events

INNER JOIN github_archive_repos_by_repo_id repos 

ON repos.repo_id = events.repo_id LIMIT 250;

Benchmark joins

Actors, EVENActors, KEYRepos, EVENRepos, KEY
Join stepDS_DIST_BOTHDS_DIST_NONEDS_DIST_BOTHDS_DIST_NONE
Join output (MB)13 690< 116 250< 1
Join time (s)213255
Total scanned (rows)442 997 562196 894 844442 999 328196 900 175
Total scanned (GB)26.279.6431.179.64
Total time (s)21162717
Benchmark joins – Results

Benchmarking sorting

When defining sorting keys for benchmark we focus on optimizing block filtering. As mentioned in the previous article, Redshift checks metadata of each block before opening it to read for min-max values of a given column. If the query asks for rows from outside that min-max range, the block is skipped. The block size is not configurable, but the DBA can control the order of the rows stored in consecutive blocks using SORTKEY to achieve a good block filtering.

In this benchmark three kinds of sorting are checked:

1. AUTO, leaving Redshift to figure out sorting itself,

2. SORTKEY(id) that puts rows in order of event creation (good for vacuuming, but not for filtering by type),

3. SORTKEY(type, id), a compound sort key by type & id: type is a finite enumeration and id is incrementing with new rows, suggesting a good choice for sorting.

Again, the goal of the benchmark is to check if above differences will have a noticeable impact on execution time and cluster load.

Benchmark: using a predicate

The simplest example of block rejection is a SELECT query with a predicate. For this demonstration events data will be loaded into three tables, sorted automatically, by event id and by (type, id) tuple. Query will filter out rows based on type column. Limit is introduced again to not unload large dataset onto the leader node.

Schema
CREATE TABLE github_archive_events_auto

(

    id VARCHAR(64),

    type VARCHAR(64)     DEFAULT 'NULL',

    actor_id        VARCHAR(64)     DEFAULT 'NULL',

    repo_id VARCHAR(64)     DEFAULT 'NULL',

    org_id  VARCHAR(64)     DEFAULT 'NULL',

    created_at   TIMESTAMP

) DISTSTYLE EVEN SORTKEY AUTO;

CREATE TABLE github_archive_events_id

(

    id VARCHAR(64),

    type VARCHAR(64)     DEFAULT 'NULL',

    actor_id     VARCHAR(64)     DEFAULT 'NULL',

    repo_id VARCHAR(64)     DEFAULT 'NULL',

    org_id  VARCHAR(64)     DEFAULT 'NULL',

    created_at   TIMESTAMP

) DISTSTYLE EVEN SORTKEY(id);

CREATE TABLE github_archive_events_type_id

(

    id VARCHAR(64),

    type VARCHAR(64)     DEFAULT 'NULL',

    actor_id     VARCHAR(64)     DEFAULT 'NULL',

    repo_id VARCHAR(64)     DEFAULT 'NULL',

    org_id  VARCHAR(64)     DEFAULT 'NULL',

    created_at   TIMESTAMP

) DISTSTYLE EVEN SORTKEY (type, id);

INSERT INTO github_archive_events_auto (id, type, actor_id, repo_id, org_id, created_at

VALUES (SELECT id, type, actor_id, repo_id, org_id, created_at FROM github_archive_input WHERE id IS NOT NULL);

INSERT INTO github_archive_events_id (id, type, actor_id, repo_id, org_id, created_at

VALUES (SELECT id, type, actor_id, repo_id, org_id, created_at FROM github_archive_input WHERE id IS NOT NULL);

INSERT INTO github_archive_events_type_id (id, type, actor_id, repo_id, org_id, created_at

VALUES (SELECT id, type, actor_id, repo_id, org_id, created_at FROM github_archive_input WHERE id IS NOT NULL);

Queries

SELECT * FROM github_archive_events_auto

WHERE type = 'PullRequestEvent'

ORDER BY org_id ASC LIMIT 10000;

SELECT * FROM github_archive_events_id

WHERE type = 'PullRequestEvent'

ORDER BY org_id ASC LIMIT 10000;

SELECT * FROM github_archive_events_type_id

WHERE type = 'PullRequestEvent'

ORDER BY org_id ASC LIMIT 10000;

Results

A query to STL_SCAN table can show the number of rows scanned in each query. In case of type + id sorted key, the number is ~10 lower compared to other tables.

SELECT query, segment, perm_table_name, 

SUM(rows_pre_filter) AS total_rows_pre_filter, 

SUM(rows) AS total_rows

FROM stl_scan

WHERE query IN (<auto>, <id>, <type + id>)

GROUP BY 1, 2, 3

ORDER BY 1, 2, 3

Benchmark using a predicate

total_rows_pre_filtertotal_rows
events_auto49 220 744 (all)4 158 104
events_id49 220 744 (all)4 158 104
events_type_id5 495 4984 158 104
Benchmark predicate – Results
Share
Share:

More insights