Azure Database for PostgreSQLでApache AGEを試してみる(続き)

未分類

上記エントリーをまとめて、書き直したコードが以下。

#!/usr/bin/env python3.11
# -*- coding: utf-8 -*-

# builtin modules
import argparse
import json
import os
import resource
import sys
import threading
import time

# third party modules
while True:
    try:
        import pandas as pd
        break
    except:
        os.system(f"{sys.executable} -m pip install pandas")

while True:
    try:
        import psycopg as pg
        from psycopg.rows import dict_row, namedtuple_row
        break
    except:
        os.system(f"{sys.executable} -m pip install psycopg")

while True:
    try:
        import psycopg_pool
        break
    except:
        os.system(f"{sys.executable} -m pip install psycopg-pool")

while True:
    try:
        from pyvis.network import Network
        break
    except ModuleNotFoundError:
        os.system(f"{sys.executable} -m pip install pyvis")

# Constants
AG_GRAPH_NAME = "actorfilms"

# Global Variables
connection_string = """
    host=your_server.postgres.database.azure.com
    port=5432
    dbname=postgres
    user=your_account
    password=admin_password
    options='-c search_path=ag_catalog,\"$user\",public'
"""

# See, https://docs.python.org/ja/3/library/resource.html
resource.setrlimit(resource.RLIMIT_NOFILE, (8192, 9223372036854775807))
# See, https://learn.microsoft.com/en-us/azure/postgresql/flexible-server/concepts-limits
# If number of connections exceeds 8192, you will get the error.
pg_pool = psycopg_pool.ConnectionPool(connection_string, max_size = 1703, min_size = 128)

# exit with redden message
def exit_with_error(msg) -> None:
    sys.exit("\033[31m" + msg + "\033[0m")

# initialize graph database
def initGraph() -> None:
    print("Creating graph database...")
    with pg_pool.connection() as conn:
        with conn.cursor() as cur:
            try:
                cur.execute(f"SELECT * FROM ag_graph WHERE name='{AG_GRAPH_NAME}'")
            except pg.errors.UndefinedTable:
                exit_with_error("Failed to create graph database.\nPlease check if AGE extension is installed.")
            row = cur.fetchone()
            if row is not None:
                cur.execute(f"SELECT drop_graph('{AG_GRAPH_NAME}', true)")
            cur.execute(f"SELECT create_graph('{AG_GRAPH_NAME}')")

# create all vertices at once
def createVertices(df: pd.DataFrame) -> None:
    with pg_pool.connection() as conn:
        with conn.cursor() as cur:
            df["Actor"] = df["Actor"].str.replace("'", r"\'")
            df["Film"] = df["Film"].str.replace("'", r"\'")

            actors = df["Actor"].unique()
            films = df["Film"].unique()

            print(f"Creating {len(actors)} vertices...")
            cur.execute(''.join([f"SELECT * FROM cypher('{AG_GRAPH_NAME}', $$ CREATE (n:Actor {{name: '{actor}'}}) $$) AS (a agtype);" for actor in actors]))
            print(f"Creating {len(films)} vertices...")
            cur.execute(''.join([f"SELECT * FROM cypher('{AG_GRAPH_NAME}', $$ CREATE (m:Film {{title: '{film}'}}) $$) AS (a agtype);" for film in films]))

            print("Creating indices...")
            cur.execute(f'CREATE INDEX ON {AG_GRAPH_NAME}."Actor" USING GIN (properties);')
            cur.execute(f'CREATE INDEX ON {AG_GRAPH_NAME}."Actor" (id);')
            cur.execute(f'CREATE INDEX ON {AG_GRAPH_NAME}."Film" USING GIN (properties);')
            cur.execute(f'CREATE INDEX ON {AG_GRAPH_NAME}."Film" (id);')

# create edges
def createEdges(df: pd.DataFrame) -> None:
    print("Creating edges...")
    threads = []
    threadId = 1

    ln = len(df)
    row_num = 1
    films = []
    saved_actor = df["Actor"][0]
    for row in df.itertuples():
        if row.Actor == saved_actor:
            films.append(row.Film)
        else:
            t = threading.Thread(target = createEdgesPerActor, args = (saved_actor, films, row_num, ln, threadId))
            t.start()
            threads.append(t)
            threadId += 1
            saved_actor = row.Actor
            films = [row.Film]
        row_num += 1
    if len(films) > 0:
        t = threading.Thread(target = createEdgesPerActor, args = (saved_actor, films, row_num, ln, threadId))
        t.start()
        threads.append(t)

    for i in threads:
        i.join()

    with pg_pool.connection() as conn:
        with conn.cursor() as cur:
            # added 2024.10.28
            cur.execute(f'CREATE INDEX ON {AG_GRAPH_NAME}."ACTED_IN" (start_id);')
            cur.execute(f'CREATE INDEX ON {AG_GRAPH_NAME}."ACTED_IN" (end_id);')

            cur.execute("COMMIT;")

