Citus as a distributed database plugin hands-on

Preamble

Citus is an excellent columnar storage that fits perfectly in a Business intelligence instance as we have seen in a previous post (link). Citus also provide a distributed database feature inside PostgreSQL.

Citus is not a new software nor a fork of PostgreSQL but insert itself smoothly as a plugin in a traditional PostgreSQL cluster. So you still have all the power and the full ecosystem of PostgreSQL.

Citus is a sharding plugin allowing you to shard your big objects (fact table objects in a star schema implementation) on multiple separated PostgreSQL instance (called workers). The queries will be then distributed on all the worker nodes directly from the coordinator node (that does not contains any data) where you clients will connect:

citus01
citus01

Remark:
The dimension table can be duplicated entirely on all worker nodes and fact tables can be co-located by shards to speed up join operations.

My testing has been done with three Virtual Machines under VirtualBox running Oracle Linux Server release 8.7 and PostgreSQL 15.2:

  • server1.domain.com (192.168.56.101): Citus coordinator node
  • server2.domain.com (192.168.56.102): First Citus worker node
  • server3.domain.com (192.168.56.103): Second Citus worker node

As we will see the solution is great to improve performance and to scale out the big queries of a BI platform. But in now way it is an High Available solution like MariaDB Galera Cluster as with Citus you have to handle yourself the loss of any node of your cluster. Even the worker nodes are not redundant as they all contains an unique shard of your sharded objects… The generic recommendation from Citus is to use PostgreSQL replication to remove of the Single Point Of Failure (SPOF).

Citus distributed database installation

[root@server1 ~]# lvcreate --name lvol01 --size 5g vg00
  Logical volume "lvol01" created.
[root@server1 ~]# mkfs -t xfs /dev/vg00/lvol02
[root@server1 ~]# grep postgres /etc/fstab
/dev/mapper/vg00-lvol02   /postgres                   xfs     defaults        0 0
[root@server1 ~]# mount -a
[root@server1 ~]# useradd --gid dba -m --shell /bin/bash postgres
export PATH=/usr/pgsql-15/bin/:$PATH:$HOME/.local/bin:$HOME/bin
export PGDATA=/postgres/15/data
alias psql='psql --port=5433 --host=server1.domain.com --username=postgres'
alias pg_stop='pg_ctl -l logfile stop'
alias pg_start='pg_ctl -l logfile start'
alias pg_restart='pg_ctl -l logfile restart'
[postgres@server1 ~]$ initdb --pgdata=/postgres/15/data

In pg_hba.conf I have set. I want to be able to connect on all node on my local network with postgres account without password. There is most probably a better secure way to do it but this is not the purpose of this blog post:

host    all             postgres             192.168.56.0/24            trust
[postgres@server1 ~]$ grep -E -v '^(\t|\s)*#.*|^$' /postgres/15/data/postgresql.conf
listen_addresses = 'server1.domain.com'         # what IP address(es) to listen on;
port = 5433                             # (change requires restart)
max_connections = 100                   # (change requires restart)
shared_buffers = 128MB                  # min 128kB
dynamic_shared_memory_type = posix      # the default is usually the first option
max_wal_size = 1GB
min_wal_size = 80MB
log_destination = 'stderr'              # Valid values are combinations of
logging_collector = on                  # Enable capturing of stderr, jsonlog,
log_directory = 'log'                   # directory where log files are written,
log_filename = 'postgresql-%a.log'      # log file name pattern,
log_rotation_age = 1d                   # Automatic rotation of logfiles will
log_rotation_size = 0                   # Automatic rotation of logfiles will
log_truncate_on_rotation = on           # If on, an existing log file with the
log_line_prefix = '%m [%p] '            # special values:
log_timezone = 'CET'
datestyle = 'iso, mdy'
timezone = 'CET'
lc_messages = 'en_US.UTF-8'                     # locale for system error message
lc_monetary = 'en_US.UTF-8'                     # locale for monetary formatting
lc_numeric = 'en_US.UTF-8'                      # locale for number formatting
lc_time = 'en_US.UTF-8'                         # locale for time formatting
default_text_search_config = 'pg_catalog.english'
[root@server2 ~]# curl https://install.citusdata.com/community/rpm.sh | sudo bash
[root@server1 ~]# dnf -y install citus112_15

