Skip to content

Commit

Permalink
Merge pull request #623 from pulasthi/master
Browse files Browse the repository at this point in the history
TSet Iterative Kmeans fix and example
  • Loading branch information
pulasthi authored Sep 6, 2019
2 parents 8432fb3 + d3d0bb3 commit a74e185
Show file tree
Hide file tree
Showing 32 changed files with 425 additions and 136 deletions.
118 changes: 117 additions & 1 deletion dashboard/server/pom.xml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

Expand All @@ -24,6 +25,18 @@
<java.version>1.8</java.version>
</properties>

<distributionManagement>
<snapshotRepository>
<id>ossrh</id>
<url>https://oss.sonatype.org/content/repositories/snapshots</url>
</snapshotRepository>
<repository>
<id>ossrh</id>
<url>https://oss.sonatype.org/service/local/staging/deploy/maven2/
</url>
</repository>
</distributionManagement>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
Expand Down Expand Up @@ -97,8 +110,111 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
<plugin>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.8.2</version>
<executions>
<execution>
<id>default-deploy</id>
<phase>deploy</phase>
<goals>
<goal>deploy</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-release-plugin</artifactId>
<version>2.5.3</version>
<configuration>
<localCheckout>true</localCheckout>
<pushChanges>false</pushChanges>
<mavenExecutorId>forked-path</mavenExecutorId>
<arguments>-Dgpg.passphrase=${gpg.passphrase}</arguments>
</configuration>
<dependencies>
<dependency>
<groupId>org.apache.maven.scm</groupId>
<artifactId>maven-scm-provider-gitexe</artifactId>
<version>1.9.5</version>
</dependency>
</dependencies>
</plugin>
<plugin>
<groupId>org.sonatype.plugins</groupId>
<artifactId>nexus-staging-maven-plugin</artifactId>
<version>1.6.7</version>
<extensions>true</extensions>
<configuration>
<serverId>ossrh</serverId>
<nexusUrl>https://oss.sonatype.org/</nexusUrl>
<autoReleaseAfterClose>true</autoReleaseAfterClose>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>3.0.1</version>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>2.10.4</version>
<configuration>
<encoding>UTF-8</encoding>
</configuration>
<executions>
<execution>
<id>attach-javadoc</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

<profiles>
<!-- GPG Signature on release -->
<profile>
<id>release-sign-artifacts</id>
<activation>
<property>
<name>performRelease</name>
<value>true</value>
</property>
</activation>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-gpg-plugin</artifactId>
<version>1.6</version>
<executions>
<execution>
<id>sign-artifacts</id>
<phase>verify</phase>
<goals>
<goal>sign</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>


