# Copyright (c) 2022, PostgreSQL Global Development Group

# Test partial-column publication of tables
use strict;
use warnings;
use PostgreSQL::Test::Cluster;
use PostgreSQL::Test::Utils;
use Test::More;

# create publisher node
my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
$node_publisher->init(allows_streaming => 'logical');
$node_publisher->start;

# create subscriber node
my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
$node_subscriber->init(allows_streaming => 'logical');
$node_subscriber->append_conf('postgresql.conf',
	qq(max_logical_replication_workers = 6));
$node_subscriber->start;

my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
my $offset            = 0;

# setup tables on both nodes

# tab1: simple 1:1 replication
$node_publisher->safe_psql(
	'postgres', qq(
	CREATE TABLE tab1 (a int PRIMARY KEY, "B" int, c int)
));

$node_subscriber->safe_psql(
	'postgres', qq(
	CREATE TABLE tab1 (a int PRIMARY KEY, "B" int, c int)
));

# tab2: replication from regular to table with fewer columns
$node_publisher->safe_psql(
	'postgres', qq(
	CREATE TABLE tab2 (a int PRIMARY KEY, b varchar, c int);
));

$node_subscriber->safe_psql(
	'postgres', qq(
	CREATE TABLE tab2 (a int PRIMARY KEY, b varchar)
));

# tab3: simple 1:1 replication with weird column names
$node_publisher->safe_psql(
	'postgres', qq(
	CREATE TABLE tab3 ("a'" int PRIMARY KEY, "B" varchar, "c'" int)
));

$node_subscriber->safe_psql(
	'postgres', qq(
	CREATE TABLE tab3 ("a'" int PRIMARY KEY, "c'" int)
));

# test_part: partitioned tables, with partitioning (including multi-level
# partitioning, and fewer columns on the subscriber)
$node_publisher->safe_psql(
	'postgres', qq(
	CREATE TABLE test_part (a int PRIMARY KEY, b text, c timestamptz) PARTITION BY LIST (a);
	CREATE TABLE test_part_1_1 PARTITION OF test_part FOR VALUES IN (1,2,3,4,5,6);
	CREATE TABLE test_part_2_1 PARTITION OF test_part FOR VALUES IN (7,8,9,10,11,12) PARTITION BY LIST (a);
	CREATE TABLE test_part_2_2 PARTITION OF test_part_2_1 FOR VALUES IN (7,8,9,10);
));

$node_subscriber->safe_psql(
	'postgres', qq(
	CREATE TABLE test_part (a int PRIMARY KEY, b text) PARTITION BY LIST (a);
	CREATE TABLE test_part_1_1 PARTITION OF test_part FOR VALUES IN (1,2,3,4,5,6);
	CREATE TABLE test_part_2_1 PARTITION OF test_part FOR VALUES IN (7,8,9,10,11,12) PARTITION BY LIST (a);
	CREATE TABLE test_part_2_2 PARTITION OF test_part_2_1 FOR VALUES IN (7,8,9,10);
));

# tab4: table with user-defined enum types
$node_publisher->safe_psql(
	'postgres', qq(
	CREATE TYPE test_typ AS ENUM ('blue', 'red');
	CREATE TABLE tab4 (a INT PRIMARY KEY, b test_typ, c int, d text);
));

$node_subscriber->safe_psql(
	'postgres', qq(
	CREATE TYPE test_typ AS ENUM ('blue', 'red');
	CREATE TABLE tab4 (a INT PRIMARY KEY, b test_typ, d text);
));


# TEST: create publication and subscription for some of the tables with
# column lists
$node_publisher->safe_psql(
	'postgres', qq(
	CREATE PUBLICATION pub1
	   FOR TABLE tab1 (a, "B"), tab3 ("a'", "c'"), test_part (a, b), tab4 (a, b, d)
	  WITH (publish_via_partition_root = 'true');
));

# check that we got the right prattrs values for the publication in the
# pg_publication_rel catalog (order by relname, to get stable ordering)
my $result = $node_publisher->safe_psql(
	'postgres', qq(
	SELECT relname, prattrs
	FROM pg_publication_rel pb JOIN pg_class pc ON(pb.prrelid = pc.oid)
	ORDER BY relname
));

is( $result, qq(tab1|1 2
tab3|1 3
tab4|1 2 4
test_part|1 2), 'publication relation updated');

# TEST: insert data into the tables, create subscription and see if sync
# replicates the right columns
$node_publisher->safe_psql(
	'postgres', qq(
	INSERT INTO tab1 VALUES (1, 2, 3);
	INSERT INTO tab1 VALUES (4, 5, 6);
));

$node_publisher->safe_psql(
	'postgres', qq(
	INSERT INTO tab3 VALUES (1, 2, 3);
	INSERT INTO tab3 VALUES (4, 5, 6);
));

$node_publisher->safe_psql(
	'postgres', qq(
	INSERT INTO tab4 VALUES (1, 'red', 3, 'oh my');
	INSERT INTO tab4 VALUES (2, 'blue', 4, 'hello');
));

# replication of partitioned table
$node_publisher->safe_psql(
	'postgres', qq(
	INSERT INTO test_part VALUES (1, 'abc', '2021-07-04 12:00:00');
	INSERT INTO test_part VALUES (2, 'bcd', '2021-07-03 11:12:13');
	INSERT INTO test_part VALUES (7, 'abc', '2021-07-04 12:00:00');
	INSERT INTO test_part VALUES (8, 'bcd', '2021-07-03 11:12:13');
));

# create subscription for the publication, wait for sync to complete,
# then check the sync results
$node_subscriber->safe_psql(
	'postgres', qq(
	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1
));

$node_subscriber->wait_for_subscription_sync;

# tab1: only (a,b) is replicated
$result =
  $node_subscriber->safe_psql('postgres', "SELECT * FROM tab1 ORDER BY a");