I have added to my postgresql.conf file (postgresql15-contrib.x86_64 package required for pg_stat_statements):

shared_preload_libraries = 'citus,pg_stat_statements'   # (change requires restart)

I also create an account for myself to be able to use my HeiSQL graphical tool:

postgres=# create role yjaquier with superuser login password 'secure_password';
CREATE ROLE

Do not forget to modify the access file:

host    all             yjaquier             0.0.0.0/0            scram-sha-256

I wanted to create all my Citus objects in a dedicated database, so created:

postgres=# create database citus encoding=UTF8 locale='en_US.UTF-8';
NOTICE:  Citus partially supports CREATE DATABASE for distributed databases
DETAIL:  Citus does not propagate CREATE DATABASE command to workers
HINT:  You can manually create a database and its extensions on workers.
CREATE DATABASE

Please also check the GitHub issue in references section, the extension is database related so you must create it in each database you plan to distribute with Citus:

postgres=# \c citus
SSL connection (protocol: TLSv1.3, cipher: TLS_AES_256_GCM_SHA384, compression: off)
You are now connected to database "citus" as user "postgres".
citus=# create extension if not exists citus;
CREATE EXTENSION
postgres=# select * from pg_extension;
  oid  |    extname     | extowner | extnamespace | extrelocatable | extversion | extconfig | extcondition
-------+----------------+----------+--------------+----------------+------------+-----------+--------------
 13529 | plpgsql        |       10 |           11 | f              | 1.0        |           |
 16387 | citus_columnar |       10 |           11 | f              | 11.2-1     |           |
 16443 | citus          |       10 |           11 | f              | 11.2-1     |           |
(3 rows)

Citus distributed database configuration

On coordinator node (again connect first to your chosen Citus database):

citus=# SELECT citus_set_coordinator_host('server1.domain.com', 5433);
 citus_set_coordinator_host
----------------------------
 
(1 row)

postgres=# SELECT * from citus_add_node(‘server2.domain.com’, 5433);
ERROR: connection to the remote node server2.domain.com:5433 failed with the following error: server does not support SSL, but SSL was required

https://docs.citusdata.com/en/stable/reference/common_errors.html#ssl-error-certificate-verify-failed

As of Citus 8.1 SSL is required by default between the Citus cluster nodes:

citus=# show citus.node_conninfo;
 citus.node_conninfo
---------------------
 sslmode=require
(1 row)

By the way I noticed that the Citus set coordinator command created everything needed on my corrdinator node (server1):

[postgres@server1 ~]$ ll /postgres/15/data/ser*
-rw------- 1 postgres dba  981 Feb 17 16:53 /postgres/15/data/server.crt
-rw------- 1 postgres dba 1704 Feb 17 16:53 /postgres/15/data/server.key
[postgres@server1 ~]$ cat /postgres/15/data/postgresql.auto.conf
# Do not edit this file manually!
# It will be overwritten by the ALTER SYSTEM command.
ssl = 'on'
ssl_ciphers = 'ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES128-GCM-SHA256:ECDHE-RSA-AES256-GCM-SHA384:ECDHE-ECDSA-AES128-SHA256:ECDHE-ECDSA-AES256-SHA384:ECDHE-RSA-AES128-SHA256:ECDHE-RSA-AES256-SHA384'

So I had to set it up on the other nodes (I did it on all nodes to start with a clean situation):

