Skip to content

Commit

Permalink
Use simpler single table construction of requests
Browse files Browse the repository at this point in the history
  • Loading branch information
matusfaro committed Oct 28, 2024
1 parent cdaaec0 commit f5bb259
Show file tree
Hide file tree
Showing 9 changed files with 74 additions and 75 deletions.
1 change: 1 addition & 0 deletions .idea/modules.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions dataspray-api-parent/dataspray-client-java/dataspray-client.iml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<?xml version="1.0" encoding="UTF-8"?>
<module version="4">
<component name="AdditionalModuleElements">
<content url="file://$MODULE_DIR$" dumb="true">
<excludeFolder url="file://$MODULE_DIR$/target/generated-sources/client-control/src/main/java" />
<excludeFolder url="file://$MODULE_DIR$/target/generated-sources/client-ingest/src/main/java" />
<excludeFolder url="file://$MODULE_DIR$/target/generated-sources/client-typescript" />
</content>
</component>
</module>
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import io.dataspray.core.definition.model.TypescriptProcessor;
import io.dataspray.stream.control.client.model.DeployRequest;
import io.dataspray.stream.control.client.model.DeployRequest.RuntimeEnum;
import io.dataspray.stream.control.client.model.DeployRequestEndpoint;
import io.dataspray.stream.control.client.model.DeployRequestEndpointCors;
import io.dataspray.stream.control.client.model.TaskStatus;
import io.dataspray.stream.control.client.model.TaskVersion;
import io.dataspray.stream.control.client.model.TaskVersions;
Expand Down Expand Up @@ -167,6 +169,18 @@ public String publish(Organization organization, Project project, String process
.map(StreamLink::getStreamName)
.collect(Collectors.toList()))
.codeUrl(codeUrl)
.endpoint(processor.getEndpoint()
.map(endpoint -> new DeployRequestEndpoint()
.isPublic(endpoint.getIsPublic())
.cors(endpoint.getCors()
.map(cors -> new DeployRequestEndpointCors()
.allowOrigins(cors.getAllowOrigins().stream().toList())
.allowMethods(cors.getAllowMethods().stream().toList())
.allowHeaders(cors.getAllowHeaders().stream().toList())
.maxAge(cors.getMaxAge())
.allowCredentials(cors.getAllowCredentials()))
.orElse(null)))
.orElse(null))
.switchToNow(activateVersion));

if (activateVersion) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ public class Cors {
@Builder.Default
Set<String> exposeHeaders = Set.of();

@Builder.Default
Boolean allowCredentials = true;

@Builder.Default
long maxAge = 3600;
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,6 @@
import software.amazon.awssdk.services.apigateway.model.CreateUsagePlanKeyRequest;
import software.amazon.awssdk.services.apigateway.model.CreateUsagePlanKeyResponse;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.services.dynamodb.model.QueryRequest;

import java.time.Duration;
Expand Down Expand Up @@ -201,12 +198,10 @@ public Optional<ApiAccess> getApiAccessByApiKey(String apiKey, boolean useCache)
}

// Fetch from DB
Optional<ApiAccess> apiAccessOpt = Optional.ofNullable(apiAccessSchema.fromAttrMap(dynamo.getItem(GetItemRequest.builder()
.tableName(apiAccessSchema.tableName())
.key(apiAccessSchema.primaryKey(Map.of(
"apiKey", apiKey)))
.consistentRead(!useCache)
.build()).item()))
Optional<ApiAccess> apiAccessOpt = apiAccessSchema.get()
.key(Map.of("apiKey", apiKey))
.builder(b -> b.consistentRead(!useCache))
.execute(dynamo)
.filter(ApiAccess::isTtlNotExpired);

// Update cache
Expand All @@ -217,11 +212,9 @@ public Optional<ApiAccess> getApiAccessByApiKey(String apiKey, boolean useCache)

@Override
public void revokeApiKey(String apiKey) {
dynamo.deleteItem(DeleteItemRequest.builder()
.tableName(apiAccessSchema.tableName())
.key(apiAccessSchema.primaryKey(Map.of(
"apiKey", apiKey)))
.build());
apiAccessSchema.delete()
.key(Map.of("apiKey", apiKey))
.execute(dynamo);
}