is( $result, qq(1|2|
4|5|), 'insert on column tab1.c is not replicated');

# tab3: only (a,c) is replicated
$result = $node_subscriber->safe_psql('postgres',
	qq(SELECT * FROM tab3 ORDER BY "a'"));
is( $result, qq(1|3
4|6), 'insert on column tab3.b is not replicated');

# tab4: only (a,b,d) is replicated
$result =
  $node_subscriber->safe_psql('postgres', "SELECT * FROM tab4 ORDER BY a");
is( $result, qq(1|red|oh my
2|blue|hello), 'insert on column tab4.c is not replicated');

# test_part: (a,b) is replicated
$result = $node_subscriber->safe_psql('postgres',
	"SELECT * FROM test_part ORDER BY a");
is( $result, qq(1|abc
2|bcd
7|abc
8|bcd), 'insert on column test_part.c columns is not replicated');


# TEST: now insert more data into the tables, and wait until we replicate
# them (not by tablesync, but regular decoding and replication)

$node_publisher->safe_psql(
	'postgres', qq(
	INSERT INTO tab1 VALUES (2, 3, 4);
	INSERT INTO tab1 VALUES (5, 6, 7);
));

$node_publisher->safe_psql(
	'postgres', qq(
	INSERT INTO tab3 VALUES (2, 3, 4);
	INSERT INTO tab3 VALUES (5, 6, 7);
));

$node_publisher->safe_psql(
	'postgres', qq(
	INSERT INTO tab4 VALUES (3, 'red', 5, 'foo');
	INSERT INTO tab4 VALUES (4, 'blue', 6, 'bar');
));

# replication of partitioned table
$node_publisher->safe_psql(
	'postgres', qq(
	INSERT INTO test_part VALUES (3, 'xxx', '2022-02-01 10:00:00');
	INSERT INTO test_part VALUES (4, 'yyy', '2022-03-02 15:12:13');
	INSERT INTO test_part VALUES (9, 'zzz', '2022-04-03 21:00:00');
	INSERT INTO test_part VALUES (10, 'qqq', '2022-05-04 22:12:13');
));

# wait for catchup before checking the subscriber
$node_publisher->wait_for_catchup('sub1');

# tab1: only (a,b) is replicated
$result =
  $node_subscriber->safe_psql('postgres', "SELECT * FROM tab1 ORDER BY a");
is( $result, qq(1|2|
2|3|
4|5|
5|6|), 'insert on column tab1.c is not replicated');

# tab3: only (a,c) is replicated
$result = $node_subscriber->safe_psql('postgres',
	qq(SELECT * FROM tab3 ORDER BY "a'"));
is( $result, qq(1|3
2|4
4|6
5|7), 'insert on column tab3.b is not replicated');

# tab4: only (a,b,d) is replicated
$result =
  $node_subscriber->safe_psql('postgres', "SELECT * FROM tab4 ORDER BY a");
is( $result, qq(1|red|oh my
2|blue|hello
3|red|foo
4|blue|bar), 'insert on column tab4.c is not replicated');

# test_part: (a,b) is replicated
$result = $node_subscriber->safe_psql('postgres',
	"SELECT * FROM test_part ORDER BY a");
is( $result, qq(1|abc
2|bcd
3|xxx
4|yyy
7|abc
8|bcd
9|zzz
10|qqq), 'insert on column test_part.c columns is not replicated');


# TEST: do some updates on some of the tables, both on columns included
# in the column list and other

# tab1: update of replicated column
$node_publisher->safe_psql('postgres',
	qq(UPDATE tab1 SET "B" = 2 * "B" where a = 1));

# tab1: update of non-replicated column
$node_publisher->safe_psql('postgres',
	qq(UPDATE tab1 SET c = 2*c where a = 4));

# tab3: update of non-replicated
$node_publisher->safe_psql('postgres',
	qq(UPDATE tab3 SET "B" = "B" || ' updated' where "a'" = 4));

# tab3: update of replicated column
$node_publisher->safe_psql('postgres',
	qq(UPDATE tab3 SET "c'" = 2 * "c'" where "a'" = 1));

# tab4
$node_publisher->safe_psql('postgres',
	qq(UPDATE tab4 SET b = 'blue', c = c * 2, d = d || ' updated' where a = 1)
);

# tab4
$node_publisher->safe_psql('postgres',
	qq(UPDATE tab4 SET b = 'red', c = c * 2, d = d || ' updated' where a = 2)
);

# wait for the replication to catch up, and check the UPDATE results got
# replicated correctly, with the right column list
$node_publisher->wait_for_catchup('sub1');

$result =
  $node_subscriber->safe_psql('postgres', qq(SELECT * FROM tab1 ORDER BY a));
is( $result,
	qq(1|4|
2|3|
4|5|
5|6|), 'only update on column tab1.b is replicated');

$result = $node_subscriber->safe_psql('postgres',
	qq(SELECT * FROM tab3 ORDER BY "a'"));
is( $result,
	qq(1|6
2|4
4|6
5|7), 'only update on column tab3.c is replicated');

$result =
  $node_subscriber->safe_psql('postgres', qq(SELECT * FROM tab4 ORDER BY a));

is( $result, qq(1|blue|oh my updated
2|red|hello updated
3|red|foo
4|blue|bar), 'update on column tab4.c is not replicated');


# TEST: add table with a column list, insert data, replicate

# insert some data before adding it to the publication
$node_publisher->safe_psql(
	'postgres', qq(
	INSERT INTO tab2 VALUES (1, 'abc', 3);
));

$node_publisher->safe_psql('postgres',
	"ALTER PUBLICATION pub1 ADD TABLE tab2 (a, b)");

$node_subscriber->safe_psql('postgres',
	"ALTER SUBSCRIPTION sub1 REFRESH PUBLICATION");

