From 0ff75fe5a07cdf0b8d48e3d1deabfde98863514a Mon Sep 17 00:00:00 2001 From: Di Wang Date: Thu, 14 Nov 2024 21:21:32 +0000 Subject: [PATCH] DAOS-1 test: test dfuse test dfuse Required-githooks: true Signed-off-by: Di Wang --- src/cart/README.env | 6 + src/client/api/init.c | 3 +- src/client/api/metrics.c | 11 +- src/client/dfuse/dfuse.h | 62 +++- src/client/dfuse/dfuse_core.c | 8 +- src/client/dfuse/dfuse_fuseops.c | 6 +- src/client/dfuse/ops/create.c | 8 +- src/client/dfuse/ops/ioctl.c | 2 +- src/client/dfuse/ops/lookup.c | 1 + src/client/dfuse/ops/open.c | 155 ++++---- src/client/dfuse/ops/read.c | 617 +++++++++++++++++++++++++++---- src/client/dfuse/ops/readdir.c | 10 +- src/client/dfuse/ops/write.c | 9 +- src/include/daos/metrics.h | 1 + src/tests/ftest/dfuse/read.py | 4 +- 15 files changed, 721 insertions(+), 182 deletions(-) diff --git a/src/cart/README.env b/src/cart/README.env index b90939c8c72..290d8638043 100644 --- a/src/cart/README.env +++ b/src/cart/README.env @@ -184,6 +184,12 @@ This file lists the environment variables used in CaRT. a file with the pattern /-.csv. As a convenience, setting this variable automatically sets D_CLIENT_METRICS_ENABLE=1. + . D_CLIENT_METRICS_MAX_ENTRY_CNT + Set maximum metrics entry count for client side metrics, which only works if + D_CLIENT_METRICS_ENABLE is set to 1. The value is set to 1024 by default, and it only + needs to be increased if there are more concurrent daos client threads, for example + thread-count for dfuse is more than 20. + D_POST_INIT (server only) Controls the initial number of requests that are posted on context creation. When using a transport that supports multi-recv, also controls the maximum size diff --git a/src/client/api/init.c b/src/client/api/init.c index 831b6c0c482..9a8e80a8a86 100644 --- a/src/client/api/init.c +++ b/src/client/api/init.c @@ -334,6 +334,8 @@ daos_fini(void) D_GOTO(unlock, rc); } + daos_hhash_fini(); + /** clean up all registered per-module metrics */ daos_metrics_fini(); #if BUILD_PIPELINE @@ -354,7 +356,6 @@ daos_fini(void) dc_job_fini(); pl_fini(); - daos_hhash_fini(); daos_debug_fini(); module_initialized = 0; unlock: diff --git a/src/client/api/metrics.c b/src/client/api/metrics.c index 0ee629be16d..d44981ee25d 100644 --- a/src/client/api/metrics.c +++ b/src/client/api/metrics.c @@ -20,11 +20,15 @@ #include #include -#define INIT_JOB_NUM 1024 bool daos_client_metric; bool daos_client_metric_retain; +uint32_t max_entry_cnt = 1024; -#define MAX_IDS_SIZE(num) (num * D_TM_METRIC_SIZE) +static inline uint32_t +client_metrics_shm_size(uint32_t num) +{ + return num * D_TM_METRIC_SIZE; +} /* The client side metrics structure looks like * root/job_id/pid/.... */ @@ -66,8 +70,9 @@ init_root(const char *name, pid_t pid, int flags) key_t key; int rc; + d_getenv_uint(DAOS_CLIENT_METRICS_MAX_ENTRY_CNT, &max_entry_cnt); key = d_tm_cli_pid_key(pid); - rc = d_tm_init_with_name(key, MAX_IDS_SIZE(INIT_JOB_NUM), flags, name); + rc = d_tm_init_with_name(key, client_metrics_shm_size(max_entry_cnt), flags, name); if (rc != 0) { DL_ERROR(rc, "failed to initialize root for %s.", name); return rc; diff --git a/src/client/dfuse/dfuse.h b/src/client/dfuse/dfuse.h index 58847287391..a2ac113b8e9 100644 --- a/src/client/dfuse/dfuse.h +++ b/src/client/dfuse/dfuse.h @@ -137,7 +137,6 @@ struct dfuse_inode_entry; * when EOF is returned to the kernel. If it's still present on release then it's freed then. */ struct dfuse_pre_read { - pthread_mutex_t dra_lock; struct dfuse_event *dra_ev; int dra_rc; }; @@ -149,8 +148,6 @@ struct dfuse_obj_hdl { /** the DFS object handle. Not created for directories. */ dfs_obj_t *doh_obj; - struct dfuse_pre_read *doh_readahead; - /** the inode entry for the file */ struct dfuse_inode_entry *doh_ie; @@ -176,27 +173,26 @@ struct dfuse_obj_hdl { * to false), the expected position of the next read and a boolean for if EOF has been * detected. */ - off_t doh_linear_read_pos; - bool doh_linear_read; - bool doh_linear_read_eof; - - /** True if caching is enabled for this file. */ - bool doh_caching; - + off_t doh_linear_read_pos; + uint64_t doh_linear_read:1, + doh_linear_read_eof:1, + /** True if caching is enabled for this file. */ + doh_caching:1, /* True if the file handle is writable - used for cache invalidation */ - bool doh_writeable; - + doh_writeable:1, /* Track possible kernel cache of readdir on this directory */ /* Set to true if there is any reason the kernel will not use this directory handle as the * basis for a readdir cache. Includes if seekdir or rewind are used. */ - bool doh_kreaddir_invalid; + doh_kreaddir_invalid:1, /* Set to true if readdir calls are made on this handle */ - bool doh_kreaddir_started; + doh_kreaddir_started:1, /* Set to true if readdir calls reach EOF made on this handle */ - bool doh_kreaddir_finished; + doh_kreaddir_finished:1, + doh_evict_on_close:1, + doh_readahead_inflight:1; - bool doh_evict_on_close; + int doh_flags; }; /* Readdir support. @@ -401,10 +397,18 @@ struct dfuse_event { d_iov_t de_iov; d_sg_list_t de_sgl; d_list_t de_list; + + /* Position in a list of events, this will either be off ie->ie_open_reads or + * de->de_read_slaves + */ + d_list_t de_read_list; + /* List of slave events */ + d_list_t de_read_slaves; struct dfuse_eq *de_eqt; union { struct dfuse_obj_hdl *de_oh; struct dfuse_inode_entry *de_ie; + struct read_chunk_data *de_cd; }; off_t de_req_position; /**< The file position requested by fuse */ union { @@ -474,6 +478,10 @@ struct dfuse_pool { ACTION(OPEN) \ ACTION(PRE_READ) \ ACTION(READ) \ + ACTION(READ_EOF_M) \ + ACTION(READ_CON) \ + ACTION(READ_BUCKET) \ + ACTION(READ_BUCKET_M) \ ACTION(WRITE) \ ACTION(STATFS) @@ -1011,6 +1019,13 @@ struct dfuse_inode_entry { /* Entry on the evict list */ d_list_t ie_evict_entry; + + /* ie read lock is used to protect ie_open_reads and ie_readahead */ + d_list_t ie_open_reads; + struct dfuse_pre_read *ie_readahead; + pthread_mutex_t ie_read_lock; + + struct read_chunk_core *ie_chunk; }; /* Flush write-back cache writes to a inode. It does this by waiting for and then releasing an @@ -1108,6 +1123,13 @@ dfuse_compute_inode(struct dfuse_cont *dfs, void dfuse_cache_evict_dir(struct dfuse_info *dfuse_info, struct dfuse_inode_entry *ie); +/* Free any read chunk data for an inode. + * + * Returns true if feature was used. + */ +bool +read_chunk_close(struct dfuse_inode_entry *ie); + /* Metadata caching functions. */ /* Mark the cache as up-to-date from now */ @@ -1170,8 +1192,13 @@ dfuse_cache_evict(struct dfuse_inode_entry *ie); bool dfuse_dcache_get_valid(struct dfuse_inode_entry *ie, double max_age); +int +dfuse_pre_read_init(struct dfuse_info *info, struct dfuse_inode_entry *ie, + struct dfuse_event **evp); + void -dfuse_pre_read(struct dfuse_info *dfuse_info, struct dfuse_obj_hdl *oh); +dfuse_pre_read(struct dfuse_info *dfuse_info, struct dfuse_obj_hdl *oh, + struct dfuse_event *ev); int check_for_uns_ep(struct dfuse_info *dfuse_info, struct dfuse_inode_entry *ie, char *attr, @@ -1183,6 +1210,7 @@ dfuse_ie_init(struct dfuse_info *dfuse_info, struct dfuse_inode_entry *ie); #define dfuse_ie_free(_di, _ie) \ do { \ atomic_fetch_sub_relaxed(&(_di)->di_inode_count, 1); \ + D_MUTEX_DESTROY(&_ie->ie_read_lock); \ D_FREE(_ie); \ } while (0) diff --git a/src/client/dfuse/dfuse_core.c b/src/client/dfuse/dfuse_core.c index 6397b283e97..b56cf8ac64d 100644 --- a/src/client/dfuse/dfuse_core.c +++ b/src/client/dfuse/dfuse_core.c @@ -1044,7 +1044,6 @@ dfuse_mcache_get_valid(struct dfuse_inode_entry *ie, double max_age, double *tim if (timeout) *timeout = time_left; } - return use; } @@ -1237,7 +1236,7 @@ dfuse_open_handle_init(struct dfuse_info *dfuse_info, struct dfuse_obj_hdl *oh, { oh->doh_dfs = ie->ie_dfs->dfs_ns; oh->doh_ie = ie; - oh->doh_linear_read = true; + oh->doh_linear_read = 1; oh->doh_linear_read_pos = 0; atomic_init(&oh->doh_il_calls, 0); atomic_init(&oh->doh_write_count, 0); @@ -1254,7 +1253,9 @@ dfuse_ie_init(struct dfuse_info *dfuse_info, struct dfuse_inode_entry *ie) atomic_init(&ie->ie_linear_read, true); atomic_fetch_add_relaxed(&dfuse_info->di_inode_count, 1); D_INIT_LIST_HEAD(&ie->ie_evict_entry); + D_INIT_LIST_HEAD(&ie->ie_open_reads); D_RWLOCK_INIT(&ie->ie_wlock, 0); + D_MUTEX_INIT(&ie->ie_read_lock, 0); } void @@ -1317,6 +1318,9 @@ dfuse_read_event_size(void *arg, size_t size) ev->de_sgl.sg_nr = 1; } + /* D_INIT_LIST_HEAD(&ev->de_read_list); */ + D_INIT_LIST_HEAD(&ev->de_read_slaves); + rc = daos_event_init(&ev->de_ev, ev->de_eqt->de_eq, NULL); if (rc != -DER_SUCCESS) { return false; diff --git a/src/client/dfuse/dfuse_fuseops.c b/src/client/dfuse/dfuse_fuseops.c index 980e71ac9af..960dfe1ca74 100644 --- a/src/client/dfuse/dfuse_fuseops.c +++ b/src/client/dfuse/dfuse_fuseops.c @@ -1,5 +1,5 @@ /** - * (C) Copyright 2016-2023 Intel Corporation. + * (C) Copyright 2016-2024 Intel Corporation. * * SPDX-License-Identifier: BSD-2-Clause-Patent */ @@ -88,7 +88,7 @@ dfuse_fuse_init(void *arg, struct fuse_conn_info *conn) DFUSE_TRA_INFO(dfuse_info, "kernel readdir cache support compiled in"); conn->want |= FUSE_CAP_READDIRPLUS; - conn->want |= FUSE_CAP_READDIRPLUS_AUTO; + conn->want &= ~FUSE_CAP_READDIRPLUS_AUTO; #ifdef FUSE_CAP_CACHE_SYMLINKS conn->want |= FUSE_CAP_CACHE_SYMLINKS; @@ -197,7 +197,7 @@ df_ll_setattr(fuse_req_t req, fuse_ino_t ino, struct stat *attr, int to_set, if (handle) { inode = handle->doh_ie; - handle->doh_linear_read = false; + handle->doh_linear_read = 0; DFUSE_IE_STAT_ADD(inode, DS_FSETATTR); } else { diff --git a/src/client/dfuse/ops/create.c b/src/client/dfuse/ops/create.c index a389c253795..e24642f8e1d 100644 --- a/src/client/dfuse/ops/create.c +++ b/src/client/dfuse/ops/create.c @@ -1,5 +1,5 @@ /** - * (C) Copyright 2016-2023 Intel Corporation. + * (C) Copyright 2016-2024 Intel Corporation. * * SPDX-License-Identifier: BSD-2-Clause-Patent */ @@ -170,7 +170,7 @@ dfuse_cb_create(fuse_req_t req, struct dfuse_inode_entry *parent, const char *na if (flags & O_APPEND) flags &= ~O_APPEND; - oh->doh_linear_read = false; + oh->doh_linear_read = 0; if (!dfuse_info->di_multi_user) { rc = _dfuse_mode_update(req, parent, &mode); @@ -192,7 +192,7 @@ dfuse_cb_create(fuse_req_t req, struct dfuse_inode_entry *parent, const char *na if (rc) D_GOTO(release, rc); - oh->doh_writeable = true; + oh->doh_writeable = 1; if (dfs->dfc_data_timeout != 0 || ie->ie_dfs->dfc_data_otoc) { if (fi->flags & O_DIRECT) @@ -205,7 +205,7 @@ dfuse_cb_create(fuse_req_t req, struct dfuse_inode_entry *parent, const char *na fi_out.direct_io = 0; if (!fi_out.direct_io) - oh->doh_caching = true; + oh->doh_caching = 1; fi_out.fh = (uint64_t)oh; diff --git a/src/client/dfuse/ops/ioctl.c b/src/client/dfuse/ops/ioctl.c index 8d073b859ff..08bfae59e82 100644 --- a/src/client/dfuse/ops/ioctl.c +++ b/src/client/dfuse/ops/ioctl.c @@ -353,7 +353,7 @@ handle_cont_query_ioctl(struct dfuse_obj_hdl *oh, fuse_req_t req, const void *in static void handle_cont_evict_ioctl(fuse_req_t req, struct dfuse_obj_hdl *oh) { - oh->doh_evict_on_close = true; + oh->doh_evict_on_close = 1; handle_cont_qe_ioctl_helper(oh, req, NULL); } diff --git a/src/client/dfuse/ops/lookup.c b/src/client/dfuse/ops/lookup.c index 8e168102cb4..76fbfc32e42 100644 --- a/src/client/dfuse/ops/lookup.c +++ b/src/client/dfuse/ops/lookup.c @@ -266,6 +266,7 @@ dfuse_cb_lookup(fuse_req_t req, struct dfuse_inode_entry *parent, DFUSE_TRA_UP(ie, parent, "inode"); dfuse_ie_init(dfuse_info, ie); + dfuse_dcache_set_time(ie); ie->ie_parent = parent->ie_stat.st_ino; ie->ie_dfs = parent->ie_dfs; diff --git a/src/client/dfuse/ops/open.c b/src/client/dfuse/ops/open.c index 46a49cf0767..ade81e9650d 100644 --- a/src/client/dfuse/ops/open.c +++ b/src/client/dfuse/ops/open.c @@ -13,11 +13,11 @@ dfuse_cb_open(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi) struct dfuse_info *dfuse_info = fuse_req_userdata(req); struct dfuse_inode_entry *ie; struct dfuse_obj_hdl *oh; - struct fuse_file_info fi_out = {0}; - int rc; - bool prefetch = false; - bool preread = false; - int flags; + struct dfuse_event *ev; + struct fuse_file_info fi_out = {0}; + int rc; + bool preread = false; + int flags; ie = dfuse_inode_lookup_nf(dfuse_info, ino); @@ -45,17 +45,13 @@ dfuse_cb_open(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi) LOG_FLAGS(ie, fi->flags); flags = fi->flags; + oh->doh_flags = flags; if (flags & O_APPEND) flags &= ~O_APPEND; - /** duplicate the file handle for the fuse handle */ - rc = dfs_dup(ie->ie_dfs->dfs_ns, ie->ie_obj, flags, &oh->doh_obj); - if (rc) - D_GOTO(err, rc); - if ((fi->flags & O_ACCMODE) != O_RDONLY) - oh->doh_writeable = true; + oh->doh_writeable = 1; if (ie->ie_dfs->dfc_data_timeout != 0) { if (fi->flags & O_DIRECT) @@ -66,13 +62,33 @@ dfuse_cb_open(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi) * This should mean that pre-read is only used on the first read, and on files * which pre-existed in the container. */ - - if (atomic_load_relaxed(&ie->ie_open_count) > 0 || + if (atomic_load_relaxed(&ie->ie_open_count) > 1 || ((ie->ie_dcache_last_update.tv_sec != 0) && dfuse_dcache_get_valid(ie, ie->ie_dfs->dfc_data_timeout))) { fi_out.keep_cache = 1; } else { - prefetch = true; + D_MUTEX_LOCK(&ie->ie_read_lock); + if (((oh->doh_parent_dir && atomic_load_relaxed(&oh->doh_parent_dir->ie_linear_read)) || + (ie->ie_stat.st_size > 0 && ie->ie_stat.st_size <= DFUSE_MAX_PRE_READ))) { + if (ie->ie_readahead == NULL) { + D_ALLOC_PTR(ie->ie_readahead); + if (ie->ie_readahead == NULL) { + D_MUTEX_UNLOCK(&ie->ie_read_lock); + D_GOTO(err, rc = -DER_NOMEM); + } + /* Add the read extent to the list to make sure the following read + * will check the readahead list first. + */ + rc = dfuse_pre_read_init(dfuse_info, ie, &ev); + if (rc != 0) { + D_MUTEX_UNLOCK(&ie->ie_read_lock); + D_GOTO(err, rc); + } + oh->doh_readahead_inflight = 1; + preread = true; + } + } + D_MUTEX_UNLOCK(&ie->ie_read_lock); } } else if (ie->ie_dfs->dfc_data_otoc) { /* Open to close caching, this allows the use of shared mmap */ @@ -89,33 +105,22 @@ dfuse_cb_open(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi) fi_out.direct_io = 0; if (!fi_out.direct_io) - oh->doh_caching = true; + oh->doh_caching = 1; fi_out.fh = (uint64_t)oh; + atomic_fetch_add_relaxed(&ie->ie_open_count, 1); /* * dfs_dup() just locally duplicates the file handle. If we have * O_TRUNC flag, we need to truncate the file manually. */ if (fi->flags & O_TRUNC) { rc = dfs_punch(ie->ie_dfs->dfs_ns, ie->ie_obj, 0, DFS_MAX_FSIZE); - if (rc) + if (rc) { + atomic_fetch_sub_relaxed(&ie->ie_open_count, 1); D_GOTO(err, rc); - dfuse_dcache_evict(oh->doh_ie); - } - - atomic_fetch_add_relaxed(&ie->ie_open_count, 1); - - /* Enable this for files up to the max read size. */ - if (prefetch && oh->doh_parent_dir && - atomic_load_relaxed(&oh->doh_parent_dir->ie_linear_read) && ie->ie_stat.st_size > 0 && - ie->ie_stat.st_size <= DFUSE_MAX_PRE_READ) { - D_ALLOC_PTR(oh->doh_readahead); - if (oh->doh_readahead) { - D_MUTEX_INIT(&oh->doh_readahead->dra_lock, 0); - D_MUTEX_LOCK(&oh->doh_readahead->dra_lock); - preread = true; } + dfuse_dcache_evict(oh->doh_ie); } DFUSE_REPLY_OPEN(oh, req, &fi_out); @@ -124,10 +129,12 @@ dfuse_cb_open(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi) * release from completing which also holds open the inode. */ if (preread) - dfuse_pre_read(dfuse_info, oh); + dfuse_pre_read(dfuse_info, oh, ev); return; err: + if (preread) + oh->doh_readahead_inflight = 0; dfuse_oh_free(dfuse_info, oh); DFUSE_REPLY_ERR_RAW(ie, req, rc); } @@ -137,8 +144,10 @@ dfuse_cb_release(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi) { struct dfuse_info *dfuse_info = fuse_req_userdata(req); struct dfuse_obj_hdl *oh = (struct dfuse_obj_hdl *)fi->fh; - struct dfuse_inode_entry *ie = NULL; - int rc; + struct dfuse_inode_entry *ie = oh->doh_ie; + bool evict_on_close = false; + int rc = 0; + uint32_t oc; uint32_t il_calls; /* Perform the opposite of what the ioctl call does, always change the open handle count @@ -147,27 +156,7 @@ dfuse_cb_release(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi) DFUSE_TRA_DEBUG(oh, "Closing %d", oh->doh_caching); - DFUSE_IE_WFLUSH(oh->doh_ie); - - if (oh->doh_readahead) { - struct dfuse_event *ev; - - /* Grab this lock first to ensure that the read cb has been completed. The - * callback might register an error and release ev so do not read it's value - * until after this has completed. - */ - D_MUTEX_LOCK(&oh->doh_readahead->dra_lock); - D_MUTEX_UNLOCK(&oh->doh_readahead->dra_lock); - - ev = oh->doh_readahead->dra_ev; - - D_MUTEX_DESTROY(&oh->doh_readahead->dra_lock); - if (ev) { - daos_event_fini(&ev->de_ev); - d_slab_release(ev->de_eqt->de_pre_read_slab, ev); - } - D_FREE(oh->doh_readahead); - } + DFUSE_IE_WFLUSH(ie); /* If the file was read from then set the data cache time for future use, however if the * file was written to then evict the metadata cache. @@ -183,37 +172,71 @@ dfuse_cb_release(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi) if (oh->doh_caching) { if (il_calls == 0) { DFUSE_TRA_DEBUG(oh, "Evicting metadata cache, setting data cache"); - dfuse_mcache_evict(oh->doh_ie); - dfuse_dcache_set_time(oh->doh_ie); + dfuse_mcache_evict(ie); + dfuse_dcache_set_time(ie); } else { DFUSE_TRA_DEBUG(oh, "Evicting cache"); - dfuse_cache_evict(oh->doh_ie); + dfuse_cache_evict(ie); } } - atomic_fetch_sub_relaxed(&oh->doh_ie->ie_open_write_count, 1); + atomic_fetch_sub_relaxed(&ie->ie_open_write_count, 1); } else { if (oh->doh_caching) { if (il_calls == 0) { DFUSE_TRA_DEBUG(oh, "Saving data cache"); - dfuse_dcache_set_time(oh->doh_ie); + dfuse_dcache_set_time(ie); } else { DFUSE_TRA_DEBUG(oh, "Evicting cache"); - dfuse_cache_evict(oh->doh_ie); + dfuse_cache_evict(ie); } } } - DFUSE_TRA_DEBUG(oh, "il_calls %d, caching %d,", il_calls, oh->doh_caching); if (il_calls != 0) { - atomic_fetch_sub_relaxed(&oh->doh_ie->ie_il_count, 1); + atomic_fetch_sub_relaxed(&ie->ie_il_count, 1); + } + +wait_readahead: + /* Wait inflight readahead RPC finished before release */ + D_MUTEX_LOCK(&ie->ie_read_lock); + if (oh->doh_readahead_inflight) { + D_MUTEX_UNLOCK(&ie->ie_read_lock); + goto wait_readahead; + } + D_MUTEX_UNLOCK(&ie->ie_read_lock); + + oc = atomic_fetch_sub_relaxed(&ie->ie_open_count, 1); + DFUSE_TRA_DEBUG(oh, "il_calls %d, caching %d, open count %d", il_calls, oh->doh_caching, + oc - 1); + if (oc == 1) { + struct dfuse_pre_read *readahead; + struct dfuse_event *ev; + + if (read_chunk_close(ie)) + oh->doh_linear_read = true; + + D_MUTEX_LOCK(&ie->ie_read_lock); + readahead = ie->ie_readahead; + ie->ie_readahead = NULL; + D_MUTEX_UNLOCK(&ie->ie_read_lock); + + if (readahead != NULL) { + ev = readahead->dra_ev; + if (ev) { + daos_event_fini(&ev->de_ev); + d_slab_release(ev->de_eqt->de_pre_read_slab, ev); + } + D_FREE(readahead); + } } - atomic_fetch_sub_relaxed(&oh->doh_ie->ie_open_count, 1); if (oh->doh_evict_on_close) { - ie = oh->doh_ie; + evict_on_close = true; atomic_fetch_add_relaxed(&ie->ie_ref, 1); } - rc = dfs_release(oh->doh_obj); + if (oh->doh_obj != NULL) + rc = dfs_release(oh->doh_obj); + if (rc == 0) { DFUSE_REPLY_ZERO_OH(oh, req); } else { @@ -242,7 +265,7 @@ dfuse_cb_release(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi) dfuse_inode_decref(dfuse_info, oh->doh_parent_dir); } - if (ie) { + if (evict_on_close) { rc = fuse_lowlevel_notify_inval_entry(dfuse_info->di_session, ie->ie_parent, ie->ie_name, strnlen(ie->ie_name, NAME_MAX)); diff --git a/src/client/dfuse/ops/read.c b/src/client/dfuse/ops/read.c index 1edf0a959ee..9edb0e79332 100644 --- a/src/client/dfuse/ops/read.c +++ b/src/client/dfuse/ops/read.c @@ -7,8 +7,25 @@ #include "dfuse_common.h" #include "dfuse.h" +/* Global lock for all chunk read operations. Each inode has a struct read_chunk_core * entry + * which is checked for NULL and set whilst holding this lock. Each read_chunk_core then has + * a list of read_chunk_data and again, this lock protects all lists on all inodes. This avoids + * the need for a per-inode lock which for many files would consume considerable memory but does + * mean there is potentially some lock contention. The lock however is only held for list + * manipulation, no dfs or kernel calls are made whilst holding the lock. + * + * Also used for ie_open_reads list. + */ +static pthread_mutex_t rc_lock = PTHREAD_MUTEX_INITIALIZER; + +struct read_chunk_core { + d_list_t entries; + size_t file_size; + bool seen_eof; +}; + static void -dfuse_cb_read_complete(struct dfuse_event *ev) +cb_read_helper(struct dfuse_event *ev, void *buff) { struct dfuse_obj_hdl *oh = ev->de_oh; @@ -19,35 +36,69 @@ dfuse_cb_read_complete(struct dfuse_event *ev) if (oh->doh_linear_read) { if (oh->doh_linear_read_pos != ev->de_req_position) { - oh->doh_linear_read = false; + oh->doh_linear_read = 0; } else { oh->doh_linear_read_pos = ev->de_req_position + ev->de_len; if (ev->de_len < ev->de_req_len) { - oh->doh_linear_read_eof = true; + oh->doh_linear_read_eof = 1; } } } - if (ev->de_len == 0) { - DFUSE_TRA_DEBUG(oh, "%#zx-%#zx requested (EOF)", ev->de_req_position, - ev->de_req_position + ev->de_req_len - 1); - - DFUSE_REPLY_BUFQ(oh, ev->de_req, ev->de_iov.iov_buf, ev->de_len); - D_GOTO(release, 0); - } - - if (ev->de_len == ev->de_req_len) + if (ev->de_len == ev->de_req_len) { DFUSE_TRA_DEBUG(oh, "%#zx-%#zx read", ev->de_req_position, ev->de_req_position + ev->de_req_len - 1); - else - DFUSE_TRA_DEBUG(oh, "%#zx-%#zx read %#zx-%#zx not read (truncated)", - ev->de_req_position, ev->de_req_position + ev->de_len - 1, - ev->de_req_position + ev->de_len, - ev->de_req_position + ev->de_req_len - 1); + } else { + struct dfuse_inode_entry *ie = oh->doh_ie; + + if (ie->ie_chunk) { + ie->ie_chunk->seen_eof = true; + if (ev->de_len == 0) + ie->ie_chunk->file_size = ev->de_req_position; + else + ie->ie_chunk->file_size = ev->de_req_position + ev->de_len - 1; + } + if (ev->de_len == 0) + DFUSE_TRA_DEBUG(oh, "%#zx-%#zx requested (EOF)", ev->de_req_position, + ev->de_req_position + ev->de_req_len - 1); + else + DFUSE_TRA_DEBUG(oh, "%#zx-%#zx read %#zx-%#zx not read (truncated)", + ev->de_req_position, ev->de_req_position + ev->de_len - 1, + ev->de_req_position + ev->de_len, + ev->de_req_position + ev->de_req_len - 1); + } - DFUSE_REPLY_BUFQ(oh, ev->de_req, ev->de_iov.iov_buf, ev->de_len); + if (ev->de_req) + DFUSE_REPLY_BUFQ(oh, ev->de_req, buff, ev->de_len); release: daos_event_fini(&ev->de_ev); +} + +static void +dfuse_cb_read_complete(struct dfuse_event *ev) +{ + struct dfuse_event *evs, *evn; + struct dfuse_inode_entry *ie = ev->de_oh->doh_ie; + + D_MUTEX_LOCK(&ie->ie_read_lock); + d_list_del_init(&ev->de_read_list); + d_list_for_each_entry(evs, &ev->de_read_slaves, de_read_list) { + DFUSE_TRA_DEBUG(ev->de_oh, "concurrent network read %p", evs->de_oh); + evs->de_len = min(ev->de_len, evs->de_req_len); + evs->de_ev.ev_error = ev->de_ev.ev_error; + DFUSE_IE_STAT_ADD(ie, DS_READ_CON); + cb_read_helper(evs, ev->de_iov.iov_buf); + } + + cb_read_helper(ev, ev->de_iov.iov_buf); + + d_list_for_each_entry_safe(evs, evn, &ev->de_read_slaves, de_read_list) { + d_list_del_init(&evs->de_read_list); + d_slab_restock(evs->de_eqt->de_read_slab); + d_slab_release(evs->de_eqt->de_read_slab, evs); + } + D_MUTEX_UNLOCK(&ie->ie_read_lock); + d_slab_restock(ev->de_eqt->de_read_slab); d_slab_release(ev->de_eqt->de_read_slab, ev); } @@ -59,35 +110,41 @@ dfuse_readahead_reply(fuse_req_t req, size_t len, off_t position, struct dfuse_o { size_t reply_len; - if (oh->doh_readahead->dra_rc) { - DFUSE_REPLY_ERR_RAW(oh, req, oh->doh_readahead->dra_rc); + if (oh->doh_ie->ie_readahead->dra_rc) { + DFUSE_TRA_DEBUG(oh, "failed readahead"); + DFUSE_REPLY_ERR_RAW(oh, req, oh->doh_ie->ie_readahead->dra_rc); return true; } - if (!oh->doh_linear_read || oh->doh_readahead->dra_ev == NULL) { + if (!oh->doh_linear_read || oh->doh_ie->ie_readahead->dra_ev == NULL) { DFUSE_TRA_DEBUG(oh, "Pre read disabled"); return false; } + if (!d_list_empty(&oh->doh_ie->ie_readahead->dra_ev->de_read_list)) { + DFUSE_TRA_DEBUG(oh, "still on read %lu", oh->doh_ie->ie_stat.st_ino); + return false; + } + if (((position % K128) == 0) && ((len % K128) == 0)) { DFUSE_TRA_INFO(oh, "allowing out-of-order pre read"); /* Do not closely track the read position in this case, just the maximum, * later checks will determine if the file is read to the end. */ oh->doh_linear_read_pos = max(oh->doh_linear_read_pos, position + len); - } else if (oh->doh_linear_read_pos != position) { + } else if (oh->doh_linear_read_pos != position && 0 /* always enable pre read */) { DFUSE_TRA_DEBUG(oh, "disabling pre read"); - daos_event_fini(&oh->doh_readahead->dra_ev->de_ev); - d_slab_release(oh->doh_readahead->dra_ev->de_eqt->de_pre_read_slab, - oh->doh_readahead->dra_ev); - oh->doh_readahead->dra_ev = NULL; + daos_event_fini(&oh->doh_ie->ie_readahead->dra_ev->de_ev); + d_slab_release(oh->doh_ie->ie_readahead->dra_ev->de_eqt->de_pre_read_slab, + oh->doh_ie->ie_readahead->dra_ev); + oh->doh_ie->ie_readahead->dra_ev = NULL; return false; } else { oh->doh_linear_read_pos = position + len; } - if (position + len >= oh->doh_readahead->dra_ev->de_readahead_len) { - oh->doh_linear_read_eof = true; + if (position + len >= oh->doh_ie->ie_readahead->dra_ev->de_readahead_len) { + oh->doh_linear_read_eof = 1; } /* At this point there is a buffer of known length that contains the data, and a read @@ -96,21 +153,328 @@ dfuse_readahead_reply(fuse_req_t req, size_t len, off_t position, struct dfuse_o * It the attempted read is smaller than the buffer it will be met in full. */ - if (position + len < oh->doh_readahead->dra_ev->de_readahead_len) { + if (position + len < oh->doh_ie->ie_readahead->dra_ev->de_readahead_len) { reply_len = len; DFUSE_TRA_DEBUG(oh, "%#zx-%#zx read", position, position + reply_len - 1); } else { /* The read will be truncated */ - reply_len = oh->doh_readahead->dra_ev->de_readahead_len - position; + reply_len = oh->doh_ie->ie_readahead->dra_ev->de_readahead_len - position; DFUSE_TRA_DEBUG(oh, "%#zx-%#zx read %#zx-%#zx not read (truncated)", position, position + reply_len - 1, position + reply_len, position + len - 1); } DFUSE_IE_STAT_ADD(oh->doh_ie, DS_PRE_READ); - DFUSE_REPLY_BUFQ(oh, req, oh->doh_readahead->dra_ev->de_iov.iov_buf + position, reply_len); + DFUSE_REPLY_BUFQ(oh, req, oh->doh_ie->ie_readahead->dra_ev->de_iov.iov_buf + position, reply_len); return true; } +static struct dfuse_eq * +pick_eqt(struct dfuse_info *dfuse_info) +{ + uint64_t eqt_idx; + + eqt_idx = atomic_fetch_add_relaxed(&dfuse_info->di_eqt_idx, 1); + return &dfuse_info->di_eqt[eqt_idx % dfuse_info->di_eq_count]; +} + +/* Chunk read and coalescing + * + * This code attempts to predict application and kernel I/O patterns and preemptively read file + * data ahead of when it's requested. + * + * For some kernels read I/O size is limited to 128k when using the page cache or 1Mb when using + * direct I/O. To get around the performance impact of them detect when well aligned 128k reads + * are received and read an entire buffers worth, then for future requests the data should already + * be in cache. + * + * This code is entered when caching is enabled and reads are correctly size/aligned and not in the + * last CHUNK_SIZE of a file. When open then the inode contains a single read_chunk_core pointer + * and this contains a list of read_chunk_data entries, one for each bucket. Buckets where all + * slots have been requested are remove from the list and closed when the last request is completed. + * + * TODO: Currently there is no code to remove partially read buckets from the list so reading + * one slot every chunk would leave the entire file contents in memory until close and mean long + * list traversal times. + */ + +#define CHUNK_SIZE (1024 * 1024) + +#define MAX_REQ_COUNT 256 + +struct read_chunk_data { + struct dfuse_event *ev; + bool slot_done[8]; + d_list_t list; + uint64_t bucket; + struct dfuse_eq *eqt; + int rc; + int entered; + bool complete; + int idx; + struct { + int slot; + fuse_req_t req; + struct dfuse_obj_hdl *oh; + } reqs[MAX_REQ_COUNT]; +}; + +/* Called when the last open file handle on a inode is closed. This needs to free everything which + * is complete and for anything that isn't flag it for deletion in the callback. + * + * Returns true if the feature was used. + */ +bool +read_chunk_close(struct dfuse_inode_entry *ie) +{ + struct read_chunk_data *cd, *cdn; + bool rcb = false; + + D_MUTEX_LOCK(&rc_lock); + if (!ie->ie_chunk) + goto out; + + rcb = true; + + d_list_for_each_entry_safe(cd, cdn, &ie->ie_chunk->entries, list) { + D_ASSERT(cd->complete); + d_list_del(&cd->list); + d_slab_release(cd->eqt->de_read_slab, cd->ev); + D_FREE(cd); + } + D_FREE(ie->ie_chunk); +out: + D_MUTEX_UNLOCK(&rc_lock); + return rcb; +} + +static void +chunk_cb(struct dfuse_event *ev) +{ + struct read_chunk_data *cd = ev->de_cd; + + cd->rc = ev->de_ev.ev_error; + + if (cd->rc == 0 && (ev->de_len != CHUNK_SIZE)) { + cd->rc = EIO; + DS_WARN(cd->rc, "Unexpected short read bucket %ld (%#zx) expected %i got %zi", + cd->bucket, cd->bucket * CHUNK_SIZE, CHUNK_SIZE, ev->de_len); + } + + daos_event_fini(&ev->de_ev); + + /* Mark as complete so no more get put on list */ + D_MUTEX_LOCK(&rc_lock); + cd->complete = true; + D_MUTEX_UNLOCK(&rc_lock); + + for (int i = 0; i < cd->idx; i++) { + int slot; + fuse_req_t req; + size_t position; + + slot = cd->reqs[i].slot; + req = cd->reqs[i].req; + + DFUSE_TRA_DEBUG(cd->reqs[i].oh, "Replying idx %d/%d for %ld[%d]", i, cd->idx, + cd->bucket, slot); + + position = (cd->bucket * CHUNK_SIZE) + (slot * K128); + DFUSE_IE_STAT_ADD(cd->reqs[i].oh->doh_ie, DS_READ_BUCKET); + if (cd->rc != 0) { + DFUSE_REPLY_ERR_RAW(cd->reqs[i].oh, req, cd->rc); + } else { + DFUSE_TRA_DEBUG(cd->reqs[i].oh, "%#zx-%#zx read", position, + position + K128 - 1); + DFUSE_REPLY_BUFQ(cd->reqs[i].oh, req, ev->de_iov.iov_buf + (slot * K128), + K128); + } + } +} + +/* Submit a read to dfs. + * + * Returns true on success. + */ +static bool +chunk_fetch(fuse_req_t req, struct dfuse_inode_entry *ie, struct read_chunk_data *cd) +{ + struct dfuse_info *dfuse_info = fuse_req_userdata(req); + struct dfuse_event *ev; + struct dfuse_eq *eqt; + int rc; + daos_off_t position = cd->bucket * CHUNK_SIZE; + + eqt = pick_eqt(dfuse_info); + + ev = d_slab_acquire(eqt->de_read_slab); + if (ev == NULL) { + cd->rc = ENOMEM; + return false; + } + + ev->de_iov.iov_len = CHUNK_SIZE; + ev->de_req = req; + ev->de_cd = cd; + ev->de_sgl.sg_nr = 1; + ev->de_len = 0; + ev->de_complete_cb = chunk_cb; + + cd->ev = ev; + cd->eqt = eqt; + + rc = dfs_read(ie->ie_dfs->dfs_ns, ie->ie_obj, &ev->de_sgl, position, &ev->de_len, + &ev->de_ev); + if (rc != 0) + goto err; + + /* Send a message to the async thread to wake it up and poll for events */ + sem_post(&eqt->de_sem); + + /* Now ensure there are more descriptors for the next request */ + d_slab_restock(eqt->de_read_slab); + + return true; + +err: + daos_event_fini(&ev->de_ev); + d_slab_release(eqt->de_read_slab, ev); + cd->ev = NULL; + cd->rc = rc; + return false; +} + +/* Try and do a bulk read. + * + * Returns true if it was able to handle the read. + */ +static bool +chunk_read(fuse_req_t req, size_t len, off_t position, struct dfuse_obj_hdl *oh) +{ + struct dfuse_inode_entry *ie = oh->doh_ie; + struct read_chunk_data *cd; + off_t last; + uint64_t bucket; + int slot; + bool submit = false; + bool rcb; + + if (len != K128) + return false; + + if ((position % K128) != 0) + return false; + + last = D_ALIGNUP(position + len - 1, CHUNK_SIZE); + + if (last > oh->doh_ie->ie_stat.st_size) + return false; + + bucket = D_ALIGNUP(position + len, CHUNK_SIZE); + bucket = (bucket / CHUNK_SIZE) - 1; + + slot = (position / K128) % 8; + + DFUSE_TRA_DEBUG(oh, "read bucket %#zx-%#zx last %#zx size %#zx bucket %ld slot %d", + position, position + len - 1, last, ie->ie_stat.st_size, bucket, slot); + + D_MUTEX_LOCK(&rc_lock); + if (ie->ie_chunk == NULL) { + D_ALLOC_PTR(ie->ie_chunk); + if (ie->ie_chunk == NULL) + goto err; + D_INIT_LIST_HEAD(&ie->ie_chunk->entries); + } + + d_list_for_each_entry(cd, &ie->ie_chunk->entries, list) + if (cd->bucket == bucket) { + goto found; + } + + D_ALLOC_PTR(cd); + if (cd == NULL) + goto err; + + cd->bucket = bucket; + submit = true; + + d_list_add(&cd->list, &ie->ie_chunk->entries); + +found: + +#if 0 + /* Keep track of which slots have been requested, but allow for multiple reads per + * slot */ + if (!cd->slot_done[slot]) { + cd->entered++; + cd->slot_done[slot] = true; + } else { + DFUSE_TRA_DEBUG(oh, "concurrent read on %ld[%d] complete %d", bucket, slot, + cd->complete); + } + + if (cd->entered < 8) { + /* Put on front of list for efficient searching */ + DFUSE_TRA_DEBUG(oh, "%d slots requested, putting on front of list", cd->entered); + d_list_add(&cd->list, &ie->ie_chunk->entries); + } +#endif + + if (submit) { + cd->reqs[0].req = req; + cd->reqs[0].oh = oh; + cd->reqs[0].slot = slot; + cd->idx++; + + D_MUTEX_UNLOCK(&rc_lock); + + DFUSE_TRA_DEBUG(oh, "submit for bucket %ld[%d]", bucket, slot); + + rcb = chunk_fetch(req, ie, cd); + } else if (cd->complete) { + D_MUTEX_UNLOCK(&rc_lock); + + if (cd->rc != 0) { + /* Don't pass fuse an error here, rather return false and + * the read will be tried over the network. + */ + rcb = false; + } else { + oh->doh_linear_read_pos = max(oh->doh_linear_read_pos, position + K128); + + DFUSE_IE_STAT_ADD(oh->doh_ie, DS_READ_BUCKET_M); + DFUSE_TRA_DEBUG(oh, "%#zx-%#zx read", position, position + K128 - 1); + DFUSE_REPLY_BUFQ(oh, req, cd->ev->de_iov.iov_buf + (slot * K128), K128); + rcb = true; + } + + } else if (cd->idx < MAX_REQ_COUNT) { + DFUSE_TRA_DEBUG(oh, "Using idx %d for %ld[%d]", cd->idx, bucket, slot); + + cd->reqs[cd->idx].req = req; + cd->reqs[cd->idx].oh = oh; + cd->reqs[cd->idx].slot = slot; + cd->idx++; + D_MUTEX_UNLOCK(&rc_lock); + rcb = true; + } else { + D_MUTEX_UNLOCK(&rc_lock); + + /* Handle concurrent reads of the same slot, this wasn't + * expected but can happen so for now reject it at this + * level and have read perform a separate I/O for this slot. + * TODO: Handle DAOS-16686 here. + */ + rcb = false; + DFUSE_TRA_WARNING(oh, "Too many outstanding reads"); + } + + return rcb; + +err: + D_MUTEX_UNLOCK(&rc_lock); + return false; +} + void dfuse_cb_read(fuse_req_t req, fuse_ino_t ino, size_t len, off_t position, struct fuse_file_info *fi) { @@ -120,53 +484,70 @@ dfuse_cb_read(fuse_req_t req, fuse_ino_t ino, size_t len, off_t position, struct struct dfuse_eq *eqt; int rc; struct dfuse_event *ev; - uint64_t eqt_idx; + struct dfuse_inode_entry *ie = oh->doh_ie; + bool reached_eof = false; - DFUSE_IE_STAT_ADD(oh->doh_ie, DS_READ); + DFUSE_IE_STAT_ADD(ie, DS_READ); if (oh->doh_linear_read_eof && position == oh->doh_linear_read_pos) { + reached_eof = true; + } else if (ie->ie_chunk && ie->ie_chunk->seen_eof) { + if (position >= ie->ie_chunk->file_size) + reached_eof = true; + } + + if (reached_eof) { DFUSE_TRA_DEBUG(oh, "Returning EOF early without round trip %#zx", position); - oh->doh_linear_read_eof = false; + oh->doh_linear_read_eof = 0; +#if 0 + /* Release uses this to set the bit on the directory so do not turn it off here + * but I do need to check why it was set before. + */ oh->doh_linear_read = false; +#endif - if (oh->doh_readahead) { - D_MUTEX_LOCK(&oh->doh_readahead->dra_lock); - ev = oh->doh_readahead->dra_ev; - - oh->doh_readahead->dra_ev = NULL; - D_MUTEX_UNLOCK(&oh->doh_readahead->dra_lock); + if (ie->ie_readahead) { + D_MUTEX_LOCK(&ie->ie_read_lock); + ev = ie->ie_readahead->dra_ev; + ie->ie_readahead->dra_ev = NULL; + D_MUTEX_UNLOCK(&ie->ie_read_lock); if (ev) { daos_event_fini(&ev->de_ev); d_slab_release(ev->de_eqt->de_pre_read_slab, ev); - DFUSE_IE_STAT_ADD(oh->doh_ie, DS_PRE_READ); + DFUSE_IE_STAT_ADD(ie, DS_PRE_READ); } } + DFUSE_IE_STAT_ADD(ie, DS_READ_EOF_M); DFUSE_REPLY_BUFQ(oh, req, NULL, 0); return; } - if (oh->doh_readahead) { + if (ie->ie_readahead) { bool replied; - D_MUTEX_LOCK(&oh->doh_readahead->dra_lock); + D_MUTEX_LOCK(&ie->ie_read_lock); replied = dfuse_readahead_reply(req, len, position, oh); - D_MUTEX_UNLOCK(&oh->doh_readahead->dra_lock); + D_MUTEX_UNLOCK(&ie->ie_read_lock); if (replied) return; } - eqt_idx = atomic_fetch_add_relaxed(&dfuse_info->di_eqt_idx, 1); - eqt = &dfuse_info->di_eqt[eqt_idx % dfuse_info->di_eq_count]; + if (chunk_read(req, len, position, oh)) + return; + + eqt = pick_eqt(dfuse_info); ev = d_slab_acquire(eqt->de_read_slab); - if (ev == NULL) - D_GOTO(err, rc = ENOMEM); + if (ev == NULL) { + DFUSE_REPLY_ERR_RAW(oh, req, ENOMEM); + return; + } - if (oh->doh_ie->ie_truncated && position + len < oh->doh_ie->ie_stat.st_size && - ((oh->doh_ie->ie_start_off == 0 && oh->doh_ie->ie_end_off == 0) || - position >= oh->doh_ie->ie_end_off || position + len <= oh->doh_ie->ie_start_off)) { + if (ie->ie_truncated && position + len < ie->ie_stat.st_size && + ((ie->ie_start_off == 0 && ie->ie_end_off == 0) || + position >= ie->ie_end_off || position + len <= ie->ie_start_off)) { DFUSE_TRA_DEBUG(oh, "Returning zeros"); mock_read = true; } @@ -182,6 +563,8 @@ dfuse_cb_read(fuse_req_t req, fuse_ino_t ino, size_t len, off_t position, struct ev->de_iov.iov_buf_len); } + if (position + len > ie->ie_stat.st_size) + len = ie->ie_stat.st_size - position; ev->de_iov.iov_len = len; ev->de_req = req; ev->de_sgl.sg_nr = 1; @@ -197,11 +580,41 @@ dfuse_cb_read(fuse_req_t req, fuse_ino_t ino, size_t len, off_t position, struct ev->de_complete_cb = dfuse_cb_read_complete; - DFUSE_IE_WFLUSH(oh->doh_ie); + DFUSE_IE_WFLUSH(ie); + + /* Not check for outstanding read which matches */ + D_MUTEX_LOCK(&ie->ie_read_lock); + { + struct dfuse_event *evc; + + /* Check for concurrent overlapping reads and if so then add request to current op + */ + d_list_for_each_entry(evc, &ie->ie_open_reads, de_read_list) { + if (ev->de_req_position >= evc->de_req_position && + ev->de_req_len <= evc->de_req_len) { + d_list_add(&ev->de_read_list, &evc->de_read_slaves); + D_MUTEX_UNLOCK(&ie->ie_read_lock); + return; + } + } + d_list_add_tail(&ev->de_read_list, &ie->ie_open_reads); + } + D_MUTEX_UNLOCK(&ie->ie_read_lock); + + if (oh->doh_obj == NULL) { + /** duplicate the file handle for the fuse handle */ + rc = dfs_dup(oh->doh_dfs, oh->doh_ie->ie_obj, oh->doh_flags, &oh->doh_obj); + if (rc) { + ev->de_ev.ev_error = rc; + dfuse_cb_read_complete(ev); + return; + } + } rc = dfs_read(oh->doh_dfs, oh->doh_obj, &ev->de_sgl, position, &ev->de_len, &ev->de_ev); if (rc != 0) { - D_GOTO(err, rc); + ev->de_ev.ev_error = rc; + dfuse_cb_read_complete(ev); return; } @@ -212,26 +625,44 @@ dfuse_cb_read(fuse_req_t req, fuse_ino_t ino, size_t len, off_t position, struct d_slab_restock(eqt->de_read_slab); return; -err: - DFUSE_REPLY_ERR_RAW(oh, req, rc); - if (ev) { - daos_event_fini(&ev->de_ev); - d_slab_release(eqt->de_read_slab, ev); - } } static void dfuse_cb_pre_read_complete(struct dfuse_event *ev) { struct dfuse_obj_hdl *oh = ev->de_oh; + struct dfuse_event *evs, *evn; + struct dfuse_inode_entry *ie = oh->doh_ie; + struct dfuse_pre_read *readahead; + + D_ASSERTF(ie != NULL, "%p", ev); + readahead = ie->ie_readahead; + D_ASSERTF(readahead != NULL, "%p", ev); + + D_MUTEX_LOCK(&ie->ie_read_lock); + ie->ie_readahead->dra_rc = ev->de_ev.ev_error; - oh->doh_readahead->dra_rc = ev->de_ev.ev_error; + d_list_del_init(&ev->de_read_list); + + d_list_for_each_entry(evs, &ev->de_read_slaves, de_read_list) { + DFUSE_TRA_DEBUG(ev->de_oh, "concurrent network read %p", evs->de_oh); + evs->de_len = min(ev->de_len, evs->de_req_len); + evs->de_ev.ev_error = ev->de_ev.ev_error; + DFUSE_IE_STAT_ADD(ev->de_oh->doh_ie, DS_READ_CON); + cb_read_helper(evs, ev->de_iov.iov_buf); + } + + d_list_for_each_entry_safe(evs, evn, &ev->de_read_slaves, de_read_list) { + d_list_del_init(&evs->de_read_list); + d_slab_restock(evs->de_eqt->de_read_slab); + d_slab_release(evs->de_eqt->de_read_slab, evs); + } if (ev->de_ev.ev_error != 0) { - oh->doh_readahead->dra_rc = ev->de_ev.ev_error; + readahead->dra_rc = ev->de_ev.ev_error; daos_event_fini(&ev->de_ev); d_slab_release(ev->de_eqt->de_pre_read_slab, ev); - oh->doh_readahead->dra_ev = NULL; + readahead->dra_ev = NULL; } /* If the length is not as expected then the file has been modified since the last stat so @@ -241,38 +672,63 @@ dfuse_cb_pre_read_complete(struct dfuse_event *ev) if (ev->de_len != ev->de_readahead_len) { daos_event_fini(&ev->de_ev); d_slab_release(ev->de_eqt->de_pre_read_slab, ev); - oh->doh_readahead->dra_ev = NULL; + readahead->dra_ev = NULL; } - - D_MUTEX_UNLOCK(&oh->doh_readahead->dra_lock); + oh->doh_readahead_inflight = 0; + D_MUTEX_UNLOCK(&ie->ie_read_lock); } -void -dfuse_pre_read(struct dfuse_info *dfuse_info, struct dfuse_obj_hdl *oh) +int +dfuse_pre_read_init(struct dfuse_info *dfuse_info, struct dfuse_inode_entry *ie, + struct dfuse_event **evp) { struct dfuse_eq *eqt; - int rc; struct dfuse_event *ev; - uint64_t eqt_idx; - size_t len = oh->doh_ie->ie_stat.st_size; - - eqt_idx = atomic_fetch_add_relaxed(&dfuse_info->di_eqt_idx, 1); - eqt = &dfuse_info->di_eqt[eqt_idx % dfuse_info->di_eq_count]; + size_t len = ie->ie_stat.st_size; + eqt = pick_eqt(dfuse_info); ev = d_slab_acquire(eqt->de_pre_read_slab); if (ev == NULL) - D_GOTO(err, rc = ENOMEM); + return ENOMEM; ev->de_iov.iov_len = len; ev->de_req = 0; ev->de_sgl.sg_nr = 1; - ev->de_oh = oh; ev->de_readahead_len = len; ev->de_req_position = 0; ev->de_complete_cb = dfuse_cb_pre_read_complete; - oh->doh_readahead->dra_ev = ev; + ie->ie_readahead->dra_ev = ev; + + /* NB: the inode_entry has been locked by ie_read_lock */ + d_list_add_tail(&ev->de_read_list, &ie->ie_open_reads); + + *evp = ev; + return 0; +} + +void +dfuse_pre_read(struct dfuse_info *dfuse_info, struct dfuse_obj_hdl *oh, struct dfuse_event *ev) +{ + struct dfuse_eq *eqt; + int rc; + size_t len = oh->doh_ie->ie_stat.st_size; + + DFUSE_TRA_DEBUG(oh, "ino %llu prefetch len %zd event %p", + (unsigned long long)oh->doh_ie->ie_stat.st_ino, len, &ev->de_ev); + + eqt = pick_eqt(dfuse_info); + + if (oh->doh_obj == NULL) { + /** duplicate the file handle for the fuse handle */ + rc = dfs_dup(oh->doh_dfs, oh->doh_ie->ie_obj, oh->doh_flags, &oh->doh_obj); + if (rc) + D_GOTO(err, rc); + } + ev->de_oh = oh; + ev->de_complete_cb = dfuse_cb_pre_read_complete; + oh->doh_ie->ie_readahead->dra_ev = ev; rc = dfs_read(oh->doh_dfs, oh->doh_obj, &ev->de_sgl, 0, &ev->de_len, &ev->de_ev); if (rc != 0) { D_GOTO(err, rc); @@ -287,11 +743,14 @@ dfuse_pre_read(struct dfuse_info *dfuse_info, struct dfuse_obj_hdl *oh) return; err: - oh->doh_readahead->dra_rc = rc; + D_MUTEX_LOCK(&oh->doh_ie->ie_read_lock); + oh->doh_ie->ie_readahead->dra_rc = rc; + oh->doh_readahead_inflight = 0; if (ev) { daos_event_fini(&ev->de_ev); + DFUSE_TRA_DEBUG(oh, "release ev %p", ev); d_slab_release(eqt->de_pre_read_slab, ev); - oh->doh_readahead->dra_ev = NULL; + oh->doh_ie->ie_readahead->dra_ev = NULL; } - D_MUTEX_UNLOCK(&oh->doh_readahead->dra_lock); + D_MUTEX_UNLOCK(&oh->doh_ie->ie_read_lock); } diff --git a/src/client/dfuse/ops/readdir.c b/src/client/dfuse/ops/readdir.c index 10ea454fd42..e08a04410d5 100644 --- a/src/client/dfuse/ops/readdir.c +++ b/src/client/dfuse/ops/readdir.c @@ -354,9 +354,9 @@ dfuse_do_readdir(struct dfuse_info *dfuse_info, fuse_req_t req, struct dfuse_obj */ if (offset == 0) { if (oh->doh_kreaddir_started) { - oh->doh_kreaddir_invalid = true; + oh->doh_kreaddir_invalid = 1; } - oh->doh_kreaddir_started = true; + oh->doh_kreaddir_started = 1; } D_RWLOCK_RDLOCK(&hdl->drh_lock); @@ -502,6 +502,7 @@ dfuse_do_readdir(struct dfuse_info *dfuse_info, fuse_req_t req, struct dfuse_obj } set_entry_params(&entry, ie); + dfuse_mcache_set_time(ie); written = FADP(req, &reply_buff[buff_offset], size - buff_offset, drc->drc_name, &entry, drc->drc_next_offset); @@ -566,7 +567,7 @@ dfuse_do_readdir(struct dfuse_info *dfuse_info, fuse_req_t req, struct dfuse_obj DFUSE_TRA_DEBUG(oh, "Seeking from offset %#lx to %#lx", oh->doh_rd_offset, offset); - oh->doh_kreaddir_invalid = true; + oh->doh_kreaddir_invalid = 1; /* Drop if shared */ if (oh->doh_rd->drh_caching) { @@ -733,6 +734,7 @@ dfuse_do_readdir(struct dfuse_info *dfuse_info, fuse_req_t req, struct dfuse_obj } set_entry_params(&entry, ie); + dfuse_mcache_set_time(ie); written = FADP(req, &reply_buff[buff_offset], size - buff_offset, dre->dre_name, &entry, dre->dre_next_offset); @@ -847,7 +849,7 @@ dfuse_cb_readdir(fuse_req_t req, struct dfuse_obj_hdl *oh, size_t size, off_t of * reply early in this case. */ if (offset == READDIR_EOD) { - oh->doh_kreaddir_finished = true; + oh->doh_kreaddir_finished = 1; DFUSE_TRA_DEBUG(oh, "End of directory %#lx", offset); size = 0; diff --git a/src/client/dfuse/ops/write.c b/src/client/dfuse/ops/write.c index 943f9b75e78..f9fbc5751a5 100644 --- a/src/client/dfuse/ops/write.c +++ b/src/client/dfuse/ops/write.c @@ -38,7 +38,7 @@ dfuse_cb_write(fuse_req_t req, fuse_ino_t ino, struct fuse_bufvec *bufv, off_t p DFUSE_IE_STAT_ADD(oh->doh_ie, DS_WRITE); - oh->doh_linear_read = false; + oh->doh_linear_read = 0; eqt_idx = atomic_fetch_add_relaxed(&dfuse_info->di_eqt_idx, 1); @@ -101,6 +101,13 @@ dfuse_cb_write(fuse_req_t req, fuse_ino_t ino, struct fuse_bufvec *bufv, off_t p if (len + position > oh->doh_ie->ie_stat.st_size) oh->doh_ie->ie_stat.st_size = len + position; + if (oh->doh_obj == NULL) { + /** duplicate the file handle for the fuse handle */ + rc = dfs_dup(oh->doh_dfs, oh->doh_ie->ie_obj, oh->doh_flags, &oh->doh_obj); + if (rc) + D_GOTO(err, rc); + } + rc = dfs_write(oh->doh_dfs, oh->doh_obj, &ev->de_sgl, position, &ev->de_ev); if (rc != 0) D_GOTO(err, rc); diff --git a/src/include/daos/metrics.h b/src/include/daos/metrics.h index 690602e9790..61966ae5b6d 100644 --- a/src/include/daos/metrics.h +++ b/src/include/daos/metrics.h @@ -22,6 +22,7 @@ #define DAOS_CLIENT_METRICS_DUMP_DIR "D_CLIENT_METRICS_DUMP_DIR" #define DAOS_CLIENT_METRICS_ENABLE "D_CLIENT_METRICS_ENABLE" #define DAOS_CLIENT_METRICS_RETAIN "D_CLIENT_METRICS_RETAIN" +#define DAOS_CLIENT_METRICS_MAX_ENTRY_CNT "D_CLIENT_METRICS_MAX_ENTRY_CNT" extern bool daos_client_metric; extern bool daos_client_metric_retain; diff --git a/src/tests/ftest/dfuse/read.py b/src/tests/ftest/dfuse/read.py index 607b8f99f9f..71a13a14044 100644 --- a/src/tests/ftest/dfuse/read.py +++ b/src/tests/ftest/dfuse/read.py @@ -23,7 +23,9 @@ def test_dfuse_pre_read(self): Read one large file entirely using pre-read. Read a second smaller file to ensure that the first file leaves the flag enabled. - :avocado: tags=all,full_regression + TODO: Check this with 128k I/O size and non-128K I/O size. + + :avocado: tags=all,daily_regression :avocado: tags=vm :avocado: tags=dfuse :avocado: tags=DFusePreReadTest,test_dfuse_pre_read