-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathdigraph.py
363 lines (290 loc) · 13.3 KB
/
digraph.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
from typing import Any, Callable, ClassVar
import networkx as nx
from arango.database import StandardDatabase
import nx_arangodb as nxadb
from nx_arangodb.classes.graph import Graph
from nx_arangodb.logger import logger
from .dict.adj import AdjListOuterDict
from .enum import TraversalDirection
from .function import get_node_id
networkx_api = nxadb.utils.decorators.networkx_class(nx.DiGraph) # type: ignore
__all__ = ["DiGraph"]
class DiGraph(Graph, nx.DiGraph):
"""
Base class for directed graphs.
Subclasses ``nxadb.Graph`` and ``nx.DiGraph``.
In order to connect to an ArangoDB instance, the following environment
variables must be set:
1. ``DATABASE_HOST``
2. ``DATABASE_USERNAME``
3. ``DATABASE_PASSWORD``
4. ``DATABASE_NAME``
Furthermore, the ``name`` parameter is required to create a new graph
or to connect to an existing graph in the database.
Example
-------
>>> import os
>>> import networkx as nx
>>> import nx_arangodb as nxadb
>>>
>>> os.environ["DATABASE_HOST"] = "http://localhost:8529"
>>> os.environ["DATABASE_USERNAME"] = "root"
>>> os.environ["DATABASE_PASSWORD"] = "openSesame"
>>> os.environ["DATABASE_NAME"] = "_system"
>>>
>>> G = nxadb.DiGraph(name="MyGraph")
>>> ...
Parameters
----------
incoming_graph_data : input graph (optional, default: None)
Data to initialize graph. If None (default) an empty
graph is created. Must be used in conjunction with **name** if
the user wants to persist the graph in ArangoDB. NOTE: It is
recommended for incoming_graph_data to be a NetworkX graph due
to faster loading times.
name : str (optional, default: None)
Name of the graph in the database. If the graph already exists,
the user can pass the name of the graph to connect to it. If
the graph does not exist, a General Graph will be created by
passing the **name**. NOTE: Must be used in conjunction with
**incoming_graph_data** if the user wants to persist the graph
in ArangoDB.
default_node_type : str (optional, default: None)
Default node type for the graph. In ArangoDB terms, this is the
default vertex collection. If the graph already exists, the user can
omit this parameter and the default node type will be set to the
first vertex collection in the graph. If the graph does not exist,
the user can pass the default node type to create the default vertex
collection.
edge_type_key : str (optional, default: "_edge_type")
Key used to store the edge type when inserting edges into the graph.
Useful for working with Heterogeneous Graphs.
edge_type_func : Callable[[str, str], str] (optional, default: None)
Function to determine the edge type between two nodes. If the graph
already exists, the user can omit this parameter and the edge type
function will be set based on the existing edge definitions. If the
graph does not exist, the user can pass a function that determines
the edge type between two nodes.
edge_collections_attributes : set[str] (optional, default: None)
Set of edge attributes to fetch when executing a NetworkX algorithm.
Useful if the user has edge weights or other edge attributes that
they want to use in a NetworkX algorithm.
db : arango.database.StandardDatabase (optional, default: None)
ArangoDB database object. If the user has an existing python-arango
connection to the database, they can pass the database object to the graph.
If not provided, a database object will be created using the environment
variables DATABASE_HOST, DATABASE_USERNAME, DATABASE_PASSWORD, and
DATABASE_NAME.
read_parallelism : int (optional, default: 10)
Number of parallel threads to use when reading data from ArangoDB.
Used for fetching node and edge data from the database.
read_batch_size : int (optional, default: 100000)
Number of documents to fetch in a single batch when reading data from ArangoDB.
Used for fetching node and edge data from the database.
write_batch_size : int (optional, default: 50000)
Number of documents to insert in a single batch when writing data to ArangoDB.
Used for inserting node and edge data into the database if and only if
**incoming_graph_data** is a NetworkX graph.
write_async : bool (optional, default: False)
Whether to insert data into ArangoDB asynchronously. Used for inserting
node and edge data into the database if and only if **incoming_graph_data**
is a NetworkX graph.
symmetrize_edges : bool (optional, default: False)
Whether to symmetrize the edges in the graph when fetched from the database.
Only applies to directed graphs, thereby converting them to undirected graphs.
use_arango_views : bool (optional, default: False)
Whether to use experimental work-in-progress ArangoDB Views for the
nodes, adjacency list, and edges. These views are designed to improve
data processing performance by delegating CRUD operations to the database
whenever possible. NOTE: This feature is experimental and may not work
as expected.
overwrite_graph : bool (optional, default: False)
Whether to overwrite the graph in the database if it already exists. If
set to True, the graph collections will be dropped and recreated. Note that
this operation is irreversible and will result in the loss of all data in
the graph. NOTE: If set to True, Collection Indexes will also be lost.
args: positional arguments for nx.Graph
Additional arguments passed to nx.Graph.
kwargs: keyword arguments for nx.Graph
Additional arguments passed to nx.Graph.
"""
__networkx_backend__: ClassVar[str] = "arangodb" # nx >=3.2
__networkx_plugin__: ClassVar[str] = "arangodb" # nx <3.2
@classmethod
def to_networkx_class(cls) -> type[nx.DiGraph]:
return nx.DiGraph # type: ignore[no-any-return]
def __init__(
self,
incoming_graph_data: Any = None,
name: str | None = None,
default_node_type: str | None = None,
edge_type_key: str = "_edge_type",
edge_type_func: Callable[[str, str], str] | None = None,
edge_collections_attributes: set[str] | None = None,
db: StandardDatabase | None = None,
read_parallelism: int = 10,
read_batch_size: int = 100000,
write_batch_size: int = 50000,
write_async: bool = False,
symmetrize_edges: bool = False,
use_arango_views: bool = False,
overwrite_graph: bool = False,
*args: Any,
**kwargs: Any,
):
super().__init__(
incoming_graph_data,
name,
default_node_type,
edge_type_key,
edge_type_func,
edge_collections_attributes,
db,
read_parallelism,
read_batch_size,
write_batch_size,
write_async,
symmetrize_edges,
use_arango_views,
overwrite_graph,
*args,
**kwargs,
)
if self.graph_exists_in_db:
self.clear_edges = self.clear_edges_override
self.add_node = self.add_node_override
self.add_nodes_from = self.add_nodes_from_override
self.remove_node = self.remove_node_override
self.reverse = self.reverse_override
assert isinstance(self._succ, AdjListOuterDict)
assert isinstance(self._pred, AdjListOuterDict)
self._succ.mirror = self._pred
self._pred.mirror = self._succ
self._succ.traversal_direction = TraversalDirection.OUTBOUND
self._pred.traversal_direction = TraversalDirection.INBOUND
if (
not self.is_multigraph()
and incoming_graph_data is not None
and not self._loaded_incoming_graph_data
):
nx.convert.to_networkx_graph(incoming_graph_data, create_using=self)
self._loaded_incoming_graph_data = True
#######################
# nx.DiGraph Overides #
#######################
# TODO?
# If we want to continue with "Experimental Views" we need to implement the
# InEdgeView and OutEdgeView classes.
# @cached_property
# def in_edges(self):
# pass
# TODO?
# @cached_property
# def out_edges(self):
# pass
def reverse_override(self, copy: bool = True) -> Any:
if copy is False:
raise NotImplementedError("In-place reverse is not supported yet.")
return super().reverse(copy=True)
def clear_edges_override(self):
logger.info("Note that clearing edges ony erases the edges in the local cache")
for predecessor_dict in self._pred.data.values():
predecessor_dict.clear()
super().clear_edges()
def add_node_override(self, node_for_adding, **attr):
if node_for_adding is None:
raise ValueError("None cannot be a node")
if node_for_adding not in self._succ:
self._succ[node_for_adding] = self.adjlist_inner_dict_factory()
self._pred[node_for_adding] = self.adjlist_inner_dict_factory()
######################
# NOTE: monkey patch #
######################
# Old:
# attr_dict = self._node[node_for_adding] = self.node_attr_dict_factory()
# attr_dict.update(attr)
# New:
node_attr_dict = self.node_attr_dict_factory()
node_attr_dict.data = attr
self._node[node_for_adding] = node_attr_dict
# Reason:
# We can optimize the process of adding a node by creating avoiding
# the creation of a new dictionary and updating it with the attributes.
# Instead, we can create a new node_attr_dict object and set the attributes
# directly. This only makes 1 network call to the database instead of 2.
###########################
else:
self._node[node_for_adding].update(attr)
nx._clear_cache(self)
def add_nodes_from_override(self, nodes_for_adding, **attr):
for n in nodes_for_adding:
try:
newnode = n not in self._node
newdict = attr
except TypeError:
n, ndict = n
newnode = n not in self._node
newdict = attr.copy()
newdict.update(ndict)
if newnode:
if n is None:
raise ValueError("None cannot be a node")
self._succ[n] = self.adjlist_inner_dict_factory()
self._pred[n] = self.adjlist_inner_dict_factory()
######################
# NOTE: monkey patch #
######################
# Old:
# self._node[n] = self.node_attr_dict_factory()
#
# self._node[n].update(newdict)
# New:
node_attr_dict = self.node_attr_dict_factory()
node_attr_dict.data = newdict
self._node[n] = node_attr_dict
else:
self._node[n].update(newdict)
# Reason:
# We can optimize the process of adding a node by creating avoiding
# the creation of a new dictionary and updating it with the attributes.
# Instead, we create a new node_attr_dict object and set the attributes
# directly. This only makes 1 network call to the database instead of 2.
###########################
nx._clear_cache(self)
def remove_node_override(self, n):
if isinstance(n, (str, int)):
n = get_node_id(str(n), self.default_node_type)
try:
######################
# NOTE: monkey patch #
######################
# Old:
# nbrs = self._succ[n]
# New:
nbrs_succ = list(self._succ[n])
nbrs_pred = list(self._pred[n])
# Reason:
# We need to fetch the outbound/inbound edges _prior_ to deleting the node,
# as node deletion will already take care of deleting edges
###########################
del self._node[n]
except KeyError as err: # NetworkXError if n not in self
raise nx.NetworkXError(f"The node {n} is not in the digraph.") from err
for u in nbrs_succ:
del self._pred[u][n] # remove all edges n-u in digraph
del self._succ[n] # remove node from succ
for u in nbrs_pred:
######################
# NOTE: Monkey patch #
######################
# Old: Nothing
# New:
if u == n:
continue # skip self loops
# Reason: We need to skip self loops, as they are
# already taken care of in the previous step. This
# avoids getting a KeyError on the next line.
###########################
del self._succ[u][n] # remove all edges n-u in digraph
del self._pred[n] # remove node from pred
nx._clear_cache(self)