Skip to content

Commit

Permalink
Allow pg_partman to be installed in any schema (#310)
Browse files Browse the repository at this point in the history
* support pg_partman in user defined schema

* pg_partman ud schema tweaks

* manual schema append to fq_qtable

* manual schema append to fq_atable

* manual schema append to qualified_a_table_name

* 1.4.3 - 1.4.4 migration

* bump default version to 1.4.4

* create or replace everywhere in upgrade script
  • Loading branch information
olirice authored Sep 16, 2024
1 parent 645a3c6 commit 9787166
Show file tree
Hide file tree
Showing 3 changed files with 394 additions and 37 deletions.
2 changes: 1 addition & 1 deletion pgmq-extension/pgmq.control
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
comment = 'A lightweight message queue. Like AWS SQS and RSMQ but on Postgres.'
default_version = '1.4.3'
default_version = '1.4.4'
module_pathname = '$libdir/pgmq'
schema = 'pgmq'
relocatable = false
Expand Down
310 changes: 310 additions & 0 deletions pgmq-extension/sql/pgmq--1.4.3--1.4.4.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,310 @@
CREATE OR REPLACE FUNCTION pgmq._get_pg_partman_schema()
RETURNS TEXT AS $$
SELECT
extnamespace::regnamespace::text
FROM
pg_extension
WHERE
extname = 'pg_partman';
$$ LANGUAGE SQL;


CREATE OR REPLACE FUNCTION pgmq.drop_queue(queue_name TEXT, partitioned BOOLEAN DEFAULT FALSE)
RETURNS BOOLEAN AS $$
DECLARE
qtable TEXT := pgmq.format_table_name(queue_name, 'q');
fq_qtable TEXT := 'pgmq.' || qtable;
atable TEXT := pgmq.format_table_name(queue_name, 'a');
fq_atable TEXT := 'pgmq.' || atable;
BEGIN
EXECUTE FORMAT(
$QUERY$
ALTER EXTENSION pgmq DROP TABLE pgmq.%I
$QUERY$,
qtable
);

EXECUTE FORMAT(
$QUERY$
ALTER EXTENSION pgmq DROP TABLE pgmq.%I
$QUERY$,
atable
);

EXECUTE FORMAT(
$QUERY$
DROP TABLE IF EXISTS pgmq.%I
$QUERY$,
qtable
);

EXECUTE FORMAT(
$QUERY$
DROP TABLE IF EXISTS pgmq.%I
$QUERY$,
atable
);

IF EXISTS (
SELECT 1
FROM information_schema.tables
WHERE table_name = 'meta' and table_schema = 'pgmq'
) THEN
EXECUTE FORMAT(
$QUERY$
DELETE FROM pgmq.meta WHERE queue_name = %L
$QUERY$,
queue_name
);
END IF;

IF partitioned THEN
EXECUTE FORMAT(
$QUERY$
DELETE FROM %I.part_config where parent_table in (%L, %L)
$QUERY$,
pgmq._get_pg_partman_schema(), fq_qtable, fq_atable
);
END IF;

RETURN TRUE;
END;
$$ LANGUAGE plpgsql;


CREATE OR REPLACE FUNCTION pgmq.create_partitioned(
queue_name TEXT,
partition_interval TEXT DEFAULT '10000',
retention_interval TEXT DEFAULT '100000'
)
RETURNS void AS $$
DECLARE
partition_col TEXT;
a_partition_col TEXT;
qtable TEXT := pgmq.format_table_name(queue_name, 'q');
atable TEXT := pgmq.format_table_name(queue_name, 'a');
fq_qtable TEXT := 'pgmq.' || qtable;
fq_atable TEXT := 'pgmq.' || atable;
BEGIN
PERFORM pgmq.validate_queue_name(queue_name);
PERFORM pgmq._ensure_pg_partman_installed();
SELECT pgmq._get_partition_col(partition_interval) INTO partition_col;

EXECUTE FORMAT(
$QUERY$
CREATE TABLE IF NOT EXISTS pgmq.%I (
msg_id BIGINT GENERATED ALWAYS AS IDENTITY,
read_ct INT DEFAULT 0 NOT NULL,
enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL,
vt TIMESTAMP WITH TIME ZONE NOT NULL,
message JSONB
) PARTITION BY RANGE (%I)
$QUERY$,
qtable, partition_col
);

IF NOT pgmq._belongs_to_pgmq(qtable) THEN
EXECUTE FORMAT('ALTER EXTENSION pgmq ADD TABLE pgmq.%I', qtable);
END IF;

-- https://github.com/pgpartman/pg_partman/blob/master/doc/pg_partman.md
-- p_parent_table - the existing parent table. MUST be schema qualified, even if in public schema.
EXECUTE FORMAT(
$QUERY$
SELECT %I.create_parent(
p_parent_table := %L,
p_control := %L,
p_interval := %L,
p_type := case
when pgmq._get_pg_partman_major_version() = 5 then 'range'
else 'native'
end
)
$QUERY$,
pgmq._get_pg_partman_schema(),
fq_qtable,
partition_col,
partition_interval
);

EXECUTE FORMAT(
$QUERY$
CREATE INDEX IF NOT EXISTS %I ON pgmq.%I (%I);
$QUERY$,
qtable || '_part_idx', qtable, partition_col
);

EXECUTE FORMAT(
$QUERY$
UPDATE %I.part_config
SET
retention = %L,
retention_keep_table = false,
retention_keep_index = true,
automatic_maintenance = 'on'
WHERE parent_table = %L;
$QUERY$,
pgmq._get_pg_partman_schema(),
retention_interval,
'pgmq.' || qtable
);

EXECUTE FORMAT(
$QUERY$
INSERT INTO pgmq.meta (queue_name, is_partitioned, is_unlogged)
VALUES (%L, true, false)
ON CONFLICT
DO NOTHING;
$QUERY$,
queue_name
);

IF partition_col = 'enqueued_at' THEN
a_partition_col := 'archived_at';
ELSE
a_partition_col := partition_col;
END IF;

EXECUTE FORMAT(
$QUERY$
CREATE TABLE IF NOT EXISTS pgmq.%I (
msg_id BIGINT NOT NULL,
read_ct INT DEFAULT 0 NOT NULL,
enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL,
archived_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL,
vt TIMESTAMP WITH TIME ZONE NOT NULL,
message JSONB
) PARTITION BY RANGE (%I);
$QUERY$,
atable, a_partition_col
);

IF NOT pgmq._belongs_to_pgmq(atable) THEN
EXECUTE FORMAT('ALTER EXTENSION pgmq ADD TABLE pgmq.%I', atable);
END IF;

-- https://github.com/pgpartman/pg_partman/blob/master/doc/pg_partman.md
-- p_parent_table - the existing parent table. MUST be schema qualified, even if in public schema.
EXECUTE FORMAT(
$QUERY$
SELECT %I.create_parent(
p_parent_table := %L,
p_control := %L,
p_interval := %L,
p_type := case
when pgmq._get_pg_partman_major_version() = 5 then 'range'
else 'native'
end
)
$QUERY$,
pgmq._get_pg_partman_schema(),
fq_atable,
a_partition_col,
partition_interval
);

EXECUTE FORMAT(
$QUERY$
UPDATE %I.part_config
SET
retention = %L,
retention_keep_table = false,
retention_keep_index = true,
automatic_maintenance = 'on'
WHERE parent_table = %L;
$QUERY$,
pgmq._get_pg_partman_schema(),
retention_interval,
'pgmq.' || atable
);

EXECUTE FORMAT(
$QUERY$
CREATE INDEX IF NOT EXISTS %I ON pgmq.%I (archived_at);
$QUERY$,
'archived_at_idx_' || queue_name, atable
);

END;
$$ LANGUAGE plpgsql;


CREATE OR REPLACE FUNCTION pgmq.convert_archive_partitioned(
table_name TEXT,
partition_interval TEXT DEFAULT '10000',
retention_interval TEXT DEFAULT '100000',
leading_partition INT DEFAULT 10
)
RETURNS void AS $$
DECLARE
a_table_name TEXT := pgmq.format_table_name(table_name, 'a');
a_table_name_old TEXT := pgmq.format_table_name(table_name, 'a') || '_old';
qualified_a_table_name TEXT := format('pgmq.%I', a_table_name);
BEGIN

PERFORM c.relkind
FROM pg_class c
JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE c.relname = a_table_name
AND c.relkind = 'p';

IF FOUND THEN
RAISE NOTICE 'Table %s is already partitioned', a_table_name;
RETURN;
END IF;

PERFORM c.relkind
FROM pg_class c
JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE c.relname = a_table_name
AND c.relkind = 'r';

IF NOT FOUND THEN
RAISE NOTICE 'Table %s does not exists', a_table_name;
RETURN;
END IF;

EXECUTE 'ALTER TABLE ' || qualified_a_table_name || ' RENAME TO ' || a_table_name_old;

EXECUTE format( 'CREATE TABLE pgmq.%I (LIKE pgmq.%I including all) PARTITION BY RANGE (msg_id)', a_table_name, a_table_name_old );

EXECUTE 'ALTER INDEX pgmq.archived_at_idx_' || table_name || ' RENAME TO archived_at_idx_' || table_name || '_old';
EXECUTE 'CREATE INDEX archived_at_idx_'|| table_name || ' ON ' || qualified_a_table_name ||'(archived_at)';

-- https://github.com/pgpartman/pg_partman/blob/master/doc/pg_partman.md
-- p_parent_table - the existing parent table. MUST be schema qualified, even if in public schema.
EXECUTE FORMAT(
$QUERY$
SELECT %I.create_parent(
p_parent_table := %L,
p_control := 'msg_id',
p_interval := %L,
p_type := case
when pgmq._get_pg_partman_major_version() = 5 then 'range'
else 'native'
end
)
$QUERY$,
pgmq._get_pg_partman_schema(),
qualified_a_table_name,
partition_interval
);

EXECUTE FORMAT(
$QUERY$
UPDATE %I.part_config
SET
retention = %L,
retention_keep_table = false,
retention_keep_index = false,
infinite_time_partitions = true
WHERE
parent_table = %L;
$QUERY$,
pgmq._get_pg_partman_schema(),
retention_interval,
qualified_a_table_name
);

END;
$$ LANGUAGE plpgsql;
Loading

0 comments on commit 9787166

Please sign in to comment.