Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix formatting #10

Merged
merged 1 commit into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 36 additions & 55 deletions src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ use std::{collections::HashMap, sync::Arc};

use crate::{
AddNodeError, ArrayIndices, AttributesTable, ChunkPayload, Dataset, ManifestRef,
ManifestsTable, NodeData, NodeId, NodeStructure, ObjectId, Path, Storage, StructureTable,
UpdateNodeError, UserAttributes, UserAttributesStructure, ZarrArrayMetadata,
ManifestsTable, NodeData, NodeId, NodeStructure, ObjectId, Path, Storage,
StructureTable, UpdateNodeError, UserAttributes, UserAttributesStructure,
ZarrArrayMetadata,
};

/// FIXME: what do we want to do with implicit groups?
Expand Down Expand Up @@ -62,10 +63,7 @@ impl Dataset {
) -> Result<(), UpdateNodeError> {
match self.get_node(&path).await {
None => Err(UpdateNodeError::NotFound),
Some(NodeStructure {
node_data: NodeData::Array(..),
..
}) => {
Some(NodeStructure { node_data: NodeData::Array(..), .. }) => {
self.updated_arrays.insert(path, metadata);
Ok(())
}
Expand Down Expand Up @@ -99,10 +97,7 @@ impl Dataset {
) -> Result<(), UpdateNodeError> {
match self.get_node(&path).await {
None => Err(UpdateNodeError::NotFound),
Some(NodeStructure {
node_data: NodeData::Array(..),
..
}) => {
Some(NodeStructure { node_data: NodeData::Array(..), .. }) => {
self.set_chunks.insert((path, coord), data);
Ok(())
}
Expand All @@ -114,17 +109,14 @@ impl Dataset {
// FIXME: errors
match self.storage.fetch_structure(&self.structure_id).await.ok() {
None => 0,
Some(structure) => structure
.iter()
.max_by_key(|s| s.id)
.map_or(0, |node| node.id),
Some(structure) => {
structure.iter().max_by_key(|s| s.id).map_or(0, |node| node.id)
}
}
}

async fn reserve_node_id(&mut self) -> NodeId {
let last = self
.last_node_id
.unwrap_or(self.compute_last_node_id().await);
let last = self.last_node_id.unwrap_or(self.compute_last_node_id().await);
let new = last + 1;
self.last_node_id = Some(new);
new
Expand All @@ -134,16 +126,11 @@ impl Dataset {

// FIXME: we should have errros here, not only None
pub async fn get_node(&self, path: &Path) -> Option<NodeStructure> {
self.get_new_node(path)
.or(self.get_existing_node(path).await)
self.get_new_node(path).or(self.get_existing_node(path).await)
}

async fn get_existing_node(&self, path: &Path) -> Option<NodeStructure> {
let structure = self
.storage
.fetch_structure(&self.structure_id)
.await
.ok()?;
let structure = self.storage.fetch_structure(&self.structure_id).await.ok()?;
let session_atts = self
.updated_attributes
.get(path)
Expand Down Expand Up @@ -199,23 +186,26 @@ impl Dataset {
})
}

pub async fn get_chunk(&self, path: &Path, coords: &ArrayIndices) -> Option<ChunkPayload> {
pub async fn get_chunk(
&self,
path: &Path,
coords: &ArrayIndices,
) -> Option<ChunkPayload> {
// FIXME: better error type
let node = self.get_node(path).await?;
match node.node_data {
NodeData::Group => None,
NodeData::Array(_, manifests) => {
// check the chunks modified in this session first
// TODO: I hate rust forces me to clone to search in a hashmap. How to do better?
let session_chunk = self
.set_chunks
.get(&(path.clone(), coords.clone()))
.cloned();
let session_chunk =
self.set_chunks.get(&(path.clone(), coords.clone())).cloned();
// If session_chunk is not None we have to return it, because is the update the
// user made in the current session
// If session_chunk == None, user hasn't modified the chunk in this session and we
// need to fallback to fetching the manifests
session_chunk.unwrap_or(self.get_old_chunk(manifests.as_slice(), coords).await)
session_chunk
.unwrap_or(self.get_old_chunk(manifests.as_slice(), coords).await)
}
}
}
Expand All @@ -227,11 +217,8 @@ impl Dataset {
) -> Option<ChunkPayload> {
// FIXME: use manifest extents
for manifest in manifests {
let manifest_structure = self
.storage
.fetch_manifests(&manifest.object_id)
.await
.ok()?;
let manifest_structure =
self.storage.fetch_manifests(&manifest.object_id).await.ok()?;
if let Some(payload) = manifest_structure
.get_chunk_info(coords, &manifest.location)
.map(|info| info.payload)
Expand All @@ -250,11 +237,7 @@ impl Dataset {
/// Files that are reused from previous commits are not returned because they don't need saving
pub async fn consolidate(
&mut self,
) -> (
Arc<StructureTable>,
Vec<Arc<AttributesTable>>,
Vec<Arc<ManifestsTable>>,
) {
) -> (Arc<StructureTable>, Vec<Arc<AttributesTable>>, Vec<Arc<ManifestsTable>>) {
todo!()
}
}
Expand All @@ -264,9 +247,10 @@ mod tests {
use std::{error::Error, num::NonZeroU64, path::PathBuf};

use crate::{
manifest::mk_manifests_table, storage::InMemoryStorage, structure::mk_structure_table,
ChunkInfo, ChunkKeyEncoding, ChunkRef, ChunkShape, Codecs, DataType, FillValue, Flags,
ManifestExtents, StorageTransformers, TableRegion,
manifest::mk_manifests_table, storage::InMemoryStorage,
structure::mk_structure_table, ChunkInfo, ChunkKeyEncoding, ChunkRef, ChunkShape,
Codecs, DataType, FillValue, Flags, ManifestExtents, StorageTransformers,
TableRegion,
};

use super::*;
Expand Down Expand Up @@ -335,7 +319,9 @@ mod tests {
NodeStructure {
path: array1_path.clone(),
id: array_id,
user_attributes: Some(UserAttributesStructure::Inline("{foo:1}".to_string())),
user_attributes: Some(UserAttributesStructure::Inline(
"{foo:1}".to_string(),
)),
node_data: NodeData::Array(zarr_meta1.clone(), vec![manifest_ref]),
},
];
Expand Down Expand Up @@ -394,7 +380,9 @@ mod tests {
Some(NodeStructure {
path: "/group/array2".into(),
id: 4,
user_attributes: Some(UserAttributesStructure::Inline("{n:42}".to_string(),)),
user_attributes: Some(UserAttributesStructure::Inline(
"{n:42}".to_string(),
)),
node_data: NodeData::Array(zarr_meta2.clone(), vec![]),
})
);
Expand Down Expand Up @@ -422,16 +410,11 @@ mod tests {
let node = ds.get_node(&array1_path).await.unwrap();
assert_eq!(
node.user_attributes,
Some(UserAttributesStructure::Inline(
"{updated: true}".to_string()
))
Some(UserAttributesStructure::Inline("{updated: true}".to_string()))
);

// update old array zarr metadata and check it
let new_zarr_meta1 = ZarrArrayMetadata {
shape: vec![2, 2, 3],
..zarr_meta1
};
let new_zarr_meta1 = ZarrArrayMetadata { shape: vec![2, 2, 3], ..zarr_meta1 };
ds.update_array(array1_path.clone(), new_zarr_meta1)
.await
.map_err(|err| format!("{err:#?}"))?;
Expand All @@ -455,9 +438,7 @@ mod tests {
.await
.map_err(|err| format!("{err:#?}"))?;

let chunk = ds
.get_chunk(&array1_path, &ArrayIndices(vec![0, 0, 0]))
.await;
let chunk = ds.get_chunk(&array1_path, &ArrayIndices(vec![0, 0, 0])).await;
assert_eq!(chunk, Some(ChunkPayload::Inline(vec![0, 0, 0, 99])));

Ok(())
Expand Down
33 changes: 21 additions & 12 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,14 @@ pub mod structure;

use async_trait::async_trait;
use manifest::ManifestsTable;
use std::{collections::HashMap, fmt::Display, num::NonZeroU64, path::PathBuf, sync::Arc};
use std::{
collections::HashMap, fmt::Display, num::NonZeroU64, path::PathBuf, sync::Arc,
};
use structure::StructureTable;

#[derive(Debug, Clone)]
pub enum IcechunkFormatError {
FillValueDecodeError {
found_size: usize,
target_size: usize,
target_type: DataType,
},
FillValueDecodeError { found_size: usize, target_size: usize, target_type: DataType },
NullFillValueError,
}

Expand Down Expand Up @@ -181,7 +179,10 @@ pub enum FillValue {
}

impl FillValue {
fn from_data_type_and_value(dt: &DataType, value: &[u8]) -> Result<Self, IcechunkFormatError> {
fn from_data_type_and_value(
dt: &DataType,
value: &[u8],
) -> Result<Self, IcechunkFormatError> {
use IcechunkFormatError::FillValueDecodeError;

match dt {
Expand Down Expand Up @@ -415,8 +416,7 @@ impl TryFrom<&[u8]> for ObjectId {

fn try_from(value: &[u8]) -> Result<Self, Self::Error> {
let buf = value.try_into();
buf.map(ObjectId)
.map_err(|_| "Invalid ObjectId buffer length")
buf.map(ObjectId).map_err(|_| "Invalid ObjectId buffer length")
}
}

Expand Down Expand Up @@ -543,9 +543,18 @@ pub enum StorageError {
/// Implementations are free to assume files are never overwritten.
#[async_trait]
pub trait Storage {
async fn fetch_structure(&self, id: &ObjectId) -> Result<Arc<StructureTable>, StorageError>; // FIXME: format flags
async fn fetch_attributes(&self, id: &ObjectId) -> Result<Arc<AttributesTable>, StorageError>; // FIXME: format flags
async fn fetch_manifests(&self, id: &ObjectId) -> Result<Arc<ManifestsTable>, StorageError>; // FIXME: format flags
async fn fetch_structure(
&self,
id: &ObjectId,
) -> Result<Arc<StructureTable>, StorageError>; // FIXME: format flags
async fn fetch_attributes(
&self,
id: &ObjectId,
) -> Result<Arc<AttributesTable>, StorageError>; // FIXME: format flags
async fn fetch_manifests(
&self,
id: &ObjectId,
) -> Result<Arc<ManifestsTable>, StorageError>; // FIXME: format flags

async fn write_structure(
&self,
Expand Down
Loading