AGEFreighter 0.5.2リリース

Azure

これ、なかなか大変でした。一番書きたかったやつです。ツイートしたりあちこちで書いたりしているんだけど、このAzureStorageFreighterというクラスが本命でして、なんでこんなものを書かないとならないかという話を少しだけ。

Apache AGEとNeo4j

CypherクエリにはUNWINDという句があります。以下のようにリストを展開して、代入することが出来る機能で、要するにPythonのリスト内包表記と同じです。

{
  "events" : [ {
    "year" : 2014,
    "id" : 1
  }, {
    "year" : 2014,
    "id" : 2
  } ]
}

UNWIND $events AS event
MERGE (y:Year {year: event.year})
MERGE (y)<-[:IN]-(e:Event {id: event.id})
RETURN e.id AS x ORDER BY x

Apache AGEにももちろんUNWINDは実装されているので、例えばpsqlで接続してCypherクエリを直接叩くのであれば、UNWINDを利用できます。

一方で、これをPythonから利用するとなると大きな問題があります。PythonでPostgreSQLに接続するには、psycopgパッケージを用いるわけですが、psycopgのparameterized queryの機能は、以下のような書式をとります。

id = 10
created_at = datetime.date(2020, 11, 18)
last_name = "O'Reilly"
cur.execute("""
    INSERT INTO some_table (id, created_at, last_name)
    VALUES (%s, %s, %s);
    """,
    (id, created_at, last_name))

ここで渡している変数は全てimmutableであって、タプル等のmutableを渡すことができません。以下のように書いてもエラーになるだけです。

cur.execute("""
    SELECT * FROM cypher('graph_name' $$
    UNWIND $events AS event
    MERGE (y:Year {year: event.year})
    MERGE (y)<-[:IN]-(e:Event {id: event.id})
    RETURN e.id AS x ORDER BY x
    $$...
    """,
    (events))

色々試してみましたが、そもそもCypherクエリをパースしてPostgreSQLのテーブルにINSERTするAGEの仕組み上、UNWINDが出来てもあまり速くなるとは思えず、諦めました。

AzureにおけるApache AGEの問題

Apache AGEには、load_labels_from_file() / load_edges_from_file()というユーザ定義関数があり、事前に定義された書式のCSVファイルからグラフをロードする機能があります。この機能の実装(age/src/backend/utils/load/ag_load_labels.c)を見ると、はい、ファイルポインタですよね…。