# create edges
def createEdgesPerActor(actor: str, films: tuple, row_num: int, ln: int, threadId: int) -> None:
    query_films = ', '.join([f"(m{idx}:Film {{title: '{film}'}})" for idx, film in enumerate(films)])
    query_rels = ', '.join([f"(n)-[:ACTED_IN]->(m{idx})" for idx, _ in enumerate(films)])
    query = f"SELECT * FROM cypher('{AG_GRAPH_NAME}', $$ MATCH (n:Actor {{name: '{actor}'}}), {query_films} CREATE {query_rels} $$) AS (a agtype);"
    while True:
        try:
            with pg_pool.connection() as conn:
                with conn.cursor() as cur:
                    cur.execute(query)
                    break
        except:
            pass
    print(f"{len(films)} films for '{actor}' created, {row_num} / {ln} processed by thread {threadId}.{' '*10}", end = '\r')

# show graph
def showGraph(limit: int, film: str, actor: str) -> None:
    if actor is not None and actor != '':
        wk_actor = f" {{name: '{actor}'}}"
    else:
        wk_actor = ''
    if film is not None and film != '':
        wk_film = f" {{title: '{film}'}}"
    else:
        wk_film = ''

    query = f"""
        SELECT *
            FROM cypher('{AG_GRAPH_NAME}', $$
            MATCH (n:Actor{wk_actor})-[r:ACTED_IN]->(m:Film{wk_film})
            RETURN n,r,m
            LIMIT {limit}
            $$) AS (n agtype, r agtype, m agtype);
    """

    with pg_pool.connection() as conn:
        with conn.cursor(row_factory = namedtuple_row) as cur:
            cur.execute(query)
            rows = cur.fetchall()
            if len(rows) == 0:
                exit_with_error("No records found.")

    net = Network(height = "1600px", width = "100%")
    for row in rows:
        temp_v = [json.loads(row.n[:-len("::vertex")]), json.loads(row.m[:-len("::vertex")])]
        temp_e = json.loads(row.r[:-len("::edge")])
        for v in temp_v:
            if v["label"] == "Actor":
                color = "#F79667"
                net.add_node(v["id"], label = v["properties"]["name"], color = color)
            elif v["label"] == "Film":
                color = "#57C7E3"
                net.add_node(v["id"], label = v["properties"]["title"], color = color)
            else:
                color = "blue"
        net.add_edge(temp_v[0]["id"], temp_v[1]["id"], label = temp_e["label"])

    net.toggle_physics(True)
    net.show(f"{AG_GRAPH_NAME}.html", notebook = False)

    # open the browser if the platform is macOS
    if sys.platform == "darwin":
        os.system(f"open {AG_GRAPH_NAME}.html")

def main() -> None:
    # pass initdb to create the graph database
    parser = argparse.ArgumentParser(description = "IMDB Graph")
    parser.add_argument("--initdb", "-i", action = 'store_true', help = "Initialize the database if specified")
    parser.add_argument("--limit", "-l", type = int, default = 1000, help = "Number of records to display in the graph")
    parser.add_argument("--film", "-f", default = '', help = "Film to search for")
    parser.add_argument("--actor", "-a", default  = '', help = "Actor to search for")
    args = parser.parse_args()

    pg_pool.open()
    pg_pool.wait(timeout = 30.0)

    # Run once to create the graph database
    if args.initdb:
        # read csv file downloaded from https://www.kaggle.com/datasets/darinhawley/imdb-films-by-actor-for-10k-actors
        df = pd.read_csv(f"{AG_GRAPH_NAME}.csv", usecols = ["Actor", "ActorID", "Film"])
        initGraph()
        createVertices(df)
        createEdges(df)

    # Show the graph
    showGraph(args.limit, args.film, args.actor)

    pg_pool.close()

if __name__ == "__main__":
    main()
./imdb-graph.py -i  14.87s user 27.34s system 25% cpu 2:44.01 total

最初は、cpu 37:21.04 totalだったのがcpu 2:44.01 totalなので、13.7倍ぐらい速くなった

マルチスレッド化の前はサーバ側のCPUは最大でも25%ぐらいしか使えていなかったが(下のグラフの12:30頃のピーク)、これで使い切れるようになったのでこれ以上はスケールアップしないと無理なはず。

改修ポイントは以下。

  • 56〜60行:クライアントであるmacOSのファイルデスクリプタの上限を上げて、コネクションプールで張る接続数の上限までコネクションを生成できるようにした。
  • 95〜99行:Actor / FilmのpropertiesにGIN(汎用転置)インデックスを、idにb-treeインデックスを張るようにした。MATCH (n:Actor {name: ‘actor_name’}), (m:Film {title: ‘film_title’}) CREATE (n)-[:ACTED_IN]->(m)というcypherクエリでpropertiesのSeqScan(nameを探す)とCREATEする際のidのSeqScan(start_idとend_idを探す)を避けるため。
  • 115〜117行:threadingで、Actor毎のFilmへのedgeを並列に投入するようにした。139行のcreateEdgesPerActor()が本体。
  • 133〜136行:ACTED_INのstart_id / end_idにb-treeインデックスを張るようにした。これは後の検索処理でSeqScanを避けるため。
  • 143〜148行:56行で用意したコネクションプールを利用して、並列にedgeを投入する。
  • 211〜212行:pool.open()と、poolのmin_sizeになるまでpool.wait()する。