@Override
Expand Down Expand Up @@ -287,11 +280,9 @@ private String getOrCreateUsageKeyApiKey(UsageKeyType type, String apiKey) {
// We have fallen through, this means we are fetching an organization key

// Lookup mapping from dynamo
Optional<UsageKey> usageKeyOpt = Optional.ofNullable(usageKeyByApiKeySchema.fromAttrMap(dynamo.getItem(GetItemRequest.builder()
.tableName(usageKeyByApiKeySchema.tableName())
.key(usageKeyByApiKeySchema.primaryKey(Map.of(
"usageKeyApiKey", apiKey)))
.build()).item()));
Optional<UsageKey> usageKeyOpt = usageKeyByApiKeySchema.get()
.key(Map.of("usageKeyApiKey", apiKey))
.execute(dynamo);

// Return existing key if exists
if (usageKeyOpt.isPresent()) {
Expand All @@ -310,10 +301,9 @@ private String getOrCreateUsageKeyApiKey(UsageKeyType type, String apiKey) {

// Store mapping in dynamo
UsageKey usageKey = new UsageKey(apiKey, createApiKeyResponse.id());
dynamo.putItem(PutItemRequest.builder()
.tableName(usageKeyByApiKeySchema.tableName())
.item(usageKeyByApiKeySchema.toAttrMap(usageKey))
.build());
usageKeyByApiKeySchema.put()
.item(usageKey)
.execute(dynamo);

return usageKey.getUsageKeyApiKey();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;

import java.time.Duration;
import java.time.Instant;
Expand Down Expand Up @@ -101,24 +99,20 @@ public LambdaRecord set(String organizationName,
endpointUrlOpt.orElse(null),
false,
null);
dynamo.putItem(PutItemRequest.builder()
.tableName(lambdaRecordSchema.tableName())
.item(lambdaRecordSchema.toAttrMap(lamdbaRecord))
.build());
lambdaRecordSchema.put()
.item(lamdbaRecord)
.execute(dynamo);
return lamdbaRecord;
}

@Override
public Optional<LambdaRecord> get(String organizationName, String taskId) {
return Optional.ofNullable(dynamo.getItem(GetItemRequest.builder()
.tableName(lambdaRecordSchema.tableName())
.key(lambdaRecordSchema.primaryKey(Map.of(
"organizationName", organizationName,
"taskId", taskId)))
.consistentRead(true)
.build())
.item())
.map(lambdaRecordSchema::fromAttrMap);
return lambdaRecordSchema.get()
.key(Map.of(
"organizationName", organizationName,
"taskId", taskId))
.builder(b -> b.consistentRead(true))
.execute(dynamo);
}

@Override
Expand Down Expand Up @@ -190,12 +184,11 @@ public Optional<AutoCloseable> acquireLock(String organizationName, String taskI
}

private void releaseLock(String organizationName, String taskId, String reservationId) {
dynamo.deleteItem(lambdaMutexSchema.delete()
lambdaMutexSchema.delete()
.conditionFieldEquals("reservationId", reservationId)
.builder()
.key(lambdaMutexSchema.primaryKey(Map.of(
.key(Map.of(
"organizationName", organizationName,
"taskId", taskId)))
.build());
"taskId", taskId))
.execute(dynamo);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import io.quarkus.runtime.Startup;
import jakarta.inject.Inject;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
import software.amazon.awssdk.services.dynamodb.model.PutItemResponse;

import java.time.Duration;
Expand Down Expand Up @@ -76,20 +75,15 @@ public Targets getTargets(String organizationName, boolean useCache) {
}

// Fetch from DB
Targets targets = targetsSchema.fromAttrMap(dynamo.getItem(GetItemRequest.builder()
.tableName(targetsSchema.tableName())
.key(targetsSchema.primaryKey(Map.of(
"organizationName", organizationName)))
.consistentRead(!useCache)
.build())
.item());
// Construct default if not found
if (targets == null) {
targets = Targets.builder()
.organizationName(organizationName)
.version(INITIAL_VERSION)
.build();
}
Targets targets = targetsSchema.get()
.key(Map.of("organizationName", organizationName))
.builder(b -> b.consistentRead(!useCache))
.execute(dynamo)
// Construct default if not found
.orElseGet(() -> Targets.builder()
.organizationName(organizationName)
.version(INITIAL_VERSION)
.build());

// Update cache
targetsByOrganizationNameCache.put(organizationName, targets);
Expand All @@ -110,15 +104,14 @@ public Targets updateTargets(Targets targets) {
.version(targets.getVersion() + 1)
.build();

// To prevent concurrent modification, we check that the database entry matches our
// expected version using dynamo conditions
// Make the update
// Prevent concurrent modifications by ensuring expected version
PutBuilder<Targets> putBuilder = targetsSchema.put();
if (targets.getVersion() == INITIAL_VERSION) {
putBuilder.conditionNotExists().build();
putBuilder.conditionNotExists();
} else {
putBuilder.conditionFieldEquals("version", targets.getVersion()).build();
putBuilder.conditionFieldEquals("version", targets.getVersion());
}
// Make the update
PutItemResponse execute = putBuilder.item(targetsVersionBumped)
.execute(dynamo);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.apigateway.ApiGatewayClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;

import java.time.Instant;
import java.util.Map;
Expand Down Expand Up @@ -268,10 +266,9 @@ private ApiAccess createApiAccessInDb(
TableSchema<ApiAccess> apiAccessSchema = singleTable.parseTableSchema(ApiAccess.class);

// Insert into dynamo
dynamo.putItem(PutItemRequest.builder()
.tableName(apiAccessSchema.tableName())
.item(apiAccessSchema.toAttrMap(apiAccess))
.build());
apiAccessSchema.put()
.item(apiAccess)
.execute(dynamo);

return apiAccess;
}
Expand All @@ -283,11 +280,9 @@ private boolean usageKeyExists(ApiAccess apiAccess) {
Optional.of(apiAccess.getOwnerUsername()),
ImmutableSet.of(apiAccess.getOrganizationName()));
TableSchema<UsageKey> usageKeySchema = singleTable.parseTableSchema(UsageKey.class);
Optional<UsageKey> usageKeyOpt = Optional.ofNullable(usageKeySchema.fromAttrMap(dynamo.getItem(GetItemRequest.builder()
.tableName(usageKeySchema.tableName())
.key(usageKeySchema.primaryKey(Map.of(
"usageKeyApiKey", usageKeyApiKey)))
.build()).item()));
Optional<UsageKey> usageKeyOpt = usageKeySchema.get()
.key(Map.of("usageKeyApiKey", usageKeyApiKey))
.execute(dynamo);
return usageKeyOpt.isPresent();
}
}
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@
<dependency>
<groupId>io.dataspray</groupId>
<artifactId>single-table</artifactId>
<version>2.2.2</version>
<version>2.2.3</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
Expand Down

0 comments on commit f5bb259

Please sign in to comment.