# wait for the tablesync to complete, add a bit more data and then check
# the results of the replication
$node_subscriber->wait_for_subscription_sync;

$node_publisher->safe_psql(
	'postgres', qq(
	INSERT INTO tab2 VALUES (2, 'def', 6);
));

$node_publisher->wait_for_catchup('sub1');

$result =
  $node_subscriber->safe_psql('postgres', "SELECT * FROM tab2 ORDER BY a");
is( $result, qq(1|abc
2|def), 'insert on column tab2.c is not replicated');

# do a couple updates, check the correct stuff gets replicated
$node_publisher->safe_psql(
	'postgres', qq(
	UPDATE tab2 SET c = 5 where a = 1;
	UPDATE tab2 SET b = 'xyz' where a = 2;
));

$node_publisher->wait_for_catchup('sub1');

$result =
  $node_subscriber->safe_psql('postgres', "SELECT * FROM tab2 ORDER BY a");
is( $result, qq(1|abc
2|xyz), 'update on column tab2.c is not replicated');


# TEST: add a table to two publications with same column lists, and
# create a single subscription replicating both publications
$node_publisher->safe_psql(
	'postgres', qq(
	CREATE TABLE tab5 (a int PRIMARY KEY, b int, c int, d int);
	CREATE PUBLICATION pub2 FOR TABLE tab5 (a, b);
	CREATE PUBLICATION pub3 FOR TABLE tab5 (a, b);

	-- insert a couple initial records
	INSERT INTO tab5 VALUES (1, 11, 111, 1111);
	INSERT INTO tab5 VALUES (2, 22, 222, 2222);
));

$node_subscriber->safe_psql(
	'postgres', qq(
	CREATE TABLE tab5 (a int PRIMARY KEY, b int, d int);
));

$node_subscriber->safe_psql(
	'postgres', qq(
	ALTER SUBSCRIPTION sub1 SET PUBLICATION pub2, pub3
));

$node_subscriber->wait_for_subscription_sync($node_publisher, 'sub1');

# insert data and make sure the columns in column list get fully replicated
$node_publisher->safe_psql(
	'postgres', qq(
	INSERT INTO tab5 VALUES (3, 33, 333, 3333);
	INSERT INTO tab5 VALUES (4, 44, 444, 4444);
));

$node_publisher->wait_for_catchup('sub1');

is( $node_subscriber->safe_psql('postgres', "SELECT * FROM tab5 ORDER BY a"),
	qq(1|11|
2|22|
3|33|
4|44|),
	'overlapping publications with overlapping column lists');


# TEST: create a table with a column list, then change the replica
# identity by replacing a primary key (but use a different column in
# the column list)
$node_publisher->safe_psql(
	'postgres', qq(
	CREATE TABLE tab6 (a int PRIMARY KEY, b int, c int, d int);
	CREATE PUBLICATION pub4 FOR TABLE tab6 (a, b);

	-- initial data
	INSERT INTO tab6 VALUES (1, 22, 333, 4444);
));

$node_subscriber->safe_psql(
	'postgres', qq(
	CREATE TABLE tab6 (a int PRIMARY KEY, b int, c int, d int);
));

$node_subscriber->safe_psql(
	'postgres', qq(
	ALTER SUBSCRIPTION sub1 SET PUBLICATION pub4
));

$node_subscriber->wait_for_subscription_sync;

$node_publisher->safe_psql(
	'postgres', qq(
	INSERT INTO tab6 VALUES (2, 33, 444, 5555);
	UPDATE tab6 SET b = b * 2, c = c * 3, d = d * 4;
));

$node_publisher->wait_for_catchup('sub1');

is( $node_subscriber->safe_psql('postgres', "SELECT * FROM tab6 ORDER BY a"),
	qq(1|44||
2|66||), 'replication with the original primary key');

# now redefine the constraint - move the primary key to a different column
# (which is still covered by the column list, though)

$node_publisher->safe_psql(
	'postgres', qq(
	ALTER TABLE tab6 DROP CONSTRAINT tab6_pkey;
	ALTER TABLE tab6 ADD PRIMARY KEY (b);
));

# we need to do the same thing on the subscriber
# XXX What would happen if this happens before the publisher ALTER? Or
# interleaved, somehow? But that seems unrelated to column lists.
$node_subscriber->safe_psql(
	'postgres', qq(
	ALTER TABLE tab6 DROP CONSTRAINT tab6_pkey;
	ALTER TABLE tab6 ADD PRIMARY KEY (b);
));

$node_subscriber->safe_psql(
	'postgres', qq(
	ALTER SUBSCRIPTION sub1 REFRESH PUBLICATION
));

$node_subscriber->wait_for_subscription_sync;

$node_publisher->safe_psql(
	'postgres', qq(
	INSERT INTO tab6 VALUES (3, 55, 666, 8888);
	UPDATE tab6 SET b = b * 2, c = c * 3, d = d * 4;
));

$node_publisher->wait_for_catchup('sub1');

is( $node_subscriber->safe_psql('postgres', "SELECT * FROM tab6 ORDER BY a"),
	qq(1|88||
2|132||
3|110||),
	'replication with the modified primary key');


# TEST: create a table with a column list, then change the replica
# identity by replacing a primary key with a key on multiple columns
# (all of them covered by the column list)
$node_publisher->safe_psql(
	'postgres', qq(
	CREATE TABLE tab7 (a int PRIMARY KEY, b int, c int, d int);
	CREATE PUBLICATION pub5 FOR TABLE tab7 (a, b);

	-- some initial data
	INSERT INTO tab7 VALUES (1, 22, 333, 4444);
));

$node_subscriber->safe_psql(
	'postgres', qq(
	CREATE TABLE tab7 (a int PRIMARY KEY, b int, c int, d int);
));

$node_subscriber->safe_psql(
	'postgres', qq(
	ALTER SUBSCRIPTION sub1 SET PUBLICATION pub5
));