[postgres@server1 ~]$ openssl req -new -x509 -days 365 -nodes -text -out server1.crt -keyout server1.key -subj "/CN=server1.domain.com"
Generating a RSA private key
.....................................................................................................+++++
................................................+++++
writing new private key to 'server1.key'
-----
[postgres@server1 ~]$ openssl req -new -nodes -text -out root.csr -keyout root.key -subj "/CN=server1.domain.com"
Generating a RSA private key
...........................................................................+++++
.........................................................................................................................................+++++
writing new private key to 'root.key'
-----
[postgres@server1 ~]$ openssl x509 -req -in root.csr -text -days 3650 -extfile /etc/pki/tls/openssl.cnf -extensions v3_ca -signkey root.key -out root.crt
Signature ok
subject=CN = server1.domain.com
Getting Private key
[postgres@server1 ~]$ openssl req -new -nodes -text -out server1.csr -keyout server1.key -subj "/CN=server1.domain.com"
Generating a RSA private key
....................................................................................................................................................+++++
.................+++++
writing new private key to 'server1.key'
-----
[postgres@server1 ~]$ openssl x509 -req -in server1.csr -text -days 365 -CA root.crt -CAkey root.key -CAcreateserial -out server1.crt
Signature ok
subject=CN = server1.domain.com
Getting CA Private Key
[postgres@server1 ~]$ chmod 600 *
[postgres@server1 ~]$ ll
total 28
-rw------- 1 postgres dba 2881 Feb 20 11:38 root.crt
-rw------- 1 postgres dba 3343 Feb 20 11:37 root.csr
-rw------- 1 postgres dba 1708 Feb 20 11:37 root.key
-rw------- 1 postgres dba   41 Feb 20 11:38 root.srl
-rw------- 1 postgres dba 2759 Feb 20 11:38 server1.crt
-rw------- 1 postgres dba 3343 Feb 20 11:38 server1.csr
-rw------- 1 postgres dba 1704 Feb 20 11:38 server1.key

You can also use below cert script (execute with ./cert server3):

openssl req -new -x509 -days 365 -nodes -text -out $1.crt -keyout $1.key -subj "/CN=$1.domain.com"
openssl req -new -nodes -text -out root.csr -keyout root.key -subj "/CN=$1.domain.com"
openssl x509 -req -in root.csr -text -days 3650 -extfile /etc/pki/tls/openssl.cnf -extensions v3_ca -signkey root.key -out root.crt
openssl req -new -nodes -text -out $1.csr -keyout $1.key -subj "/CN=$1.domain.com"
openssl x509 -req -in $1.csr -text -days 365 -CA root.crt -CAkey root.key -CAcreateserial -out $1.crt
chmod 600 $1* root*

Once SSL is activated you see it appearing when connecting:

[postgres@server3 ~]$ psql
psql (15.2)
SSL connection (protocol: TLSv1.3, cipher: TLS_AES_256_GCM_SHA384, compression: off)
Type "help" for help.
 
postgres=#

Or:

citus=# SELECT run_command_on_workers('show ssl');
  run_command_on_workers
--------------------------------
(server2.domain.com,5433,t,on)
(server3.domain.com,5433,t,on)
(2 rows)

Nodes can now be added without issues:

citus=# SELECT * from citus_add_node('server2.domain.com', 5433);
 citus_add_node
----------------
              5
(1 row)
 
citus=# SELECT * from citus_add_node('server3.domain.com', 5433);
 citus_add_node
----------------
              6
(1 row)
 
citus=# SELECT * FROM citus_get_active_worker_nodes();
     node_name      | node_port
--------------------+-----------
 server3.domain.com |      5433
 server2.domain.com |      5433
(2 rows)

Citus distributed database testing

I’m using the same example table as the GitHub repository:

