COPYしたい
先日来、Apache AGEにどうにかしてデカいデータを流し込もうとしている途中で色々試しているのだけれど、やはりCypher / SQLではなく、COPYプロトコルでぶっ込むしかないかぁ、となり。
さて、Vertexはどういうテーブルに格納されるかというと、
\d cities_countries."Country"
テーブル"cities_countries.Country"
列 | タイプ | 照合順序 | Null 値を許容 | デフォルト
------------+---------+----------+---------------+---------------------------------------------------------------------------------------------------------------------------------
id | graphid | | not null | _graphid(_label_id('cities_countries'::name, 'Country'::name)::integer, nextval('cities_countries."Country_id_seq"'::regclass))
properties | agtype | | not null | agtype_build_map()
インデックス:
"Country_id_idx" btree (id)
"Country_properties_idx" gin (properties)
継承元: cities_countries._ag_label_vertex
まあidもpropertiesもnot null制約が付いてるわけです。なので、idとpropertiesを用意してあげないと、COPYを受け付けてくれない、と。
さて、ここに一つだけvertexを作成すると、
postgres=# SELECT id FROM cities_countries."Country";
id
-----------------
844424930131969
(1 行)
見ての通り、vertexを一つしか作成していないのに、何やら豪勢なidが付与されます。
さらに、vertexを格納するテーブルを作って、1件だけデータを入れると、以下のようになります。
postgres=# SELECT create_vlabel('cities_countries', 'City');
NOTICE: VLabel "City" has been created
create_vlabel
---------------
(1 行)
postgres=# INSERT INTO cities_countries."City" (properties) VALUES ('{"id":"123"}');
INSERT 0 1
postgres=# SELECT id FROM cities_countries."City";
id
------------------
1125899906842625
(1 行)
844424930131969と、1125899906842625、ずいぶん飛んだなぁ…。log2(1125899906842625 – 844424930131969) = 48、48ビット、シフトしてるな、これ。
AGEのコードを見てみる
PostgreSQLのエクステンションなので、AGEももちろんシーケンスを利用してます。ということは、どこかでnextval()を叩いてるだろ、という当たりが付きます。git cloneして
% grep -r nextval src/*
src/backend/utils/load/ag_load_edges.c: entry_id = nextval_internal(cr->label_seq_relid, true);
src/backend/utils/load/ag_load_labels.c: entry_id = nextval_internal(cr->label_seq_relid, true);
src/backend/utils/load/ag_load_labels.c: * We cant use currval because it will error out if nextval was
src/backend/utils/load/ag_load_labels.c: cr.curr_seq_num = nextval_internal(cr.label_seq_relid, true);
src/backend/utils/graph_generation.c:int64 get_nextval_internal(graph_cache_data* graph_cache,
src/backend/utils/graph_generation.c:int64 get_nextval_internal(graph_cache_data* graph_cache,
src/backend/utils/graph_generation.c: return nextval_internal(obj_seq_id, true);
src/backend/utils/graph_generation.c: vid = nextval_internal(vtx_seq_id, true);
src/backend/utils/graph_generation.c: eid = nextval_internal(edge_seq_id, true);
src/backend/utils/graph_generation.c: int64 start_node_index, end_node_index, nextval;
src/backend/utils/graph_generation.c: nextval = get_nextval_internal(graph_cache, edge_cache);
src/backend/utils/graph_generation.c: object_graph_id = make_graphid(edge_label_id, nextval);
src/backend/commands/label_commands.c: List *nextval_func_name;
src/backend/commands/label_commands.c: List *nextval_func_args;
src/backend/commands/label_commands.c: FuncCall *nextval_func;
src/backend/commands/label_commands.c: nextval_func_name = SystemFuncName("nextval");
src/backend/commands/label_commands.c: nextval_func_args = list_make1(regclass_cast);
src/backend/commands/label_commands.c: nextval_func = makeFuncCall(nextval_func_name, nextval_func_args, COERCE_SQL_SYNTAX, -1);
src/backend/commands/label_commands.c: graphid_func_args = list_make2(label_id_func, nextval_func);
src/backend/commands/label_commands.c: label_id = (int32) nextval_internal(seq_id, true);
graph_generation.cを開いてみると、まあこんな感じに書かれてます。最後の方を見ると、graph_oid, vtx_name_str, object_graph_id, propsを引数にとるinsert_vertex_simple()が呼ばれてると。
vtx_label_id = get_label_id(vtx_name_str, graph_oid);
edge_label_id = get_label_id(edge_name_str, graph_oid);
graph_cache = search_graph_name_cache(graph_name_str);
vertex_cache = search_label_name_graph_cache(vtx_name_str, graph_oid);
edge_cache = search_label_name_graph_cache(edge_name_str, graph_oid);
nsp_id = graph_cache->namespace;
vtx_seq_name = &(vertex_cache->seq_name);
vtx_seq_name_str = NameStr(*vtx_seq_name);
edge_seq_name = &(edge_cache->seq_name);
edge_seq_name_str = NameStr(*edge_seq_name);
vtx_seq_id = get_relname_relid(vtx_seq_name_str, nsp_id);
edge_seq_id = get_relname_relid(edge_seq_name_str, nsp_id);
props = create_empty_agtype();
/* Creating vertices*/
for (i=(int64)1; i<=no_vertices; i++)
{
vid = nextval_internal(vtx_seq_id, true);
object_graph_id = make_graphid(vtx_label_id, vid);
insert_vertex_simple(graph_oid, vtx_name_str, object_graph_id, props);
}
graph_oidはPostgreSQLでおなじみのやつ。
postgres=# SELECT oid,relname FROM pg_class;
oid | relname
-------+------------------------------------------------
16389 | ag_graph_graphid_index
16390 | ag_graph_name_index
16391 | ag_graph_namespace_index
16386 | ag_graph
16398 | ag_label
......
vtx_name_strは、vertexを格納するテーブル名で、propsはvertexのpropertiesだろうから、object_graph_idが、実際に割り当てられるidだろう、と。で、これは前の行のmake_graphid()なので、grepしてgraphid.cだなと。
% grep -r make_graphid src/*
src/backend/utils/adt/graphid.c:graphid make_graphid(const int32 label_id, const int64 entry_id)
src/backend/utils/adt/graphid.c: gid = make_graphid(label_id, entry_id);
src/backend/utils/load/ag_load_edges.c: edge_id = make_graphid(cr->label_id, entry_id);
src/backend/utils/load/ag_load_edges.c: start_vertex_graph_id = make_graphid(start_vertex_type_id, start_id_int);
src/backend/utils/load/ag_load_edges.c: end_vertex_graph_id = make_graphid(end_vertex_type_id, end_id_int);
src/backend/utils/load/ag_load_labels.c: vertex_id = make_graphid(cr->label_id, entry_id);
src/backend/utils/graph_generation.c: object_graph_id = make_graphid(vtx_label_id, vid);
src/backend/utils/graph_generation.c: object_graph_id = make_graphid(edge_label_id, eid);
src/backend/utils/graph_generation.c: start_vertex_graph_id = make_graphid(vtx_label_id, start_vid);
src/backend/utils/graph_generation.c: end_vertex_graph_id = make_graphid(vtx_label_id, end_vid);
src/backend/utils/graph_generation.c: object_graph_id = make_graphid(edge_label_id, nextval);
src/backend/utils/graph_generation.c: start_node_graph_id = make_graphid(node_label_id, start_node_index);
src/backend/utils/graph_generation.c: end_node_graph_id = make_graphid(node_label_id, end_node_index);
src/include/utils/graphid.h:graphid make_graphid(const int32 label_id, const int64 entry_id);
make_graphid()は以下。
graphid make_graphid(const int32 label_id, const int64 entry_id)
{
uint64 tmp;
if (!label_id_is_valid(label_id))
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("label_id must be %d .. %d",
LABEL_ID_MIN, LABEL_ID_MAX)));
}
if (!entry_id_is_valid(entry_id))
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("entry_id must be " INT64_FORMAT " .. " INT64_FORMAT,
ENTRY_ID_MIN, ENTRY_ID_MAX)));
}
tmp = (((uint64)label_id) << ENTRY_ID_BITS) |
(((uint64)entry_id) & ENTRY_ID_MASK);
return (graphid)tmp;
}
はい、ENTRY_ID_BITSでシフトして、ENTRY_ID_MASKでアンドとって、オアってますね。どちらの定数もgraphid.hに定義されてます。
% grep -r ENTRY_ID_BITS src/*
src/backend/catalog/ag_label.c: label_id = (int32)(((uint64)AG_GETARG_GRAPHID(1)) >> ENTRY_ID_BITS);
src/backend/utils/adt/graphid.c: tmp = (((uint64)label_id) << ENTRY_ID_BITS) |
src/backend/utils/adt/graphid.c: return (int32)(((uint64)gid) >> ENTRY_ID_BITS);
src/include/utils/graphid.h:#define ENTRY_ID_BITS (32 + 16)
src/include/utils/graphid.h: (((uint64)id) >> ENTRY_ID_BITS)
% grep -r ENTRY_ID_MASK src/*
src/backend/utils/adt/graphid.c: (((uint64)entry_id) & ENTRY_ID_MASK);
src/backend/utils/adt/graphid.c: return (int64)(((uint64)gid) & ENTRY_ID_MASK);
src/include/utils/graphid.h:#define ENTRY_ID_MASK INT64CONST(0x0000ffffffffffff)
で、このentry_idは、呼び出しているgraph_generation.cの頭の方で1が代入されてるので、新規にcreate_vlabel()が呼ばれたら常に1です。
int64 i,j,vid = 1, eid, start_vid, end_vid;
一方のlabel_idは呼び出しているgraph_generation.cをもう一度見ると、引用した最初の行でこう書かれてます。
vtx_label_id = get_label_id(vtx_name_str, graph_oid);
get_label_id()は、vetexを格納するテーブルだというのは前述した通り、graph_oidも既に説明した通り。これは、vertexのlabelのid、つまり、ag_labelのid値でしょうと推測できます。
postgres=# select * from ag_label;
name | graph | id | kind | relation | seq_name
------------------+-------+----+------+-----------------------------------+-------------------------
_ag_label_vertex | 17273 | 1 | v | cities_countries._ag_label_vertex | _ag_label_vertex_id_seq
_ag_label_edge | 17273 | 2 | e | cities_countries._ag_label_edge | _ag_label_edge_id_seq
Country | 17273 | 3 | v | cities_countries."Country" | Country_id_seq
City | 17273 | 4 | v | cities_countries."City" | City_id_seq
(4 行)
CountryとCityのidはそれぞれ、3と4、1違うので、48ビットシフトするとちょうど良いですね。
Pythonで書くと
新しく作成したグラフにvlabelを作成し、さぁ、これからvertexを入れるぞぉ、って時に割り当てられる最初のidはこう。
#!/usr/bin/env python3.11
import numpy as np
import os
import psycopg as pg
from psycopg.rows import namedtuple_row
connection_string = os.environ["PG_CONNECTION_STRING"]
graph_name = "test_graph"
label = "test_label"
with pg.connect(connection_string) as conn:
with conn.cursor(row_factory=namedtuple_row) as cur:
cur.execute("SET search_path TO ag_catalog")
cur.execute(f"SELECT create_graph('{graph_name}')")
cur.execute(f"SELECT create_vlabel('{graph_name}', '{label}')")
cur.execute(f"SELECT id FROM ag_label WHERE name='{label}'")
row = cur.fetchone()
ENTRY_ID_BITS = (32 + 16)
ENTRY_ID_MASK = np.uint64(0x0000ffffffffffff)
label_id = np.uint64(row.id)
tmp = ((label_id) << ENTRY_ID_BITS) | ((np.uint64(1)) & ENTRY_ID_MASK)
print(tmp)
実行してみる。
% python id.py
844424930131969
次にpsqlでINSERTしてみる。
postgres=# insert into test_graph.test_label (properties) VALUES ('{"id":"123"}');
INSERT 0 1
postgres=# select * from test_graph.test_label;
id | properties
-----------------+---------------
844424930131969 | {"id": "123"}
(1 行)
一致したので、Pythonで使えますね。
まとめ
COPY用のデータを生成できるようになりました。まる。なお、このブログエントリーは、夕食でウイスキーを飲んで良い気分で書いているので、間違っていても責任はもちません、あしからずご了承ください<(_ _)>