$node_subscriber->wait_for_subscription_sync;

$node_publisher->safe_psql(
	'postgres', qq(
	INSERT INTO tab7 VALUES (2, 33, 444, 5555);
	UPDATE tab7 SET b = b * 2, c = c * 3, d = d * 4;
));

$node_publisher->wait_for_catchup('sub1');

is( $node_subscriber->safe_psql('postgres', "SELECT * FROM tab7 ORDER BY a"),
	qq(1|44||
2|66||), 'replication with the original primary key');

# now redefine the constraint - move the primary key to a different column
# (which is not covered by the column list)
$node_publisher->safe_psql(
	'postgres', qq(
	ALTER TABLE tab7 DROP CONSTRAINT tab7_pkey;
	ALTER TABLE tab7 ADD PRIMARY KEY (a, b);
));

$node_publisher->safe_psql(
	'postgres', qq(
	INSERT INTO tab7 VALUES (3, 55, 666, 7777);
	UPDATE tab7 SET b = b * 2, c = c * 3, d = d * 4;
));

$node_publisher->wait_for_catchup('sub1');

is( $node_subscriber->safe_psql('postgres', "SELECT * FROM tab7 ORDER BY a"),
	qq(1|88||
2|132||
3|110||),
	'replication with the modified primary key');

# now switch the primary key again to another columns not covered by the
# column list, but also generate writes between the drop and creation
# of the new constraint

$node_publisher->safe_psql(
	'postgres', qq(
	ALTER TABLE tab7 DROP CONSTRAINT tab7_pkey;
	INSERT INTO tab7 VALUES (4, 77, 888, 9999);
	-- update/delete is not allowed for tables without RI
	ALTER TABLE tab7 ADD PRIMARY KEY (b, a);
	UPDATE tab7 SET b = b * 2, c = c * 3, d = d * 4;
	DELETE FROM tab7 WHERE a = 1;
));

$node_publisher->safe_psql(
	'postgres', qq(
));

$node_publisher->wait_for_catchup('sub1');

is( $node_subscriber->safe_psql('postgres', "SELECT * FROM tab7 ORDER BY a"),
	qq(2|264||
3|220||
4|154||),
	'replication with the modified primary key');


# TEST: partitioned tables (with publish_via_partition_root = false)
# and replica identity. The (leaf) partitions may have different RI, so
# we need to check the partition RI (with respect to the column list)
# while attaching the partition.

# First, let's create a partitioned table with two partitions, each with
# a different RI, but a column list not covering all those RI.

$node_publisher->safe_psql(
	'postgres', qq(
	CREATE TABLE test_part_a (a int, b int, c int) PARTITION BY LIST (a);

	CREATE TABLE test_part_a_1 PARTITION OF test_part_a FOR VALUES IN (1,2,3,4,5);
	ALTER TABLE test_part_a_1 ADD PRIMARY KEY (a);
	ALTER TABLE test_part_a_1 REPLICA IDENTITY USING INDEX test_part_a_1_pkey;

	CREATE TABLE test_part_a_2 PARTITION OF test_part_a FOR VALUES IN (6,7,8,9,10);
	ALTER TABLE test_part_a_2 ADD PRIMARY KEY (b);
	ALTER TABLE test_part_a_2 REPLICA IDENTITY USING INDEX test_part_a_2_pkey;

	-- initial data, one row in each partition
	INSERT INTO test_part_a VALUES (1, 3);
	INSERT INTO test_part_a VALUES (6, 4);
));

# do the same thing on the subscriber (with the opposite column order)
$node_subscriber->safe_psql(
	'postgres', qq(
	CREATE TABLE test_part_a (b int, a int) PARTITION BY LIST (a);

	CREATE TABLE test_part_a_1 PARTITION OF test_part_a FOR VALUES IN (1,2,3,4,5);
	ALTER TABLE test_part_a_1 ADD PRIMARY KEY (a);
	ALTER TABLE test_part_a_1 REPLICA IDENTITY USING INDEX test_part_a_1_pkey;

	CREATE TABLE test_part_a_2 PARTITION OF test_part_a FOR VALUES IN (6,7,8,9,10);
	ALTER TABLE test_part_a_2 ADD PRIMARY KEY (b);
	ALTER TABLE test_part_a_2 REPLICA IDENTITY USING INDEX test_part_a_2_pkey;
));

# create a publication replicating just the column "a", which is not enough
# for the second partition
$node_publisher->safe_psql(
	'postgres', qq(
	CREATE PUBLICATION pub6 FOR TABLE test_part_a (b, a) WITH (publish_via_partition_root = true);
	ALTER PUBLICATION pub6 ADD TABLE test_part_a_1 (a);
	ALTER PUBLICATION pub6 ADD TABLE test_part_a_2 (b);
));

# add the publication to our subscription, wait for sync to complete
$node_subscriber->safe_psql(
	'postgres', qq(
	ALTER SUBSCRIPTION sub1 SET PUBLICATION pub6
));

$node_subscriber->wait_for_subscription_sync;

$node_publisher->safe_psql(
	'postgres', qq(
	INSERT INTO test_part_a VALUES (2, 5);
	INSERT INTO test_part_a VALUES (7, 6);
));

$node_publisher->wait_for_catchup('sub1');

is( $node_subscriber->safe_psql(
		'postgres', "SELECT a, b FROM test_part_a ORDER BY a, b"),
	qq(1|3
2|5
6|4
7|6),
	'partitions with different replica identities not replicated correctly');

# This time start with a column list covering RI for all partitions, but
# then update the column list to not cover column "b" (needed by the
# second partition)