citus=# CREATE TABLE events (
citus(#   device_id bigint,
citus(#   event_id bigserial,
citus(#   event_time timestamptz default now(),
citus(#   data jsonb not null,
citus(#   PRIMARY KEY (device_id, event_id)
citus(# );
CREATE TABLE
citus=# SELECT create_distributed_table('events', 'device_id');
 create_distributed_table
--------------------------
 
(1 row)
 
citus=# select * from citus_tables;
 table_name | citus_table_type | distribution_column | colocation_id | table_size | shard_count | table_owner | access_method
------------+------------------+---------------------+---------------+------------+-------------+-------------+---------------
 events     | distributed      | device_id           |             1 | 512 kB     |          32 | postgres    | heap

Filled it with 10 million rows:

citus=# INSERT INTO events (device_id, data) SELECT s % 100, ('{"measurement":'||random()||'}')::jsonb FROM generate_series(1,10000000) s;
INSERT 0 1000000
citus=# SELECT table_name, table_size FROM citus_tables;
 table_name | table_size
------------+------------
 events     | 1213 MB
(1 row)
citus=# SELECT * FROM events WHERE device_id = 1 ORDER BY event_time DESC, event_id DESC LIMIT 3;
 device_id | event_id |          event_time           |                 data
-----------+----------+-------------------------------+--------------------------------------
         1 |   999901 | 2023-02-20 15:39:35.181012+01 | {"measurement": 0.8813668984787719}
         1 |   999801 | 2023-02-20 15:39:35.181012+01 | {"measurement": 0.03485957466660938}
         1 |   999701 | 2023-02-20 15:39:35.181012+01 | {"measurement": 0.7250541721835948}
(3 rows)

You can now display the explain plan of the sharded queries…

Selection on a device_id so only one shard accessed:

citus=# EXPLAIN (VERBOSE ON) SELECT * FROM events WHERE device_id = 1 ORDER BY event_time DESC, event_id DESC LIMIT 3;
                                                                                       QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Custom Scan (Citus Adaptive)  (cost=0.00..0.00 rows=0 width=0)
   Output: remote_scan.device_id, remote_scan.event_id, remote_scan.event_time, remote_scan.data
   Task Count: 1
   Tasks Shown: All
   ->  Task
         Query: SELECT device_id, event_id, event_time, data FROM public.events_102009 events WHERE (device_id OPERATOR(pg_catalog.=) 1) ORDER BY event_time DESC, event_id DESC LIMIT 3
         Node: host=server3.domain.com port=5433 dbname=citus
         ->  Limit  (cost=1082.91..1082.92 rows=3 width=63)
               Output: device_id, event_id, event_time, data
               ->  Sort  (cost=1082.91..1108.28 rows=10147 width=63)
                     Output: device_id, event_id, event_time, data
                     Sort Key: events.event_time DESC, events.event_id DESC
                     ->  Bitmap Heap Scan on public.events_102009 events  (cost=242.93..951.77 rows=10147 width=63)
                           Output: device_id, event_id, event_time, data
                           Recheck Cond: (events.device_id = 1)
                           ->  Bitmap Index Scan on events_pkey_102009  (cost=0.00..240.39 rows=10147 width=0)
                                 Index Cond: (events.device_id = 1)
             Query Identifier: 1025693804818833478
 Query Identifier: 5031995992548174482
(19 rows)

More generic query using all shards (32 is the default number of shards: citus.shard_count). only one task on one shard is displayed by default:

citus=# EXPLAIN (VERBOSE ON) SELECT device_id, count(*) from events group by device_id;
                                                    QUERY PLAN
-------------------------------------------------------------------------------------------------------------------
 Custom Scan (Citus Adaptive)  (cost=0.00..0.00 rows=100000 width=16)
   Output: remote_scan.device_id, remote_scan.count
   Task Count: 32
   Tasks Shown: One of 32
   ->  Task
         Query: SELECT device_id, count(*) AS count FROM public.events_102008 events WHERE true GROUP BY device_id
         Node: host=server2.domain.com port=5433 dbname=citus
         ->  HashAggregate  (cost=800.00..800.03 rows=3 width=16)
               Output: device_id, count(*)
               Group Key: events.device_id
               ->  Seq Scan on public.events_102008 events  (cost=0.00..650.00 rows=30000 width=8)
                     Output: device_id, event_id, event_time, data
             Query Identifier: -3583515104173105020
 Query Identifier: 399593402794421440
(14 rows)

To display all tasks:

citus=# SET citus.explain_all_tasks = 1;
SET
citus=# EXPLAIN SELECT device_id, count(*) from events group by device_id;
                                         QUERY PLAN
---------------------------------------------------------------------------------------------
 Custom Scan (Citus Adaptive)  (cost=0.00..0.00 rows=100000 width=16)
   Task Count: 32
   Tasks Shown: All
   ->  Task
         Node: host=server2.domain.com port=5433 dbname=citus
         ->  HashAggregate  (cost=800.00..800.03 rows=3 width=16)
               Group Key: device_id
               ->  Seq Scan on events_102008 events  (cost=0.00..650.00 rows=30000 width=8)
   ->  Task
         Node: host=server3.domain.com port=5433 dbname=citus
         ->  HashAggregate  (cost=1332.00..1332.05 rows=5 width=16)
               Group Key: device_id
               ->  Seq Scan on events_102009 events  (cost=0.00..1082.00 rows=50000 width=8)
.
.

To compare with a non sharded table I have create the same exact table witouth sharding it:

citus=# CREATE TABLE events2 (
citus(#   device_id bigint,
citus(#   event_id bigserial,
citus(#   event_time timestamptz default now(),
citus(#   data jsonb not null,
citus(#   PRIMARY KEY (device_id, event_id)
citus(# );
CREATE TABLE
citus=# INSERT INTO events2 (device_id, data) SELECT s % 100, ('{"measurement":'||random()||'}')::jsonb FROM generate_series(1,10000000) s;
INSERT 0 10000000

Result of below query over around 10 runs, this is not a benchmark (use \timing with psql to have execution time) !!:

QueryEvents2Events
SELECT device_id, COUNT(*) FROM table_name GROUP BY device_id;5 seconds2.9 seconds

So yes even on my small test cluster made of VMs there is a difference…

Citus distributed database cluster management

Handle node failure

If you loose one worker you can still perform queries but you get the expected bellow error:

citus=# SELECT device_id, count(*) from events GROUP BY device_id;
ERROR:  connection to the remote node server2.domain.com:5433 failed with the following error: server closed the connection unexpectedly
        This probably means the server terminated abnormally
        before or while processing the request.

If you loose the coordinator this is even worst as you cannot perform any queries.

Here clearly no miracle and the High Availability must be handle by you with, for example, PostgreSQL replication as suggested in Citus documentation. We are really far from a High Available solution…

Removing nodes

I have started with:

citus=# select citus_remove_node('server3.domain.com', 5433);
ERROR:  cannot remove or disable the node server3.domain.com:5433 because because it contains the only shard placement for shard 102009
DETAIL:  One of the table(s) that prevents the operation complete successfully is public.events
HINT:  To proceed, either drop the tables or use undistribute_table() function to convert them to local tables

Completely un-distributing tables might often not be a solution particularly in case of very big objects. Even more if you already have plenty of workers. You can move shards on this node one by one using citus_move_shard_placement or better:

citus=# select * from citus_drain_node('server3.domain.com', 5433);
ERROR:  connection to the remote node localhost:5433 failed with the following error: Connection refused
        Is the server running on that host and accepting TCP/IP connections?
connection to server at "localhost" (127.0.0.1), port 5433 failed: Connection refused
        Is the server running on that host and accepting TCP/IP connections?

As I got an error I tried with (but same error):

citus=# select * from citus_set_node_property('server3.domain.com', 5433, 'shouldhaveshards', false);
 citus_set_node_property
-------------------------
 
(1 row)
 
citus=# select nodename, shouldhaveshards from pg_dist_node;
      nodename      | shouldhaveshards
--------------------+------------------
 server1.domain.com | f
 server2.domain.com | t
 server3.domain.com | f
(3 rows)
 
citus=# select * from citus_rebalance_start(drain_only := true);
ERROR:  connection to the remote node localhost:5433 failed with the following error: Connection refused
        Is the server running on that host and accepting TCP/IP connections?
connection to server at "localhost" (127.0.0.1), port 5433 failed: Connection refused
        Is the server running on that host and accepting TCP/IP connections?

Then luckily found that sometimes Citus needs to connect to themselves and citus.local_hostname parameter is key in those situations:

postgres=# alter system set citus.local_hostname to 'server2.domain.com';
ALTER SYSTEM
postgres=# select pg_reload_conf();
pg_reload_conf
----------------
t
(1 row)

Then everything worked better:

citus=# select * from citus_drain_node('server3.domain.com', 5433);
NOTICE:  Moving shard 102009 from server3.domain.com:5433 to server2.domain.com:5433 ...
ERROR:  ERROR:  logical decoding requires wal_level >= logical
CONTEXT:  while executing command on server3.domain.com:5433
while executing command on server1.domain.com:5433
citus=# show wal_level
citus-# ;
 wal_level
-----------
 replica
(1 row)

Changer wal_level to logical on all instances…

Finally:

citus=# select * from citus_drain_node('server3.domain.com', 5433);
NOTICE:  Moving shard 102009 from server3.domain.com:5433 to server2.domain.com:5433 ...
NOTICE:  Moving shard 102011 from server3.domain.com:5433 to server2.domain.com:5433 ...
NOTICE:  Moving shard 102013 from server3.domain.com:5433 to server2.domain.com:5433 ...
NOTICE:  Moving shard 102015 from server3.domain.com:5433 to server2.domain.com:5433 ...
NOTICE:  Moving shard 102017 from server3.domain.com:5433 to server2.domain.com:5433 ...
NOTICE:  Moving shard 102019 from server3.domain.com:5433 to server2.domain.com:5433 ...
NOTICE:  Moving shard 102021 from server3.domain.com:5433 to server2.domain.com:5433 ...
NOTICE:  Moving shard 102023 from server3.domain.com:5433 to server2.domain.com:5433 ...
NOTICE:  Moving shard 102025 from server3.domain.com:5433 to server2.domain.com:5433 ...
NOTICE:  Moving shard 102027 from server3.domain.com:5433 to server2.domain.com:5433 ...
NOTICE:  Moving shard 102029 from server3.domain.com:5433 to server2.domain.com:5433 ...
NOTICE:  Moving shard 102031 from server3.domain.com:5433 to server2.domain.com:5433 ...
NOTICE:  Moving shard 102033 from server3.domain.com:5433 to server2.domain.com:5433 ...
NOTICE:  Moving shard 102035 from server3.domain.com:5433 to server2.domain.com:5433 ...
NOTICE:  Moving shard 102037 from server3.domain.com:5433 to server2.domain.com:5433 ...
NOTICE:  Moving shard 102039 from server3.domain.com:5433 to server2.domain.com:5433 ...
 citus_drain_node
------------------
 
(1 row)

The distributed table is now on one worker only:

citus=# select distinct nodename from citus_shards where table_name='events'::regclass;
      nodename
--------------------
 server2.domain.com
(1 row)
 
citus=# select citus_remove_node('server3.domain.com', 5433);
 citus_remove_node
-------------------
 
(1 row)
 
citus=# SELECT * FROM citus_get_active_worker_nodes();
     node_name      | node_port
--------------------+-----------
 server2.domain.com |      5433
(1 row)

At this point you can stop instance running on server3.domain.com with no impact.

Adding nodes

Let’s add back the server we removed in previous chapter:

citus=# SELECT * from citus_add_node('server3.domain.com', 5433);
 citus_add_node
----------------
              5
(1 row)
 
citus=# SELECT * FROM citus_get_active_worker_nodes();
     node_name      | node_port
--------------------+-----------
 server3.domain.com |      5433
 server2.domain.com |      5433
(2 rows)

We need to rebalance the events table:

citus=# select nodename,count(*) from citus_shards where table_name='events'::regclass group by nodename;
      nodename      | count
--------------------+-------
 server2.domain.com |    32
(1 row)
 
citus=# select * from get_rebalance_table_shards_plan();
 table_name | shardid | shard_size |     sourcename     | sourceport |     targetname     | targetport
------------+---------+------------+--------------------+------------+--------------------+------------
 events     |  102008 |          0 | server2.domain.com |       5433 | server3.domain.com |       5433
 events     |  102009 |          0 | server2.domain.com |       5433 | server3.domain.com |       5433
 events     |  102010 |          0 | server2.domain.com |       5433 | server3.domain.com |       5433
 events     |  102011 |          0 | server2.domain.com |       5433 | server3.domain.com |       5433
 events     |  102012 |          0 | server2.domain.com |       5433 | server3.domain.com |       5433
 events     |  102013 |          0 | server2.domain.com |       5433 | server3.domain.com |       5433
 events     |  102014 |          0 | server2.domain.com |       5433 | server3.domain.com |       5433
 events     |  102015 |          0 | server2.domain.com |       5433 | server3.domain.com |       5433
 events     |  102016 |          0 | server2.domain.com |       5433 | server3.domain.com |       5433
 events     |  102017 |          0 | server2.domain.com |       5433 | server3.domain.com |       5433
 events     |  102018 |          0 | server2.domain.com |       5433 | server3.domain.com |       5433
 events     |  102019 |          0 | server2.domain.com |       5433 | server3.domain.com |       5433
 events     |  102020 |          0 | server2.domain.com |       5433 | server3.domain.com |       5433
 events     |  102021 |          0 | server2.domain.com |       5433 | server3.domain.com |       5433
 events     |  102022 |          0 | server2.domain.com |       5433 | server3.domain.com |       5433
 events     |  102023 |          0 | server2.domain.com |       5433 | server3.domain.com |       5433
(16 rows)
 
citus=# SELECT citus_rebalance_start();
NOTICE:  Scheduled 16 moves as job 1
DETAIL:  Rebalance scheduled as background job
HINT:  To monitor progress, run: SELECT * FROM citus_rebalance_status();
 citus_rebalance_start
-----------------------
                     1
(1 row)
 
citus=# SELECT * FROM citus_rebalance_status();
 job_id |  state  | job_type  |           description           |          started_at           | finished_at |
                                                                                          details
 
--------+---------+-----------+---------------------------------+-------------------------------+-------------+-------------------------------------------------------------------------------------------------------------------------
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
-----------------------------------------------------------------------------
      1 | running | rebalance | Rebalance all colocation groups | 2023-02-23 14:30:41.909994+01 |             | {"tasks": [{"LSN": {"lag": null, "source": "0/5B88E000", "target": null}, "size": {"source": "36 MB", "target": "8856 kB
"}, "hosts": {"source": "server2.domain.com:5433", "target": "server3.domain.com:5433"}, "phase": "Copying Data", "state": "running", "command": "SELECT pg_catalog.citus_move_shard_placement(102008,2,5,'auto')", "message": "", "retr
ied": 0, "task_id": 1}], "task_state_counts": {"blocked": 15, "running": 1}}
(1 row)
 
citus=# select nodename,count(*) from citus_shards where table_name='events'::regclass group by nodename;
      nodename      | count
--------------------+-------
 server2.domain.com |    22
 server3.domain.com |    10
(2 rows)
 
citus=# SELECT * FROM citus_rebalance_status();
 job_id |  state   | job_type  |           description           |          started_at           |          finished_at          |                     details
--------+----------+-----------+---------------------------------+-------------------------------+-------------------------------+--------------------------------------------------
      1 | finished | rebalance | Rebalance all colocation groups | 2023-02-23 14:30:41.909994+01 | 2023-02-23 14:32:47.406338+01 | {"tasks": [], "task_state_counts": {"done": 16}}
(1 row)

The table is finally equilly balanced on all workers:

citus=# select nodename,count(*) from citus_shards where table_name='events'::regclass group by nodename;
      nodename      | count
--------------------+-------
 server2.domain.com |    16
 server3.domain.com |    16
(2 rows)

References

About Post Author

Share the knowledge!

Leave a Reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes:

<a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>