int create_labels_from_csv_file(char *file_path,
                                char *graph_name,
                                Oid graph_oid,
                                char *label_name,
                                int label_id,
                                bool id_field_exists,
                                bool load_as_agtype)
{

    FILE *fp;
    struct csv_parser p;
    char buf[1024];
    size_t bytes_read;
    unsigned char options = 0;
    csv_vertex_reader cr;
    char *label_seq_name;
    Oid temp_table_relid;

つまり、これらのUDFを実行しているマシンのローカルファイルにしかアクセス出来ない、ということです。違う言い方をすると、PostgreSQLが動いているマシンにログインして実行することしか出来ません。PaaSではちょっと無理、ということになります。

そこで、AGEFreighterの開発に着手して、「手元にあるファイル」をどうにか高速にApache AGEにロード出来ないかという試行錯誤してきました。

azure_storage Extension

さて、Azure Database for PostgreSQLには(より正確には、Azure Cosmos DB for PostgreSQL「にも」)、azure_storageというエクステンションがあって、これを使うとAzure Storage AccountのBlobコンテナにあるファイルから、PostgreSQLにデータを読み込むことが可能になります。

SELECT * FROM azure_storage.blob_get
        ('<storage_account>'
        ,'<blob_container>'
        ,'events.csv'
        , NULL::events)
LIMIT 5;

つまり、あたかもローカルのファイルにアクセスしているかのように、ファイルからの読み込みが可能になります。なので、下記のようなユーザ定義関数を作って、Azure StorageからCSVを読み込んでテンポラリテーブルにぶち込み、そこからAGEのテーブルに良い感じにぶち込めば良いわけです。それをやっているのが、今回のAzureStorageFreighterというクラスです。

CREATE OR REPLACE FUNCTION load_graph_from_azure_storage()
RETURNS VOID AS $$
DECLARE
    chunk RECORD;
    chunk_size BIGINT := 100000;
    num_offset BIGINT := 0;
    total_rows BIGINT;

    ENTRY_ID_BITS INTEGER := 32 + 16;
    ENTRY_ID_MASK BIGINT := 0x0000FFFFFFFFFFFF;
    oid BIGINT;
    first_id_actor BIGINT;
    first_id_film BIGINT;
BEGIN
    SET search_path = ag_catalog, "$user", public;

    -- create a temporary table to store the data from the Azure Storage
    CREATE TEMP TABLE temp_from_azure_storage (
        Actor TEXT,
        ActorID TEXT,
        Film TEXT,
        Year TEXT,
        Votes TEXT,
        Rating TEXT,
        FilmID TEXT
    );

    -- bulk load from the Azure Storage into the temporary table
    INSERT INTO temp_from_azure_storage
    SELECT *
    FROM azure_storage.blob_get(
        'saagefreighter6894b1e8',
        'bcagefreighter6894b1e8',
        'actorfilms.csv',
        options := azure_storage.options_csv_get(header := 'true'))
    AS res (Actor TEXT,ActorID TEXT,Film TEXT,Year TEXT,Votes TEXT,Rating TEXT,FilmID TEXT);

    -- create a temporary table to store the mapping between the entryID and the id
    CREATE TEMP TABLE temp_id_map (entryID TEXT, id BIGINT);

    SELECT COUNT(*) INTO total_rows FROM temp_from_azure_storage;

    -- determine the first id for the Actor vertex
    SELECT id INTO oid FROM ag_label WHERE name='Actor';
    first_id_actor := ((oid << ENTRY_ID_BITS) | (1 & ENTRY_ID_MASK));

    -- determine the first id for the Film vertex
    SELECT id INTO oid FROM ag_label WHERE name='Film';
    first_id_film := ((oid << ENTRY_ID_BITS) | (1 & ENTRY_ID_MASK));

    WHILE num_offset < total_rows LOOP
        -- bulk insert the Actor data
        INSERT INTO "AgeFreighter"."Actor" (properties)
        SELECT format('{"id":"%s", "Actor":"%s"}', ActorID, Actor)::agtype
        FROM (
            SELECT DISTINCT ActorID, Actor
            FROM temp_from_azure_storage
            OFFSET num_offset LIMIT chunk_size
        ) AS distinct_actors;

        -- bulk insert the mapping between the entryID and the id
        INSERT INTO temp_id_map (entryID, id)
        SELECT distinct_actors.ActorID, first_id_actor + ROW_NUMBER() OVER () - 1
        FROM (
            SELECT DISTINCT ActorID
            FROM temp_from_azure_storage
            OFFSET num_offset LIMIT chunk_size
        ) AS distinct_actors;

        -- bulk insert the Film data
        INSERT INTO "AgeFreighter"."Film" (properties)
        SELECT format('{"id":"%s", "Film":"%s", "Year":"%s", "Votes":"%s", "Rating":"%s"}', FilmID, Film, Year, Votes, Rating)::agtype
        FROM (
            SELECT DISTINCT FilmID, Film, Year, Votes, Rating
            FROM temp_from_azure_storage
            OFFSET num_offset LIMIT chunk_size
        ) AS distinct_films;

        -- bulk insert the mapping between the entryID and the id
        INSERT INTO temp_id_map (entryID, id)
        SELECT distinct_films.FilmID, first_id_film + ROW_NUMBER() OVER () - 1
        FROM (
            SELECT DISTINCT FilmID
            FROM temp_from_azure_storage
            OFFSET num_offset LIMIT chunk_size
        ) AS distinct_films;

        -- bulk insert the edge data
        INSERT INTO "AgeFreighter"."ACTED_IN" (start_id, end_id)
        SELECT actor_map.id::agtype::graphid, film_map.id::agtype::graphid
        FROM (
            SELECT DISTINCT ActorID, FilmID
            FROM temp_from_azure_storage
            OFFSET num_offset LIMIT chunk_size
        ) AS af
        JOIN temp_id_map AS actor_map ON af.ActorID = actor_map.entryID
        JOIN temp_id_map AS film_map ON af.FilmID = film_map.entryID;

        num_offset := num_offset + chunk_size;
    END LOOP;

    CREATE INDEX ON "AgeFreighter"."Actor" USING GIN (properties);
    CREATE INDEX ON "AgeFreighter"."Actor" USING BTREE (id);

    CREATE INDEX ON "AgeFreighter"."Film" USING GIN (properties);
    CREATE INDEX ON "AgeFreighter"."Film" USING BTREE (id);

    CREATE INDEX ON "AgeFreighter"."ACTED_IN" USING BTREE (start_id);
    CREATE INDEX ON "AgeFreighter"."ACTED_IN" USING BTREE (end_id);

END;
$$ LANGUAGE plpgsql;

もちろん、ローカルのストレージと言っても実際にはネットワーク経由でアクセスするストレージなので、レイテンシーが大きいのは否めません。また、手元にあるファイルをAzure Storageにアップロードするための準備やアップロードそのものにも時間がかかるので、小さなファイルをロードするには不向きです。

テストした結果から推測すると、300万レコード、200MB程度のファイルあたりから、CSVFreighterクラスより時間がかからなくなるんじゃないかと思ってますが、そのテストはまた後日。

0.5.2 Release -AzureStorageFreighter-

  • AzureStorageFreighter class is used to load data from Azure Storage into the graph database. It’s totally different from other classes. The class works as follows:
    • If the argument, ‘subscription_id’ is not set, the class tries to find the Azure Subscription ID from your local environment using the ‘az’ command.
    • Creates an Azure Storage account and a blob container under the resource group where the PostgreSQL server runs in.
    • Enables the ‘azure_storage’ extension in the PostgreSQL server, if it’s not enabled.
    • Uploads the CSV file to the blob container.
    • Creates a UDF (User Defined Function) named ‘load_from_azure_storage’ in the PostgreSQL server. The UDF loads data from the Azure Storage into the graph database.
    • Executes the UDF.
  • The above process takes time to prepare for loading data, making it unsuitable for loading small files, but effective for loading large files. For instance, it takes under 3 seconds to load ‘actorfilms.csv’ after uploading.
  • However, please note that it is still in the early stages of implementation, so there is room for optimization and potential issues due to insufficient testing.
タイトルとURLをコピーしました