Skip to content

Commit

Permalink
Changes:
Browse files Browse the repository at this point in the history
- Stricter check of the column value for fate reservations. If anything unexpected is seen, an error is now thrown.
- Simplified MetaFateStore.getActiveReservations() to only read from ZooKeeper once.
- Combined the two scans that were done in AbstractFateStore.runnable() into one. This meant adding FateReservation to FateIdStatus and refactoring Meta/UserFateStore.getTransactions().
- No longer use/store a string representation of the FateReservation (was used in UserFateStore). Now, only the serialized value is used. This keeps the usage of FateReservation consistent across Meta and UserFateStore. It was also unneccessary, so simplifies code.
- Moved AbstractFateStore.createAndReserve() implementation into Meta and UserFateStore, and rewrote the impl for each to work with the new way reservations are stored. This also allowed me to delete methods that were only used by AFS.createAndReserve(): create(FateId, FateKey), getStatusAndKey(FateId), create(FateKey).
- Fixed how concurrentStatusChangeCallers was decremented in AbstractFateStore.waitForStatusChange()
- Small change to MetaFateStore.deleteDeadReservations() to avoid reading from ZK unnecessarily
- Added isReservedBy() method to MetaFateStore.NodeValue to avoid code duplication and make the code more clear.
- Since the FateIdStatus now has the FateReservation, realized Meta and UserFateStore.getActiveReservations() could now be simplified to just call list(). This also made the impls the same, so moved to AbstractFateStore. This also made me realize that I had put getActiveReservations() method signature in FateStore, but would be better suited for ReadOnlyFateStore, so moved it there.
- Deleted FateStore.isReserved(FateId)... No longer needed/used
- Moved UNKNOWN status check in AbstractFateStore.reserve() into waiting loop
- Now log when a dead reservation is detected and deleted
- Minor change to Fate: no longer create the executor for the dead reservation cleaner if it's not going to be used
  • Loading branch information
