From 055635cfe4b65e31f964f910c1fb53d101c4c217 Mon Sep 17 00:00:00 2001 From: muXxer Date: Fri, 1 Dec 2023 19:42:33 +0100 Subject: [PATCH] Improve locking --- .golangci.yml | 2 + components/app/app.go | 2 +- components/faucet/component.go | 7 +- pkg/faucet/faucet.go | 262 ++++++++++++++++++++++----------- 4 files changed, 177 insertions(+), 96 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index d5d30a1..1381ee2 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -16,6 +16,8 @@ linters-settings: locale: US staticcheck: checks: ["all"] + nlreturn: + block-size: 3 stylecheck: initialisms: ["ACL", "API", "ASCII", "CPU", "CSS", "DNS", "EOF", "GUID", "HTML", "HTTP", "HTTPS", "ID", "IP", "JSON", "QPS", "RAM", "RPC", "SLA", "SMTP", "SQL", "SSH", "TCP", "TLS", "TTL", "UDP", "UI", "GID", "UID", "UUID", "URI", "URL", "UTF8", "VM", "XML", "XMPP", "XSRF", "XSS", "SIP", "RTP", "AMQP", "DB", "TS"] diff --git a/components/app/app.go b/components/app/app.go index 831e082..af95268 100644 --- a/components/app/app.go +++ b/components/app/app.go @@ -13,7 +13,7 @@ var ( Name = "inx-faucet" // Version of the app. - Version = "2.0.0-alpha.14" + Version = "2.0.0-alpha.15" ) func App() *app.App { diff --git a/components/faucet/component.go b/components/faucet/component.go index ac5e6e5..a87d7b0 100644 --- a/components/faucet/component.go +++ b/components/faucet/component.go @@ -306,12 +306,9 @@ func run() error { consumedOutputs[output.OutputID] = types.Void } - err := deps.Faucet.ApplyAcceptedTransaction(createdOutputs, consumedOutputs) - if err != nil { - deps.ShutdownHandler.SelfShutdown(fmt.Sprintf("faucet plugin hit a critical error while applying new accepted transaction: %s", err.Error()), true) - } + deps.Faucet.ApplyAcceptedTransaction(createdOutputs, consumedOutputs) - return err + return nil }); err != nil { deps.ShutdownHandler.SelfShutdown(fmt.Sprintf("Listening to AcceptedTransactions failed, error: %s", err), false) } diff --git a/pkg/faucet/faucet.go b/pkg/faucet/faucet.go index 3a22ed4..25e12aa 100644 --- a/pkg/faucet/faucet.go +++ b/pkg/faucet/faucet.go @@ -3,6 +3,7 @@ package faucet import ( "context" + "fmt" "time" "github.com/labstack/echo/v4" @@ -119,7 +120,7 @@ type EnqueueResponse struct { // Faucet is used to issue transaction to users that requested funds via a REST endpoint. type Faucet struct { // lock used to secure the state of the faucet. - syncutils.Mutex + syncutils.RWMutex // the logger used to log events. *logger.WrappedLogger // used to access the global daemon. @@ -130,7 +131,7 @@ type Faucet struct { // used to fetch metadata of a transaction from the node. fetchTransactionMetadataFunc FetchTransactionMetadataFunc // used to collect the unlockable outputs and the balance of the faucet. - // write lock must be acquired outside. + // write lock must be acquired outside because we read from queueMap and we want to set the faucet balance without modifications to the map. collectUnlockableFaucetOutputsAndBalanceFuncWithoutLocking CollectUnlockableFaucetOutputsAndBalanceFunc // used to compute the unlockable balance of an address. computeUnlockableAddressBalanceFunc ComputeUnlockableAddressBalanceFunc @@ -309,7 +310,7 @@ func New( }, } - // write lock must be acquired outside. + // write lock must be acquired outside because we read from queueMap and we want to set the faucet balance without modifications to the map faucet.collectUnlockableFaucetOutputsAndBalanceFuncWithoutLocking = func() ([]UTXOBasicOutput, iotago.BaseToken, error) { // get all outputs of the faucet unspentOutputs, err := collectUnlockableFaucetOutputsFunc() @@ -400,10 +401,7 @@ func (f *Faucet) Enqueue(bech32Addr string) (*EnqueueResponse, error) { return nil, ierrors.Wrap(echo.ErrInternalServerError, "Faucet node is not synchronized/healthy. Please try again later!") } - f.Lock() - defer f.Unlock() - - if _, exists := f.queueMap[bech32Addr]; exists { + if exists := f.isAlreadyinQueue(bech32Addr); exists { //nolint:stylecheck,revive // this error message is shown to the user return nil, ierrors.Wrap(httpserver.ErrInvalidParameter, "Address is already in the queue.") } @@ -419,6 +417,11 @@ func (f *Faucet) Enqueue(bech32Addr string) (*EnqueueResponse, error) { } } + // we already need to lock here to have the correct faucet balance + // and we need to add the request to the queueMap + f.Lock() + defer f.Unlock() + if baseTokenAmount > f.faucetBalance { //nolint:stylecheck,revive // this error message is shown to the user return nil, ierrors.Wrap(echo.ErrInternalServerError, "Faucet does not have enough funds to process your request. Please try again later!") @@ -475,6 +478,16 @@ func (f *Faucet) parseBech32Address(bech32Addr string) (iotago.Address, error) { return bech32Address, nil } +// isAlreadyinQueue checks if the given address is already in the queue. +func (f *Faucet) isAlreadyinQueue(bech32Addr string) bool { + f.RLock() + defer f.RUnlock() + + _, exists := f.queueMap[bech32Addr] + + return exists +} + // clearRequestWithoutLocking clear the old request from the map. // this is necessary to be able to send a new request to the same address. // write lock must be acquired outside. @@ -709,9 +722,9 @@ func (f *Faucet) createTransactionBuilder(api iotago.API, unspentOutputs []UTXOB return txBuilder, consumedInputs, remainderOutputIndex } -// sendFaucetBlock creates a faucet transaction payload and sends it to the block issuer. +// sendFaucetBlockWithoutLocking creates a faucet transaction payload and sends it to the block issuer. // write lock must be acquired outside. -func (f *Faucet) sendFaucetBlock(ctx context.Context, unspentOutputs []UTXOBasicOutput, batchedRequests []*queueItem) error { +func (f *Faucet) sendFaucetBlockWithoutLocking(ctx context.Context, unspentOutputs []UTXOBasicOutput, batchedRequests []*queueItem) error { api := f.apiProvider.CommittedAPI() txBuilder, consumedInputs, remainderOutputIndex := f.createTransactionBuilder(api, unspentOutputs, batchedRequests) @@ -743,8 +756,8 @@ func (f *Faucet) sendFaucetBlock(ctx context.Context, unspentOutputs []UTXOBasic return nil } -// computeAndSetFaucetBalance computes the faucet balance minus the storage deposit for a single basic output. -func (f *Faucet) computeAndSetFaucetBalance() error { +// computeAndSetInitialFaucetBalance computes the faucet balance minus the storage deposit for a single basic output. +func (f *Faucet) computeAndSetInitialFaucetBalance() error { f.Lock() defer f.Unlock() @@ -760,13 +773,15 @@ func (f *Faucet) computeAndSetFaucetBalance() error { // collectRequestsAndSendFaucetBlock collects the requests and sends a faucet block. func (f *Faucet) collectRequestsAndSendFaucetBlock(ctx context.Context) error { - f.Lock() - defer f.Unlock() - f.LogDebug("entering collectRequestsAndSendFaucetBlock...") + defer f.LogDebug("leaving collectRequestsAndSendFaucetBlock...") + + f.RLock() + pendingTx := f.pendingTransaction + f.RUnlock() // check if there is a pending transaction before issuing the next one - if f.pendingTransaction != nil { + if pendingTx != nil { f.LogDebugf("skip processing of new requests because a pending tx was found, blockID: %s, txID: %s", f.pendingTransaction.BlockID, f.pendingTransaction.TransactionID) select { @@ -774,7 +789,7 @@ func (f *Faucet) collectRequestsAndSendFaucetBlock(ctx context.Context) error { // faucet was stopped return nil case <-time.After(time.Second): - // wait until the next loop + // cooldown return nil } } @@ -797,7 +812,8 @@ func (f *Faucet) collectRequestsAndSendFaucetBlock(ctx context.Context) error { f.LogDebugf("collected %d requests", len(batchedRequests)) - processRequests := func() ([]UTXOBasicOutput, []*queueItem, error) { + // write lock must be acquired outside + processRequestsWithoutLocking := func() ([]UTXOBasicOutput, []*queueItem, error) { unspentOutputs, balance, err := f.collectUnlockableFaucetOutputsAndBalanceFuncWithoutLocking() if err != nil { return nil, nil, err @@ -814,7 +830,11 @@ func (f *Faucet) collectRequestsAndSendFaucetBlock(ctx context.Context) error { return unspentOutputs, processableRequests, nil } - unspentOutputs, processableRequests, err := processRequests() + // we need to acquire a write lock here to be able to modify the requests in the queue + f.Lock() + defer f.Unlock() + + unspentOutputs, processableRequests, err := processRequestsWithoutLocking() if err != nil { if !ierrors.Is(err, ErrNothingToProcess) { if IsCriticalError(err) != nil { @@ -838,7 +858,7 @@ func (f *Faucet) collectRequestsAndSendFaucetBlock(ctx context.Context) error { f.LogDebugf(" processable request %d, address: %s, amount: %d", i, processableRequest.Bech32, processableRequest.BaseTokenAmount) } - if err := f.sendFaucetBlock(ctx, unspentOutputs, processableRequests); err != nil { + if err := f.sendFaucetBlockWithoutLocking(ctx, unspentOutputs, processableRequests); err != nil { if IsCriticalError(err) != nil { // error is a critical error // => stop the faucet @@ -856,7 +876,7 @@ func (f *Faucet) collectRequestsAndSendFaucetBlock(ctx context.Context) error { func (f *Faucet) RunFaucetLoop(ctx context.Context) error { // set initial faucet balance - if err := f.computeAndSetFaucetBalance(); err != nil { + if err := f.computeAndSetInitialFaucetBalance(); err != nil { return CriticalError(ierrors.Errorf("reading faucet address balance failed: %s, error: %w", f.address.Bech32(f.apiProvider.CommittedAPI().ProtocolParameters().Bech32HRP()), err)) } @@ -885,58 +905,94 @@ func (f *Faucet) RunFaucetLoop(ctx context.Context) error { // If a problem is found, all requests are readded to the queue. func (f *Faucet) checkPendingTransactionState() { f.LogDebug("entering checkPendingTransactionState...") + defer f.LogDebug("leaving checkPendingTransactionState...") - f.Lock() - defer f.Unlock() + // nolint: nonamedreturns // easier to read in this case + checkPendingTransaction := func(pendingTx *pendingTransaction) (clearPending bool, readdPending bool, logMessage string, softError error) { + if pendingTx == nil { + // no pending transaction so there is no need for additional checks + return false, false, "no pending transaction found", nil + } - pendingTx := f.pendingTransaction + metadata, err := f.fetchTransactionMetadataFunc(pendingTx.TransactionID) + if err != nil { + // an error occurred => re-add the items to the queue and delete the pending transaction + return false, true, "", ierrors.Errorf("failed to fetch metadata of the pending transaction, blockID: %s, txID: %s", pendingTx.BlockID, pendingTx.TransactionID) + } + + if metadata == nil { + // metadata unknown, this can only happen if the block was orphaned. + // => re-add the items to the queue and delete the pending transaction + return false, true, "", ierrors.Errorf("metadata of the pending transaction is unknown, blockID: %s, txID: %s", pendingTx.BlockID, pendingTx.TransactionID) + } - if pendingTx == nil { - // no transaction pending so there is no need for additional checks - f.LogDebug("checkPendingTransactionState: no pending transaction found") + switch metadata.TransactionState { + case api.TransactionStateNoTransaction: + // transaction is not known, so the block must have been filtered + // => re-add the items to the queue and delete the pending transaction + return false, true, "", ierrors.Errorf("metadata of the pending transaction is no transaction, blockID: %s, txID: %s", pendingTx.BlockID, pendingTx.TransactionID) - return + case api.TransactionStatePending: + // transaction is still pending + // => do nothing + return false, false, fmt.Sprintf("transaction still pending, blockID: %s, txID: %s", pendingTx.BlockID, pendingTx.TransactionID), nil + + case api.TransactionStateAccepted, api.TransactionStateConfirmed, api.TransactionStateFinalized: + // transaction was confirmed + // => delete the requests and the pending transaction + return true, false, fmt.Sprintf("transaction successful, blockID: %s, txID: %s", pendingTx.BlockID, pendingTx.TransactionID), nil + + case api.TransactionStateFailed: + // transaction failed + // => re-add the items to the queue and delete the pending transaction + return false, true, "", ierrors.Errorf("transaction failed, blockID: %s, txID: %s, reason: %d", pendingTx.BlockID, pendingTx.TransactionID, metadata.TransactionFailureReason) + + default: + // unknown transaction state + panic(ierrors.Errorf("unknown transaction state: %d", metadata.TransactionState)) + } } - metadata, err := f.fetchTransactionMetadataFunc(pendingTx.TransactionID) - if err != nil { - // an error occurred => re-add the items to the queue and delete the pending transaction - f.logSoftError(ierrors.Errorf("failed to fetch metadata of the pending transaction, blockID: %s, txID: %s", pendingTx.BlockID, pendingTx.TransactionID)) - f.readdPendingRequestsWithoutLocking() + f.RLock() + pendingTx := f.pendingTransaction + f.RUnlock() + + clearPending, readdPending, logMessage, softError := checkPendingTransaction(pendingTx) + if !(clearPending || readdPending) { + // no pending transaction or transaction is still pending + if softError != nil { + f.logSoftError(ierrors.Wrap(softError, "checkPendingTransactionState failed")) + } + + if logMessage != "" { + f.LogDebugf("checkPendingTransactionState: %s", logMessage) + } return } - if metadata == nil { - // metadata unknown, this can only happen if the block was orphaned. - // => re-add the items to the queue and delete the pending transaction - f.logSoftError(ierrors.Errorf("metadata of the pending transaction is unknown, blockID: %s, txID: %s", pendingTx.BlockID, pendingTx.TransactionID)) - f.readdPendingRequestsWithoutLocking() + // we need to acquire a write lock here and check again if there is a pending transaction. + f.Lock() + defer f.Unlock() - return + if pendingTx != f.pendingTransaction { + // the pending transaction changed, check again + clearPending, readdPending, logMessage, softError = checkPendingTransaction(f.pendingTransaction) } - switch metadata.TransactionState { - case api.TransactionStateNoTransaction: - // transaction is not known, so the block must have been filtered - // => re-add the items to the queue and delete the pending transaction - f.logSoftError(ierrors.Errorf("metadata of the pending transaction is no transaction, blockID: %s, txID: %s", pendingTx.BlockID, pendingTx.TransactionID)) - f.readdPendingRequestsWithoutLocking() + if softError != nil { + f.logSoftError(ierrors.Wrap(softError, "checkPendingTransactionState failed")) + } - case api.TransactionStatePending: - // transaction is still pending - f.LogDebugf("checkPendingTransactionState: transaction still pending, blockID: %s, txID: %s", pendingTx.BlockID, pendingTx.TransactionID) + if logMessage != "" { + f.LogDebugf("checkPendingTransactionState: %s", logMessage) + } - case api.TransactionStateAccepted, api.TransactionStateConfirmed, api.TransactionStateFinalized: - // transaction was confirmed - // => delete the requests and the pending transaction - f.LogDebugf("checkPendingTransactionState: transaction successful, blockID: %s, txID: %s", pendingTx.BlockID, pendingTx.TransactionID) + if clearPending { f.clearPendingRequestsWithoutLocking() - - case api.TransactionStateFailed: - // transaction failed - // => re-add the items to the queue and delete the pending transaction - f.logSoftError(ierrors.Errorf("transaction failed, blockID: %s, txID: %s, reason: %d", pendingTx.BlockID, pendingTx.TransactionID, metadata.TransactionFailureReason)) + return + } + if readdPending { f.readdPendingRequestsWithoutLocking() } } @@ -944,52 +1000,78 @@ func (f *Faucet) checkPendingTransactionState() { // ApplyAcceptedTransaction applies an accepted transaction to the faucet. // If there is a pending transaction, it is checked if the transaction was confirmed or conflicting. // If a conflict is found, all requests are readded to the queue. -func (f *Faucet) ApplyAcceptedTransaction(createdOutputs map[iotago.OutputID]struct{}, consumedOutputs map[iotago.OutputID]struct{}) error { +func (f *Faucet) ApplyAcceptedTransaction(createdOutputs map[iotago.OutputID]struct{}, consumedOutputs map[iotago.OutputID]struct{}) { f.LogDebug("entering ApplyAcceptedTransaction...") + defer f.LogDebug("leaving ApplyAcceptedTransaction...") - f.Lock() - defer f.Unlock() + // nolint: nonamedreturns // easier to read in this case + checkPendingTransaction := func(pendingTx *pendingTransaction) (clearPending bool, readdPending bool, logMessage string) { + if pendingTx == nil { + // no pending transaction so there is no need for additional checks + return false, false, "no pending transaction found" + } - pendingTx := f.pendingTransaction + // check if the pending transaction was confirmed. + // we can easily check this by searching for output index 0. + txOutputIndexZero := iotago.UTXOInput{ + TransactionID: pendingTx.TransactionID, + TransactionOutputIndex: 0, + } + txOutputIDIndexZero := txOutputIndexZero.OutputID() - if pendingTx == nil { - // no transaction pending so there is no need for additional checks - f.LogDebug("ApplyAcceptedTransaction: no pending transaction found") + // if this output was created, the rest of the outputs were created as well because transactions are atomic. + if _, created := createdOutputs[txOutputIDIndexZero]; created { + // transaction was confirmed + // => delete the requests and the pending transaction + return true, false, "transaction successful" + } - return nil - } + // check if the inputs of the pending transaction were affected by the ledger update. + for _, consumedInput := range pendingTx.ConsumedInputs { + if _, spent := consumedOutputs[consumedInput]; spent { + // a referenced input of the pending transaction was spent, so it is affected by this ledger update. + // since the output index 0 of the pending transaction was not created, + // it means that the transaction was conflicting with another one. + // => readd the items to the queue and delete the pending transaction + return false, true, "transaction conflicting, inputs consumed in another transaction" + } + } - // check if the pending transaction was confirmed. - // we can easily check this by searching for output index 0. - txOutputIndexZero := iotago.UTXOInput{ - TransactionID: pendingTx.TransactionID, - TransactionOutputIndex: 0, + return false, false, "" } - txOutputIDIndexZero := txOutputIndexZero.OutputID() - // if this output was created, the rest of the outputs were created as well because transactions are atomic. - if _, created := createdOutputs[txOutputIDIndexZero]; created { - // transaction was confirmed - // => delete the requests and the pending transaction - f.LogDebug("ApplyAcceptedTransaction: transaction successful") - f.clearPendingRequestsWithoutLocking() + f.RLock() + pendingTx := f.pendingTransaction + f.RUnlock() - return nil + clearPending, readdPending, logMessage := checkPendingTransaction(pendingTx) + if !(clearPending || readdPending) { + // no pending transaction or transaction is not affected by the update + if logMessage != "" { + f.LogDebugf("ApplyAcceptedTransaction: %s", logMessage) + } + + return } - // check if the inputs of the pending transaction were affected by the ledger update. - for _, consumedInput := range pendingTx.ConsumedInputs { - if _, spent := consumedOutputs[consumedInput]; spent { - // a referenced input of the pending transaction was spent, so it is affected by this ledger update. - // since the output index 0 of the pending transaction was not created, - // it means that the transaction was conflicting with another one. - // => readd the items to the queue and delete the pending transaction - f.LogDebug("ApplyAcceptedTransaction: transaction conflicting, inputs consumed in another transaction") - f.readdPendingRequestsWithoutLocking() + // we need to acquire a write lock here and check again if there is a pending transaction. + f.Lock() + defer f.Unlock() - return nil - } + if pendingTx != f.pendingTransaction { + // the pending transaction changed, check again + clearPending, readdPending, logMessage = checkPendingTransaction(f.pendingTransaction) } - return nil + if logMessage != "" { + f.LogDebugf("ApplyAcceptedTransaction: %s", logMessage) + } + + if clearPending { + f.clearPendingRequestsWithoutLocking() + return + } + if readdPending { + f.readdPendingRequestsWithoutLocking() + } }