
# Copyright (c) 2021-2022, PostgreSQL Global Development Group

# Tests for subscription stats.
use strict;
use warnings;
use PostgreSQL::Test::Cluster;
use PostgreSQL::Test::Utils;
use Test::More;

# Create publisher node.
my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
$node_publisher->init(allows_streaming => 'logical');
$node_publisher->start;

# Create subscriber node.
my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
$node_subscriber->init(allows_streaming => 'logical');
$node_subscriber->start;


sub create_sub_pub_w_errors
{
	my ($node_publisher, $node_subscriber, $db, $table_name) = @_;
	# Initial table setup on both publisher and subscriber. On subscriber we
	# create the same tables but with primary keys. Also, insert some data that
	# will conflict with the data replicated from publisher later.
	$node_publisher->safe_psql(
		$db,
		qq[
	BEGIN;
	CREATE TABLE $table_name(a int);
	INSERT INTO $table_name VALUES (1);
	COMMIT;
	]);
	$node_subscriber->safe_psql(
		$db,
		qq[
	BEGIN;
	CREATE TABLE $table_name(a int primary key);
	INSERT INTO $table_name VALUES (1);
	COMMIT;
	]);

	# Set up publication.
	my $pub_name          = $table_name . '_pub';
	my $publisher_connstr = $node_publisher->connstr . qq( dbname=$db);

	$node_publisher->safe_psql($db,
		qq(CREATE PUBLICATION $pub_name FOR TABLE $table_name));

	# Create subscription. The tablesync for table on subscription will enter into
	# infinite error loop due to violating the unique constraint.
	my $sub_name = $table_name . '_sub';
	$node_subscriber->safe_psql($db,
		qq(CREATE SUBSCRIPTION $sub_name CONNECTION '$publisher_connstr' PUBLICATION $pub_name)
	);

	$node_publisher->wait_for_catchup($sub_name);

	# Wait for the tablesync error to be reported.
	$node_subscriber->poll_query_until(
		$db,
		qq[
	SELECT sync_error_count > 0
	FROM pg_stat_subscription_stats
	WHERE subname = '$sub_name'
	])
	  or die
	  qq(Timed out while waiting for tablesync errors for subscription '$sub_name');

	# Truncate test_tab1 so that tablesync worker can continue.
	$node_subscriber->safe_psql($db, qq(TRUNCATE $table_name));

	# Wait for initial tablesync to finish.
	$node_subscriber->poll_query_until(
		$db,
		qq[
	SELECT count(1) = 1 FROM pg_subscription_rel
	WHERE srrelid = '$table_name'::regclass AND srsubstate in ('r', 's')
	])
	  or die
	  qq(Timed out while waiting for subscriber to synchronize data for table '$table_name'.);

	# Check test table on the subscriber has one row.
	my $result =
	  $node_subscriber->safe_psql($db, qq(SELECT a FROM $table_name));
	is($result, qq(1), qq(Check that table '$table_name' now has 1 row.));

	# Insert data to test table on the publisher, raising an error on the
	# subscriber due to violation of the unique constraint on test table.
	$node_publisher->safe_psql($db, qq(INSERT INTO $table_name VALUES (1)));

	# Wait for the apply error to be reported.
	$node_subscriber->poll_query_until(
		$db,
		qq[
	SELECT apply_error_count > 0
	FROM pg_stat_subscription_stats
	WHERE subname = '$sub_name'
	])
	  or die
	  qq(Timed out while waiting for apply error for subscription '$sub_name');

	# Truncate test table so that apply worker can continue.
	$node_subscriber->safe_psql($db, qq(TRUNCATE $table_name));

	return ($pub_name, $sub_name);
}

my $db = 'postgres';

# There shouldn't be any subscription errors before starting logical replication.
my $result = $node_subscriber->safe_psql($db,
	qq(SELECT count(1) FROM pg_stat_subscription_stats));
is($result, qq(0),
	'Check that there are no subscription errors before starting logical replication.'
);

# Create the publication and subscription with sync and apply errors
my $table1_name = 'test_tab1';
my ($pub1_name, $sub1_name) =
  create_sub_pub_w_errors($node_publisher, $node_subscriber, $db,
	$table1_name);

# Apply and Sync errors are > 0 and reset timestamp is NULL
is( $node_subscriber->safe_psql(
		$db,
		qq(SELECT apply_error_count > 0,
	sync_error_count > 0,
	stats_reset IS NULL
	FROM pg_stat_subscription_stats
	WHERE subname = '$sub1_name')
	),
	qq(t|t|t),
	qq(Check that apply errors and sync errors are both > 0 and stats_reset is NULL for subscription '$sub1_name'.)
);

# Reset a single subscription
$node_subscriber->safe_psql($db,
	qq(SELECT pg_stat_reset_subscription_stats((SELECT subid FROM pg_stat_subscription_stats WHERE subname = '$sub1_name')))
);

# Apply and Sync errors are 0 and stats reset is not NULL
is( $node_subscriber->safe_psql(
		$db,
		qq(SELECT apply_error_count = 0,
	sync_error_count = 0,
	stats_reset IS NOT NULL
	FROM pg_stat_subscription_stats
	WHERE subname = '$sub1_name')
	),
	qq(t|t|t),
	qq(Confirm that apply errors and sync errors are both 0 and stats_reset is not NULL after reset for subscription '$sub1_name'.)
);

# Get reset timestamp
my $reset_time1 = $node_subscriber->safe_psql($db,
	qq(SELECT stats_reset FROM pg_stat_subscription_stats WHERE subname = '$sub1_name')
);

