Summarized issue
I have a customer asking about the subject, who is using Azure Cosmos DB for PostgreSQL (Citus).
- Using Azure Cosmos DB for PostgreSQL (Citus)
- Using multiple distributed tables on a Citus cluster
- Created a user defined function (UDF) to extract a record from a distributed table with complex conditions. It returns exactly one record.
- When the UDF is called directly like ‘SELECT my_function()’ from psql / pgAdmin, the error doesn’t occur.
- But the UDF is called from another query like ‘SELECT * from another_distributed_table WHERE my_function() = 1’, it fails.
OK, let’s reproduce it and fix it.
Reproduce
Preparation
Create tables and indices, make tables distributed, and load the test data
CREATE TABLE github_events(
event_id bigint,
event_type text,
event_public boolean,
repo_id bigint,
payload jsonb,
repo jsonb,
user_id bigint,
org jsonb,
created_at timestamp
);
CREATE TABLE github_users(
user_id bigint,
url text,
login text,
avatar_url text,
gravatar_id text,
display_login text
);
CREATE INDEX event_type_index ON github_events (event_type);
CREATE INDEX payload_index ON github_events USING GIN (payload jsonb_path_ops);
SELECT create_distributed_table('github_events', 'user_id');
SELECT create_distributed_table('github_users', 'user_id');
\! curl -O https://examples.citusdata.com/users.csv
\! curl -O https://examples.citusdata.com/events.csv
\copy github_events from 'events.csv' WITH CSV
\copy github_users from 'users.csv' WITH CSV
Create a function
The following function refers a distributed table, ‘github_events’.
CREATE OR REPLACE FUNCTION get_repo_id(id BIGINT) RETURNS BIGINT AS $$
BEGIN
RETURN (SELECT repo_id FROM github_events WHERE github_events.user_id = id LIMIT 1);
END;
$$ LANGUAGE plpgsql;
Reproduce an error
citus=> SELECT * FROM github_users WHERE user_id=17686159 AND get_repo_id(17686159)=59659257;
ERROR: cannot execute a distributed query from a query on a shard
DETAIL: Executing a distributed query in a function call that may be pushed to a remote node can lead to incorrect results.
HINT: Avoid nesting of distributed queries or use alter user current_user set citus.allow_nested_distributed_execution to on to allow it with possible incorrectness.
CONTEXT: SQL expression "(SELECT repo_id FROM github_events WHERE github_events.user_id = id LIMIT 1)"
PL/pgSQL function get_repo_id(bigint) line 3 at RETURN
while executing command on private-w1-riowemextest.qn4l7dxt7nr5nu.postgres.cosmos.azure.com:5432
The error message explains enough. I have some options to solve the error.
- Avoid nesting of distributed queries
- Setting citus.allow_nested_distributed_execution to on
Unfortunately, citus.allow_nested_distributed_execution didn’t work for me this time.
Check which worker nodes has the shard
citus=> EXPLAIN SELECT * FROM github_users WHERE user_id=17686159;
QUERY PLAN
------------------------------------------------------------------------------------------------------------
Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0)
Task Count: 1
Tasks Shown: All
-> Task
Node: host=private-w1-riowemextest.qn4l7dxt7nr5nu.postgres.cosmos.azure.com port=5432 dbname=citus
-> Seq Scan on github_users_102067 github_users (cost=0.00..254.74 rows=1 width=117)
Filter: (user_id = 17686159)
(7 rows)
citus=> EXPLAIN SELECT * FROM github_events WHERE user_id=17686159;
QUERY PLAN
------------------------------------------------------------------------------------------------------------
Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0)
Task Count: 1
Tasks Shown: All
-> Task
Node: host=private-w1-riowemextest.qn4l7dxt7nr5nu.postgres.cosmos.azure.com port=5432 dbname=citus
-> Seq Scan on github_events_102035 github_events (cost=0.00..391.86 rows=2 width=811)
Filter: (user_id = 17686159)
(7 rows)
The result of EXPLAIN shows the shard for user_id=17686159 exists on w1.
The flow I guessed
And the above fact told us what occurred with the query getting the error.
- The query ‘SELECT * FROM github_users WHERE user_id=17686159 AND get_repo_id(17686159)=59659257’ was pushed down to w1, not w0 because shard column, ‘user_id’ was specified in WHERE clause.
- The table name, ‘github_users’ in the above query was rewritten to ‘github_users_102067’ by the coordinator node. So, the query that w1 received was ‘SELECT * FROM github_users_102067 WHERE user_id=17686159 AND get_repo_id(17686159)=59659257’.
- w1 tried to execute the received query, more precisely, w1 tried to make a query plan before executing, and tried to execute the function. Let’s see the function again.
CREATE OR REPLACE FUNCTION get_repo_id(id BIGINT) RETURNS BIGINT AS $$
BEGIN
RETURN (SELECT repo_id FROM github_events WHERE github_events.user_id = id LIMIT 1);
END;
$$ LANGUAGE plpgsql;
No! w1 didn’t have the table ‘github_events’. Instead, it has the table ‘github_events_102035’ having the rows for user_id = 17686159.
Proof for the guess
Tried to fix the function.
CREATE OR REPLACE FUNCTION get_repo_id(id BIGINT) RETURNS BIGINT AS $$
BEGIN
RETURN (SELECT repo_id FROM github_events_102035 WHERE github_events_102035.user_id = id LIMIT 1);
END;
$$ LANGUAGE plpgsql;
citus=> SELECT * FROM github_users WHERE user_id=17686159 AND get_repo_id(17686159)=59659257;
user_id | url | login | avatar_url | gravatar_id | display_login
----------+----------------------------------------+-----------+---------------------------------------------------+-------------+---------------
17686159 | https://api.github.com/users/MarkChan7 | MarkChan7 | https://avatars.githubusercontent.com/u/17686159? | | MarkChan7
(1 row)
It worked!
Conclusion
Turn the function to a dynamic query
Finally, the workaround was like followings. Citus offers a function, get_shard_id_for_distribution_column(). It works for this kind of scenario.
CREATE OR REPLACE FUNCTION get_repo_id(id BIGINT) RETURNS bigint AS $$
DECLARE
shard_id BIGINT;
repo_id BIGINT;
BEGIN
shard_id := get_shard_id_for_distribution_column('github_events', id);
RAISE NOTICE 'shard_id: %', shard_id;
EXECUTE format('SELECT repo_id FROM github_events_%s WHERE user_id=%s LIMIT 1', shard_id, id) INTO repo_id;
RETURN repo_id;
END;
$$ LANGUAGE plpgsql;