これ、なかなか大変でした。一番書きたかったやつです。ツイートしたりあちこちで書いたりしているんだけど、このAzureStorageFreighterというクラスが本命でして、なんでこんなものを書かないとならないかという話を少しだけ。
Apache AGEとNeo4j
CypherクエリにはUNWINDという句があります。以下のようにリストを展開して、代入することが出来る機能で、要するにPythonのリスト内包表記と同じです。
{
"events" : [ {
"year" : 2014,
"id" : 1
}, {
"year" : 2014,
"id" : 2
} ]
}
UNWIND $events AS event
MERGE (y:Year {year: event.year})
MERGE (y)<-[:IN]-(e:Event {id: event.id})
RETURN e.id AS x ORDER BY x
Apache AGEにももちろんUNWINDは実装されているので、例えばpsqlで接続してCypherクエリを直接叩くのであれば、UNWINDを利用できます。
一方で、これをPythonから利用するとなると大きな問題があります。PythonでPostgreSQLに接続するには、psycopgパッケージを用いるわけですが、psycopgのparameterized queryの機能は、以下のような書式をとります。
id = 10
created_at = datetime.date(2020, 11, 18)
last_name = "O'Reilly"
cur.execute("""
INSERT INTO some_table (id, created_at, last_name)
VALUES (%s, %s, %s);
""",
(id, created_at, last_name))
ここで渡している変数は全てimmutableであって、タプル等のmutableを渡すことができません。以下のように書いてもエラーになるだけです。
cur.execute("""
SELECT * FROM cypher('graph_name' $$
UNWIND $events AS event
MERGE (y:Year {year: event.year})
MERGE (y)<-[:IN]-(e:Event {id: event.id})
RETURN e.id AS x ORDER BY x
$$...
""",
(events))
色々試してみましたが、そもそもCypherクエリをパースしてPostgreSQLのテーブルにINSERTするAGEの仕組み上、UNWINDが出来てもあまり速くなるとは思えず、諦めました。
AzureにおけるApache AGEの問題
Apache AGEには、load_labels_from_file() / load_edges_from_file()というユーザ定義関数があり、事前に定義された書式のCSVファイルからグラフをロードする機能があります。この機能の実装(age/src/backend/utils/load/ag_load_labels.c)を見ると、はい、ファイルポインタですよね…。
int create_labels_from_csv_file(char *file_path,
char *graph_name,
Oid graph_oid,
char *label_name,
int label_id,
bool id_field_exists,
bool load_as_agtype)
{
FILE *fp;
struct csv_parser p;
char buf[1024];
size_t bytes_read;
unsigned char options = 0;
csv_vertex_reader cr;
char *label_seq_name;
Oid temp_table_relid;
つまり、これらのUDFを実行しているマシンのローカルファイルにしかアクセス出来ない、ということです。違う言い方をすると、PostgreSQLが動いているマシンにログインして実行することしか出来ません。PaaSではちょっと無理、ということになります。
そこで、AGEFreighterの開発に着手して、「手元にあるファイル」をどうにか高速にApache AGEにロード出来ないかという試行錯誤してきました。
azure_storage Extension
さて、Azure Database for PostgreSQLには(より正確には、Azure Cosmos DB for PostgreSQL「にも」)、azure_storageというエクステンションがあって、これを使うとAzure Storage AccountのBlobコンテナにあるファイルから、PostgreSQLにデータを読み込むことが可能になります。
SELECT * FROM azure_storage.blob_get
('<storage_account>'
,'<blob_container>'
,'events.csv'
, NULL::events)
LIMIT 5;
つまり、あたかもローカルのファイルにアクセスしているかのように、ファイルからの読み込みが可能になります。なので、下記のようなユーザ定義関数を作って、Azure StorageからCSVを読み込んでテンポラリテーブルにぶち込み、そこからAGEのテーブルに良い感じにぶち込めば良いわけです。それをやっているのが、今回のAzureStorageFreighterというクラスです。
CREATE OR REPLACE FUNCTION load_graph_from_azure_storage()
RETURNS VOID AS $$
DECLARE
chunk RECORD;
chunk_size BIGINT := 100000;
num_offset BIGINT := 0;
total_rows BIGINT;
ENTRY_ID_BITS INTEGER := 32 + 16;
ENTRY_ID_MASK BIGINT := 0x0000FFFFFFFFFFFF;
oid BIGINT;
first_id_actor BIGINT;
first_id_film BIGINT;
BEGIN
SET search_path = ag_catalog, "$user", public;
-- create a temporary table to store the data from the Azure Storage
CREATE TEMP TABLE temp_from_azure_storage (
Actor TEXT,
ActorID TEXT,
Film TEXT,
Year TEXT,
Votes TEXT,
Rating TEXT,
FilmID TEXT
);
-- bulk load from the Azure Storage into the temporary table
INSERT INTO temp_from_azure_storage
SELECT *
FROM azure_storage.blob_get(
'saagefreighter6894b1e8',
'bcagefreighter6894b1e8',
'actorfilms.csv',
options := azure_storage.options_csv_get(header := 'true'))
AS res (Actor TEXT,ActorID TEXT,Film TEXT,Year TEXT,Votes TEXT,Rating TEXT,FilmID TEXT);
-- create a temporary table to store the mapping between the entryID and the id
CREATE TEMP TABLE temp_id_map (entryID TEXT, id BIGINT);
SELECT COUNT(*) INTO total_rows FROM temp_from_azure_storage;
-- determine the first id for the Actor vertex
SELECT id INTO oid FROM ag_label WHERE name='Actor';
first_id_actor := ((oid << ENTRY_ID_BITS) | (1 & ENTRY_ID_MASK));
-- determine the first id for the Film vertex
SELECT id INTO oid FROM ag_label WHERE name='Film';
first_id_film := ((oid << ENTRY_ID_BITS) | (1 & ENTRY_ID_MASK));
WHILE num_offset < total_rows LOOP
-- bulk insert the Actor data
INSERT INTO "AgeFreighter"."Actor" (properties)
SELECT format('{"id":"%s", "Actor":"%s"}', ActorID, Actor)::agtype
FROM (
SELECT DISTINCT ActorID, Actor
FROM temp_from_azure_storage
OFFSET num_offset LIMIT chunk_size
) AS distinct_actors;
-- bulk insert the mapping between the entryID and the id
INSERT INTO temp_id_map (entryID, id)
SELECT distinct_actors.ActorID, first_id_actor + ROW_NUMBER() OVER () - 1
FROM (
SELECT DISTINCT ActorID
FROM temp_from_azure_storage
OFFSET num_offset LIMIT chunk_size
) AS distinct_actors;
-- bulk insert the Film data
INSERT INTO "AgeFreighter"."Film" (properties)
SELECT format('{"id":"%s", "Film":"%s", "Year":"%s", "Votes":"%s", "Rating":"%s"}', FilmID, Film, Year, Votes, Rating)::agtype
FROM (
SELECT DISTINCT FilmID, Film, Year, Votes, Rating
FROM temp_from_azure_storage
OFFSET num_offset LIMIT chunk_size
) AS distinct_films;
-- bulk insert the mapping between the entryID and the id
INSERT INTO temp_id_map (entryID, id)
SELECT distinct_films.FilmID, first_id_film + ROW_NUMBER() OVER () - 1
FROM (
SELECT DISTINCT FilmID
FROM temp_from_azure_storage
OFFSET num_offset LIMIT chunk_size
) AS distinct_films;
-- bulk insert the edge data
INSERT INTO "AgeFreighter"."ACTED_IN" (start_id, end_id)
SELECT actor_map.id::agtype::graphid, film_map.id::agtype::graphid
FROM (
SELECT DISTINCT ActorID, FilmID
FROM temp_from_azure_storage
OFFSET num_offset LIMIT chunk_size
) AS af
JOIN temp_id_map AS actor_map ON af.ActorID = actor_map.entryID
JOIN temp_id_map AS film_map ON af.FilmID = film_map.entryID;
num_offset := num_offset + chunk_size;
END LOOP;
CREATE INDEX ON "AgeFreighter"."Actor" USING GIN (properties);
CREATE INDEX ON "AgeFreighter"."Actor" USING BTREE (id);
CREATE INDEX ON "AgeFreighter"."Film" USING GIN (properties);
CREATE INDEX ON "AgeFreighter"."Film" USING BTREE (id);
CREATE INDEX ON "AgeFreighter"."ACTED_IN" USING BTREE (start_id);
CREATE INDEX ON "AgeFreighter"."ACTED_IN" USING BTREE (end_id);
END;
$$ LANGUAGE plpgsql;
もちろん、ローカルのストレージと言っても実際にはネットワーク経由でアクセスするストレージなので、レイテンシーが大きいのは否めません。また、手元にあるファイルをAzure Storageにアップロードするための準備やアップロードそのものにも時間がかかるので、小さなファイルをロードするには不向きです。
テストした結果から推測すると、300万レコード、200MB程度のファイルあたりから、CSVFreighterクラスより時間がかからなくなるんじゃないかと思ってますが、そのテストはまた後日。
0.5.2 Release -AzureStorageFreighter-
- AzureStorageFreighter class is used to load data from Azure Storage into the graph database. It’s totally different from other classes. The class works as follows:
- If the argument, ‘subscription_id’ is not set, the class tries to find the Azure Subscription ID from your local environment using the ‘az’ command.
- Creates an Azure Storage account and a blob container under the resource group where the PostgreSQL server runs in.
- Enables the ‘azure_storage’ extension in the PostgreSQL server, if it’s not enabled.
- Uploads the CSV file to the blob container.
- Creates a UDF (User Defined Function) named ‘load_from_azure_storage’ in the PostgreSQL server. The UDF loads data from the Azure Storage into the graph database.
- Executes the UDF.
- The above process takes time to prepare for loading data, making it unsuitable for loading small files, but effective for loading large files. For instance, it takes under 3 seconds to load ‘actorfilms.csv’ after uploading.
- However, please note that it is still in the early stages of implementation, so there is room for optimization and potential issues due to insufficient testing.