# Reset single sub again
$node_subscriber->safe_psql(
	$db,
	qq(SELECT pg_stat_reset_subscription_stats((SELECT subid FROM
	pg_stat_subscription_stats WHERE subname = '$sub1_name')))
);

# check reset timestamp is newer after reset
is( $node_subscriber->safe_psql(
		$db,
		qq(SELECT stats_reset > '$reset_time1'::timestamptz FROM
	pg_stat_subscription_stats WHERE subname = '$sub1_name')
	),
	qq(t),
	qq(Check reset timestamp for '$sub1_name' is newer after second reset.));

# Make second subscription and publication
my $table2_name = 'test_tab2';
my ($pub2_name, $sub2_name) =
  create_sub_pub_w_errors($node_publisher, $node_subscriber, $db,
	$table2_name);

# Apply and Sync errors are > 0 and reset timestamp is NULL
is( $node_subscriber->safe_psql(
		$db,
		qq(SELECT apply_error_count > 0,
	sync_error_count > 0,
	stats_reset IS NULL
	FROM pg_stat_subscription_stats
	WHERE subname = '$sub2_name')
	),
	qq(t|t|t),
	qq(Confirm that apply errors and sync errors are both > 0 and stats_reset is NULL for sub '$sub2_name'.)
);

# Reset all subscriptions
$node_subscriber->safe_psql($db,
	qq(SELECT pg_stat_reset_subscription_stats(NULL)));

# Apply and Sync errors are 0 and stats reset is not NULL
is( $node_subscriber->safe_psql(
		$db,
		qq(SELECT apply_error_count = 0,
	sync_error_count = 0,
	stats_reset IS NOT NULL
	FROM pg_stat_subscription_stats
	WHERE subname = '$sub1_name')
	),
	qq(t|t|t),
	qq(Confirm that apply errors and sync errors are both 0 and stats_reset is not NULL for sub '$sub1_name' after reset.)
);

is( $node_subscriber->safe_psql(
		$db,
		qq(SELECT apply_error_count = 0,
	sync_error_count = 0,
	stats_reset IS NOT NULL
	FROM pg_stat_subscription_stats
	WHERE subname = '$sub2_name')
	),
	qq(t|t|t),
	qq(Confirm that apply errors and sync errors are both 0 and stats_reset is not NULL for sub '$sub2_name' after reset.)
);

$reset_time1 = $node_subscriber->safe_psql($db,
	qq(SELECT stats_reset FROM pg_stat_subscription_stats WHERE subname = '$sub1_name')
);
my $reset_time2 = $node_subscriber->safe_psql($db,
	qq(SELECT stats_reset FROM pg_stat_subscription_stats WHERE subname = '$sub2_name')
);

# Reset all subscriptions
$node_subscriber->safe_psql($db,
	qq(SELECT pg_stat_reset_subscription_stats(NULL)));

# check reset timestamp for sub1 is newer after reset
is( $node_subscriber->safe_psql(
		$db,
		qq(SELECT stats_reset > '$reset_time1'::timestamptz FROM
	pg_stat_subscription_stats WHERE subname = '$sub1_name')
	),
	qq(t),
	qq(Confirm that reset timestamp for '$sub1_name' is newer after second reset.)
);

# check reset timestamp for sub2 is newer after reset
is( $node_subscriber->safe_psql(
		$db,
		qq(SELECT stats_reset > '$reset_time2'::timestamptz FROM
	pg_stat_subscription_stats WHERE subname = '$sub2_name')
	),
	qq(t),
	qq(Confirm that reset timestamp for '$sub2_name' is newer after second reset.)
);

# Get subscription 1 oid
my $sub1_oid = $node_subscriber->safe_psql($db,
	qq(SELECT oid FROM pg_subscription WHERE subname = '$sub1_name'));

# Drop subscription 1
$node_subscriber->safe_psql($db, qq(DROP SUBSCRIPTION $sub1_name));

# Subscription stats for sub1 should be gone
is( $node_subscriber->safe_psql(
		$db, qq(SELECT pg_stat_have_stats('subscription', 0, $sub1_oid))),
	qq(f),
	qq(Subscription stats for subscription '$sub1_name' should be removed.));

# Get subscription 2 oid
my $sub2_oid = $node_subscriber->safe_psql($db,
	qq(SELECT oid FROM pg_subscription WHERE subname = '$sub2_name'));

# Diassociate the subscription 2 from its replication slot and drop it
$node_subscriber->safe_psql(
	$db,
	qq(
ALTER SUBSCRIPTION $sub2_name DISABLE;
ALTER SUBSCRIPTION $sub2_name SET (slot_name = NONE);
DROP SUBSCRIPTION $sub2_name;
			    ));

# Subscription stats for sub2 should be gone
is( $node_subscriber->safe_psql(
		$db, qq(SELECT pg_stat_have_stats('subscription', 0, $sub2_oid))),
	qq(f),
	qq(Subscription stats for subscription '$sub2_name' should be removed.));

# Since disabling subscription doesn't wait for walsender to release the replication
# slot and exit, wait for the slot to become inactive.
$node_publisher->poll_query_until(
	$db,
	qq(SELECT EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = '$sub2_name' AND active_pid IS NULL))
) or die "slot never became inactive";

$node_publisher->safe_psql($db,
	qq(SELECT pg_drop_replication_slot('$sub2_name')));

$node_subscriber->stop('fast');
$node_publisher->stop('fast');

done_testing();
