Azure Database for PostgreSQLでApache AGEを試してみる

Azure

数日前からAzure Database for PostgreSQLでのApache AGEのプレビューが開始されたので、先に書いたエントリーと同じようなことをやってみる。

Azure Database for PostgreSQLを設定する

AzureポータルからAzure Database for PostgreSQLをデプロイする。デプロイが完了したら、[設定]→[サーバーパラメーター]と進み、

検索フィルタに”extension”を入力すると、[azure.extensions]が表示されるので、”AGE”にチェックを入れて保存する。

次に、同じく[設定]→[接続]と進み、psqlコマンドをコピーし、CloudShellあるいはローカルのターミナルから接続する。

接続できたら、CREATE EXTENSIONを実行して、AGEを有効化する。

postgres=> CREATE EXTENSION IF NOT EXISTS age CASCADE;
CREATE EXTENSION

以上でサーバ側の準備は完了。

データをダウンロードする

今回は、映画のタイトルと出演俳優のデータをグラフ化するので、kaggleのIMDB Films By Actor For 10K ActorsからCSVファイルをダウンロードする(要登録)。

Pythonコードを保存する

以下のPythonコードと、ダウンロードしたCSVファイルを同じディレクトリに置いて”-i”(初期化)オプションを付けて実行する。

#!/usr/bin/env python3.11
# -*- coding: utf-8 -*-

# builtin modules
import argparse
import json
import os
import sys

# third party modules
while True:
    try:
        import pandas as pd
        break
    except:
        os.system(f"{sys.executable} -m pip install pandas")

while True:
    try:
        import psycopg as pg
        from psycopg.rows import dict_row, namedtuple_row
        break
    except:
        os.system(f"{sys.executable} -m pip install psycopg")

while True:
    try:
        from pyvis.network import Network
        break
    except ModuleNotFoundError:
        os.system(f"{sys.executable} -m pip install pyvis")

# exit with redden message
def exit_with_error(msg) -> None:
    sys.exit("\033[31m" + msg + "\033[0m")

# Constants
AG_GRAPH_NAME = "actorfilms"

# create graph database and add data
def createGraph(cur: pg.cursor) -> None:
    print("Creating graph database...")
    try:
        cur.execute(f"SELECT * FROM ag_graph WHERE name='{AG_GRAPH_NAME}'")
    except pg.errors.UndefinedTable:
        exit_with_error("Failed to create graph database.\nPlease check if AGE extension is installed.")
    row = cur.fetchone()
    if row is not None:
        cur.execute(f"SELECT drop_graph('{AG_GRAPH_NAME}', true)")
    cur.execute(f"SELECT create_graph('{AG_GRAPH_NAME}')")

    print("Adding Graph Data...")
    # read csv file downloaded from https://www.kaggle.com/datasets/darinhawley/imdb-films-by-actor-for-10k-actors
    df = pd.read_csv("actorfilms.csv", usecols = ["Actor", "ActorID", "Film"])
    df["Actor"] = df["Actor"].str.replace("'", r"\'")
    df["Film"] = df["Film"].str.replace("'", r"\'")

    ln = len(df)
    row_num = 0
    queries = []
    saved_actor = ''
    for row in df.itertuples():
        row_num += 1
        # the file is sorted by actor, so we can avoid creating the same actor multiple times
        if row.Actor == saved_actor:
            queries.append(f"""
                SELECT * FROM cypher('{AG_GRAPH_NAME}', $$ MERGE (m:Film {{title: '{row.Film}'}}) $$) AS (a agtype);
                SELECT * FROM cypher('{AG_GRAPH_NAME}', $$ MATCH (n:Actor {{name: '{row.Actor}'}}), (m:Film {{title: '{row.Film}'}}) CREATE (n)-[:ACTED_IN]->(m) $$) AS (a agtype);
            """)
        else:
            queries.append(f"""
                SELECT * FROM cypher('{AG_GRAPH_NAME}', $$ CREATE (n:Actor {{name: '{row.Actor}'}}) $$) AS (a agtype);
                SELECT * FROM cypher('{AG_GRAPH_NAME}', $$ MERGE (m:Film {{title: '{row.Film}'}}) $$) AS (a agtype);
                SELECT * FROM cypher('{AG_GRAPH_NAME}', $$ MATCH (n:Actor {{name: '{row.Actor}'}}), (m:Film {{title: '{row.Film}'}}) CREATE (n)-[:ACTED_IN]->(m) $$) AS (a agtype);
            """)
            saved_actor = row.Actor
        # create nodes and edges in batches of 1000
        if row_num % 1000 == 0:
            print(f"Adding data {row_num}/{ln}...")
            cur.execute(''.join(queries))
            queries = []
    if len(queries) > 0:
        print(f"Adding data {row_num}/{ln}...")
        cur.execute(''.join(queries))