$node_publisher->safe_psql(
	'postgres', qq(
	CREATE TABLE test_part_b (a int, b int) PARTITION BY LIST (a);

	CREATE TABLE test_part_b_1 PARTITION OF test_part_b FOR VALUES IN (1,2,3,4,5);
	ALTER TABLE test_part_b_1 ADD PRIMARY KEY (a);
	ALTER TABLE test_part_b_1 REPLICA IDENTITY USING INDEX test_part_b_1_pkey;

	CREATE TABLE test_part_b_2 PARTITION OF test_part_b FOR VALUES IN (6,7,8,9,10);
	ALTER TABLE test_part_b_2 ADD PRIMARY KEY (b);
	ALTER TABLE test_part_b_2 REPLICA IDENTITY USING INDEX test_part_b_2_pkey;

	-- initial data, one row in each partitions
	INSERT INTO test_part_b VALUES (1, 1);
	INSERT INTO test_part_b VALUES (6, 2);
));

# do the same thing on the subscriber
$node_subscriber->safe_psql(
	'postgres', qq(
	CREATE TABLE test_part_b (a int, b int) PARTITION BY LIST (a);

	CREATE TABLE test_part_b_1 PARTITION OF test_part_b FOR VALUES IN (1,2,3,4,5);
	ALTER TABLE test_part_b_1 ADD PRIMARY KEY (a);
	ALTER TABLE test_part_b_1 REPLICA IDENTITY USING INDEX test_part_b_1_pkey;

	CREATE TABLE test_part_b_2 PARTITION OF test_part_b FOR VALUES IN (6,7,8,9,10);
	ALTER TABLE test_part_b_2 ADD PRIMARY KEY (b);
	ALTER TABLE test_part_b_2 REPLICA IDENTITY USING INDEX test_part_b_2_pkey;
));

# create a publication replicating both columns, which is sufficient for
# both partitions
$node_publisher->safe_psql(
	'postgres', qq(
	CREATE PUBLICATION pub7 FOR TABLE test_part_b (a, b) WITH (publish_via_partition_root = true);
));

# add the publication to our subscription, wait for sync to complete
$node_subscriber->safe_psql(
	'postgres', qq(
	ALTER SUBSCRIPTION sub1 SET PUBLICATION pub7
));

$node_subscriber->wait_for_subscription_sync;

$node_publisher->safe_psql(
	'postgres', qq(
	INSERT INTO test_part_b VALUES (2, 3);
	INSERT INTO test_part_b VALUES (7, 4);
));

$node_publisher->wait_for_catchup('sub1');

is( $node_subscriber->safe_psql(
		'postgres', "SELECT * FROM test_part_b ORDER BY a, b"),
	qq(1|1
2|3
6|2
7|4),
	'partitions with different replica identities not replicated correctly');


# TEST: This time start with a column list covering RI for all partitions,
# but then update RI for one of the partitions to not be covered by the
# column list anymore.

$node_publisher->safe_psql(
	'postgres', qq(
	CREATE TABLE test_part_c (a int, b int, c int) PARTITION BY LIST (a);

	CREATE TABLE test_part_c_1 PARTITION OF test_part_c FOR VALUES IN (1,3);
	ALTER TABLE test_part_c_1 ADD PRIMARY KEY (a);
	ALTER TABLE test_part_c_1 REPLICA IDENTITY USING INDEX test_part_c_1_pkey;

	CREATE TABLE test_part_c_2 PARTITION OF test_part_c FOR VALUES IN (2,4);
	ALTER TABLE test_part_c_2 ADD PRIMARY KEY (b);
	ALTER TABLE test_part_c_2 REPLICA IDENTITY USING INDEX test_part_c_2_pkey;

	-- initial data, one row for each partition
	INSERT INTO test_part_c VALUES (1, 3, 5);
	INSERT INTO test_part_c VALUES (2, 4, 6);
));

# do the same thing on the subscriber
$node_subscriber->safe_psql(
	'postgres', qq(
	CREATE TABLE test_part_c (a int, b int, c int) PARTITION BY LIST (a);

	CREATE TABLE test_part_c_1 PARTITION OF test_part_c FOR VALUES IN (1,3);
	ALTER TABLE test_part_c_1 ADD PRIMARY KEY (a);
	ALTER TABLE test_part_c_1 REPLICA IDENTITY USING INDEX test_part_c_1_pkey;

	CREATE TABLE test_part_c_2 PARTITION OF test_part_c FOR VALUES IN (2,4);
	ALTER TABLE test_part_c_2 ADD PRIMARY KEY (b);
	ALTER TABLE test_part_c_2 REPLICA IDENTITY USING INDEX test_part_c_2_pkey;
));

# create a publication replicating data through partition root, with a column
# list on the root, and then add the partitions one by one with separate
# column lists (but those are not applied)
$node_publisher->safe_psql(
	'postgres', qq(
	CREATE PUBLICATION pub8 FOR TABLE test_part_c WITH (publish_via_partition_root = false);
	ALTER PUBLICATION pub8 ADD TABLE test_part_c_1 (a,c);
	ALTER PUBLICATION pub8 ADD TABLE test_part_c_2 (a,b);
));

# add the publication to our subscription, wait for sync to complete
$node_subscriber->safe_psql(
	'postgres', qq(
	DROP SUBSCRIPTION sub1;
	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub8;
));

$node_subscriber->wait_for_subscription_sync;

$node_publisher->safe_psql(
	'postgres', qq(
	INSERT INTO test_part_c VALUES (3, 7, 8);
	INSERT INTO test_part_c VALUES (4, 9, 10);
));

$node_publisher->wait_for_catchup('sub1');

is( $node_subscriber->safe_psql(
		'postgres', "SELECT * FROM test_part_c ORDER BY a, b"),
	qq(1||5
2|4|
3||8
4|9|),
	'partitions with different replica identities not replicated correctly');


