Skip to content

Commit

Permalink
Add mode to read consolidated ZARR datasets
Browse files Browse the repository at this point in the history
  • Loading branch information
mannreis committed Oct 7, 2024
1 parent 150db23 commit 1d79947
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 63 deletions.
3 changes: 2 additions & 1 deletion libnczarr/zarr.c
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ ncz_open_dataset(NC_FILE_INFO_T* file, NClist* controls)
if (!(file->format_file_info = calloc(1, sizeof(NCZ_FILE_INFO_T))))
{stat = NC_ENOMEM; goto done;}
zinfo = file->format_file_info;

zinfo->consolidated = NULL;
/* Fill in NCZ_FILE_INFO_T */
zinfo->creating = 0;
zinfo->common.file = file;
Expand Down Expand Up @@ -279,6 +279,7 @@ applycontrols(NCZ_FILE_INFO_T* zinfo)
else if(strcasecmp(p,"zip")==0) zinfo->controls.mapimpl = NCZM_ZIP;
else if(strcasecmp(p,"file")==0) zinfo->controls.mapimpl = NCZM_FILE;
else if(strcasecmp(p,"s3")==0) zinfo->controls.mapimpl = NCZM_S3;
else if(strcasecmp(p,"consolidated")==0) zinfo->controls.flags |= FLAG_CONSOLIDATED;
}
/* Apply negative controls by turning off negative flags */
/* This is necessary to avoid order dependence of mode flags when both positive and negative flags are defined */
Expand Down
1 change: 1 addition & 0 deletions libnczarr/zclose.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ ncz_close_file(NC_FILE_INFO_T* file, int abort)
if((stat = nczmap_close(zinfo->map,(abort && zinfo->creating)?1:0)))
goto done;
nclistfreeall(zinfo->controllist);
NCJreclaim(zinfo->consolidated);
NC_authfree(zinfo->auth);
nullfree(zinfo);

Expand Down
27 changes: 16 additions & 11 deletions libnczarr/zinternal.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,14 @@
# define NC_MAX_PATH 4096
# endif
#endif

#define ZMETAROOT "/.zgroup"
#define ZMETAATTR "/.zattrs"
#define ZGROUP ".zgroup"
#define ZATTRS ".zattrs"
#define ZARRAY ".zarray"
#define ZMETAROOT "/.zgroup"
#define ZMETAGROUP "/.zgroup"
#define ZMETAATTR "/.zattrs"
#define ZMETAARRAY "/.zarray"
#define ZMETADATA ".zmetadata"
#define ZGROUP ".zgroup"
#define ZATTRS ".zattrs"
#define ZARRAY ".zarray"

/* V2 Reserved Attributes */
/*
Expand Down Expand Up @@ -101,6 +103,7 @@ Inserted into any .zattrs
#define ncidforx(file,grpid) ((file)->controller->ext_ncid | (grpid))
#define ncidfor(var) ncidforx((var)->container->nc4_info,(var)->container->hdr.id)

#define isconsolidaded(var) ( (var)->controls.flags & (FLAG_CONSOLIDATED) )
/**************************************************/
/* Forward */

