AGEFreighterそのものには手を加えていないけれど、tests/にneo2mcsv.pyというツールを追加した。何をするものかというと、Neo4jにあるグラフを、AGEFreighterのMultiCSVFreighterクラス用のCSVとしてexportして、それを読み込むための’importer.py’というスクリプトを生成する。
Neo4jFreighterは、Neo4jとPostgreSQL双方とやり取りするためにあまり動作が速いとは言えず、一度ダンプしちゃった方がトータルでは時間がかからない。そして、Neo4jのAPOCでexportすると、MultiCSVFreighterで読み込めるCSVに変換するのがかなり面倒なことが分かったので、こういうツールを書いた、と。
cd tests
chmod 755 neo2mcsv.py
./neo2mcsv.py --help
usage: neo2mcsv.py [-h] [--uri URI] [--user USER] [--password PASSWORD] [--database DATABASE] [--trial] [--chunk-size CHUNK_SIZE] [--progress] [--graphname GRAPHNAME] output_dir
Export data from Neo4j to CSV
positional arguments:
output_dir Output directory
optional arguments:
-h, --help show this help message and exit
--uri URI The URI of the Neo4j database
--user USER The username of the Neo4j database
--password PASSWORD The password of the Neo4j database
--database DATABASE The database of the Neo4j database
--trial Extract only 100 edges per relationship type
--chunk-size CHUNK_SIZE
Chunk size
--progress Show progress
--graphname GRAPHNAME
Name of the graph to be embedded in 'importer.py'
例えば、自分の手元ではDocker DesktopでNeo4jが動いているので、’exported’というディレクトリを指定すれば、ここにCSVファイルと’importer.py’を出力する。
./neo2mcsv.py exported
INFO:root:Exporting 8679 nodes for label 'Customer'
INFO:root:Exporting 20000 edges for relationship type 'BOUGHT'
INFO:root:Exported 8679 records to /Users/rifujita/ownCloud/bin/agefreighter/tests/exported/customer.csv
INFO:root:Exporting 1000 nodes for label 'Product'
INFO:root:Exported 1000 records to /Users/rifujita/ownCloud/bin/agefreighter/tests/exported/product.csv
INFO:root:Exporting 2 nodes for label 'Person'
INFO:root:Exported 2 records to /Users/rifujita/ownCloud/bin/agefreighter/tests/exported/person.csv
INFO:root:Exported 20000 records to /Users/rifujita/ownCloud/bin/agefreighter/tests/exported/bought.csv
INFO:root:Exporting 1 edges for relationship type 'KNOWS'
INFO:root:Exported 1 records to /Users/rifujita/ownCloud/bin/agefreighter/tests/exported/knows.csv
importer.py file under /Users/rifujita/ownCloud/bin/agefreighter/tests/exported created successfully
で、’importer.py’は出力したCSVファイルと、ノードのラベル、エッジのタイプを埋め込んであるので、
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import asyncio
import os
from agefreighter import Factory
async def main():
instance = Factory.create_instance("MultiCSVFreighter")
await instance.connect(
dsn=os.environ["PG_CONNECTION_STRING"],
max_connections=64,
min_connections=4,
)
await instance.load(
graph_name="FROM_NEO4J",
vertex_csv_paths=[
"/Users/rifujita/ownCloud/bin/agefreighter/tests/exported/customer.csv",
"/Users/rifujita/ownCloud/bin/agefreighter/tests/exported/product.csv",
"/Users/rifujita/ownCloud/bin/agefreighter/tests/exported/person.csv"
],
vertex_labels=["Customer", "Product", "Person"],
edge_csv_paths=[
"/Users/rifujita/ownCloud/bin/agefreighter/tests/exported/bought.csv",
"/Users/rifujita/ownCloud/bin/agefreighter/tests/exported/knows.csv"
],
edge_types=["BOUGHT", "KNOWS"],
use_copy=True,
drop_graph=True,
create_graph=True,
progress=True,
)
if __name__ == "__main__":
import asyncio
import sys
if sys.platform == "win32":
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
asyncio.run(main())
あとは実行するだけ。
chmod 755 exported/importer.py
./exported/importer.py
一応、非同期・マルチプロセッシングに書いたので、そこそこ速度は出るんじゃないかと。