Skip to content

Commit

Permalink
fix: using a sync.Map for handling lockers per WATER transport
Browse files Browse the repository at this point in the history
  • Loading branch information
WendelHime committed Nov 19, 2024
1 parent 2079bb0 commit 3638a4b
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 11 deletions.
12 changes: 9 additions & 3 deletions chained/water_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ var httpClient *http.Client

// waterLoadingWASMMutex prevents the WATER implementation to download/load the
// WASM file concurrently.
var waterLoadingWASMMutex = &sync.Mutex{}
var waterTransportLocker sync.Map

func newWaterImpl(dir, addr string, pc *config.ProxyConfig, reportDialCore reportDialCoreFn) (*waterImpl, error) {
ctx := context.Background()
Expand Down Expand Up @@ -98,8 +98,14 @@ func newWaterImpl(dir, addr string, pc *config.ProxyConfig, reportDialCore repor
}

func (d *waterImpl) loadWASM(ctx context.Context, transport string, dir string, wasmAvailableAt string) (io.ReadCloser, error) {
waterLoadingWASMMutex.Lock()
defer waterLoadingWASMMutex.Unlock()
locker, ok := waterTransportLocker.Load(transport)
if !ok {
waterTransportLocker.Store(transport, new(sync.Mutex))
locker, _ = waterTransportLocker.Load(transport)
}

locker.(sync.Locker).Lock()
defer locker.(sync.Locker).Unlock()
vc := newWaterVersionControl(dir)
cli := httpClient
if cli == nil {
Expand Down
20 changes: 12 additions & 8 deletions chained/water_version_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ func newWaterVersionControl(dir string) *waterVersionControl {
// GetWASM returns the WASM file for the given transport
// Remember to Close the io.ReadCloser after using it
func (vc *waterVersionControl) GetWASM(ctx context.Context, transport string, downloader waterWASMDownloader) (io.ReadCloser, error) {
// This function implements the following steps:
// 1. Check if the WASM file exists
// 2. If it does not exist, download it
// 3. If it exists, check if it was loaded correctly by checking if the last-loaded file existis
// 4. If it was not loaded correctly, download it again
// 5. If it was loaded correctly, return the file and mark the file as loaded
path := filepath.Join(vc.dir, transport+".wasm")
log.Debugf("trying to load file %q", path)
f, err := os.Open(path)
Expand All @@ -50,20 +56,20 @@ func (vc *waterVersionControl) GetWASM(ctx context.Context, transport string, do
return response, nil
}

fi, err := f.Stat()
if err != nil {
return nil, log.Errorf("failed loading file info")
lastLoaded, err := os.Open(filepath.Join(vc.dir, transport+".last-loaded"))
if err != nil && !os.IsNotExist(err) {
return nil, log.Errorf("failed to open file %s: %w", transport+".last-loaded", err)
}

if fi.Size() == 0 {
f.Close()
log.Debug("loaded empty WASM file, downloading again")
if errors.Is(err, fs.ErrNotExist) {
log.Debugf("%q WASM file exists but never loaded correctly, downloading again", transport)
response, err := vc.downloadWASM(ctx, transport, downloader)
if err != nil {
return nil, log.Errorf("failed to download WASM file: %w", err)
}
return response, nil
}
defer lastLoaded.Close()

_, err = f.Seek(0, 0)
if err != nil {
Expand All @@ -73,8 +79,6 @@ func (vc *waterVersionControl) GetWASM(ctx context.Context, transport string, do
if err = vc.markUsed(transport); err != nil {
return nil, log.Errorf("failed to update WASM history: %w", err)
}
log.Debugf("WASM file loaded, file size: %d", fi.Size())

return f, nil
}

Expand Down

0 comments on commit 3638a4b

Please sign in to comment.