kevinrr888 committed Jul 23, 2024
1 parent 176b9c3 commit 6760035
Show file tree
Hide file tree
Showing 16 changed files with 358 additions and 381 deletions.
173 changes: 51 additions & 122 deletions core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.accumulo.core.fate;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.ALL_STATUSES;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
Expand All @@ -41,11 +42,11 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.apache.accumulo.core.fate.Fate.TxInfo;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.Retry;
import org.apache.accumulo.core.util.time.NanoTime;
import org.slf4j.Logger;
Expand Down Expand Up @@ -79,9 +80,9 @@ public FateId fromTypeAndKey(FateInstanceType instanceType, FateKey fateKey) {
protected final ZooUtil.LockID lockID;
protected final Predicate<ZooUtil.LockID> isLockHeld;
protected final Map<FateId,NanoTime> deferred;
protected final FateIdGenerator fateIdGenerator;
private final int maxDeferred;
private final AtomicBoolean deferredOverflow = new AtomicBoolean();
private final FateIdGenerator fateIdGenerator;

// This is incremented each time a transaction is unreserved that was runnable
private final SignalCount unreservedRunnableCount = new SignalCount();
Expand Down Expand Up @@ -133,13 +134,13 @@ public static Object deserialize(byte[] ser) {

@Override
public FateTxStore<T> reserve(FateId fateId) {
Preconditions.checkState(!_getStatus(fateId).equals(TStatus.UNKNOWN),
"Attempted to reserve a tx that does not exist: " + fateId);
var retry = Retry.builder().infiniteRetries().retryAfter(Duration.ofMillis(25))
.incrementBy(Duration.ofMillis(25)).maxWait(Duration.ofSeconds(30)).backOffFactor(1.5)
.logInterval(Duration.ofMinutes(3)).createRetry();
Optional<FateTxStore<T>> reserveAttempt = tryReserve(fateId);
while (reserveAttempt.isEmpty()) {
Preconditions.checkState(!_getStatus(fateId).equals(TStatus.UNKNOWN),
"Attempted to reserve a tx that does not exist: " + fateId);
try {
retry.waitForNextAttempt(log, "Attempting to reserve " + fateId);
} catch (InterruptedException e) {
Expand Down Expand Up @@ -172,7 +173,8 @@ public void runnable(AtomicBoolean keepWaiting, Consumer<FateId> idConsumer) {
// first
var transactions = Stream.concat(inProgress, other);
transactions.filter(fateIdStatus -> isRunnable(fateIdStatus.getStatus()))
.map(FateIdStatus::getFateId).filter(fateId -> {
.filter(fateIdStatus -> {
var fateId = fateIdStatus.getFateId();
var deferredTime = deferred.get(fateId);
if (deferredTime != null) {
if (deferredTime.elapsed().isNegative()) {
Expand All @@ -182,10 +184,10 @@ public void runnable(AtomicBoolean keepWaiting, Consumer<FateId> idConsumer) {
deferred.remove(fateId);
}
}
return !isReserved(fateId);
}).forEach(fateId -> {
return fateIdStatus.getFateReservation().isEmpty();
}).forEach(fateIdStatus -> {
seen.incrementAndGet();
idConsumer.accept(fateId);
idConsumer.accept(fateIdStatus.getFateId());
});
}

Expand Down Expand Up @@ -236,6 +238,12 @@ public ReadOnlyFateTxStore<T> read(FateId fateId) {
return newUnreservedFateTxStore(fateId);
}

@Override
public Map<FateId,FateReservation> getActiveReservations() {
return list().filter(entry -> entry.getFateReservation().isPresent()).collect(Collectors
.toMap(FateIdStatus::getFateId, entry -> entry.getFateReservation().orElseThrow()));
}

protected boolean isRunnable(TStatus status) {
return status == TStatus.IN_PROGRESS || status == TStatus.FAILED_IN_PROGRESS
|| status == TStatus.SUBMITTED;
Expand Down Expand Up @@ -267,89 +275,13 @@ public int getDeferredCount() {
return deferred.size();
}

private Optional<FateId> create(FateKey fateKey) {
FateId fateId = fateIdGenerator.fromTypeAndKey(type(), fateKey);

try {
create(fateId, fateKey);
} catch (IllegalStateException e) {
Pair<TStatus,Optional<FateKey>> statusAndKey = getStatusAndKey(fateId);
TStatus status = statusAndKey.getFirst();
Optional<FateKey> tFateKey = statusAndKey.getSecond();

// Case 1: Status is NEW so this is unseeded, we can return and allow the calling code
// to reserve/seed as long as the existing key is the same and not different as that would
// mean a collision
if (status == TStatus.NEW) {
Preconditions.checkState(tFateKey.isPresent(), "Tx Key is missing from tid %s",
fateId.getTxUUIDStr());
Preconditions.checkState(fateKey.equals(tFateKey.orElseThrow()),
"Collision detected for tid %s", fateId.getTxUUIDStr());
// Case 2: Status is some other state which means already in progress
// so we can just log and return empty optional
} else {
log.trace("Existing transaction {} already exists for key {} with status {}", fateId,
fateKey, status);
return Optional.empty();
}
}

return Optional.of(fateId);
protected void verifyFateKey(FateId fateId, Optional<FateKey> fateKeySeen,
FateKey fateKeyExpected) {
Preconditions.checkState(fateKeySeen.isPresent(), "fate key is missing from fate id " + fateId);
Preconditions.checkState(fateKeySeen.orElseThrow().equals(fateKeyExpected),
"Collision detected for fate id " + fateId);
}

@Override
public Optional<FateTxStore<T>> createAndReserve(FateKey fateKey) {
// TODO 4131 not confident about this new implementation of createAndReserve.
// Previously, you could reserve before creation, but with the new impl of reservations
// being stored in ZK (MetaFateStore) and the Accumulo Fate table (UserFateStore), creation
// is needed before reservation.
// TODO 4131 the comments in this method also need to be updated.
// Will wait until after review for this method
FateId fateId = fateIdGenerator.fromTypeAndKey(type(), fateKey);
final Optional<FateTxStore<T>> txStore;

// First make sure we can reserve in memory the fateId, if not
// we can return an empty Optional as it is reserved and in progress
// This reverses the usual order of creation and then reservation but
// this prevents a race condition by ensuring we can reserve first.
// This will create the FateTxStore before creation but this object
// is not exposed until after creation is finished so there should not
// be any errors.

// If present we were able to reserve so try and create
if (!isReserved(fateId)) {
try {
var fateIdFromCreate = create(fateKey);
if (fateIdFromCreate.isPresent()) {
Preconditions.checkState(fateId.equals(fateIdFromCreate.orElseThrow()),
"Transaction creation returned unexpected %s, expected %s", fateIdFromCreate, fateId);
txStore = tryReserve(fateId);
} else {
// We already exist in a non-new state then un-reserve and an empty
// Optional will be returned. This is expected to happen when the
// system is busy and operations are not running, and we keep seeding them
txStore = Optional.empty();
}
} catch (Exception e) {
if (e instanceof IllegalStateException) {
throw e;
} else {
throw new IllegalStateException(e);
}
}
} else {
// Could not reserve so return empty
log.trace("Another thread currently has transaction {} key {} reserved", fateId, fateKey);
txStore = Optional.empty();
}

return txStore;
}

protected abstract void create(FateId fateId, FateKey fateKey);

protected abstract Pair<TStatus,Optional<FateKey>> getStatusAndKey(FateId fateId);

protected abstract Stream<FateIdStatus> getTransactions(Set<TStatus> statuses);

protected abstract TStatus _getStatus(FateId fateId);
Expand Down Expand Up @@ -388,35 +320,38 @@ public TStatus waitForStatusChange(EnumSet<TStatus> expected) {
verifyReserved(false);

int currNumCallers = concurrentStatusChangeCallers.incrementAndGet();
// TODO 4131
// TODO make the max time a function of the number of concurrent callers, as the number of
// concurrent callers increases then increase the max wait time
// TODO could support signaling within this instance for known events
// TODO made the maxWait low so this would be responsive... that may put a lot of load in the
// case there are lots of things waiting...
// Made maxWait = num of curr callers
var retry = Retry.builder().infiniteRetries().retryAfter(Duration.ofMillis(25))
.incrementBy(Duration.ofMillis(25)).maxWait(Duration.ofSeconds(currNumCallers))
.backOffFactor(1.5).logInterval(Duration.ofMinutes(3)).createRetry();

while (true) {

TStatus status = _getStatus(fateId);
if (expected.contains(status)) {
retry.logCompletion(log, "Waiting on status change for " + fateId + " expected:"
+ expected + " status:" + status);
concurrentStatusChangeCallers.decrementAndGet();
return status;
}

try {
retry.waitForNextAttempt(log, "Waiting on status change for " + fateId + " expected:"
+ expected + " status:" + status);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
concurrentStatusChangeCallers.decrementAndGet();
throw new IllegalStateException(e);
try {
// TODO 4131
// TODO make the max time a function of the number of concurrent callers, as the number of
// concurrent callers increases then increase the max wait time
// TODO could support signaling within this instance for known events
// TODO made the maxWait low so this would be responsive... that may put a lot of load in
// the case there are lots of things waiting...
// Made maxWait = num of curr callers
var retry = Retry.builder().infiniteRetries().retryAfter(Duration.ofMillis(25))
.incrementBy(Duration.ofMillis(25)).maxWait(Duration.ofSeconds(currNumCallers))
.backOffFactor(1.5).logInterval(Duration.ofMinutes(3)).createRetry();

while (true) {

TStatus status = _getStatus(fateId);
if (expected.contains(status)) {
retry.logCompletion(log, "Waiting on status change for " + fateId + " expected:"
+ expected + " status:" + status);
return status;
}

try {
retry.waitForNextAttempt(log, "Waiting on status change for " + fateId + " expected:"
+ expected + " status:" + status);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
}
}
} finally {
concurrentStatusChangeCallers.decrementAndGet();
}
}

Expand Down Expand Up @@ -475,12 +410,6 @@ public Optional<FateKey> getKey() {
return AbstractFateStore.this.getKey(fateId);
}

@Override
public Pair<TStatus,Optional<FateKey>> getStatusAndKey() {
verifyReserved(false);
return AbstractFateStore.this.getStatusAndKey(fateId);
}

@Override
public FateId getID() {
return fateId;
Expand Down
35 changes: 22 additions & 13 deletions core/src/main/java/org/apache/accumulo/core/fate/Fate.java
Original file line number Diff line number Diff line change
Expand Up @@ -387,16 +387,19 @@ public Fate(T environment, FateStore<T> store, Function<Repo<T>,String> toLogStr
// reservation cleaner is not already running for the given store type.
// TODO 4131 periodic cleanup runs every 30 seconds
// Should this be longer? Shorter? A configurable Property? A function of something?
ScheduledExecutorService deadResCleanerExecutor = ThreadPools.getServerThreadPools()
.createScheduledExecutorService(1, store.type() + "-dead-reservation-cleaner-pool");
if ((store.type() == FateInstanceType.USER && !userDeadReservationCleanerRunning)
|| (store.type() == FateInstanceType.META && !metaDeadReservationCleanerRunning)) {
ScheduledExecutorService deadResCleanerExecutor = null;
boolean isUserStore = store.type() == FateInstanceType.USER;
boolean isMetaStore = store.type() == FateInstanceType.META;
if ((isUserStore && !userDeadReservationCleanerRunning)
|| (isMetaStore && !metaDeadReservationCleanerRunning)) {
deadResCleanerExecutor = ThreadPools.getServerThreadPools().createScheduledExecutorService(1,
store.type() + "-dead-reservation-cleaner-pool");
ScheduledFuture<?> deadReservationCleaner = deadResCleanerExecutor
.scheduleWithFixedDelay(new DeadReservationCleaner(), 3, 30, SECONDS);
ThreadPools.watchCriticalScheduledTask(deadReservationCleaner);
if (store.type() == FateInstanceType.USER) {
if (isUserStore) {
userDeadReservationCleanerRunning = true;
} else if (store.type() == FateInstanceType.META) {
} else {
metaDeadReservationCleanerRunning = true;
}
}
Expand Down Expand Up @@ -572,21 +575,25 @@ public void shutdown(long timeout, TimeUnit timeUnit) {
fatePoolWatcher.shutdown();
transactionExecutor.shutdown();
workFinder.interrupt();
deadResCleanerExecutor.shutdown();
if (deadResCleanerExecutor != null) {
deadResCleanerExecutor.shutdown();
}
}

if (timeout > 0) {
long start = System.nanoTime();

while ((System.nanoTime() - start) < timeUnit.toNanos(timeout) && (workFinder.isAlive()
|| !transactionExecutor.isTerminated() || !deadResCleanerExecutor.isTerminated())) {
while ((System.nanoTime() - start) < timeUnit.toNanos(timeout)
&& (workFinder.isAlive() || !transactionExecutor.isTerminated()
|| (deadResCleanerExecutor != null && !deadResCleanerExecutor.isTerminated()))) {
try {
if (!transactionExecutor.awaitTermination(1, SECONDS)) {
log.debug("Fate {} is waiting for worker threads to terminate", store.type());
continue;
}

if (!deadResCleanerExecutor.awaitTermination(1, SECONDS)) {
if (deadResCleanerExecutor != null
&& !deadResCleanerExecutor.awaitTermination(1, SECONDS)) {
log.debug("Fate {} is waiting for dead reservation cleaner thread to terminate",
store.type());
continue;
Expand All @@ -603,18 +610,20 @@ public void shutdown(long timeout, TimeUnit timeUnit) {
}

if (workFinder.isAlive() || !transactionExecutor.isTerminated()
|| !deadResCleanerExecutor.isTerminated()) {
|| (deadResCleanerExecutor != null && !deadResCleanerExecutor.isTerminated())) {
log.warn(
"Waited for {}ms for all fate {} background threads to stop, but some are still running. workFinder:{} transactionExecutor:{} deadResCleanerExecutor:{}",
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start), store.type(),
workFinder.isAlive(), !transactionExecutor.isTerminated(),
!deadResCleanerExecutor.isTerminated());
(deadResCleanerExecutor != null && !deadResCleanerExecutor.isTerminated()));
}
}

// interrupt the background threads
transactionExecutor.shutdownNow();
deadResCleanerExecutor.shutdownNow();
if (deadResCleanerExecutor != null) {
deadResCleanerExecutor.shutdownNow();
}

// Update that USER/META dead reservation cleaner is no longer running
if (store.type() == FateInstanceType.USER && userDeadReservationCleanerRunning) {
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/java/org/apache/accumulo/core/fate/FateKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@

public class FateKey {

// TODO 4131 noticed FateKey is logged, but doesn't have a toString()
// a toString() method should be added.

private final FateKeyType type;
private final Optional<KeyExtent> keyExtent;
private final Optional<ExternalCompactionId> compactionId;
Expand Down
Loading

0 comments on commit 6760035

Please sign in to comment.