-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
use remote_hdt::error::RemoteHDTError; | ||
use remote_hdt::complement::layout::default::DefaultComplementLayout; | ||
use remote_hdt::complement::ops::Ops; | ||
use remote_hdt::complement::ComplementStorage; | ||
fn main() { | ||
|
||
let mut binding = ComplementStorage::new(DefaultComplementLayout); | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
pub struct DefaultComplementLayout; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,133 @@ | ||
pub mod default; | ||
|
||
use zarrs::storage::store::OpendalStore; | ||
use zarrs::array::Array; | ||
pub trait ComplementLayout { | ||
fn retrieve_attributes(&mut self, arr: &Array<OpendalStore>) { | ||
// 4. We get the attributes so we can obtain some values that we will need | ||
let attributes = arr.attributes(); | ||
|
||
let subjects = &value_to_term(match attributes.get("subjects") { | ||
Check failure on line 10 in src/complement/layout/mod.rs GitHub Actions / Clippy (stable)
Check failure on line 10 in src/complement/layout/mod.rs GitHub Actions / Check (stable)
|
||
Some(subjects) => subjects, | ||
None => return Err(RemoteHDTError::SubjectsNotInJSON), | ||
Check failure on line 12 in src/complement/layout/mod.rs GitHub Actions / Clippy (stable)
Check failure on line 12 in src/complement/layout/mod.rs GitHub Actions / Check (stable)
|
||
}); | ||
let predicates = &value_to_term(match attributes.get("predicates") { | ||
Check failure on line 14 in src/complement/layout/mod.rs GitHub Actions / Clippy (stable)
Check failure on line 14 in src/complement/layout/mod.rs GitHub Actions / Check (stable)
|
||
Some(predicates) => predicates, | ||
None => return Err(RemoteHDTError::PredicatesNotInJSON), | ||
Check failure on line 16 in src/complement/layout/mod.rs GitHub Actions / Clippy (stable)
Check failure on line 16 in src/complement/layout/mod.rs GitHub Actions / Check (stable)
|
||
}); | ||
let objects = &value_to_term(match attributes.get("objects") { | ||
Check failure on line 18 in src/complement/layout/mod.rs GitHub Actions / Clippy (stable)
Check failure on line 18 in src/complement/layout/mod.rs GitHub Actions / Check (stable)
|
||
Some(objects) => objects, | ||
None => return Err(RemoteHDTError::ObjectsNotInJSON), | ||
Check failure on line 20 in src/complement/layout/mod.rs GitHub Actions / Clippy (stable)
Check failure on line 20 in src/complement/layout/mod.rs GitHub Actions / Check (stable)
|
||
}); | ||
|
||
let reference_system: ReferenceSystem = match attributes.get("reference_system") { | ||
Check failure on line 23 in src/complement/layout/mod.rs GitHub Actions / Clippy (stable)
Check failure on line 23 in src/complement/layout/mod.rs GitHub Actions / Check (stable)
|
||
Some(reference_system) => reference_system, | ||
None => return Err(RemoteHDTError::ReferenceSystemNotInJSON), | ||
Check failure on line 25 in src/complement/layout/mod.rs GitHub Actions / Clippy (stable)
Check failure on line 25 in src/complement/layout/mod.rs GitHub Actions / Check (stable)
|
||
} | ||
.as_str() | ||
.unwrap() | ||
.into(); | ||
|
||
Ok(Dictionary::from_vec_str( | ||
Check failure on line 31 in src/complement/layout/mod.rs GitHub Actions / Clippy (stable)
Check failure on line 31 in src/complement/layout/mod.rs GitHub Actions / Check (stable)
|
||
reference_system, | ||
subjects, | ||
predicates, | ||
objects, | ||
)) | ||
} | ||
|
||
fn serialize(&mut self, arr: Array<OpendalStore>, graph: Graph) -> StorageResult<()> { | ||
Check failure on line 39 in src/complement/layout/mod.rs GitHub Actions / Clippy (stable)
Check failure on line 39 in src/complement/layout/mod.rs GitHub Actions / Check (stable)
|
||
let columns = arr.shape()[1] as usize; | ||
let count = AtomicU64::new(0); | ||
let binding = self.graph_iter(graph.to_owned()); | ||
let iter = binding.chunks_exact(rows_per_shard(&arr) as usize); | ||
let remainder = iter.remainder(); | ||
|
||
for chunk in iter { | ||
arr.store_chunk_elements( | ||
&[count.load(Ordering::Relaxed), 0], | ||
self.store_chunk_elements(chunk, columns), | ||
)?; | ||
count.fetch_add(1, Ordering::Relaxed); | ||
} | ||
|
||
if !remainder.is_empty() { | ||
arr.store_array_subset_elements( | ||
&ArraySubset::new_with_start_shape( | ||
vec![count.load(Ordering::Relaxed) * rows_per_shard(&arr), 0], | ||
vec![remainder.len() as u64, columns_per_shard(&arr)], | ||
)?, | ||
self.store_chunk_elements(remainder, columns), | ||
)?; | ||
} | ||
|
||
Ok(()) | ||
} | ||
|
||
fn parse( | ||
&mut self, | ||
arr: &Array<OpendalStore>, | ||
dimensionality: &Dimensionality, | ||
) -> StorageResult<ZarrArray> { | ||
// First, we create the 2D matrix in such a manner that the number of | ||
// rows is the same as the size of the first terms; i.e, in the SPO | ||
// orientation, that will be equals to the number of subjects, while | ||
// the number of columns is equals to the size of the third terms; i.e, | ||
// following the same example as before, it will be equals to the number | ||
// of objects. In our case the dimensionality abstracts the process | ||
// of getting the size of the concrete dimension | ||
let matrix = Mutex::new(TriMat::new(( | ||
dimensionality.first_term_size, // we obtain the size of the first terms | ||
dimensionality.third_term_size, // we obtain the size of the third terms | ||
))); | ||
|
||
// We compute the number of shards; for us to achieve so, we have to obtain | ||
// first dimension of the chunk grid | ||
let number_of_shards = match arr.chunk_grid_shape() { | ||
Some(chunk_grid) => chunk_grid[0], | ||
|
||
None => 0, | ||
}; | ||
|
||
let number_of_columns = arr.shape()[1] as usize; | ||
|
||
// For each chunk in the Zarr array we retrieve it and parse it into a | ||
// matrix, inserting the triplet in its corresponding position. The idea | ||
// of parsing the array chunk-by-chunk allows us to keep the RAM usage | ||
// low, as instead of parsing the whole array, we process smaller pieces | ||
// of it. Once we have all the pieces processed, we will have parsed the | ||
// whole array | ||
for shard in 0..number_of_shards { | ||
arr.retrieve_chunk_elements(&[shard, 0])? | ||
// We divide each shard by the number of columns, as a shard is | ||
// composed of chunks having the size of [1, number of cols] | ||
.chunks(number_of_columns) | ||
.enumerate() | ||
.for_each(|(first_term_idx, chunk)| { | ||
self.retrieve_chunk_elements( | ||
&matrix, | ||
first_term_idx + (shard * rows_per_shard(arr)) as usize, | ||
chunk, | ||
); | ||
}) | ||
} | ||
|
||
// We use a CSC Matrix because typically, RDF knowledge graphs tend to | ||
// have more rows than columns; as such, CSC matrices are optimized | ||
// for that precise scenario | ||
let x = matrix.lock(); | ||
Ok(x.to_csc()) | ||
} | ||
|
||
fn graph_iter(&self, graph: Graph) -> Vec<C>; | ||
fn store_chunk_elements(&self, chunk: &[C], columns: usize) -> Vec<u64>; | ||
fn retrieve_chunk_elements( | ||
&mut self, | ||
matrix: &Mutex<TriMat<usize>>, | ||
first_term_idx: usize, | ||
chunk: &[usize], | ||
); | ||
fn sharding_factor(&self, dimensionality: &Dimensionality) -> usize; | ||
|
||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
pub mod layout; | ||
pub mod ops; | ||
pub mod params; | ||
|
||
use crate::dictionary::Dictionary; | ||
|
||
use self::params::Dimensionality; | ||
use self::layout::default::DefaultComplementLayout; | ||
Check warning on line 8 in src/complement/mod.rs GitHub Actions / Check (stable)
|
||
|
||
|
||
use zarrs::storage::store::OpendalStore; | ||
use zarrs::array::Array; | ||
|
||
|
||
pub struct ComplementStorage { | ||
dictionary: Dictionary, | ||
dimensionality: Dimensionality, | ||
array: Option<Array<OpendalStore>> | ||
|
||
} | ||
|
||
impl ComplementStorage { | ||
pub fn new(layout: impl ComplementLayout + 'static) -> Self { | ||
ComplementStorage { | ||
|
||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
pub trait Ops { | ||
fn get_chunk(&self, subject: &usize) -> Vec<u32>; | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
#[derive(Default)] | ||
pub struct Dimensionality { | ||
graph_size: Option<usize>, | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,3 +4,4 @@ pub mod error; | |
mod io; | ||
pub mod storage; | ||
mod utils; | ||
pub mod complement; |
This file was deleted.