Skip to content

Commit

Permalink
Property to include SingleTable library in template in Java Processors
Browse files Browse the repository at this point in the history
  • Loading branch information
matusfaro committed Nov 20, 2024
1 parent 2e253b6 commit 16d2d43
Show file tree
Hide file tree
Showing 20 changed files with 113 additions and 132 deletions.
98 changes: 9 additions & 89 deletions .idea/compiler.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ private Path getArtifactPath(Project project, Processor processor) {
Path codeZipPath;
if (processor instanceof JavaProcessor) {
codeZipPath = project.getProcessorDir(processor)
.resolve(Path.of("target", processor.getTaskId() + ".jar"));
.resolve(Path.of("target", processor.getProcessorId() + ".jar"));
} else if (processor instanceof TypescriptProcessor) {
codeZipPath = project.getProcessorDir(processor)
.resolve(Path.of("dist", "index.zip"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public String upload(Organization organization, Project project, String processo
UploadCodeResponse uploadCodeResponse = DataSprayClient.get(organization.toAccess())
.control()
.uploadCode(organization.getName(), new UploadCodeRequest()
.taskId(processor.getTaskId())
.taskId(processor.getProcessorId())
.contentLengthBytes(codeZipFile.length()));

// Upload to S3
Expand Down Expand Up @@ -161,7 +161,7 @@ public String publish(Organization organization, Project project, String process
handler);
TaskVersion deployedVersion = DataSprayClient.get(organization.toAccess())
.control()
.deployVersion(organization.getName(), processor.getTaskId(), new DeployRequest()
.deployVersion(organization.getName(), processor.getProcessorId(), new DeployRequest()
.runtime(runtime)
.handler(handler)
.inputQueueNames(processor.getInputStreams().stream()
Expand Down Expand Up @@ -206,7 +206,7 @@ public TaskStatus activateVersion(Organization organization, Project project, St
log.info("Activating version {} for task {}", version, processorName);
TaskStatus taskStatus = DataSprayClient.get(organization.toAccess())
.control()
.activateVersion(organization.getName(), processor.getTaskId(), version);
.activateVersion(organization.getName(), processor.getProcessorId(), version);
log.info("Version active!");

return taskStatus;
Expand All @@ -219,10 +219,10 @@ public TaskStatus pause(Organization organization, Project project, String proce
checkState(Processor.Target.DATASPRAY.equals(processor.getTarget()),
"Not yet implemented: %s", processor.getTarget());

log.info("Pausing {}", processor.getTaskId());
log.info("Pausing {}", processor.getProcessorId());
TaskStatus taskStatus = DataSprayClient.get(organization.toAccess())
.control()
.pause(organization.getName(), processor.getTaskId());
.pause(organization.getName(), processor.getProcessorId());
log.info("Task set to be paused");
printStatus(taskStatus);

Expand All @@ -236,10 +236,10 @@ public TaskStatus resume(Organization organization, Project project, String proc
checkState(Processor.Target.DATASPRAY.equals(processor.getTarget()),
"Not yet implemented: %s", processor.getTarget());

log.info("Resuming {}", processor.getTaskId());
log.info("Resuming {}", processor.getProcessorId());
TaskStatus taskStatus = DataSprayClient.get(organization.toAccess())
.control()
.resume(organization.getName(), processor.getTaskId());
.resume(organization.getName(), processor.getProcessorId());
log.info("Task set to be resumed");
printStatus(taskStatus);

Expand All @@ -255,7 +255,7 @@ public TaskVersions listVersions(Organization organization, Project project, Str

TaskVersions versions = DataSprayClient.get(organization.toAccess())
.control()
.getVersions(organization.getName(), processor.getTaskId());
.getVersions(organization.getName(), processor.getProcessorId());
printVersions(versions);

return versions;
Expand All @@ -270,7 +270,7 @@ public TaskStatus delete(Organization organization, Project project, String proc

TaskStatus status = DataSprayClient.get(organization.toAccess())
.control()
.delete(organization.getName(), processor.getTaskId());
.delete(organization.getName(), processor.getProcessorId());

printStatus(status);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,6 @@ public String getNameUpper() {
return name.toUpperCase();
}

public String getTaskId() {
return getNameDir();
}

@Cacheable(lifetime = Definition.CACHEABLE_METHODS_LIFETIME_IN_MIN)
public String getNameCamelUpper() {
return StringUtil.camelCase(name, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
@RegisterForReflection
public class JavaProcessor extends Processor {

Boolean includeSingleTableLibrary;

@Override
public void initialize() {
super.initialize();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@
@NonFinal
@RegisterForReflection
public class Processor extends Item {

public String getProcessorId() {
return getNameDir();
}

@Nonnull
Target target;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
<dependency>
<groupId>io.dataspray</groupId>
<artifactId>dataspray-runner</artifactId>
<version>0.0.8</version>
<version>0.0.9</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
Expand All @@ -65,6 +65,15 @@
<artifactId>aws-lambda-java-events</artifactId>
<version>3.11.1</version>
</dependency>
{{#processor.hasDynamoState}}
{{#processor.includeSingleTableLibrary}}
<dependency>
<groupId>io.dataspray</groupId>
<artifactId>single-table</artifactId>
<version>2.2.5</version>
</dependency>
{{/processor.includeSingleTableLibrary}}
{{/processor.hasDynamoState}}
{{^processor.jsonDataFormats.empty}}
<dependency>
<groupId>com.google.code.gson</groupId>
Expand Down Expand Up @@ -143,7 +152,7 @@
<plugin>
<groupId>org.jsonschema2pojo</groupId>
<artifactId>jsonschema2pojo-maven-plugin</artifactId>
<version>1.1.2</version>
<version>1.2.2</version>
<configuration>
<sourcePaths>
{{#processor.jsonDataFormats}}
Expand Down Expand Up @@ -173,6 +182,10 @@
<dateType>java.time.LocalDate</dateType>
<timeType>java.time.LocalTime</timeType>
<includeGeneratedAnnotation>false</includeGeneratedAnnotation>
<useJakartaValidation>true</useJakartaValidation>
<initializeCollections>true</initializeCollections>
<includeConstructors>true</includeConstructors>
<includeRequiredPropertiesConstructor>true</includeRequiredPropertiesConstructor>
</configuration>
<executions>
<execution>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import java.nio.charset.StandardCharsets;
import java.io.ByteArrayOutputStream;
{{#processor.hasDynamoState}}
import io.dataspray.runner.StateManager;
import io.dataspray.runner.StateManagerFactoryImpl;
import java.time.Duration;
import java.util.Optional;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
Expand Down Expand Up @@ -37,14 +38,14 @@ public class CoordinatorImpl implements {{#processor.web}}WebCoordinator{{#proce

@Override
public StateManager stateForMessageKey(Optional<Duration> ttl) {
return stateForNamespace(ttl, "task", "{{{processor.taskId}}}", "key", messageKey
return stateForNamespace(ttl, "task", "{{{processor.processorId}}}", "key", messageKey
.orElseThrow(() -> new IllegalStateException("messageKey is not set")));
}
{{/processor.hasInputStreams}}

@Override
public StateManager stateForTask(Optional<Duration> ttl) {
return stateForNamespace(ttl, "task", "{{{processor.taskId}}}");
return stateForNamespace(ttl, "task", "{{{processor.processorId}}}");
}

@Override
Expand All @@ -56,6 +57,16 @@ public class CoordinatorImpl implements {{#processor.web}}WebCoordinator{{#proce
public DynamoDbClient getDynamoClient() {
return rawCoordinator.getDynamoClient();
}
{{#processor.includeSingleTableLibrary}}

@Override
public SingleTable getSingleTable() {
return SingleTable.builder()
.tableName(System.getenv(StateManagerFactoryImpl.DATASPRAY_STATE_TABLE_NAME_ENV))
.overrideGson(GsonUtil.get())
.build()
}
{{/processor.includeSingleTableLibrary}}
{{/processor.hasDynamoState}}
{{#processor.outputStreams}}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ public interface StreamCoordinator {
* Use this method to perform advanced operations on DynamoDB.
*/
DynamoDbClient getDynamoClient();
{{#processor.includeSingleTableLibrary}}

/**
* Returns an instance of SingleTable which is a convenience wrapper on top of DynamoDB client.
*/
SingleTable getSingleTable();
{{/processor.includeSingleTableLibrary}}
{{/processor.hasDynamoState}}
{{#processor.outputStreams}}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ public interface WebCoordinator {
* Use this method to perform advanced operations on DynamoDB.
*/
DynamoDbClient getDynamoClient();
{{#processor.includeSingleTableLibrary}}

/**
* Returns an instance of SingleTable which is a convenience wrapper on top of DynamoDB client.
*/
SingleTable getSingleTable();
{{/processor.includeSingleTableLibrary}}
{{/processor.hasDynamoState}}
{{#processor.outputStreams}}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,13 @@ public class TestCoordinator implements {{#processor.web}}WebCoordinator{{#proce
@Override
public StateManager stateForMessageKey(Optional<Duration> ttl) {
checkState(messageKey != null, "messageKey is not set");
return stateForNamespace(ttl, "task", "{{{processor.taskId}}}", "key", messageKey);
return stateForNamespace(ttl, "task", "{{{processor.processorId}}}", "key", messageKey);
}
{{/processor.hasInputStreams}}

@Override
public StateManager stateForTask(Optional<Duration> ttl) {
return stateForNamespace(ttl, "task", "{{{processor.taskId}}}");
return stateForNamespace(ttl, "task", "{{{processor.processorId}}}");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,19 @@
```mermaid
graph TB;
{{#definition.processors}}
{{taskId}}[{{name}}];
{{processorId}}[{{name}}];
{{#inputStreams}}
Queue{{{uniqueNameCamelUpper}}}([Queue {{{uniqueNameCamelUpper}}}]) --> {{taskId}};
Queue{{{uniqueNameCamelUpper}}}([Queue {{{uniqueNameCamelUpper}}}]) --> {{processorId}};
{{/inputStreams}}
{{#outputStreams}}
{{taskId}} --> Queue{{{uniqueNameCamelUpper}}}([Queue {{{uniqueNameCamelUpper}}}]);
{{processorId}} --> Queue{{{uniqueNameCamelUpper}}}([Queue {{{uniqueNameCamelUpper}}}]);
{{/outputStreams}}
{{#web}}
{{#endpoints}}
Web{{{nameCamelUpper}}}([Web {{{nameCamelUpper}}}]) --> {{taskId}};
Web{{{nameCamelUpper}}}([Web {{{nameCamelUpper}}}]) --> {{processorId}};
{{/endpoints}}
{{#endpoints.empty}}
Web{{{nameCamelUpper}}}([Web]) --> {{taskId}};
Web{{{nameCamelUpper}}}([Web]) --> {{processorId}};
{{/endpoints.empty}}
{{/web}}
{{/definition.processors}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,11 @@ export class CoordinatorImpl implements {{#processor.web}}WebCoordinator{{#proce
if (this.messageKey === undefined) {
throw new Error('messageKey is not set');
}
return this.stateForNamespace(ttlInSec, ["task", "{{{processor.taskId}}}", "key", this.messageKey]);
return this.stateForNamespace(ttlInSec, ["task", "{{{processor.processorId}}}", "key", this.messageKey]);
}

stateForTask(ttlInSec?: number): StateManager {
return this.stateForNamespace(ttlInSec, ["task", "{{{processor.taskId}}}"]);
return this.stateForNamespace(ttlInSec, ["task", "{{{processor.processorId}}}"]);
}

stateForNamespace(ttlInSec: number | undefined, namespace: string[]): StateManager {
Expand Down
Loading

0 comments on commit 16d2d43

Please sign in to comment.