# show graph
def showGraph(cur: pg.cursor, limit: int, film: str, actor: str) -> None:
    # If the cursor was used before in createGraph() function, the error does not occur.
    # But this is the first time the cursor is used, the error occurs because 'age.so' is not properly loaded.
    # See, https://github.com/apache/age/issues/41
    try:
        cur.execute(f"SELECT * FROM cypher('{AG_GRAPH_NAME}', $$ MATCH (n:Actor)-[r:ACTED_IN]->(m:Film) RETURN n,r,m LIMIT 1 $$) AS (n agtype, r agtype, m agtype);")
    except pg.errors.InternalError_:
        pass

    if actor is not None and actor != '':
        wk_actor = f" {{name: '{actor}'}}"
    else:
        wk_actor = ''
    if film is not None and film != '':
        wk_film = f" {{title: '{film}'}}"
    else:
        wk_film = ''

    query = f"""
        SELECT *
            FROM cypher('{AG_GRAPH_NAME}', $$
            MATCH (n:Actor{wk_actor})-[r:ACTED_IN]->(m:Film{wk_film})
            RETURN n,r,m
            LIMIT {limit}
            $$) AS (n agtype, r agtype, m agtype);
    """

    cur.execute(query)

    rows = cur.fetchall()
    if len(rows) == 0:
        exit_with_error("No records found.")

    net = Network(height = "1600px", width = "100%")
    for row in rows:
        temp_v = [json.loads(row.n[:-len("::vertex")]), json.loads(row.m[:-len("::vertex")])]
        temp_e = json.loads(row.r[:-len("::edge")])
        for v in temp_v:
            print(v)
            if v["label"] == "Actor":
                color = "#F79667"
                net.add_node(v["id"], label = v["properties"]["name"], color = color)
            elif v["label"] == "Film":
                color = "#57C7E3"
                net.add_node(v["id"], label = v["properties"]["title"], color = color)
            else:
                color = "blue"
        net.add_edge(temp_v[0]["id"], temp_v[1]["id"], label = temp_e["label"])

    net.toggle_physics(True)
    net.show(f"{AG_GRAPH_NAME}.html", notebook = False)

    # open the browser if the platform is macOS
    if sys.platform == "darwin":
        os.system(f"open {AG_GRAPH_NAME}.html")

def main() -> None:
    # pass initdb to create the graph database
    parser = argparse.ArgumentParser(description = "IMDB Graph")
    parser.add_argument("--initdb", "-i", action = 'store_true', help = "Initialize the database if specified")
    parser.add_argument("--limit", "-l", type = int, default = 1000, help = "Number of records to display in the graph")
    parser.add_argument("--film", "-f", default = '', help = "Film to search for")
    parser.add_argument("--actor", "-a", default  = '', help = "Actor to search for")
    args = parser.parse_args()

    # Reason why autocommit refers args.initdb is because of an error for first time connection
    # When autocommit is set to True, the connection is not transactional,
    # so we can avoid the error just with try-except-pass flow
    try:
        con = pg.connect(
            host = "your_server.postgres.database.azure.com",
            port = 5432,
            dbname = "postgres",
            user = "your_account",
            password = "your_password",
            options = "-c search_path=ag_catalog,'$user',public",
            autocommit = not args.initdb
        )
    except:
        exit_with_error("Failed to connect to the database.")

    cur = con.cursor(row_factory = namedtuple_row)

    # Run once to create the graph database
    if args.initdb:
        createGraph(cur)

    # Show the graph
    showGraph(cur, args.limit, args.film, args.actor)

    cur.close()
    con.close()

if __name__ == "__main__":
    main()

con = pg.connect()のパラメータはAzureポータルの[接続]を見ながら、適宜修正する。

  • cypherでCREATEではなくMERGEを使う理由は以前に書いた通り。
  • psycopgは自動的にprepared statementにしてくれるが、データ毎行でcur.execute()すると大変遅い。このコードではデータ1,000行に対応するクエリーをまとめて実行するようにしている。
  • より高速にするにはcypherを使わず、直接PostgreSQLのテーブルを操作する方法もあるが、邪道なのでここでは触れない。
  • コードのコメントに記載した通り、age.soの初回ロード時にエラーが発生する問題があり、showGraph()銭湯のtry-except-passでエラーを握りつぶすには、autocommitをTrueに設定する=トランザクションにしない必要がある。データの初回ロード時には既にage.soはロードされているのでshowGraph()は問題なく実行される。
  • より大量のデータを投入するのであれば、asyncにするなりマルチスレッドにするなり、工夫をする必要がある。

コードを実行する

で、実行。結構時間がかかるので、しばらく放置しておくと良い。

% chmod 755 imdb-graph.py
% time ./imdb-graph.py -i
Creating graph database...
Adding Graph Data...
Adding data 1/191873...
Adding data 2/191873...
Adding data 3/191873...
......

./imdb-graph.py -i  8.07s user 2.02s system 0% cpu 37:21.04 total

データ投入が完了すれば、そのままhtmlを出力するので、ブラウザで開く(macOSの場合、自動的にブラウザが起動する)。

初回ロード時に生成されるグラフは以下のようになる。actorfilms.html

初回のロード後、以下のようにスクリプトを実行すると、

./imdb-graph.py -f 'Keanu Reeves'

このようなグラフが得られる。actorfilms_Keanu.html

より高度なクエリーは、Apache AGEのドキュメントが参考になる。

注:性能向上のためにインデックスを操作できないか調べてみたが、どうやらデフォルトで張られている様子。

postgres=# \d+ ag_label
                                    テーブル"ag_catalog.ag_label"
    列    |   タイプ   | 照合順序 | Null 値を許容 | デフォルト | ストレージ | 圧縮 | 統計目標 | 説明 
----------+------------+----------+---------------+------------+------------+------+----------+------
 name     | name       |          | not null      |            | plain      |      |          | 
 graph    | oid        |          | not null      |            | plain      |      |          | 
 id       | label_id   |          |               |            | plain      |      |          | 
 kind     | label_kind |          |               |            | plain      |      |          | 
 relation | regclass   |          | not null      |            | plain      |      |          | 
 seq_name | name       |          | not null      |            | plain      |      |          | 
インデックス:
    "ag_label_graph_oid_index" UNIQUE, btree (graph, id)
    "ag_label_name_graph_index" UNIQUE, btree (name, graph)
    "ag_label_relation_index" UNIQUE, btree (relation)
    "ag_label_seq_name_graph_index" UNIQUE, btree (seq_name, graph)
外部キー制約:
    "fk_graph_oid" FOREIGN KEY (graph) REFERENCES ag_graph(graphid)
アクセスメソッド: heap