# create a publication not replicating data through partition root, without
# a column list on the root, and then add the partitions one by one with
# separate column lists
$node_publisher->safe_psql(
	'postgres', qq(
	DROP PUBLICATION pub8;
	CREATE PUBLICATION pub8 FOR TABLE test_part_c WITH (publish_via_partition_root = false);
	ALTER PUBLICATION pub8 ADD TABLE test_part_c_1 (a);
	ALTER PUBLICATION pub8 ADD TABLE test_part_c_2 (a,b);
));

# add the publication to our subscription, wait for sync to complete
$node_subscriber->safe_psql(
	'postgres', qq(
	ALTER SUBSCRIPTION sub1 REFRESH PUBLICATION;
	TRUNCATE test_part_c;
));

$node_subscriber->wait_for_subscription_sync;

$node_publisher->safe_psql(
	'postgres', qq(
	TRUNCATE test_part_c;
	INSERT INTO test_part_c VALUES (1, 3, 5);
	INSERT INTO test_part_c VALUES (2, 4, 6);
));

$node_publisher->wait_for_catchup('sub1');

is( $node_subscriber->safe_psql(
		'postgres', "SELECT * FROM test_part_c ORDER BY a, b"),
	qq(1||
2|4|),
	'partitions with different replica identities not replicated correctly');


# TEST: Start with a single partition, with RI compatible with the column
# list, and then attach a partition with incompatible RI.

$node_publisher->safe_psql(
	'postgres', qq(
	CREATE TABLE test_part_d (a int, b int) PARTITION BY LIST (a);

	CREATE TABLE test_part_d_1 PARTITION OF test_part_d FOR VALUES IN (1,3);
	ALTER TABLE test_part_d_1 ADD PRIMARY KEY (a);
	ALTER TABLE test_part_d_1 REPLICA IDENTITY USING INDEX test_part_d_1_pkey;

	INSERT INTO test_part_d VALUES (1, 2);
));

# do the same thing on the subscriber (in fact, create both partitions right
# away, no need to delay that)
$node_subscriber->safe_psql(
	'postgres', qq(
	CREATE TABLE test_part_d (a int, b int) PARTITION BY LIST (a);

	CREATE TABLE test_part_d_1 PARTITION OF test_part_d FOR VALUES IN (1,3);
	ALTER TABLE test_part_d_1 ADD PRIMARY KEY (a);
	ALTER TABLE test_part_d_1 REPLICA IDENTITY USING INDEX test_part_d_1_pkey;

	CREATE TABLE test_part_d_2 PARTITION OF test_part_d FOR VALUES IN (2,4);
	ALTER TABLE test_part_d_2 ADD PRIMARY KEY (a);
	ALTER TABLE test_part_d_2 REPLICA IDENTITY USING INDEX test_part_d_2_pkey;
));

# create a publication replicating both columns, which is sufficient for
# both partitions
$node_publisher->safe_psql(
	'postgres', qq(
	CREATE PUBLICATION pub9 FOR TABLE test_part_d (a) WITH (publish_via_partition_root = true);
));

# add the publication to our subscription, wait for sync to complete
$node_subscriber->safe_psql(
	'postgres', qq(
	ALTER SUBSCRIPTION sub1 SET PUBLICATION pub9
));

$node_subscriber->wait_for_subscription_sync;

$node_publisher->safe_psql(
	'postgres', qq(
	INSERT INTO test_part_d VALUES (3, 4);
));

$node_publisher->wait_for_catchup('sub1');

is( $node_subscriber->safe_psql(
		'postgres', "SELECT * FROM test_part_d ORDER BY a, b"),
	qq(1|
3|),
	'partitions with different replica identities not replicated correctly');


# TEST: With a table included in the publications which is FOR ALL TABLES, it
# means replicate all columns.

# drop unnecessary tables, so as not to interfere with the FOR ALL TABLES
$node_publisher->safe_psql(
	'postgres', qq(
	DROP TABLE tab1, tab2, tab3, tab4, tab5, tab6, tab7,
			   test_part, test_part_a, test_part_b, test_part_c, test_part_d;
));

$node_publisher->safe_psql(
	'postgres', qq(
	CREATE TABLE test_mix_2 (a int PRIMARY KEY, b int, c int);
	CREATE PUBLICATION pub_mix_3 FOR TABLE test_mix_2 (a, b, c);
	CREATE PUBLICATION pub_mix_4 FOR ALL TABLES;

	-- initial data
	INSERT INTO test_mix_2 VALUES (1, 2, 3);
));

$node_subscriber->safe_psql(
	'postgres', qq(
	CREATE TABLE test_mix_2 (a int PRIMARY KEY, b int, c int);
	ALTER SUBSCRIPTION sub1 SET PUBLICATION pub_mix_3, pub_mix_4;
	ALTER SUBSCRIPTION sub1 REFRESH PUBLICATION;
));

$node_subscriber->wait_for_subscription_sync;

$node_publisher->safe_psql(
	'postgres', qq(
	INSERT INTO test_mix_2 VALUES (4, 5, 6);
));

$node_publisher->wait_for_catchup('sub1');

is( $node_subscriber->safe_psql('postgres', "SELECT * FROM test_mix_2"),
	qq(1|2|3
4|5|6),
	'all columns should be replicated');


# TEST: With a table included in the publication which is FOR TABLES IN
# SCHEMA, it means replicate all columns.

$node_subscriber->safe_psql(
	'postgres', qq(
	DROP SUBSCRIPTION sub1;
	CREATE TABLE test_mix_3 (a int PRIMARY KEY, b int, c int);
));

$node_publisher->safe_psql(
	'postgres', qq(
	DROP TABLE test_mix_2;
	CREATE TABLE test_mix_3 (a int PRIMARY KEY, b int, c int);
	CREATE PUBLICATION pub_mix_5 FOR TABLE test_mix_3 (a, b, c);
	CREATE PUBLICATION pub_mix_6 FOR TABLES IN SCHEMA public;

	-- initial data
	INSERT INTO test_mix_3 VALUES (1, 2, 3);
));

