-
Notifications
You must be signed in to change notification settings - Fork 2
/
wd_to_neo4j.py
154 lines (131 loc) · 6.7 KB
/
wd_to_neo4j.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
import configargparse
import pandas as pd
from tqdm import tqdm
from wikidataintegrator import wdi_core, wdi_helpers
from more_itertools import chunked
class Bot:
edge_columns = [':START_ID', ':TYPE', ':END_ID', 'reference_uri', 'reference_supporting_text',
'reference_date', 'property_label', 'property_description:IGNORE', 'property_uri']
node_columns = ['id:ID', ':LABEL', 'preflabel', 'synonyms:IGNORE', 'name', 'description']
def __init__(self, sparql_endpoint_url, mediawiki_api_url, node_out_path, edge_out_path):
self.sparql_endpoint_url = sparql_endpoint_url
self.mediawiki_api_url = mediawiki_api_url
self.node_out_path = node_out_path
self.edge_out_path = edge_out_path
uri_pid = wdi_helpers.id_mapper("P2", endpoint=sparql_endpoint_url)
self.pid_uri = {v: k for k, v in uri_pid.items()}
dbxref_pid = uri_pid['http://www.geneontology.org/formats/oboInOwl#DbXref']
dbxref_qid = wdi_helpers.id_mapper(dbxref_pid, endpoint=sparql_endpoint_url)
self.qid_dbxref = {v: k for k, v in dbxref_qid.items()}
self.ref_supp_text_pid = uri_pid["http://reference_supporting_text"]
self.reference_uri_pid = uri_pid["http://www.wikidata.org/entity/P854"]
self.type_pid = uri_pid["http://type"]
# prop label and descriptions
pids = {x for x in self.qid_dbxref if x.startswith("P")}
props = wdi_core.WDItemEngine.generate_item_instances(list(pids), mediawiki_api_url)
self.pid_label = {pid: item.get_label() for pid, item in props}
self.pid_descr = {pid: item.get_description() for pid, item in props}
# get all items and all statements
qids = {x for x in self.qid_dbxref if x.startswith("Q")}
self.item_iter = self.item_chunker(sorted(list(qids)))
# self.item_iter = self.item_chunker(['Q94', "Q347"])
self.edge_lines = []
self.node_lines = []
def item_chunker(self, qids) -> wdi_core.WDItemEngine:
# iterate through item instances, getting 20 at a time
chunks = chunked(qids, 20)
for chunk in chunks:
items = wdi_core.WDItemEngine.generate_item_instances(chunk, mediawiki_api_url=self.mediawiki_api_url)
for item in items:
yield item[1]
def parse_node(self, item: wdi_core.WDItemEngine):
type_statements = [s for s in item.statements if s.get_prop_nr() == self.type_pid]
if len(type_statements) != 1:
return None
node_template = dict()
node_template[':LABEL'] = self.qid_dbxref["Q" + str(type_statements[0].get_value())]
node_template['id:ID'] = self.qid_dbxref[item.wd_item_id]
node_template['preflabel'] = self.undo_id_parenthesis(item.get_label())
node_template['name'] = item.get_label()
node_template['description'] = item.get_description()
node_template['synonyms:IGNORE'] = "|".join(item.get_aliases())
return node_template
@staticmethod
def undo_id_parenthesis(s):
# example "N-Acetyl-D-glucosamine (CHEBI:17411)" -> "N-Acetyl-D-glucosamine"
if " (" in s and s.endswith(")"):
idx1 = s.rindex(" (")
s = s[:idx1]
return s
def write_out(self):
for line in self.edge_lines:
for key, value in line.items():
if type(value) is str and value == '':
line[key] = None
df_edges = pd.DataFrame(self.edge_lines)
df_edges['reference_date'] = None
df_edges = df_edges[self.edge_columns]
df_edges.fillna('NA').to_csv(self.edge_out_path, index=None)
for line in self.node_lines:
for key, value in line.items():
if type(value) is str and value == '':
line[key] = None
df_nodes = pd.DataFrame(self.node_lines)
df_nodes = df_nodes[self.node_columns]
df_nodes.fillna('NA').to_csv(self.node_out_path, index=None)
def handle_statement(self, s, start_id):
# if a statement has multiple refs, it will return multiple lines
skip_statements = {
"http://www.geneontology.org/formats/oboInOwl#DbXref",
"http://type"
}
edge_lines = []
line = {":START_ID": start_id, 'property_uri': self.pid_uri[s.get_prop_nr()]}
if line['property_uri'] in skip_statements:
return edge_lines
line['property_label'] = self.pid_label[s.get_prop_nr()]
line['property_description:IGNORE'] = self.pid_descr[s.get_prop_nr()]
line[':TYPE'] = self.qid_dbxref[s.get_prop_nr()]
line[':END_ID'] = self.qid_dbxref["Q" + str(s.get_value())] if s.data_type == "wikibase-item" else s.get_value()
if s.references:
for ref in s.references:
ref_supp_text_statements = [x for x in ref if x.get_prop_nr() == self.ref_supp_text_pid]
ref_supp_text = " ".join([x.get_value() for x in ref_supp_text_statements])
reference_uri_statements = [x for x in ref if x.get_prop_nr() == self.reference_uri_pid]
reference_uri = "|".join([x.get_value() for x in reference_uri_statements])
# todo: rejoin split pubmed urls
line['reference_supporting_text'] = ref_supp_text
line['reference_uri'] = reference_uri
edge_lines.append(line.copy())
else:
edge_lines.append(line.copy())
return edge_lines
def run(self):
edge_lines = []
node_lines = []
for item in tqdm(self.item_iter):
sub_qid = item.wd_item_id
start_id = self.qid_dbxref[sub_qid]
for s in item.statements:
edge_lines.extend(self.handle_statement(s, start_id))
node_template = self.parse_node(item)
if node_template:
node_lines.append(node_template.copy())
self.edge_lines = edge_lines
self.node_lines = node_lines
self.write_out()
def main(mediawiki_api_url, sparql_endpoint_url, node_out_path, edge_out_path):
bot = Bot(sparql_endpoint_url, mediawiki_api_url, node_out_path, edge_out_path)
bot.run()
if __name__ == '__main__':
p = configargparse.ArgParser(default_config_files=['config.cfg'])
p.add('-c', '--config', is_config_file=True, help='config file path')
p.add("--mediawiki_api_url", required=True, help="Wikibase mediawiki api url")
p.add("--sparql_endpoint_url", required=True, help="Wikibase sparql endpoint url")
p.add("--node-out-path", required=True, help="path to output neo4j nodes csv")
p.add("--edge-out-path", required=True, help="path to output neo4j edges csv")
options, _ = p.parse_known_args()
print(options)
d = options.__dict__.copy()
del d['config']
main(**d)