Sharding tables in ClickHouse
Sharding provides a range of benefits for coping with a high query rate and big data amounts. It works by creating a distributed table that routes queries to underlying tables. You can access data in sharded tables both directly and through the distributed table.
There are three approaches to sharding:
- Classic approach, when the distributed table uses all shards in the cluster.
- Regular group-based approach, when some shards are combined into a group.
- Advanced group-based approach, when shards are split into two groups: one group is created for the distributed table and another group is created for underlying tables.
Below are examples of sharding setup for each of the three approaches.
For more information, see Sharding in Managed Service for ClickHouse.
To set up sharding:
If you no longer need the resources you created, delete them.
Getting started
Prepare the infrastructure
-
Create a Managed Service for ClickHouse cluster:
- Cluster name:
cluster
- Disk type: Select the required disk type. For more information, see Storage in Managed Service for ClickHouse.
- DB name:
tutorial
Cluster hosts must be available online.
- Cluster name:
-
Create two additional shards with the names
shard2
andshard3
. -
Create shard groups. Their number depends on the sharding type:
- Regular group-based sharding requires one shard group named
sgroup
, which includes theshard1
andshard2
shards. - Advanced group-based sharding requires two groups:
sgroup
includesshard1
andshard2
.sgroup_data
includesshard3
.
No shard groups are needed for classic sharding.
- Regular group-based sharding requires one shard group named
Set up clickhouse-client
Install and configure clickhouse-client to connect to your database.
Create tables with data
For example, you need to enable sharding for the tablehits_v1
. The text of the table creation query depends on the sharding approach that you selected.
For the table structure to use for <table structure>
, see the ClickHouse documentation
When you enable sharding by any of the methods, you can send the SELECT
and INSERT
queries to the created distributed table, and they will be processed according to the specified configuration.
The sharding key in the examples is a random number rand()
.
Traditional sharding
In this example, a distributed table that we create based on hits_v1
uses all the shards (shard1
, shard2
, and shard3
) in the chcluster
cluster.
Before operating a distributed table:
-
Connect to the
tutorial
database. -
Create a MergeTree
table namedhits_v1
, which will run on all cluster hosts:CREATE TABLE tutorial.hits_v1 ON CLUSTER '{cluster}' ( <table structure> ) ENGINE = MergeTree() PARTITION BY toYYYYMM(EventDate) ORDER BY (CounterID, EventDate, intHash32(UserID)) SAMPLE BY intHash32(UserID) SETTINGS index_granularity = 8192
To create the hits_v1_distributed
distributed table in the cluster:
-
Connect to the
tutorial
database. -
Create a table on the Distributed
engine:CREATE TABLE tutorial.hits_v1_distributed ON CLUSTER '{cluster}' AS tutorial.hits_v1 ENGINE = Distributed('{cluster}', tutorial, hits_v1, rand())
In this case, instead of explicitly specifying the table structure, you can use the
AS tutorial.hits_v1
expression because thehits_v1_distributed
andhits_v1
tables run on the same hosts in the cluster.When creating a table on the Distributed
engine, usechcluster
as the cluster ID. You can retrieve it with a list of clusters in the folder.Tip
Instead of the cluster ID, you can use the
{cluster}
macro: when executing the query, the ID of the cluster where theCREATE TABLE
operation is running will be picked up automatically.
Sharding using shard groups
In this example:
- One
sgroup
shard group is used. - A distributed table and the
hits_v1
underlying table are in the samesgroup
shard group in the cluster.
Before operating a distributed table:
-
Connect to the
tutorial
database. -
Create a MergeTree
table namedhits_v1
, which will use all of the hosts of thesgroup
shard group in the cluster:CREATE TABLE tutorial.hits_v1 ON CLUSTER sgroup ( <table structure> ) ENGINE = MergeTree() PARTITION BY toYYYYMM(EventDate) ORDER BY (CounterID, EventDate, intHash32(UserID)) SAMPLE BY intHash32(UserID) SETTINGS index_granularity = 8192
To create the tutorial.hits_v1_distributed
distributed table in the cluster:
-
Connect to the
tutorial
database. -
Create a table on the Distributed
engine:CREATE TABLE tutorial.hits_v1_distributed ON CLUSTER sgroup AS tutorial.hits_v1 ENGINE = Distributed(sgroup, tutorial, hits_v1, rand())
In this case, instead of explicitly specifying the table structure, you can use the
AS tutorial.hits_v1
expression because thehits_v1_distributed
andhits_v1
tables use the same shard and run on the same hosts in the cluster.
Advanced sharding using shard groups
In this example:
- Two shard groups are used:
sgroup
andsgroup_data
. - The distributed table is located in the
sgroup
shard group. - The
hits_v1
underlying table is in thesgroup_data
shard group.
Before operating a distributed table:
-
Connect to the
tutorial
database. -
Create a ReplicatedMergeTree
table namedhits_v1
, which will use all of the hosts of thesgroup_data
shard group in the cluster:CREATE TABLE tutorial.hits_v1 ON CLUSTER sgroup_data ( <table structure> ) ENGINE = ReplicatedMergeTree('/tables/{shard}/hits_v1', '{replica}') PARTITION BY toYYYYMM(EventDate) ORDER BY (CounterID, EventDate, intHash32(UserID)) SAMPLE BY intHash32(UserID) SETTINGS index_granularity = 8192
The ReplicatedMergeTree engine ensures fault tolerance.
To create the tutorial.hits_v1_distributed
distributed table in the cluster:
-
Connect to the
tutorial
database. -
Create a table on the Distributed
engine:CREATE TABLE tutorial.hits_v1_distributed ON CLUSTER sgroup ( <table structure> ) ENGINE = Distributed(sgroup_data, tutorial, hits_v1, rand())
For example, you can find out the number of rows in the table because the
hits_v1_distributed
andhits_v1
tables use different shards and run on different hosts.
Test the tables
To check the health of the created distributed table named tutorial.hits_v1_distributed
:
-
Load the
hits_v1
test dataset:curl https://datasets.clickhouse.com/hits/tsv/hits_v1.tsv.xz | unxz --threads=`nproc` > hits_v1.tsv
-
Complete the table with the test data:
clickhouse-client \ --host "<FQDN of any host with a distributed table>" \ --secure \ --port 9440 \ --user "<username>" \ --password "<user password>" \ --database "tutorial" \ --query "INSERT INTO tutorial.hits_v1_distributed FORMAT TSV" \ --max_insert_block_size=100000 < hits_v1.tsv
To find out the host names, request a list of ClickHouse hosts in the cluster.
-
Run one or more test queries to this table. For example, you can find out the number of rows in it:
SELECT count() FROM tutorial.hits_v1_distributed
Result:
8873898
Delete the resources you created
Delete the resources you no longer need to avoid being charged for them: