Skip to content

Commit

Permalink
Merge pull request #10 from earth-mover/fix-lint
Browse files Browse the repository at this point in the history
Fix formatting
  • Loading branch information
paraseba authored Aug 15, 2024
2 parents 6391312 + 791cb45 commit 17b7e55
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 233 deletions.
91 changes: 36 additions & 55 deletions src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ use std::{collections::HashMap, sync::Arc};

use crate::{
AddNodeError, ArrayIndices, AttributesTable, ChunkPayload, Dataset, ManifestRef,
ManifestsTable, NodeData, NodeId, NodeStructure, ObjectId, Path, Storage, StructureTable,
UpdateNodeError, UserAttributes, UserAttributesStructure, ZarrArrayMetadata,
ManifestsTable, NodeData, NodeId, NodeStructure, ObjectId, Path, Storage,
StructureTable, UpdateNodeError, UserAttributes, UserAttributesStructure,
ZarrArrayMetadata,
};

/// FIXME: what do we want to do with implicit groups?
Expand Down Expand Up @@ -62,10 +63,7 @@ impl Dataset {
) -> Result<(), UpdateNodeError> {
match self.get_node(&path).await {
None => Err(UpdateNodeError::NotFound),
Some(NodeStructure {
node_data: NodeData::Array(..),
..
}) => {
Some(NodeStructure { node_data: NodeData::Array(..), .. }) => {
self.updated_arrays.insert(path, metadata);
Ok(())
}
Expand Down Expand Up @@ -99,10 +97,7 @@ impl Dataset {
) -> Result<(), UpdateNodeError> {
match self.get_node(&path).await {
None => Err(UpdateNodeError::NotFound),
Some(NodeStructure {
node_data: NodeData::Array(..),
..
}) => {
Some(NodeStructure { node_data: NodeData::Array(..), .. }) => {
self.set_chunks.insert((path, coord), data);
Ok(())
}
Expand All @@ -114,17 +109,14 @@ impl Dataset {
// FIXME: errors
match self.storage.fetch_structure(&self.structure_id).await.ok() {
None => 0,
Some(structure) => structure
.iter()
.max_by_key(|s| s.id)
.map_or(0, |node| node.id),
Some(structure) => {
structure.iter().max_by_key(|s| s.id).map_or(0, |node| node.id)
}
}
}

async fn reserve_node_id(&mut self) -> NodeId {
let last = self
.last_node_id
.unwrap_or(self.compute_last_node_id().await);
let last = self.last_node_id.unwrap_or(self.compute_last_node_id().await);
let new = last + 1;
self.last_node_id = Some(new);
new
Expand All @@ -134,16 +126,11 @@ impl Dataset {

// FIXME: we should have errros here, not only None
pub async fn get_node(&self, path: &Path) -> Option<NodeStructure> {
self.get_new_node(path)
.or(self.get_existing_node(path).await)
self.get_new_node(path).or(self.get_existing_node(path).await)
}

async fn get_existing_node(&self, path: &Path) -> Option<NodeStructure> {
let structure = self
.storage
.fetch_structure(&self.structure_id)
.await
.ok()?;
let structure = self.storage.fetch_structure(&self.structure_id).await.ok()?;
let session_atts = self
.updated_attributes
.get(path)
Expand Down Expand Up @@ -199,23 +186,26 @@ impl Dataset {
})
}

pub async fn get_chunk(&self, path: &Path, coords: &ArrayIndices) -> Option<ChunkPayload> {
pub async fn get_chunk(
&self,
path: &Path,
coords: &ArrayIndices,
) -> Option<ChunkPayload> {
// FIXME: better error type
let node = self.get_node(path).await?;
match node.node_data {
NodeData::Group => None,
NodeData::Array(_, manifests) => {
// check the chunks modified in this session first
// TODO: I hate rust forces me to clone to search in a hashmap. How to do better?
let session_chunk = self
.set_chunks
.get(&(path.clone(), coords.clone()))
.cloned();
let session_chunk =
self.set_chunks.get(&(path.clone(), coords.clone())).cloned();
// If session_chunk is not None we have to return it, because is the update the
// user made in the current session
// If session_chunk == None, user hasn't modified the chunk in this session and we
// need to fallback to fetching the manifests
session_chunk.unwrap_or(self.get_old_chunk(manifests.as_slice(), coords).await)
session_chunk
.unwrap_or(self.get_old_chunk(manifests.as_slice(), coords).await)
}
}
}
Expand All @@ -227,11 +217,8 @@ impl Dataset {
) -> Option<ChunkPayload> {
// FIXME: use manifest extents
for manifest in manifests {
let manifest_structure = self
.storage
.fetch_manifests(&manifest.object_id)
.await
.ok()?;
let manifest_structure =
self.storage.fetch_manifests(&manifest.object_id).await.ok()?;
if let Some(payload) = manifest_structure
.get_chunk_info(coords, &manifest.location)
.map(|info| info.payload)
Expand All @@ -250,11 +237,7 @@ impl Dataset {
/// Files that are reused from previous commits are not returned because they don't need saving
pub async fn consolidate(
&mut self,
) -> (
Arc<StructureTable>,
Vec<Arc<AttributesTable>>,
Vec<Arc<ManifestsTable>>,
) {
) -> (Arc<StructureTable>, Vec<Arc<AttributesTable>>, Vec<Arc<ManifestsTable>>) {
todo!()
}
}
Expand All @@ -264,9 +247,10 @@ mod tests {
use std::{error::Error, num::NonZeroU64, path::PathBuf};

use crate::{
manifest::mk_manifests_table, storage::InMemoryStorage, structure::mk_structure_table,
ChunkInfo, ChunkKeyEncoding, ChunkRef, ChunkShape, Codecs, DataType, FillValue, Flags,
ManifestExtents, StorageTransformers, TableRegion,
manifest::mk_manifests_table, storage::InMemoryStorage,
structure::mk_structure_table, ChunkInfo, ChunkKeyEncoding, ChunkRef, ChunkShape,
Codecs, DataType, FillValue, Flags, ManifestExtents, StorageTransformers,
TableRegion,
};

use super::*;
Expand Down Expand Up @@ -335,7 +319,9 @@ mod tests {
NodeStructure {
path: array1_path.clone(),
id: array_id,
user_attributes: Some(UserAttributesStructure::Inline("{foo:1}".to_string())),
user_attributes: Some(UserAttributesStructure::Inline(
"{foo:1}".to_string(),
)),
node_data: NodeData::Array(zarr_meta1.clone(), vec![manifest_ref]),
},
];
Expand Down Expand Up @@ -394,7 +380,9 @@ mod tests {
Some(NodeStructure {
path: "/group/array2".into(),
id: 4,
user_attributes: Some(UserAttributesStructure::Inline("{n:42}".to_string(),)),
user_attributes: Some(UserAttributesStructure::Inline(
"{n:42}".to_string(),
)),
node_data: NodeData::Array(zarr_meta2.clone(), vec![]),
})
);
Expand Down Expand Up @@ -422,16 +410,11 @@ mod tests {
let node = ds.get_node(&array1_path).await.unwrap();
assert_eq!(
node.user_attributes,
Some(UserAttributesStructure::Inline(
"{updated: true}".to_string()
))
Some(UserAttributesStructure::Inline("{updated: true}".to_string()))
);

// update old array zarr metadata and check it
let new_zarr_meta1 = ZarrArrayMetadata {
shape: vec![2, 2, 3],
..zarr_meta1
};
let new_zarr_meta1 = ZarrArrayMetadata { shape: vec![2, 2, 3], ..zarr_meta1 };
ds.update_array(array1_path.clone(), new_zarr_meta1)
.await
.map_err(|err| format!("{err:#?}"))?;
Expand All @@ -455,9 +438,7 @@ mod tests {
.await
.map_err(|err| format!("{err:#?}"))?;

let chunk = ds
.get_chunk(&array1_path, &ArrayIndices(vec![0, 0, 0]))
.await;
let chunk = ds.get_chunk(&array1_path, &ArrayIndices(vec![0, 0, 0])).await;
assert_eq!(chunk, Some(ChunkPayload::Inline(vec![0, 0, 0, 99])));

Ok(())
Expand Down
33 changes: 21 additions & 12 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,14 @@ pub mod structure;

use async_trait::async_trait;
use manifest::ManifestsTable;
use std::{collections::HashMap, fmt::Display, num::NonZeroU64, path::PathBuf, sync::Arc};
use std::{
collections::HashMap, fmt::Display, num::NonZeroU64, path::PathBuf, sync::Arc,
};
use structure::StructureTable;

#[derive(Debug, Clone)]
pub enum IcechunkFormatError {
FillValueDecodeError {
found_size: usize,
target_size: usize,
target_type: DataType,
},
FillValueDecodeError { found_size: usize, target_size: usize, target_type: DataType },
NullFillValueError,
}

Expand Down Expand Up @@ -181,7 +179,10 @@ pub enum FillValue {
}

impl FillValue {
fn from_data_type_and_value(dt: &DataType, value: &[u8]) -> Result<Self, IcechunkFormatError> {
fn from_data_type_and_value(
dt: &DataType,
value: &[u8],
) -> Result<Self, IcechunkFormatError> {
use IcechunkFormatError::FillValueDecodeError;

match dt {
Expand Down Expand Up @@ -415,8 +416,7 @@ impl TryFrom<&[u8]> for ObjectId {

fn try_from(value: &[u8]) -> Result<Self, Self::Error> {
let buf = value.try_into();
buf.map(ObjectId)
.map_err(|_| "Invalid ObjectId buffer length")
buf.map(ObjectId).map_err(|_| "Invalid ObjectId buffer length")
}
}

Expand Down Expand Up @@ -543,9 +543,18 @@ pub enum StorageError {
/// Implementations are free to assume files are never overwritten.
#[async_trait]
pub trait Storage {
async fn fetch_structure(&self, id: &ObjectId) -> Result<Arc<StructureTable>, StorageError>; // FIXME: format flags
async fn fetch_attributes(&self, id: &ObjectId) -> Result<Arc<AttributesTable>, StorageError>; // FIXME: format flags
async fn fetch_manifests(&self, id: &ObjectId) -> Result<Arc<ManifestsTable>, StorageError>; // FIXME: format flags
async fn fetch_structure(
&self,
id: &ObjectId,
) -> Result<Arc<StructureTable>, StorageError>; // FIXME: format flags
async fn fetch_attributes(
&self,
id: &ObjectId,
) -> Result<Arc<AttributesTable>, StorageError>; // FIXME: format flags
async fn fetch_manifests(
&self,
id: &ObjectId,
) -> Result<Arc<ManifestsTable>, StorageError>; // FIXME: format flags

async fn write_structure(
&self,
Expand Down
Loading

0 comments on commit 17b7e55

Please sign in to comment.