Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Two extra graph loading classes #2

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 72 additions & 1 deletion src/kgsteward/kgsteward.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,13 @@
import requests # https://docs.python-requests.org/en/master/user/quickstart/
import getpass
from dumper import dump # just in case to debug
import gzip
import tempfile
import time

from .graphdb import GraphDBclient


# ---------------------------------------------------------#
# The RDF source graphs configuration
#
Expand Down Expand Up @@ -305,7 +309,7 @@ def update_dataset_info( gdb, config, name ) :

def main():
"""Main function of the kgsteward workflow."""

script_start_time = time.time()
args = get_user_input()


Expand Down Expand Up @@ -417,7 +421,70 @@ def main():
GRAPH {context} {{
{context} void:dataDump {path}
}}
}}""" )
if "enpkg_rdf_subfolders" in target:
for dir_path in target["enpkg_rdf_subfolders"]:
expanded_dir_path = replace_env_var(dir_path)
if os.path.isdir(expanded_dir_path):
# Search for all .ttl files within the 'rdf' subfolder of each directory and its subdirectories
rdf_dir_paths = glob.glob(os.path.join(expanded_dir_path, '**', 'rdf'), recursive=True)
print(f"RDF directories found: {rdf_dir_paths}")

for rdf_dir_path in rdf_dir_paths:
ttl_files = glob.glob(os.path.join(rdf_dir_path, '*.ttl'), recursive=True)
print(f"TTL files found: {ttl_files}")

for filename in ttl_files:
if 'merged_graph' in filename:
continue
else:
start_time = time.time() # Start timer
file_uri = "<file://" + filename + ">"
gdb.sparql_update(f"LOAD {file_uri} INTO GRAPH {context}")
gdb.sparql_update( f"""PREFIX void: <http://rdfs.org/ns/void#>

INSERT DATA {{
GRAPH {context} {{
{context} void:dataDump {file_uri}
}}
}}""" )

end_time = time.time() # End timer
print(f"Time taken to load {filename}: {end_time - start_time:.2f} seconds")

if "enpkg_merged_gz_files" in target:
for dir_path in target["enpkg_merged_gz_files"]:
expanded_dir_path = replace_env_var(dir_path)
if os.path.isdir(expanded_dir_path):
gz_files = glob.glob(os.path.join(expanded_dir_path, '*.ttl.gz'))
print(f"GZ files found: {gz_files}")

for gz_filename in gz_files:
# Extract base filename without '.gz'
base_filename = os.path.basename(gz_filename)[:-3] # Removes last 3 characters ('.gz')

start_time = time.time() # Start timer

# Decompress .gz file and read content
with gzip.open(gz_filename, 'rt') as file_in, \
tempfile.NamedTemporaryFile(mode='w+', suffix='.ttl', prefix=base_filename, delete=False) as file_out:
ttl_content = file_in.read()
file_out.write(ttl_content)
temp_file_uri = "<file://" + file_out.name + ">"

# Load the TTL content into the graph
gdb.sparql_update(f"LOAD {temp_file_uri} INTO GRAPH {context}")
gdb.sparql_update( f"""PREFIX void: <http://rdfs.org/ns/void#>

INSERT DATA {{
GRAPH {context} {{
{context} void:dataDump {temp_file_uri}
}}
}}""" )
os.remove(file_out.name) # Clean up the temporary file
end_time = time.time() # End timer
print(f"Time taken to load {gz_filename}: {end_time - start_time:.2f} seconds")


if "update" in target :
for filename in target["update"] :
Expand Down Expand Up @@ -514,6 +581,10 @@ def main():
print('{:>20} : {:>12} {:>20} {}'.format( name, "", "", "UNKNOWN" ))
print( '----------------------------------------------------------')

script_end_time = time.time() # End the overall timer
total_duration = script_end_time - script_start_time
total_duration_in_min = total_duration / 60
print(f"Total execution time: {total_duration_in_min:.2f} minutes")
# --------------------------------------------------------- #
# Main
# --------------------------------------------------------- #
Expand Down