$node_subscriber->safe_psql(
	'postgres', qq(
	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_5, pub_mix_6;
));

$node_subscriber->wait_for_subscription_sync;

$node_publisher->safe_psql(
	'postgres', qq(
	INSERT INTO test_mix_3 VALUES (4, 5, 6);
));

$node_publisher->wait_for_catchup('sub1');

is( $node_subscriber->safe_psql('postgres', "SELECT * FROM test_mix_3"),
	qq(1|2|3
4|5|6),
	'all columns should be replicated');


# TEST: Check handling of publish_via_partition_root - if a partition is
# published through partition root, we should only apply the column list
# defined for the whole table (not the partitions) - both during the initial
# sync and when replicating changes. This is what we do for row filters.

$node_subscriber->safe_psql(
	'postgres', qq(
	DROP SUBSCRIPTION sub1;

	CREATE TABLE test_root (a int PRIMARY KEY, b int, c int) PARTITION BY RANGE (a);
	CREATE TABLE test_root_1 PARTITION OF test_root FOR VALUES FROM (1) TO (10);
	CREATE TABLE test_root_2 PARTITION OF test_root FOR VALUES FROM (10) TO (20);
));

$node_publisher->safe_psql(
	'postgres', qq(
	CREATE TABLE test_root (a int PRIMARY KEY, b int, c int) PARTITION BY RANGE (a);
	CREATE TABLE test_root_1 PARTITION OF test_root FOR VALUES FROM (1) TO (10);
	CREATE TABLE test_root_2 PARTITION OF test_root FOR VALUES FROM (10) TO (20);

	CREATE PUBLICATION pub_root_true FOR TABLE test_root (a) WITH (publish_via_partition_root = true);

	-- initial data
	INSERT INTO test_root VALUES (1, 2, 3);
	INSERT INTO test_root VALUES (10, 20, 30);
));

$node_subscriber->safe_psql(
	'postgres', qq(
	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_root_true;
));

$node_subscriber->wait_for_subscription_sync;

$node_publisher->safe_psql(
	'postgres', qq(
	INSERT INTO test_root VALUES (2, 3, 4);
	INSERT INTO test_root VALUES (11, 21, 31);
));

$node_publisher->wait_for_catchup('sub1');

is( $node_subscriber->safe_psql(
		'postgres', "SELECT * FROM test_root ORDER BY a, b, c"),
	qq(1||
2||
10||
11||),
	'publication via partition root applies column list');


# TEST: Multiple publications which publish schema of parent table and
# partition. The partition is published through two publications, once
# through a schema (so no column list) containing the parent, and then
# also directly (with all columns). The expected outcome is there is
# no column list.

$node_publisher->safe_psql(
	'postgres', qq(
	DROP PUBLICATION pub1, pub2, pub3, pub4, pub5, pub6, pub7, pub8;

	CREATE SCHEMA s1;
	CREATE TABLE s1.t (a int, b int, c int) PARTITION BY RANGE (a);
	CREATE TABLE t_1 PARTITION OF s1.t FOR VALUES FROM (1) TO (10);

	CREATE PUBLICATION pub1 FOR TABLES IN SCHEMA s1;
	CREATE PUBLICATION pub2 FOR TABLE t_1(a, b, c);

	-- initial data
	INSERT INTO s1.t VALUES (1, 2, 3);
));

$node_subscriber->safe_psql(
	'postgres', qq(
	CREATE SCHEMA s1;
	CREATE TABLE s1.t (a int, b int, c int) PARTITION BY RANGE (a);
	CREATE TABLE t_1 PARTITION OF s1.t FOR VALUES FROM (1) TO (10);

	ALTER SUBSCRIPTION sub1 SET PUBLICATION pub1, pub2;
));

$node_subscriber->wait_for_subscription_sync;

$node_publisher->safe_psql(
	'postgres', qq(
	INSERT INTO s1.t VALUES (4, 5, 6);
));

$node_publisher->wait_for_catchup('sub1');

is( $node_subscriber->safe_psql('postgres', "SELECT * FROM s1.t ORDER BY a"),
	qq(1|2|3
4|5|6),
	'two publications, publishing the same relation');

# Now resync the subcription, but with publications in the opposite order.
# The result should be the same.

$node_subscriber->safe_psql(
	'postgres', qq(
	TRUNCATE s1.t;

	ALTER SUBSCRIPTION sub1 SET PUBLICATION pub2, pub1;
));

$node_subscriber->wait_for_subscription_sync;

$node_publisher->safe_psql(
	'postgres', qq(
	INSERT INTO s1.t VALUES (7, 8, 9);
));

$node_publisher->wait_for_catchup('sub1');

is( $node_subscriber->safe_psql('postgres', "SELECT * FROM s1.t ORDER BY a"),
	qq(7|8|9),
	'two publications, publishing the same relation');


# TEST: One publication, containing both the parent and child relations.
# The expected outcome is list "a", because that's the column list defined
# for the top-most ancestor added to the publication.

$node_publisher->safe_psql(
	'postgres', qq(
	DROP SCHEMA s1 CASCADE;
	CREATE TABLE t (a int, b int, c int) PARTITION BY RANGE (a);
	CREATE TABLE t_1 PARTITION OF t FOR VALUES FROM (1) TO (10)
		   PARTITION BY RANGE (a);
	CREATE TABLE t_2 PARTITION OF t_1 FOR VALUES FROM (1) TO (10);

	CREATE PUBLICATION pub3 FOR TABLE t_1 (a), t_2
	  WITH (PUBLISH_VIA_PARTITION_ROOT);

	-- initial data
	INSERT INTO t VALUES (1, 2, 3);
));

