What’s the error, ‘cannot execute a distributed query from a query on a shard’ on Citus?

Summarized issue

I have a customer asking about the subject, who is using Azure Cosmos DB for PostgreSQL (Citus).

  • Using Azure Cosmos DB for PostgreSQL (Citus)
  • Using multiple distributed tables on a Citus cluster
  • Created a user defined function (UDF) to extract a record from a distributed table with complex conditions. It returns exactly one record.
  • When the UDF is called directly like ‘SELECT my_function()’ from psql / pgAdmin, the error doesn’t occur.
  • But the UDF is called from another query like ‘SELECT * from another_distributed_table WHERE my_function() = 1’, it fails.

OK, let’s reproduce it and fix it.

Reproduce

Preparation

Create tables and indices, make tables distributed, and load the test data

CREATE TABLE github_events(
    event_id bigint,
    event_type text,
    event_public boolean,
    repo_id bigint,
    payload jsonb,
    repo jsonb,
    user_id bigint,
    org jsonb,
    created_at timestamp
);
CREATE TABLE github_users(
    user_id bigint,
    url text,
    login text,
    avatar_url text,
    gravatar_id text,
    display_login text
);

CREATE INDEX event_type_index ON github_events (event_type);
CREATE INDEX payload_index ON github_events USING GIN (payload jsonb_path_ops);

SELECT create_distributed_table('github_events', 'user_id');
SELECT create_distributed_table('github_users', 'user_id');

\! curl -O https://examples.citusdata.com/users.csv
\! curl -O https://examples.citusdata.com/events.csv

\copy github_events from 'events.csv' WITH CSV
\copy github_users from 'users.csv' WITH CSV

Create a function

The following function refers a distributed table, ‘github_events’.

CREATE OR REPLACE FUNCTION get_repo_id(id BIGINT) RETURNS BIGINT AS $$
    BEGIN
        RETURN (SELECT repo_id FROM github_events WHERE github_events.user_id = id LIMIT 1);
    END;
$$ LANGUAGE plpgsql;

Reproduce an error

citus=> SELECT * FROM github_users WHERE user_id=17686159 AND get_repo_id(17686159)=59659257;
ERROR:  cannot execute a distributed query from a query on a shard
DETAIL:  Executing a distributed query in a function call that may be pushed to a remote node can lead to incorrect results.
HINT:  Avoid nesting of distributed queries or use alter user current_user set citus.allow_nested_distributed_execution to on to allow it with possible incorrectness.
CONTEXT:  SQL expression "(SELECT repo_id FROM github_events WHERE github_events.user_id = id LIMIT 1)"
PL/pgSQL function get_repo_id(bigint) line 3 at RETURN
while executing command on private-w1-riowemextest.qn4l7dxt7nr5nu.postgres.cosmos.azure.com:5432

The error message explains enough. I have some options to solve the error.

  • Avoid nesting of distributed queries
  • Setting citus.allow_nested_distributed_execution to on

Unfortunately, citus.allow_nested_distributed_execution didn’t work for me this time.

Check which worker nodes has the shard

citus=> EXPLAIN SELECT * FROM github_users WHERE user_id=17686159;
                                                 QUERY PLAN                                                 
------------------------------------------------------------------------------------------------------------
 Custom Scan (Citus Adaptive)  (cost=0.00..0.00 rows=0 width=0)
   Task Count: 1
   Tasks Shown: All
   ->  Task
         Node: host=private-w1-riowemextest.qn4l7dxt7nr5nu.postgres.cosmos.azure.com port=5432 dbname=citus
         ->  Seq Scan on github_users_102067 github_users  (cost=0.00..254.74 rows=1 width=117)
               Filter: (user_id = 17686159)
(7 rows)

citus=> EXPLAIN SELECT * FROM github_events WHERE user_id=17686159;
                                                 QUERY PLAN
------------------------------------------------------------------------------------------------------------
 Custom Scan (Citus Adaptive)  (cost=0.00..0.00 rows=0 width=0)
   Task Count: 1
   Tasks Shown: All
   ->  Task
         Node: host=private-w1-riowemextest.qn4l7dxt7nr5nu.postgres.cosmos.azure.com port=5432 dbname=citus
         ->  Seq Scan on github_events_102035 github_events  (cost=0.00..391.86 rows=2 width=811)
               Filter: (user_id = 17686159)
(7 rows)

The result of EXPLAIN shows the shard for user_id=17686159 exists on w1.

The flow I guessed

And the above fact told us what occurred with the query getting the error.

  1. The query ‘SELECT * FROM github_users WHERE user_id=17686159 AND get_repo_id(17686159)=59659257’ was pushed down to w1, not w0 because shard column, ‘user_id’ was specified in WHERE clause.
  2. The table name, ‘github_users’ in the above query was rewritten to ‘github_users_102067’ by the coordinator node. So, the query that w1 received was ‘SELECT * FROM github_users_102067 WHERE user_id=17686159 AND get_repo_id(17686159)=59659257’.
  3. w1 tried to execute the received query, more precisely, w1 tried to make a query plan before executing, and tried to execute the function. Let’s see the function again.
CREATE OR REPLACE FUNCTION get_repo_id(id BIGINT) RETURNS BIGINT AS $$
    BEGIN
        RETURN (SELECT repo_id FROM github_events WHERE github_events.user_id = id LIMIT 1);
    END;
$$ LANGUAGE plpgsql;

No! w1 didn’t have the table ‘github_events’. Instead, it has the table ‘github_events_102035’ having the rows for user_id = 17686159.

Proof for the guess

Tried to fix the function.

CREATE OR REPLACE FUNCTION get_repo_id(id BIGINT) RETURNS BIGINT AS $$
    BEGIN
        RETURN (SELECT repo_id FROM github_events_102035 WHERE github_events_102035.user_id = id LIMIT 1);
    END;
$$ LANGUAGE plpgsql;

citus=> SELECT * FROM github_users WHERE user_id=17686159 AND get_repo_id(17686159)=59659257;
 user_id  |                  url                   |   login   |                    avatar_url                     | gravatar_id | display_login
----------+----------------------------------------+-----------+---------------------------------------------------+-------------+---------------
 17686159 | https://api.github.com/users/MarkChan7 | MarkChan7 | https://avatars.githubusercontent.com/u/17686159? |             | MarkChan7
(1 row)

It worked!

Conclusion

Turn the function to a dynamic query

Finally, the workaround was like followings. Citus offers a function, get_shard_id_for_distribution_column(). It works for this kind of scenario.

CREATE OR REPLACE FUNCTION get_repo_id(id BIGINT) RETURNS bigint AS $$
    DECLARE
        shard_id BIGINT;
        repo_id BIGINT;
    BEGIN
        shard_id := get_shard_id_for_distribution_column('github_events', id);
        RAISE NOTICE 'shard_id: %', shard_id;
        EXECUTE format('SELECT repo_id FROM github_events_%s WHERE user_id=%s LIMIT 1', shard_id, id) INTO repo_id;
        RETURN repo_id;
    END;
$$ LANGUAGE plpgsql;