</project>
38 changes: 30 additions & 8 deletions twister2/api/src/java/edu/iu/dsc/tws/api/tset/TSetEnvironment.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ public abstract class TSetEnvironment {
private WorkerEnvironment workerEnv;
private TSetGraph tsetGraph;
private TaskExecutor taskExecutor;

private ComputeGraph itergraph;
private ExecutionPlan iterexecutionPlan;
private int defaultParallelism = 1;

private Map<String, Map<String, Cacheable<?>>> tSetInputMap = new HashMap<>();
Expand Down Expand Up @@ -109,6 +110,7 @@ public Config getConfig() {

/**
* Running worker ID
*
* @return workerID
*/
public int getWorkerID() {
Expand All @@ -120,7 +122,7 @@ public int getWorkerID() {
*/
public void run() {
ComputeGraph graph = tsetGraph.build();
executeDataFlowGraph(graph, null);
executeDataFlowGraph(graph, null, false);
}

protected TSetGraph getTSetGraph() {
Expand All @@ -130,24 +132,37 @@ protected TSetGraph getTSetGraph() {
/**
* execute data flow graph
*
* @param <T> type of the output data object
* @param dataflowGraph data flow graph
* @param outputTset output tset. If null, then no output would be returned
* @param <T> type of the output data object
* @return output as a data object if outputTset is not null. Else null
*/
protected <T> DataObject<T> executeDataFlowGraph(ComputeGraph dataflowGraph,
BuildableTSet outputTset) {
ExecutionPlan executionPlan = taskExecutor.plan(dataflowGraph);
BuildableTSet outputTset, boolean isIterative) {
ExecutionPlan executionPlan = null;
if (isIterative && iterexecutionPlan != null) {
executionPlan = iterexecutionPlan;
} else {
executionPlan = taskExecutor.plan(dataflowGraph);
if (isIterative) {
iterexecutionPlan = executionPlan;
itergraph = dataflowGraph;
}
}

LOG.fine(executionPlan::toString);
LOG.fine(() -> "edges: " + dataflowGraph.getDirectedEdgesSet());
LOG.fine(() -> "vertices: " + dataflowGraph.getTaskVertexSet());

pushInputsToFunctions(dataflowGraph, executionPlan);
taskExecutor.execute(dataflowGraph, executionPlan);
if (isIterative) {
taskExecutor.itrExecute(dataflowGraph, executionPlan);
} else {
taskExecutor.execute(dataflowGraph, executionPlan);

// once a graph is built and executed, reset the underlying builder!
tsetGraph.resetDfwGraphBuilder();
// once a graph is built and executed, reset the underlying builder!
tsetGraph.resetDfwGraphBuilder();
}

// output tset alone does not guarantees that there will be an output available.
// Example: if the output is done after a reduce, parallelism(output tset) = 1. Then only
Expand All @@ -160,6 +175,13 @@ protected <T> DataObject<T> executeDataFlowGraph(ComputeGraph dataflowGraph,
return new EmptyDataObject<>();
}

public void finishIter() {
taskExecutor.waitFor(itergraph, iterexecutionPlan);
tsetGraph.resetDfwGraphBuilder();
itergraph = null;
iterexecutionPlan = null;
}

/**
* Adds inputs to tasks
*
Expand Down
10 changes: 5 additions & 5 deletions twister2/api/src/java/edu/iu/dsc/tws/api/tset/TSetGraph.java
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public ComputeGraph build() {
buildOrder.addAll(bfs(src, links, sets, this::getSuccessors));
}

LOG.info(() -> "Build order: " + buildOrder.toString());
LOG.fine(() -> "Build order: " + buildOrder.toString());

return buildGraph(links, sets, false);
}
Expand All @@ -185,22 +185,22 @@ public ComputeGraph build(BuildableTSet leafTSet) {

Set<TBase> buildOrder = bfs(leafTSet, links, sets, this::getPredecessors);

LOG.info(() -> "Build order: " + buildOrder.toString());
LOG.fine(() -> "Build order: " + buildOrder.toString());

return buildGraph(links, sets, true);
}

private ComputeGraph buildGraph(Collection<BuildableTLink> links, Collection<BuildableTSet> sets,
boolean reverse) {

LOG.info(() -> "Node build order: " + sets + " reversed: " + reverse);
LOG.fine(() -> "Node build order: " + sets + " reversed: " + reverse);
Iterator<BuildableTSet> setsItr = reverse ? new LinkedList<>(sets).descendingIterator()
: sets.iterator();
while (setsItr.hasNext()) {
setsItr.next().build(this);
}

LOG.info(() -> "Edge build order: " + links + " reversed: " + reverse);
LOG.fine(() -> "Edge build order: " + links + " reversed: " + reverse);
// links need to be built in order. check issue #519
/* for (int i = 0; i < links.size(); i++) {
links.get(links.size() - i - 1).build(this, sets);
Expand All @@ -216,7 +216,7 @@ private ComputeGraph buildGraph(Collection<BuildableTLink> links, Collection<Bui

// clean the upstream of the cached tsets
if (cleanUpstream(sets)) {
LOG.info("Some TSets have been cleaned up!");
LOG.fine("Some TSets have been cleaned up!");
}

return dataflowGraph;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@

public class BatchTSetEnvironment extends TSetEnvironment {

private BaseTSet cachedLeaf;
private ComputeGraph cachedGraph;

public BatchTSetEnvironment(WorkerEnvironment wEnv) {
super(wEnv);
}
Expand Down Expand Up @@ -72,7 +75,7 @@ public <K, V, F extends InputFormat<K, V>, I> SourceTSet<I> createHadoopSource(
*/
public void run(BaseTSet leafTset) {
ComputeGraph dataflowGraph = getTSetGraph().build(leafTset);
executeDataFlowGraph(dataflowGraph, null);
executeDataFlowGraph(dataflowGraph, null, false);
}

/**
Expand All @@ -82,8 +85,19 @@ public void run(BaseTSet leafTset) {
* @param <T> type of the output data object
* @return output result as a data object
*/
public <T> DataObject<T> runAndGet(BaseTSet leafTset, boolean isIterative) {
ComputeGraph dataflowGraph;
if (isIterative && cachedLeaf != null && cachedLeaf == leafTset) {
dataflowGraph = cachedGraph;
} else {
dataflowGraph = getTSetGraph().build(leafTset);
cachedGraph = dataflowGraph;
cachedLeaf = leafTset;
}
return executeDataFlowGraph(dataflowGraph, leafTset, isIterative);
}

public <T> DataObject<T> runAndGet(BaseTSet leafTset) {
ComputeGraph dataflowGraph = getTSetGraph().build(leafTset);
return executeDataFlowGraph(dataflowGraph, leafTset);
return runAndGet(leafTset, false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
public abstract class BIteratorLink<T> extends BBaseTLink<Iterator<T>, T>
implements BatchTupleMappableLink<T> {

private CachedTSet<T> savedCacheTSet;

BIteratorLink(BatchTSetEnvironment env, String n, int sourceP) {
this(env, n, sourceP, sourceP);
}
Expand Down Expand Up @@ -70,6 +72,24 @@ public <K, V> KeyedTSet<K, V> mapToTuple(MapFunc<Tuple<K, V>, T> mapToTupFn) {
return set;
}

@Override
public CachedTSet<T> cache(boolean isIterative) {
CachedTSet<T> cacheTSet;
if (isIterative && savedCacheTSet != null) {
cacheTSet = savedCacheTSet;
} else {
cacheTSet = new CachedTSet<>(getTSetEnv(), new CacheIterSink<T>(),
getTargetParallelism());
savedCacheTSet = cacheTSet;
addChildToGraph(cacheTSet);
}

DataObject<T> output = getTSetEnv().runAndGet(cacheTSet, isIterative);
cacheTSet.setData(output);

return cacheTSet;
}

@Override
public CachedTSet<T> cache() {
CachedTSet<T> cacheTSet = new CachedTSet<>(getTSetEnv(), new CacheIterSink<T>(),
Expand All @@ -78,7 +98,10 @@ public CachedTSet<T> cache() {

DataObject<T> output = getTSetEnv().runAndGet(cacheTSet);
cacheTSet.setData(output);

return cacheTSet;
}

public void finishIter() {
getTSetEnv().finishIter();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,18 @@ public interface BatchTLink<T1, T0> extends TLink<T1, T0> {
*
* @return output TSet
*/
BatchTSet<T0> cache();
default BatchTSet<T0> cache(boolean isIterative) {
throw new UnsupportedOperationException("Operation not implemented");
}

/**
* Runs the dataflow graph and caches data in memory
*
* @return output TSet
*/
default BatchTSet<T0> cache() {
return cache(false);
}

@Override
void forEach(ApplyFunc<T0> applyFunction);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,7 @@ public interface CacheableTSet<T> {
*
* @return the resulting TSet
*/
TSet<T> cache(boolean isIterative);

TSet<T> cache();
}
Loading

0 comments on commit a74e185

Please sign in to comment.