$node_subscriber->safe_psql(
	'postgres', qq(
	DROP SCHEMA s1 CASCADE;
	CREATE TABLE t (a int, b int, c int) PARTITION BY RANGE (a);
	CREATE TABLE t_1 PARTITION OF t FOR VALUES FROM (1) TO (10)
		   PARTITION BY RANGE (a);
	CREATE TABLE t_2 PARTITION OF t_1 FOR VALUES FROM (1) TO (10);

	ALTER SUBSCRIPTION sub1 SET PUBLICATION pub3;
));

$node_subscriber->wait_for_subscription_sync;

$node_publisher->safe_psql(
	'postgres', qq(
	INSERT INTO t VALUES (4, 5, 6);
));

$node_publisher->wait_for_catchup('sub1');

is( $node_subscriber->safe_psql(
		'postgres', "SELECT * FROM t ORDER BY a, b, c"),
	qq(1||
4||),
	'publication containing both parent and child relation');


# TEST: One publication, containing both the parent and child relations.
# The expected outcome is list "a", because that's the column list defined
# for the top-most ancestor added to the publication.
# Note: The difference from the preceding test is that in this case both
# relations have a column list defined.

$node_publisher->safe_psql(
	'postgres', qq(
	DROP TABLE t;
	CREATE TABLE t (a int, b int, c int) PARTITION BY RANGE (a);
	CREATE TABLE t_1 PARTITION OF t FOR VALUES FROM (1) TO (10)
		   PARTITION BY RANGE (a);
	CREATE TABLE t_2 PARTITION OF t_1 FOR VALUES FROM (1) TO (10);

	CREATE PUBLICATION pub4 FOR TABLE t_1 (a), t_2 (b)
	  WITH (PUBLISH_VIA_PARTITION_ROOT);

	-- initial data
	INSERT INTO t VALUES (1, 2, 3);
));

$node_subscriber->safe_psql(
	'postgres', qq(
	DROP TABLE t;
	CREATE TABLE t (a int, b int, c int) PARTITION BY RANGE (a);
	CREATE TABLE t_1 PARTITION OF t FOR VALUES FROM (1) TO (10)
		   PARTITION BY RANGE (a);
	CREATE TABLE t_2 PARTITION OF t_1 FOR VALUES FROM (1) TO (10);

	ALTER SUBSCRIPTION sub1 SET PUBLICATION pub4;
));

$node_subscriber->wait_for_subscription_sync;

$node_publisher->safe_psql(
	'postgres', qq(
	INSERT INTO t VALUES (4, 5, 6);
));

$node_publisher->wait_for_catchup('sub1');

is( $node_subscriber->safe_psql(
		'postgres', "SELECT * FROM t ORDER BY a, b, c"),
	qq(1||
4||),
	'publication containing both parent and child relation');

# TEST: Only columns in the column list should exist in the old tuple of UPDATE
# and DELETE.

$node_publisher->safe_psql(
	'postgres', qq(
	CREATE TABLE test_oldtuple_col (a int PRIMARY KEY, b int, c int);
	CREATE PUBLICATION pub_check_oldtuple FOR TABLE test_oldtuple_col (a, b);
	INSERT INTO test_oldtuple_col VALUES(1, 2, 3);
	SELECT * FROM pg_create_logical_replication_slot('test_slot', 'pgoutput');
	UPDATE test_oldtuple_col SET a = 2;
	DELETE FROM test_oldtuple_col;
));


# Check at 7th byte of binary data for the number of columns in the old tuple.
#
# 7 = 1 (count from 1) + 1 byte (message type) + 4 byte (relid) + 1 byte (flag
# for old key).
#
# The message type of UPDATE is 85('U').
# The message type of DELETE is 68('D').
$result = $node_publisher->safe_psql(
	'postgres', qq(
		SELECT substr(data, 7, 2) = int2send(2::smallint)
		FROM pg_logical_slot_peek_binary_changes('test_slot', NULL, NULL,
			'proto_version', '1',
			'publication_names', 'pub_check_oldtuple')
		WHERE get_byte(data, 0) = 85 OR get_byte(data, 0) = 68
));

is( $result, qq(t
t), 'check the number of columns in the old tuple');


# TEST: With a table included in multiple publications with different column
# lists, we should catch the error when creating the subscription.

$node_publisher->safe_psql(
	'postgres', qq(
	CREATE TABLE test_mix_1 (a int PRIMARY KEY, b int, c int);
	CREATE PUBLICATION pub_mix_1 FOR TABLE test_mix_1 (a, b);
	CREATE PUBLICATION pub_mix_2 FOR TABLE test_mix_1 (a, c);
));

$node_subscriber->safe_psql(
	'postgres', qq(
	DROP SUBSCRIPTION sub1;
	CREATE TABLE test_mix_1 (a int PRIMARY KEY, b int, c int);
));

my ($cmdret, $stdout, $stderr) = $node_subscriber->psql(
	'postgres', qq(
	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_1, pub_mix_2;
));

ok( $stderr =~
	  qr/cannot use different column lists for table "public.test_mix_1" in different publications/,
	'different column lists detected');

# TEST: If the column list is changed after creating the subscription, we
# should catch the error reported by walsender.

$node_publisher->safe_psql(
	'postgres', qq(
	ALTER PUBLICATION pub_mix_1 SET TABLE test_mix_1 (a, c);
));

$node_subscriber->safe_psql(
	'postgres', qq(
	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_1, pub_mix_2;
));

$node_publisher->wait_for_catchup('sub1');

$node_publisher->safe_psql(
	'postgres', qq(
	ALTER PUBLICATION pub_mix_1 SET TABLE test_mix_1 (a, b);
	INSERT INTO test_mix_1 VALUES(1, 1, 1);
));

$offset = $node_publisher->wait_for_log(
	qr/cannot use different column lists for table "public.test_mix_1" in different publications/,
	$offset);

$node_subscriber->stop('fast');
$node_publisher->stop('fast');

done_testing();