Expand Down Expand Up @@ -134,13 +137,15 @@ typedef struct NCZ_FILE_INFO {
int creating; /* 1=> created 0=>open */
int native_endianness; /* NC_ENDIAN_LITTLE | NC_ENDIAN_BIG */
NClist* controllist; /* Envv format */
NCjson * consolidated;
struct Controls {
size64_t flags;
# define FLAG_PUREZARR 1
# define FLAG_SHOWFETCH 2
# define FLAG_LOGGING 4
# define FLAG_XARRAYDIMS 8
# define FLAG_NCZARR_KEY 16 /* _nczarr_xxx keys are stored in object and not in _nczarr_attrs */
# define FLAG_PUREZARR 1
# define FLAG_SHOWFETCH 2
# define FLAG_LOGGING 4
# define FLAG_XARRAYDIMS 8
# define FLAG_NCZARR_KEY 16 /* _nczarr_xxx keys are stored in object and not in _nczarr_attrs */
# define FLAG_CONSOLIDATED 32
NCZM_IMPL mapimpl;
} controls;
int default_maxstrlen; /* default max str size for variables of type string */
Expand Down
28 changes: 18 additions & 10 deletions libnczarr/zmap_s3sdk.c
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,11 @@ zs3create(const char *path, int mode, size64_t flags, void* parameters, NCZMAP**
/* The problem with open is that there
no obvious way to test for existence.
So, we assume that the dataset must have
some content. We look for that */
some content. We look for that ....
UNLESS flags has CONSOLIDATED,
this means that we try to access
a single, unified metadata file (.zmetadata)
*/
static int
zs3open(const char *path, int mode, size64_t flags, void* parameters, NCZMAP** mapp)
{
Expand Down Expand Up @@ -221,15 +225,19 @@ zs3open(const char *path, int mode, size64_t flags, void* parameters, NCZMAP** m
{stat = NC_EURL; goto done;}

z3map->s3client = NC_s3sdkcreateclient(&z3map->s3);

/* Search the root for content */
content = nclistnew();
if((stat = NC_s3sdkgetkeys(z3map->s3client,z3map->s3.bucket,z3map->s3.rootkey,&nkeys,NULL,&z3map->errmsg)))
goto done;
if(nkeys == 0) {
/* dataset does not actually exist; we choose to return ENOOBJECT instead of EEMPTY */
stat = NC_ENOOBJECT;
goto done;

if (flags & FLAG_CONSOLIDATED == 0){
/* Search the root for content */
content = nclistnew();
if((stat = NC_s3sdkgetkeys(z3map->s3client,z3map->s3.bucket,z3map->s3.rootkey,&nkeys,NULL,&z3map->errmsg)))
goto done;
if(nkeys == 0) {
/* dataset does not actually exist; we choose to return ENOOBJECT instead of EEMPTY */
stat = NC_ENOOBJECT;
goto done;
}
}else {
// Lazy open, we don't need to make use at this stage that we have something usefull on the path, we only implement reads...
}
if(mapp) *mapp = (NCZMAP*)z3map;

Expand Down
169 changes: 128 additions & 41 deletions libnczarr/zsync.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ static int insert_attr(NCjson* jatts, NCjson* jtypes, const char* aname, NCjson*
static int insert_nczarr_attr(NCjson* jatts, NCjson* jtypes);
static int upload_attrs(NC_FILE_INFO_T* file, NC_OBJ* container, NCjson* jatts);
static int getnczarrkey(NC_OBJ* container, const char* name, const NCjson** jncxxxp);
static int downloadzarrobj(NC_FILE_INFO_T*, struct ZARROBJ* zobj, const char* fullpath, const char* objname);
static int downloadconsolidated(NC_FILE_INFO_T*);
static int get_zarrobj(NC_FILE_INFO_T*, struct ZARROBJ* zobj, const char* fullpath, const char* objname);
static int dictgetalt(const NCjson* jdict, const char* name, const char* alt, const NCjson** jvaluep);

/**************************************************/
Expand Down Expand Up @@ -1142,7 +1143,7 @@ define_grp(NC_FILE_INFO_T* file, NC_GRP_INFO_T* grp)
if((stat = NCZ_grpkey(grp,&fullpath))) goto done;

/* Download .zgroup and .zattrs */
if((stat = downloadzarrobj(file,&zgrp->zgroup,fullpath,ZGROUP))) goto done;
if((stat = get_zarrobj(file,&zgrp->zgroup,fullpath,ZGROUP))) goto done;
jgroup = zgrp->zgroup.obj;
jattrs = zgrp->zgroup.atts;

Expand Down Expand Up @@ -1448,7 +1449,7 @@ define_var1(NC_FILE_INFO_T* file, NC_GRP_INFO_T* grp, const char* varname)
goto done;

/* Download */
if((stat = downloadzarrobj(file,&zvar->zarray,varpath,ZARRAY))) goto done;
if((stat = get_zarrobj(file,&zvar->zarray,varpath,ZARRAY))) goto done;
jvar = zvar->zarray.obj;
jatts = zvar->zarray.atts;
assert(jvar == NULL || NCJsort(jvar) == NCJ_DICT);
Expand Down Expand Up @@ -1818,8 +1819,9 @@ ncz_read_superblock(NC_FILE_INFO_T* file, char** nczarrvp, char** zarrfp)
/* Construct grp key */
if((stat = NCZ_grpkey(root,&fullpath))) goto done;

int valid_consolidated = downloadconsolidated(file); //init zinfo->consolidated;
/* Download the root group .zgroup and associated .zattrs */
if((stat = downloadzarrobj(file, &zroot->zgroup, fullpath, ZGROUP))) goto done;
if((stat = get_zarrobj(file, &zroot->zgroup, fullpath, ZGROUP))) goto done;
jzgroup = zroot->zgroup.obj;

/* Look for superblock; first in .zattrs and then in .zgroup */
Expand All @@ -1834,7 +1836,7 @@ ncz_read_superblock(NC_FILE_INFO_T* file, char** nczarrvp, char** zarrfp)
file->no_write = 1;
}

if(jsuper == NULL) {
if(jsuper == NULL && valid_consolidated) {
/* See if this is looks like a NCZarr/Zarr dataset at all
by looking for anything here of the form ".z*" */
if((stat = ncz_validate(file))) goto done;
Expand Down Expand Up @@ -1979,19 +1981,45 @@ searchvars(NCZ_FILE_INFO_T* zfile, NC_GRP_INFO_T* grp, NClist* varnames)
/* Compute the key for the grp */
if((stat = NCZ_grpkey(grp,&grpkey))) goto done;
/* Get the map and search group */
if((stat = nczmap_search(zfile->map,grpkey,matches))) goto done;
for(i=0;i<nclistlength(matches);i++) {
const char* name = nclistget(matches,i);
if(name[0] == NCZM_DOT) continue; /* zarr/nczarr specific */
/* See if name/.zarray exists */
if((stat = nczm_concat(grpkey,name,&varkey))) goto done;
if((stat = nczm_concat(varkey,ZARRAY,&zarray))) goto done;
if((stat = nczmap_exists(zfile->map,zarray)) == NC_NOERR)
nclistpush(varnames,strdup(name));
stat = NC_NOERR;
nullfree(varkey); varkey = NULL;
nullfree(zarray); zarray = NULL;
}
if(zfile->consolidated) {
const char * group = grpkey + (grpkey[0] == '/');
size_t lgroup = strlen(group);

const NCjson * jmetadata = NULL;
NCJdictget(zfile->consolidated,"metadata", &jmetadata);
for(i=0;i<NCJlength(jmetadata);i+=2){
NCjson* jname = NCJith(jmetadata,i);
const char * fullname = NCJstring(jname);
size_t lfullname = strlen(fullname);
if(lfullname < lgroup || \
strncmp(fullname,group,lgroup) || \
(lgroup > 0 && fullname[lgroup] != NCZM_SEP[0]) ) {
continue;
}
char * start = fullname + lgroup + (lgroup>0);
char * end = strchr(start,NCZM_SEP[0]);
if(end == NULL) continue;
size_t lname = end - start;
//Ends with ".zarray"
if(strncmp(ZMETAARRAY,end,sizeof(ZMETAARRAY)) == 0){
nclistpush(varnames,strndup(start,lname));
}
}
}else{
if((stat = nczmap_search(zfile->map,grpkey,matches))) goto done;
for(i=0;i<nclistlength(matches);i++) {
const char* name = nclistget(matches,i);
if(name[0] == NCZM_DOT) continue; /* zarr/nczarr specific */
/* See if name/.zarray exists */
if((stat = nczm_concat(grpkey,name,&varkey))) goto done;
if((stat = nczm_concat(varkey,ZARRAY,&zarray))) goto done;
if((stat = nczmap_exists(zfile->map,zarray)) == NC_NOERR)
nclistpush(varnames,strdup(name));
stat = NC_NOERR;
nullfree(varkey); varkey = NULL;
nullfree(zarray); zarray = NULL;
}
}

done:
nullfree(grpkey);
Expand All @@ -2013,21 +2041,46 @@ searchsubgrps(NCZ_FILE_INFO_T* zfile, NC_GRP_INFO_T* grp, NClist* subgrpnames)

/* Compute the key for the grp */
if((stat = NCZ_grpkey(grp,&grpkey))) goto done;
/* Get the map and search group */
if((stat = nczmap_search(zfile->map,grpkey,matches))) goto done;
for(i=0;i<nclistlength(matches);i++) {
const char* name = nclistget(matches,i);
if(name[0] == NCZM_DOT) continue; /* zarr/nczarr specific */
/* See if name/.zgroup exists */
if((stat = nczm_concat(grpkey,name,&subkey))) goto done;
if((stat = nczm_concat(subkey,ZGROUP,&zgroup))) goto done;
if((stat = nczmap_exists(zfile->map,zgroup)) == NC_NOERR)
nclistpush(subgrpnames,strdup(name));
stat = NC_NOERR;
nullfree(subkey); subkey = NULL;
nullfree(zgroup); zgroup = NULL;
}

if(zfile->consolidated) {
const char * group = grpkey + (grpkey[0] == '/');
size_t lgroup = strlen(group);

const NCjson * jmetadata = NULL;
NCJdictget(zfile->consolidated,"metadata", &jmetadata);
for(i=0;i<NCJlength(jmetadata);i+=2){
NCjson* jname = NCJith(jmetadata,i);
const char * fullname = NCJstring(jname);
size_t lfullname = strlen(fullname);

if(lfullname < lgroup || \
strncmp(fullname,group,lgroup) || \
(lgroup > 0 && fullname[lgroup] != NCZM_SEP[0]) ) {
continue;
}
char * start = fullname + lgroup + (lgroup>0);
char * end = strchr(start,NCZM_SEP[0]);
if(end == NULL) continue;
size_t lname = end - start;
//Ends with "/.zgroup
if(strncmp(ZMETAGROUP,end,sizeof(ZMETAGROUP)) == 0){
nclistpush(subgrpnames,strndup(start,lname));
}
}
}else{/* Get the map and search group */
if((stat = nczmap_search(zfile->map,grpkey,matches))) goto done;
for(i=0;i<nclistlength(matches);i++) {
const char* name = nclistget(matches,i);
if(name[0] == NCZM_DOT) continue; /* zarr/nczarr specific */
/* See if name/.zgroup exists */
if((stat = nczm_concat(grpkey,name,&subkey))) goto done;
if((stat = nczm_concat(subkey,ZGROUP,&zgroup))) goto done;
if((stat = nczmap_exists(zfile->map,zgroup)) == NC_NOERR)
nclistpush(subgrpnames,strdup(name));
stat = NC_NOERR;
nullfree(subkey); subkey = NULL;
nullfree(zgroup); zgroup = NULL;
}
}
done:
nullfree(grpkey);
nullfree(subkey);
Expand Down Expand Up @@ -2587,22 +2640,56 @@ getnczarrkey(NC_OBJ* container, const char* name, const NCjson** jncxxxp)
}

static int
downloadzarrobj(NC_FILE_INFO_T* file, struct ZARROBJ* zobj, const char* fullpath, const char* objname)
downloadconsolidated(NC_FILE_INFO_T* file){
int stat = NC_NOERR;
NCjson * md = NULL;
NCZ_FILE_INFO_T* zinfo = ((NCZ_FILE_INFO_T*)file->format_file_info);
if (!isconsolidaded(zinfo)) {
return NC_ENCZARR;
}
// Download /.zmetadata
if((stat=NCZ_downloadjson(zinfo->map,ZMETADATA,&md))) goto done;
if(md == NULL){
return NC_ENCZARR;
}

zinfo->consolidated = md; md = NULL;
done:
return THROW(stat);
}


static int
get_zarrobj(NC_FILE_INFO_T* file, struct ZARROBJ* zobj, const char* fullpath, const char* objname)
{
int stat = NC_NOERR;
char* key = NULL;
NCZMAP* map = ((NCZ_FILE_INFO_T*)file->format_file_info)->map;

/* Download .zXXX and .zattrs */
nullfree(zobj->prefix);
zobj->prefix = strdup(fullpath);
NCJreclaim(zobj->obj); zobj->obj = NULL;
NCJreclaim(zobj->atts); zobj->obj = NULL;
if((stat = nczm_concat(fullpath,objname,&key))) goto done;
if((stat=NCZ_downloadjson(map,key,&zobj->obj))) goto done;
nullfree(key); key = NULL;
if((stat = nczm_concat(fullpath,ZATTRS,&key))) goto done;
if((stat=NCZ_downloadjson(map,key,&zobj->atts))) goto done;

NCZ_FILE_INFO_T* zinfo = ((NCZ_FILE_INFO_T*)file->format_file_info);
if(zinfo->consolidated){
NCjson * jtmp = NULL;
if(NCJdictget(zinfo->consolidated,"metadata",&jtmp) == 0 && jtmp){
NCjson * tmp = NULL;
if((stat = nczm_concat(fullpath,objname,&key))) goto done;
if((stat=NCJdictget(jtmp,key+(key[0]=='/'),&tmp))) goto done;
if(tmp)NCJclone(tmp,&zobj->obj);
nullfree(key); key = NULL;
if((stat = nczm_concat(fullpath,ZATTRS,&key))) goto done;
if((stat=NCJdictget(jtmp,key+(key[0]=='/'),&tmp))) goto done;
if(tmp)NCJclone(tmp,&zobj->atts);
}
}else{
if((stat = nczm_concat(fullpath,objname,&key))) goto done;
if((stat=NCZ_downloadjson(zinfo->map,key,&zobj->obj))) goto done;
nullfree(key); key = NULL;
if((stat = nczm_concat(fullpath,ZATTRS,&key))) goto done;
if((stat=NCZ_downloadjson(zinfo->map,key,&zobj->atts))) goto done;
}
done:
nullfree(key);
return THROW(stat);
Expand Down

0 comments on commit 1d79947

Please sign in to comment.