いったんクラスを書いてみたのだけれど汎用性が無いのと遅いのがダメなので、CSVから高速に突っ込めるように書き直して、テスト中。同じインスタンスサイズでテストした結果で比較すると、42秒だったのが4.6秒になったので9倍ぐらい速くなったか。最初のは37分かかってたから、480倍も速い!(最初の「一件ずつ投入する」のが遅すぎるだけ)
AGEというクラス名だとあちこち被るので、いったんAGELoaderとしていたら、それはそれでpsycopgに被る(たぶん、PostgreSQLでLOAD age.soするやつ)ので、「大量の荷物を空輸する」Freighterの意味で、AGEFreighterにしといた。ググっても無かったので名前空間的にも良いかと。
TestPyPIにアップロードしたのだけれどuvを使ったらえらい簡単だった。TestPyPIでAPIトークンを作成してenvに入れたあと、
uv build
uv publish --publish-url https://test.pypi.org/legacy/ --token $TEST_PYPI_TOKEN
するだけ。ディレクトリレイアウトなど、build段階で教えてくれるようになってる親切設計だったw
テストする際は、
mkdir test
cd test
uv vent
. ./.venv/bin/activate
uv pip install -i https://test.pypi.org/simple/ --extra-index-url https://pypi.org/simple/ agefreighter
本番PyPIで公開したら、下記だけで済むはず。
uv pip install agefreighter
クラスを書いた理由(技術背景を知りたい人向け)
さて、なんでこんなのを書いているかというと、AGEにはload_labels_from_file()とload_edges_from_file()というユーティリティが用意されていて、PostgreSQLがローカルストレージを読める環境であれば使える。つまり、Linuxの上でPostgreSQLが動いていてそのLinuxにログインが出来て、そこにファイルが置けるパターン限定。実際、Cで書かれた中身を見ると、ファイルポインタ開いてCSVとしてパースして、ってやってるので。
ただ、これだとリモートでは動かない。例えば、psqlでCOPYするならデータストリームとして渡しているから手元にあるファイルをかなり高速に入れられる。でも、COPYコマンドでCypherクエリは不可なので、vertices / edge relationに直接COPYするしか無い。ところが、verticesはともかく、edge relationは内部的なid(graphid型)でstart_id / end_idを保持するため、以下のような作業フローが必要となる。
- verticesにコピーする。ただし、propertiesに相当する部分はJSON同様の書式になっている必要がある。
\COPY vertices.csv TO graph_name."Type" WITH CSV
- 投入されたverticesのidを引き、propertiesに含まれてる一意に決められるデータと紐付けする(例:name)
SELECT id, properties->'"name"' as name FROM graph_name."Type";
- 得られたid / nameのマッピングをedgeのデータに適用して、edges.csvを作成する。
- edgesにコピーする。
\COPY edges.csv TO graph_name."Relation" WITH CSV
AGEFreighterにはdirect_loadingというフラグを設けて、FalseにするとCypherクエリで、Trueにすると上述のCOPY「みたいな」動作でグラフデータを投入するようにしていて、当然のことながらdirect_loading=Trueの方がかなり高速になります。
All tests are run on an instance of Azure Database for PostgreSQL Flexible Server, Standard_D32ds_v4 (32 vcpus, 128 GiB memory), with 512 GiB / 2300 iops storage.
test1 : time to loadFromSingleCSV, 50.37s, chunk_size: 64, direct_loading: False
test2 : time to loadFromSingleCSV, 5.45s, chunk_size: 64, direct_loading: True
test3 : time to loadFromCSVs, 10.52s, chunk_size: 64, direct_loading: False
test4 : time to loadFromCSVs, 4.68s, chunk_size: 64, direct_loading: True
test1 : time to loadFromSingleCSV, 12.17s, chunk_size: 96, direct_loading: False
test2 : time to loadFromSingleCSV, 5.45s, chunk_size: 96, direct_loading: True
test3 : time to loadFromCSVs, 10.48s, chunk_size: 96, direct_loading: False
test4 : time to loadFromCSVs, 4.59s, chunk_size: 96, direct_loading: True
test1 : time to loadFromSingleCSV, 23.78s, chunk_size: 128, direct_loading: False
test2 : time to loadFromSingleCSV, 5.46s, chunk_size: 128, direct_loading: True
test3 : time to loadFromCSVs, 10.55s, chunk_size: 128, direct_loading: False
test4 : time to loadFromCSVs, 4.69s, chunk_size: 128, direct_loading: True
psycopgにはcopyがあったりpipelinedをサポートしてたりはするんですが、試してみたところほとんど変わらなかったので、とりあえず今の実装、async pool connectionとVALUESを大量に並べる、にしてあります。これだけでもpsycopgはprepared statementにしてくれます。なお、server side cursorはpsycopg_poolではまだサポートしてないので、注意。