From 104c012ce19808f4d8b07c73309feaf5fc5c2889 Mon Sep 17 00:00:00 2001 From: Chathura Widanage Date: Wed, 28 Aug 2019 12:56:40 -0400 Subject: [PATCH 01/11] Adding maven:deploy related changes --- dashboard/server/pom.xml | 118 ++++++++++++++++++++++++- util/mvn/deploy-to-maven-central.sh | 65 +++++++------- util/mvn/execute-deploy.sh | 0 util/mvn/generate-latest-docs.sh | 0 util/mvn/publish-snapshot-on-commit.sh | 0 5 files changed, 150 insertions(+), 33 deletions(-) mode change 100644 => 100755 util/mvn/deploy-to-maven-central.sh mode change 100644 => 100755 util/mvn/execute-deploy.sh mode change 100644 => 100755 util/mvn/generate-latest-docs.sh mode change 100644 => 100755 util/mvn/publish-snapshot-on-commit.sh diff --git a/dashboard/server/pom.xml b/dashboard/server/pom.xml index 7518c2f843..9952c6b202 100644 --- a/dashboard/server/pom.xml +++ b/dashboard/server/pom.xml @@ -1,5 +1,6 @@ - 4.0.0 @@ -24,6 +25,18 @@ 1.8 + + + ossrh + https://oss.sonatype.org/content/repositories/snapshots + + + ossrh + https://oss.sonatype.org/service/local/staging/deploy/maven2/ + + + + org.springframework.boot @@ -97,8 +110,111 @@ org.springframework.boot spring-boot-maven-plugin + + maven-deploy-plugin + 2.8.2 + + + default-deploy + deploy + + deploy + + + + + + org.apache.maven.plugins + maven-release-plugin + 2.5.3 + + true + false + forked-path + -Dgpg.passphrase=${gpg.passphrase} + + + + org.apache.maven.scm + maven-scm-provider-gitexe + 1.9.5 + + + + + org.sonatype.plugins + nexus-staging-maven-plugin + 1.6.7 + true + + ossrh + https://oss.sonatype.org/ + true + + + + org.apache.maven.plugins + maven-source-plugin + 3.0.1 + + + attach-sources + + jar + + + + + + + org.apache.maven.plugins + maven-source-plugin + 2.10.4 + + UTF-8 + + + + attach-javadoc + + jar + + + + + + + + release-sign-artifacts + + + performRelease + true + + + + + + org.apache.maven.plugins + maven-gpg-plugin + 1.6 + + + sign-artifacts + verify + + sign + + + + + + + + + diff --git a/util/mvn/deploy-to-maven-central.sh b/util/mvn/deploy-to-maven-central.sh old mode 100644 new mode 100755 index 87dff6affd..44919dcff8 --- a/util/mvn/deploy-to-maven-central.sh +++ b/util/mvn/deploy-to-maven-central.sh @@ -10,8 +10,8 @@ key=$1 version_name=$2 shift 2 -if [[ ! "$version_name" =~ ^2\. ]]; then - echo 'Version name must begin with "2."' +if [[ ! "$version_name" =~ ^0\. ]]; then + echo 'Version name must begin with "0."' exit 2 fi @@ -20,40 +20,41 @@ if [[ "$version_name" =~ " " ]]; then exit 3 fi -bazel test //... +#bazel test //... bash $(dirname $0)/execute-deploy.sh \ "gpg:sign-and-deploy-file" \ "$version_name" \ - "-DrepositoryId=sonatype-nexus-staging" \ - "-Durl=https://oss.sonatype.org/service/local/staging/deploy/maven2/" \ + "-DrepositoryId=ossrh" \ + "-Durl=https://oss.sonatype.org/content/repositories/snapshots" \ +# "-Durl=https://oss.sonatype.org/service/local/staging/deploy/maven2/" \ "-Dgpg.keyname=${key}" # Publish javadocs to gh-pages -bazel build //:user-docs.jar -git clone --quiet --branch gh-pages \ - https://github.com/google/dagger gh-pages > /dev/null -cd gh-pages -unzip ../bazel-bin/user-docs.jar -d api/$version_name -rm -rf api/$version_name/META-INF/ -git add api/$version_name -git commit -m "$version_name docs" -git push origin gh-pages -cd .. -rm -rf gh-pages -for generated_pom_file in dagger*pom.xml; do - rm "${generated_pom_file}" - rm "${generated_pom_file}.asc" -done - -git checkout --detach -# Set the version string that is used as a tag in all of our libraries. If another repo depends on -# a versioned tag of Dagger, their java_library.tags should match the versioned release. -sed -i s/'${project.version}'/"${version_name}"/g tools/maven.bzl -git commit -m "${version_name} release" tools/maven.bzl - -git tag -a -m "Dagger ${version_name}" dagger-"${version_name}" -git push origin tag dagger-"${version_name}" - -# Switch back to the original HEAD -git checkout - +#bazel build //:user-docs.jar +#git clone --quiet --branch gh-pages \ +# https://github.com/google/dagger gh-pages > /dev/null +#cd gh-pages +#unzip ../bazel-bin/user-docs.jar -d api/$version_name +#rm -rf api/$version_name/META-INF/ +#git add api/$version_name +#git commit -m "$version_name docs" +#git push origin gh-pages +#cd .. +#rm -rf gh-pages +#for generated_pom_file in dagger*pom.xml; do +# rm "${generated_pom_file}" +# rm "${generated_pom_file}.asc" +#done +# +#git checkout --detach +## Set the version string that is used as a tag in all of our libraries. If another repo depends on +## a versioned tag of Dagger, their java_library.tags should match the versioned release. +#sed -i s/'${project.version}'/"${version_name}"/g tools/maven.bzl +#git commit -m "${version_name} release" tools/maven.bzl +# +#git tag -a -m "Dagger ${version_name}" dagger-"${version_name}" +#git push origin tag dagger-"${version_name}" +# +## Switch back to the original HEAD +#git checkout - diff --git a/util/mvn/execute-deploy.sh b/util/mvn/execute-deploy.sh old mode 100644 new mode 100755 diff --git a/util/mvn/generate-latest-docs.sh b/util/mvn/generate-latest-docs.sh old mode 100644 new mode 100755 diff --git a/util/mvn/publish-snapshot-on-commit.sh b/util/mvn/publish-snapshot-on-commit.sh old mode 100644 new mode 100755 From d5e10d70f995f713ad6fa191e834f37dae598339 Mon Sep 17 00:00:00 2001 From: pulasthi Date: Wed, 28 Aug 2019 13:17:05 -0400 Subject: [PATCH 02/11] temporary kmeans tset --- .../iu/dsc/tws/api/tset/TSetEnvironment.java | 38 +++++++++--- .../edu/iu/dsc/tws/api/tset/TSetGraph.java | 10 ++-- .../api/tset/env/BatchTSetEnvironment.java | 20 ++++++- .../api/tset/link/batch/BIteratorLink.java | 25 +++++++- .../tws/api/tset/link/batch/BatchTLink.java | 13 +++- .../dsc/tws/api/tset/sets/CacheableTSet.java | 2 + .../tws/api/tset/sets/batch/BBaseTSet.java | 23 +++++++- .../tws/api/tset/sets/batch/BatchTSet.java | 3 +- .../tws/api/tset/sets/batch/CachedTSet.java | 2 +- .../edu/iu/dsc/tws/comms/dfw/OneToOne.java | 1 + .../runners/twister2/examples/WordCount.java | 2 +- .../examples/batch/kmeans/KMeansTsetJob.java | 59 +++++++++---------- .../examples/batch/kmeans/KMeansWorker.java | 5 +- 13 files changed, 146 insertions(+), 57 deletions(-) diff --git a/twister2/api/src/java/edu/iu/dsc/tws/api/tset/TSetEnvironment.java b/twister2/api/src/java/edu/iu/dsc/tws/api/tset/TSetEnvironment.java index 9c7aa70f44..0f9c340435 100644 --- a/twister2/api/src/java/edu/iu/dsc/tws/api/tset/TSetEnvironment.java +++ b/twister2/api/src/java/edu/iu/dsc/tws/api/tset/TSetEnvironment.java @@ -51,7 +51,8 @@ public abstract class TSetEnvironment { private WorkerEnvironment workerEnv; private TSetGraph tsetGraph; private TaskExecutor taskExecutor; - + private ComputeGraph itergraph; + private ExecutionPlan iterexecutionPlan; private int defaultParallelism = 1; private Map>> tSetInputMap = new HashMap<>(); @@ -109,6 +110,7 @@ public Config getConfig() { /** * Running worker ID + * * @return workerID */ public int getWorkerID() { @@ -120,7 +122,7 @@ public int getWorkerID() { */ public void run() { ComputeGraph graph = tsetGraph.build(); - executeDataFlowGraph(graph, null); + executeDataFlowGraph(graph, null, false); } protected TSetGraph getTSetGraph() { @@ -130,24 +132,37 @@ protected TSetGraph getTSetGraph() { /** * execute data flow graph * + * @param type of the output data object * @param dataflowGraph data flow graph * @param outputTset output tset. If null, then no output would be returned - * @param type of the output data object * @return output as a data object if outputTset is not null. Else null */ protected DataObject executeDataFlowGraph(ComputeGraph dataflowGraph, - BuildableTSet outputTset) { - ExecutionPlan executionPlan = taskExecutor.plan(dataflowGraph); + BuildableTSet outputTset, boolean isIterative) { + ExecutionPlan executionPlan = null; + if (isIterative && iterexecutionPlan != null) { + executionPlan = iterexecutionPlan; + } else { + executionPlan = taskExecutor.plan(dataflowGraph); + if (isIterative) { + iterexecutionPlan = executionPlan; + itergraph = dataflowGraph; + } + } LOG.fine(executionPlan::toString); LOG.fine(() -> "edges: " + dataflowGraph.getDirectedEdgesSet()); LOG.fine(() -> "vertices: " + dataflowGraph.getTaskVertexSet()); pushInputsToFunctions(dataflowGraph, executionPlan); - taskExecutor.execute(dataflowGraph, executionPlan); + if (isIterative) { + taskExecutor.itrExecute(dataflowGraph, executionPlan); + } else { + taskExecutor.execute(dataflowGraph, executionPlan); - // once a graph is built and executed, reset the underlying builder! - tsetGraph.resetDfwGraphBuilder(); + // once a graph is built and executed, reset the underlying builder! + tsetGraph.resetDfwGraphBuilder(); + } // output tset alone does not guarantees that there will be an output available. // Example: if the output is done after a reduce, parallelism(output tset) = 1. Then only @@ -160,6 +175,13 @@ protected DataObject executeDataFlowGraph(ComputeGraph dataflowGraph, return new EmptyDataObject<>(); } + public void finishIter() { + taskExecutor.waitFor(itergraph, iterexecutionPlan); + tsetGraph.resetDfwGraphBuilder(); + itergraph = null; + iterexecutionPlan = null; + } + /** * Adds inputs to tasks * diff --git a/twister2/api/src/java/edu/iu/dsc/tws/api/tset/TSetGraph.java b/twister2/api/src/java/edu/iu/dsc/tws/api/tset/TSetGraph.java index 312c4fc667..c503bb0308 100644 --- a/twister2/api/src/java/edu/iu/dsc/tws/api/tset/TSetGraph.java +++ b/twister2/api/src/java/edu/iu/dsc/tws/api/tset/TSetGraph.java @@ -168,7 +168,7 @@ public ComputeGraph build() { buildOrder.addAll(bfs(src, links, sets, this::getSuccessors)); } - LOG.info(() -> "Build order: " + buildOrder.toString()); + LOG.fine(() -> "Build order: " + buildOrder.toString()); return buildGraph(links, sets, false); } @@ -185,7 +185,7 @@ public ComputeGraph build(BuildableTSet leafTSet) { Set buildOrder = bfs(leafTSet, links, sets, this::getPredecessors); - LOG.info(() -> "Build order: " + buildOrder.toString()); + LOG.fine(() -> "Build order: " + buildOrder.toString()); return buildGraph(links, sets, true); } @@ -193,14 +193,14 @@ public ComputeGraph build(BuildableTSet leafTSet) { private ComputeGraph buildGraph(Collection links, Collection sets, boolean reverse) { - LOG.info(() -> "Node build order: " + sets + " reversed: " + reverse); + LOG.fine(() -> "Node build order: " + sets + " reversed: " + reverse); Iterator setsItr = reverse ? new LinkedList<>(sets).descendingIterator() : sets.iterator(); while (setsItr.hasNext()) { setsItr.next().build(this); } - LOG.info(() -> "Edge build order: " + links + " reversed: " + reverse); + LOG.fine(() -> "Edge build order: " + links + " reversed: " + reverse); // links need to be built in order. check issue #519 /* for (int i = 0; i < links.size(); i++) { links.get(links.size() - i - 1).build(this, sets); @@ -216,7 +216,7 @@ private ComputeGraph buildGraph(Collection links, Collection, I> SourceTSet createHadoopSource( */ public void run(BaseTSet leafTset) { ComputeGraph dataflowGraph = getTSetGraph().build(leafTset); - executeDataFlowGraph(dataflowGraph, null); + executeDataFlowGraph(dataflowGraph, null, false); } /** @@ -82,8 +85,19 @@ public void run(BaseTSet leafTset) { * @param type of the output data object * @return output result as a data object */ + public DataObject runAndGet(BaseTSet leafTset, boolean isIterative) { + ComputeGraph dataflowGraph; + if (isIterative && cachedLeaf != null && cachedLeaf == leafTset) { + dataflowGraph = cachedGraph; + } else { + dataflowGraph = getTSetGraph().build(leafTset); + cachedGraph = dataflowGraph; + cachedLeaf = leafTset; + } + return executeDataFlowGraph(dataflowGraph, leafTset, isIterative); + } + public DataObject runAndGet(BaseTSet leafTset) { - ComputeGraph dataflowGraph = getTSetGraph().build(leafTset); - return executeDataFlowGraph(dataflowGraph, leafTset); + return runAndGet(leafTset, false); } } diff --git a/twister2/api/src/java/edu/iu/dsc/tws/api/tset/link/batch/BIteratorLink.java b/twister2/api/src/java/edu/iu/dsc/tws/api/tset/link/batch/BIteratorLink.java index 079f23bd47..03ad578866 100644 --- a/twister2/api/src/java/edu/iu/dsc/tws/api/tset/link/batch/BIteratorLink.java +++ b/twister2/api/src/java/edu/iu/dsc/tws/api/tset/link/batch/BIteratorLink.java @@ -33,6 +33,8 @@ public abstract class BIteratorLink extends BBaseTLink, T> implements BatchTupleMappableLink { + private CachedTSet savedCacheTSet; + BIteratorLink(BatchTSetEnvironment env, String n, int sourceP) { this(env, n, sourceP, sourceP); } @@ -70,6 +72,24 @@ public KeyedTSet mapToTuple(MapFunc, T> mapToTupFn) { return set; } + @Override + public CachedTSet cache(boolean isIterative) { + CachedTSet cacheTSet; + if (isIterative && savedCacheTSet != null) { + cacheTSet = savedCacheTSet; + } else { + cacheTSet = new CachedTSet<>(getTSetEnv(), new CacheIterSink(), + getTargetParallelism()); + savedCacheTSet = cacheTSet; + addChildToGraph(cacheTSet); + } + + DataObject output = getTSetEnv().runAndGet(cacheTSet, isIterative); + cacheTSet.setData(output); + + return cacheTSet; + } + @Override public CachedTSet cache() { CachedTSet cacheTSet = new CachedTSet<>(getTSetEnv(), new CacheIterSink(), @@ -78,7 +98,10 @@ public CachedTSet cache() { DataObject output = getTSetEnv().runAndGet(cacheTSet); cacheTSet.setData(output); - return cacheTSet; } + + public void finishIter() { + getTSetEnv().finishIter(); + } } diff --git a/twister2/api/src/java/edu/iu/dsc/tws/api/tset/link/batch/BatchTLink.java b/twister2/api/src/java/edu/iu/dsc/tws/api/tset/link/batch/BatchTLink.java index 8c1362ba12..ac756831ed 100644 --- a/twister2/api/src/java/edu/iu/dsc/tws/api/tset/link/batch/BatchTLink.java +++ b/twister2/api/src/java/edu/iu/dsc/tws/api/tset/link/batch/BatchTLink.java @@ -42,7 +42,18 @@ public interface BatchTLink extends TLink { * * @return output TSet */ - BatchTSet cache(); + default BatchTSet cache(boolean isIterative) { + throw new UnsupportedOperationException("Operation not implemented"); + } + + /** + * Runs the dataflow graph and caches data in memory + * + * @return output TSet + */ + default BatchTSet cache() { + return cache(false); + } @Override void forEach(ApplyFunc applyFunction); diff --git a/twister2/api/src/java/edu/iu/dsc/tws/api/tset/sets/CacheableTSet.java b/twister2/api/src/java/edu/iu/dsc/tws/api/tset/sets/CacheableTSet.java index 407b6f28b1..8ab8427798 100644 --- a/twister2/api/src/java/edu/iu/dsc/tws/api/tset/sets/CacheableTSet.java +++ b/twister2/api/src/java/edu/iu/dsc/tws/api/tset/sets/CacheableTSet.java @@ -18,5 +18,7 @@ public interface CacheableTSet { * * @return the resulting TSet */ + TSet cache(boolean isIterative); + TSet cache(); } diff --git a/twister2/api/src/java/edu/iu/dsc/tws/api/tset/sets/batch/BBaseTSet.java b/twister2/api/src/java/edu/iu/dsc/tws/api/tset/sets/batch/BBaseTSet.java index efc65012f4..692dce4895 100644 --- a/twister2/api/src/java/edu/iu/dsc/tws/api/tset/sets/batch/BBaseTSet.java +++ b/twister2/api/src/java/edu/iu/dsc/tws/api/tset/sets/batch/BBaseTSet.java @@ -34,6 +34,8 @@ public abstract class BBaseTSet extends BaseTSet implements BatchTSet { + private DirectTLink iterdirect = null; + BBaseTSet(BatchTSetEnvironment tSetEnv, String name, int parallelism) { super(tSetEnv, name, parallelism); } @@ -155,9 +157,21 @@ public ReplicateTLink replicate(int replications) { return cloneTSet; } + @Override + public CachedTSet cache(boolean isIterative) { + if (isIterative && iterdirect != null) { + return iterdirect.cache(isIterative); + } else if (isIterative) { + iterdirect = direct(); + return iterdirect.cache(isIterative); + } else { + return direct().cache(isIterative); + } + } + @Override public CachedTSet cache() { - return direct().cache(); + return cache(false); } @Override @@ -165,4 +179,11 @@ public boolean addInput(String key, Cacheable input) { getTSetEnv().addInput(getId(), key, input); return true; } + + public void finishIter() { + if (iterdirect == null) { + throw new IllegalStateException("cache with iter needs to be called first"); + } + iterdirect.finishIter(); + } } diff --git a/twister2/api/src/java/edu/iu/dsc/tws/api/tset/sets/batch/BatchTSet.java b/twister2/api/src/java/edu/iu/dsc/tws/api/tset/sets/batch/BatchTSet.java index fa6ff26675..0c82e910ea 100644 --- a/twister2/api/src/java/edu/iu/dsc/tws/api/tset/sets/batch/BatchTSet.java +++ b/twister2/api/src/java/edu/iu/dsc/tws/api/tset/sets/batch/BatchTSet.java @@ -68,10 +68,11 @@ public interface BatchTSet extends TSet, CacheableTSet { @Override BatchTSet union(TSet unionTSet); - @Override BatchTSet union(Collection> tSets); + BatchTSet cache(boolean isIterative); + @Override BatchTSet cache(); } diff --git a/twister2/api/src/java/edu/iu/dsc/tws/api/tset/sets/batch/CachedTSet.java b/twister2/api/src/java/edu/iu/dsc/tws/api/tset/sets/batch/CachedTSet.java index 022e232bcb..b2cb7b83a2 100644 --- a/twister2/api/src/java/edu/iu/dsc/tws/api/tset/sets/batch/CachedTSet.java +++ b/twister2/api/src/java/edu/iu/dsc/tws/api/tset/sets/batch/CachedTSet.java @@ -140,7 +140,7 @@ public ReplicateTLink replicate(int replications) { } @Override - public CachedTSet cache() { + public CachedTSet cache(boolean isIterative) { return this; // throw new IllegalStateException("Calling Cache on an already cached Object"); } diff --git a/twister2/comms/src/java/edu/iu/dsc/tws/comms/dfw/OneToOne.java b/twister2/comms/src/java/edu/iu/dsc/tws/comms/dfw/OneToOne.java index 282998fcf5..44907a1515 100644 --- a/twister2/comms/src/java/edu/iu/dsc/tws/comms/dfw/OneToOne.java +++ b/twister2/comms/src/java/edu/iu/dsc/tws/comms/dfw/OneToOne.java @@ -320,6 +320,7 @@ public void close() { public void reset() { if (finalReceiver != null) { finalReceiver.clean(); + finishedSources.clear(); } } diff --git a/twister2/compatibility/beam/src/main/java/org/apache/beam/runners/twister2/examples/WordCount.java b/twister2/compatibility/beam/src/main/java/org/apache/beam/runners/twister2/examples/WordCount.java index 8eeff20701..e0905e0cea 100644 --- a/twister2/compatibility/beam/src/main/java/org/apache/beam/runners/twister2/examples/WordCount.java +++ b/twister2/compatibility/beam/src/main/java/org/apache/beam/runners/twister2/examples/WordCount.java @@ -37,7 +37,7 @@ public static void main(String[] args) { Config config = ResourceAllocator.loadConfig(new HashMap<>()); JobConfig jobConfig = new JobConfig(); int workers = 1; - + System.out.println("Start"); Twister2Job.Twister2JobBuilder jobBuilder = Twister2Job.newBuilder(); jobBuilder.setJobName("KMeans-job"); jobBuilder.setWorkerClass(WordCountWorker.class.getName()); diff --git a/twister2/examples/src/java/edu/iu/dsc/tws/examples/batch/kmeans/KMeansTsetJob.java b/twister2/examples/src/java/edu/iu/dsc/tws/examples/batch/kmeans/KMeansTsetJob.java index f9e72d105c..22fb9e4929 100644 --- a/twister2/examples/src/java/edu/iu/dsc/tws/examples/batch/kmeans/KMeansTsetJob.java +++ b/twister2/examples/src/java/edu/iu/dsc/tws/examples/batch/kmeans/KMeansTsetJob.java @@ -10,18 +10,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - package edu.iu.dsc.tws.examples.batch.kmeans; import java.io.IOException; @@ -32,13 +20,13 @@ import edu.iu.dsc.tws.api.config.Config; import edu.iu.dsc.tws.api.data.Path; +import edu.iu.dsc.tws.api.dataset.DataPartition; import edu.iu.dsc.tws.api.tset.TSetContext; import edu.iu.dsc.tws.api.tset.env.BatchTSetEnvironment; import edu.iu.dsc.tws.api.tset.fn.BaseMapFunc; import edu.iu.dsc.tws.api.tset.fn.BaseSourceFunc; import edu.iu.dsc.tws.api.tset.fn.MapFunc; import edu.iu.dsc.tws.api.tset.fn.ReduceFunc; -import edu.iu.dsc.tws.api.tset.link.batch.AllReduceTLink; import edu.iu.dsc.tws.api.tset.sets.batch.CachedTSet; import edu.iu.dsc.tws.api.tset.sets.batch.ComputeTSet; import edu.iu.dsc.tws.api.tset.worker.BatchTSetIWorker; @@ -76,32 +64,37 @@ public void execute(BatchTSetEnvironment tc) { long startTime = System.currentTimeMillis(); CachedTSet points = - tc.createSource(new PointsSource(), parallelismValue).setName("dataSource").cache(); + tc.createSource(new PointsSource(), parallelismValue).setName("dataSource").cache(false); CachedTSet centers = - tc.createSource(new CenterSource(), parallelismValue).cache(); + tc.createSource(new CenterSource(), parallelismValue).cache(false); long endTimeData = System.currentTimeMillis(); + ComputeTSet> kmeansTSet = + points.direct().map(new KMeansMap()); + + ComputeTSet reduced = kmeansTSet.allReduce((ReduceFunc) + (t1, t2) -> { + double[][] newCentroids = new double[t1.length] + [t1[0].length]; + for (int j = 0; j < t1.length; j++) { + for (int k = 0; k < t1[0].length; k++) { + double newVal = t1[j][k] + t2[j][k]; + newCentroids[j][k] = newVal; + } + } + return newCentroids; + }).map(new AverageCenters()); for (int i = 0; i < iterations; i++) { - ComputeTSet> kmeansTSet = - points.direct().map(new KMeansMap()); - kmeansTSet.addInput("centers", centers); + centers = reduced.cache(true); + } + //reduced.finishIter(); - AllReduceTLink reduced = kmeansTSet.allReduce((ReduceFunc) - (t1, t2) -> { - double[][] newCentroids = new double[t1.length] - [t1[0].length]; - for (int j = 0; j < t1.length; j++) { - for (int k = 0; k < t1[0].length; k++) { - double newVal = t1[j][k] + t2[j][k]; - newCentroids[j][k] = newVal; - } - } - return newCentroids; - }); - - centers = reduced.map(new AverageCenters()).cache(); + DataPartition centroidPartition = centers.getDataObject().getPartition(workerId); + double[][] centroid = null; + if (centroidPartition.getConsumer().hasNext()) { + centroid = (double[][]) centroidPartition.getConsumer().next(); } long endTime = System.currentTimeMillis(); @@ -109,6 +102,8 @@ public void execute(BatchTSetEnvironment tc) { LOG.info("Data Load time : " + (endTimeData - startTime) + "\n" + "Total Time : " + (endTime - startTime) + "Compute Time : " + (endTime - endTimeData)); + LOG.info("Final Centroids After\t" + iterations + "\titerations\t" + + centroid.length); } } diff --git a/twister2/examples/src/java/edu/iu/dsc/tws/examples/batch/kmeans/KMeansWorker.java b/twister2/examples/src/java/edu/iu/dsc/tws/examples/batch/kmeans/KMeansWorker.java index 93584cd639..b33fa28a4d 100644 --- a/twister2/examples/src/java/edu/iu/dsc/tws/examples/batch/kmeans/KMeansWorker.java +++ b/twister2/examples/src/java/edu/iu/dsc/tws/examples/batch/kmeans/KMeansWorker.java @@ -11,7 +11,6 @@ // limitations under the License. package edu.iu.dsc.tws.examples.batch.kmeans; -import java.util.Arrays; import java.util.HashSet; import java.util.Set; import java.util.logging.Level; @@ -162,8 +161,8 @@ public void execute(Config config, int workerId, IWorkerController workerControl LOG.info("Total K-Means Execution Time: " + (endTime - startTime) + "\tData Load time : " + (endTimeData - startTime) + "\tCompute Time : " + (endTime - endTimeData)); - LOG.fine("Final Centroids After\t" + iterations + "\titerations\t" - + Arrays.deepToString(centroid)); + LOG.info("Final Centroids After\t" + iterations + "\titerations\t" + + centroid.length); } public static ComputeGraph buildDataPointsTG(String dataDirectory, int dsize, From 2c86425a6acf422475cccc158a0110d71fe8f5b0 Mon Sep 17 00:00:00 2001 From: pulasthi Date: Thu, 29 Aug 2019 09:35:52 -0400 Subject: [PATCH 03/11] update kmeans --- .../tws/examples/batch/kmeans/KMeansWorker.java | 17 +++++++++++------ .../examples/batch/kmeans/KMeansWorkerMain.java | 2 +- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/twister2/examples/src/java/edu/iu/dsc/tws/examples/batch/kmeans/KMeansWorker.java b/twister2/examples/src/java/edu/iu/dsc/tws/examples/batch/kmeans/KMeansWorker.java index b33fa28a4d..c839ff1364 100644 --- a/twister2/examples/src/java/edu/iu/dsc/tws/examples/batch/kmeans/KMeansWorker.java +++ b/twister2/examples/src/java/edu/iu/dsc/tws/examples/batch/kmeans/KMeansWorker.java @@ -11,6 +11,7 @@ // limitations under the License. package edu.iu.dsc.tws.examples.batch.kmeans; +import java.util.Arrays; import java.util.HashSet; import java.util.Set; import java.util.logging.Level; @@ -81,11 +82,12 @@ public void execute(Config config, int workerId, IWorkerController workerControl int csize = kMeansJobParameters.getCsize(); int iterations = kMeansJobParameters.getIterations(); - String dataDirectory = kMeansJobParameters.getDatapointDirectory() + workerId; - String centroidDirectory = kMeansJobParameters.getCentroidDirectory() + workerId; + String dataDirectory = kMeansJobParameters.getDatapointDirectory(); + String centroidDirectory = kMeansJobParameters.getCentroidDirectory(); - workerUtils.generateDatapoints(dimension, numFiles, dsize, csize, dataDirectory, - centroidDirectory); + +// workerUtils.generateDatapoints(dimension, numFiles, dsize, csize, dataDirectory, +// centroidDirectory); long startTime = System.currentTimeMillis(); @@ -161,8 +163,11 @@ public void execute(Config config, int workerId, IWorkerController workerControl LOG.info("Total K-Means Execution Time: " + (endTime - startTime) + "\tData Load time : " + (endTimeData - startTime) + "\tCompute Time : " + (endTime - endTimeData)); - LOG.info("Final Centroids After\t" + iterations + "\titerations\t" - + centroid.length); + if (workerId == 0) { + LOG.info("Final Centroids After\t" + iterations + "\titerations\t" + + Arrays.toString(centroid[0])); + + } } public static ComputeGraph buildDataPointsTG(String dataDirectory, int dsize, diff --git a/twister2/examples/src/java/edu/iu/dsc/tws/examples/batch/kmeans/KMeansWorkerMain.java b/twister2/examples/src/java/edu/iu/dsc/tws/examples/batch/kmeans/KMeansWorkerMain.java index df50050e46..476d121504 100644 --- a/twister2/examples/src/java/edu/iu/dsc/tws/examples/batch/kmeans/KMeansWorkerMain.java +++ b/twister2/examples/src/java/edu/iu/dsc/tws/examples/batch/kmeans/KMeansWorkerMain.java @@ -107,7 +107,7 @@ public static void main(String[] args) throws ParseException { } else if (Context.TWISTER2_TSET_JOB.equals(jobType)) { jobBuilder.setWorkerClass(KMeansTsetJob.class.getName()); } - jobBuilder.addComputeResource(2, 512, 1.0, workers); + jobBuilder.addComputeResource(2, 4096, 1.0, workers); jobBuilder.setConfig(jobConfig); // now submit the job From b9669fc70966186dd88f967bb4a33781c46022ab Mon Sep 17 00:00:00 2001 From: Gurhan Gunduz Date: Thu, 29 Aug 2019 16:55:37 +0300 Subject: [PATCH 04/11] fault tolerance improved --- .../jobmaster/JobMasterClientExample.java | 2 +- .../dsc/tws/master/server/WorkerMonitor.java | 44 +++++++++---------- .../dsc/tws/master/worker/JMWorkerAgent.java | 14 +++--- .../schedulers/k8s/mpi/MPIWorkerStarter.java | 2 +- .../k8s/worker/K8sWorkerStarter.java | 2 +- .../schedulers/mesos/MesosDockerWorker.java | 14 ++++-- .../mesos/mpi/MesosMPIMasterStarter.java | 2 +- .../mesos/mpi/MesosMPIWorkerStarter.java | 2 +- .../schedulers/nomad/NomadWorkerStarter.java | 2 +- .../schedulers/standalone/MPIWorker.java | 2 +- 10 files changed, 48 insertions(+), 38 deletions(-) diff --git a/twister2/examples/src/java/edu/iu/dsc/tws/examples/internal/jobmaster/JobMasterClientExample.java b/twister2/examples/src/java/edu/iu/dsc/tws/examples/internal/jobmaster/JobMasterClientExample.java index f9edb7e370..8bd1141b48 100644 --- a/twister2/examples/src/java/edu/iu/dsc/tws/examples/internal/jobmaster/JobMasterClientExample.java +++ b/twister2/examples/src/java/edu/iu/dsc/tws/examples/internal/jobmaster/JobMasterClientExample.java @@ -102,7 +102,7 @@ public static void simulateClient(Config config, JobAPI.Job job) { JMWorkerAgent client = JMWorkerAgent.createJMWorkerAgent( config, workerInfo, jobMasterAddress, jobMasterPort, numberOfWorkers); - client.startThreaded(); + client.startThreaded(false); IWorkerController workerController = client.getJMWorkerController(); diff --git a/twister2/master/src/java/edu/iu/dsc/tws/master/server/WorkerMonitor.java b/twister2/master/src/java/edu/iu/dsc/tws/master/server/WorkerMonitor.java index 3ab81a3af0..0ff28eea11 100644 --- a/twister2/master/src/java/edu/iu/dsc/tws/master/server/WorkerMonitor.java +++ b/twister2/master/src/java/edu/iu/dsc/tws/master/server/WorkerMonitor.java @@ -190,25 +190,25 @@ private void registerWorkerMessageReceived(RequestID id, JobMasterAPI.RegisterWo // if it is coming from failure // update the worker status and return - int registeredWorkerID = getRegisteredWorkerID(workerInfo.getWorkerIP(), workerInfo.getPort()); - if (registeredWorkerID >= 0) { - // update the worker status in the worker list - workers.get(registeredWorkerID).addWorkerState(JobMasterAPI.WorkerState.STARTING); - LOG.info("WorkerID: " + registeredWorkerID + " joined from failure."); - - // send the response message - sendRegisterWorkerResponse(id, workerInfo.getWorkerID(), true, null); - - // send worker registration message to dashboard - if (dashClient != null) { - dashClient.registerWorker(workerInfo); - } - // TO DO inform checkpoint master - // what should be the message - recoverFromFailure(workerInfo.getWorkerID(), workerInfo); - - return; - } +// int registeredWorkerID = getRegisteredWorkerID(workerInfo.getWorkerIP(), workerInfo.getPort()); +// if (registeredWorkerID >= 0) { +// // update the worker status in the worker list +// workers.get(registeredWorkerID).addWorkerState(JobMasterAPI.WorkerState.STARTING); +// LOG.info("WorkerID: " + registeredWorkerID + " joined from failure."); +// +// // send the response message +// sendRegisterWorkerResponse(id, workerInfo.getWorkerID(), true, null); +// +// // send worker registration message to dashboard +// if (dashClient != null) { +// dashClient.registerWorker(workerInfo); +// } +// // TO DO inform checkpoint master +// // what should be the message +// recoverFromFailure(workerInfo.getWorkerID(), workerInfo); +// +// return; +// } // if job master assigns workerIDs, get new id and update it in WorkerInfo // also set in RRServer @@ -317,7 +317,7 @@ private void reregisterWorkerMessageReceived(RequestID id, JobMasterAPI.Register return; } - // TODO: we need to let all workers and the driver know that a worker joined after failure + // if this is not a new registration and all workers joined, inform this worker only if (!newRegistration && allWorkersRegistered()) { @@ -329,8 +329,8 @@ private void reregisterWorkerMessageReceived(RequestID id, JobMasterAPI.Register //notify the worker when there was a worker coming from a failure. private void recoverFromFailure(int workerID, JobMasterAPI.WorkerInfo workerInfo) { - //rrServer.removeWorkerChannel(workerID); - //rrServer.setWorkerChannel(workerID); + rrServer.removeWorkerChannel(workerID); + rrServer.setWorkerChannel(workerID); JobMasterAPI.Recover recoverMessage = JobMasterAPI.Recover.newBuilder() .setWorkerID(workerID) .build(); diff --git a/twister2/master/src/java/edu/iu/dsc/tws/master/worker/JMWorkerAgent.java b/twister2/master/src/java/edu/iu/dsc/tws/master/worker/JMWorkerAgent.java index f516524a9c..e3d6c5b73d 100644 --- a/twister2/master/src/java/edu/iu/dsc/tws/master/worker/JMWorkerAgent.java +++ b/twister2/master/src/java/edu/iu/dsc/tws/master/worker/JMWorkerAgent.java @@ -266,7 +266,7 @@ private void startLooping() { /** * start the Job Master Client in a Thread */ - public Thread startThreaded() { + public Thread startThreaded(boolean fromFailure) { // first initialize the client, connect to Job Master init(); @@ -275,7 +275,7 @@ public Thread startThreaded() { jmThread.setName("JM Agent"); jmThread.start(); - boolean registered = registerWorker(); + boolean registered = registerWorker(fromFailure); if (!registered) { this.close(); throw new RuntimeException("Could not register JobMaster with Dashboard. Exiting ....."); @@ -287,13 +287,13 @@ public Thread startThreaded() { /** * start the Job Master Client in a blocking call */ - public void startBlocking() { + public void startBlocking(boolean fromFailure) { // first initialize the client, connect to Job Master init(); startLooping(); - boolean registered = registerWorker(); + boolean registered = registerWorker(fromFailure); if (!registered) { throw new RuntimeException("Could not register JobMaster with Dashboard. Exiting ....."); } @@ -395,11 +395,12 @@ public static boolean addJobListener(JobListener jobListener) { * send RegisterWorker message to Job Master * put WorkerInfo in this message */ - private boolean registerWorker() { + private boolean registerWorker(boolean fromFailure) { JobMasterAPI.RegisterWorker registerWorker = JobMasterAPI.RegisterWorker.newBuilder() .setWorkerID(thisWorker.getWorkerID()) .setWorkerInfo(thisWorker) + .setFromFailure(fromFailure) .build(); LOG.fine("Sending RegisterWorker message: \n" + registerWorker); @@ -592,7 +593,8 @@ public void onMessage(RequestID id, int workerId, Message message) { //TODO: going back to last checkpoint should be implemented here. public void setBackToLAstCheckpoint() { - //System.exit(3); + + System.exit(3); } diff --git a/twister2/resource-scheduler/src/java/edu/iu/dsc/tws/rsched/schedulers/k8s/mpi/MPIWorkerStarter.java b/twister2/resource-scheduler/src/java/edu/iu/dsc/tws/rsched/schedulers/k8s/mpi/MPIWorkerStarter.java index 20cb3c4552..cae5c15afb 100644 --- a/twister2/resource-scheduler/src/java/edu/iu/dsc/tws/rsched/schedulers/k8s/mpi/MPIWorkerStarter.java +++ b/twister2/resource-scheduler/src/java/edu/iu/dsc/tws/rsched/schedulers/k8s/mpi/MPIWorkerStarter.java @@ -165,7 +165,7 @@ public static void main(String[] args) { JobMasterContext.jobMasterPort(config), job.getNumberOfWorkers()); // start JMWorkerAgent - jobMasterAgent.startThreaded(); + jobMasterAgent.startThreaded(false); // we will be running the Worker, send running message jobMasterAgent.sendWorkerRunningMessage(); diff --git a/twister2/resource-scheduler/src/java/edu/iu/dsc/tws/rsched/schedulers/k8s/worker/K8sWorkerStarter.java b/twister2/resource-scheduler/src/java/edu/iu/dsc/tws/rsched/schedulers/k8s/worker/K8sWorkerStarter.java index 3b569d962c..fafaabf138 100644 --- a/twister2/resource-scheduler/src/java/edu/iu/dsc/tws/rsched/schedulers/k8s/worker/K8sWorkerStarter.java +++ b/twister2/resource-scheduler/src/java/edu/iu/dsc/tws/rsched/schedulers/k8s/worker/K8sWorkerStarter.java @@ -141,7 +141,7 @@ public static void main(String[] args) { JobMasterContext.jobMasterPort(config), job.getNumberOfWorkers()); // start JMWorkerAgent - jobMasterAgent.startThreaded(); + jobMasterAgent.startThreaded(false); // we will be running the Worker, send running message jobMasterAgent.sendWorkerRunningMessage(); diff --git a/twister2/resource-scheduler/src/java/edu/iu/dsc/tws/rsched/schedulers/mesos/MesosDockerWorker.java b/twister2/resource-scheduler/src/java/edu/iu/dsc/tws/rsched/schedulers/mesos/MesosDockerWorker.java index 04cfc397ad..d9152a4203 100644 --- a/twister2/resource-scheduler/src/java/edu/iu/dsc/tws/rsched/schedulers/mesos/MesosDockerWorker.java +++ b/twister2/resource-scheduler/src/java/edu/iu/dsc/tws/rsched/schedulers/mesos/MesosDockerWorker.java @@ -58,6 +58,10 @@ public static void main(String[] args) throws Exception { //gets the docker home directory //String homeDir = System.getenv("HOME"); + int restart = 0; + if (args != null) { + restart = Integer.parseInt(args[0]); + } workerId = Integer.parseInt(System.getenv("WORKER_ID")); jobName = System.getenv("JOB_NAME"); MesosDockerWorker worker = new MesosDockerWorker(); @@ -119,7 +123,7 @@ public static void main(String[] args) throws Exception { LOG.info(workerController.getWorkerInfo().toString()); //start job master client worker.startJobMasterAgent(workerController.getWorkerInfo(), jobMasterIP, jobMasterPort, - workerCount); + workerCount, restart); config = JobUtils.overrideConfigs(job, config); config = JobUtils.updateConfigs(job, config); @@ -182,13 +186,17 @@ public static void closeWorker() { } public void startJobMasterAgent(JobMasterAPI.WorkerInfo workerInfo, String jobMasterIP, - int jobMasterPort, int numberOfWorkers) { + int jobMasterPort, int numberOfWorkers, int restart) { LOG.info("JobMaster IP..: " + jobMasterIP); LOG.info("NETWORK INFO..: " + workerInfo.getWorkerIP()); jobMasterAgent = JMWorkerAgent.createJMWorkerAgent(config, workerInfo, jobMasterIP, jobMasterPort, numberOfWorkers); - jobMasterAgent.startThreaded(); + if (restart > 0) { + jobMasterAgent.startThreaded(true); + } else { + jobMasterAgent.startThreaded(false); + } // No need for sending workerStarting message anymore // that is called in startThreaded method } diff --git a/twister2/resource-scheduler/src/java/edu/iu/dsc/tws/rsched/schedulers/mesos/mpi/MesosMPIMasterStarter.java b/twister2/resource-scheduler/src/java/edu/iu/dsc/tws/rsched/schedulers/mesos/mpi/MesosMPIMasterStarter.java index cf54c7d0df..158e2b9786 100644 --- a/twister2/resource-scheduler/src/java/edu/iu/dsc/tws/rsched/schedulers/mesos/mpi/MesosMPIMasterStarter.java +++ b/twister2/resource-scheduler/src/java/edu/iu/dsc/tws/rsched/schedulers/mesos/mpi/MesosMPIMasterStarter.java @@ -183,7 +183,7 @@ public void startJobMasterAgent(JobMasterAPI.WorkerInfo workerInfo, String jobMa LOG.info("NETWORK INFO..: " + workerInfo.getWorkerIP()); jobMasterAgent = JMWorkerAgent.createJMWorkerAgent(config, workerInfo, jobMasterIP, jobMasterPort, numberOfWorkers); - jobMasterAgent.startThreaded(); + jobMasterAgent.startThreaded(false); // No need for sending workerStarting message anymore // that is called in startThreaded method } diff --git a/twister2/resource-scheduler/src/java/edu/iu/dsc/tws/rsched/schedulers/mesos/mpi/MesosMPIWorkerStarter.java b/twister2/resource-scheduler/src/java/edu/iu/dsc/tws/rsched/schedulers/mesos/mpi/MesosMPIWorkerStarter.java index 1b546bb90e..9fda982eb3 100644 --- a/twister2/resource-scheduler/src/java/edu/iu/dsc/tws/rsched/schedulers/mesos/mpi/MesosMPIWorkerStarter.java +++ b/twister2/resource-scheduler/src/java/edu/iu/dsc/tws/rsched/schedulers/mesos/mpi/MesosMPIWorkerStarter.java @@ -135,7 +135,7 @@ public static void startJobMasterAgent(JobMasterAPI.WorkerInfo workerInfo, Strin LOG.info("NETWORK INFO..: " + workerInfo.getWorkerIP().toString()); jobMasterAgent = JMWorkerAgent.createJMWorkerAgent(config, workerInfo, jobMasterIP, jobMasterPort, numberOfWorkers); - jobMasterAgent.startThreaded(); + jobMasterAgent.startThreaded(false); // No need for sending workerStarting message anymore // that is called in startThreaded method } diff --git a/twister2/resource-scheduler/src/java/edu/iu/dsc/tws/rsched/schedulers/nomad/NomadWorkerStarter.java b/twister2/resource-scheduler/src/java/edu/iu/dsc/tws/rsched/schedulers/nomad/NomadWorkerStarter.java index 7fe393159a..ce709348ee 100644 --- a/twister2/resource-scheduler/src/java/edu/iu/dsc/tws/rsched/schedulers/nomad/NomadWorkerStarter.java +++ b/twister2/resource-scheduler/src/java/edu/iu/dsc/tws/rsched/schedulers/nomad/NomadWorkerStarter.java @@ -309,7 +309,7 @@ private JMWorkerAgent createMasterAgent(Config cfg, String masterHost, int maste workerInfo, masterHost, masterPort, numberContainers); LOG.log(Level.INFO, String.format("Connecting to job master..: %s:%d", masterHost, masterPort)); - jobMasterAgent.startThreaded(); + jobMasterAgent.startThreaded(false); // No need for sending workerStarting message anymore // that is called in startThreaded method diff --git a/twister2/resource-scheduler/src/java/edu/iu/dsc/tws/rsched/schedulers/standalone/MPIWorker.java b/twister2/resource-scheduler/src/java/edu/iu/dsc/tws/rsched/schedulers/standalone/MPIWorker.java index 9917443821..c5bac659d4 100644 --- a/twister2/resource-scheduler/src/java/edu/iu/dsc/tws/rsched/schedulers/standalone/MPIWorker.java +++ b/twister2/resource-scheduler/src/java/edu/iu/dsc/tws/rsched/schedulers/standalone/MPIWorker.java @@ -238,7 +238,7 @@ private JMWorkerAgent createMasterAgent(Config cfg, String masterHost, int maste JMWorkerAgent jobMasterAgent = JMWorkerAgent.createJMWorkerAgent(cfg, workerInfo, masterHost, masterPort, numberContainers); LOG.log(Level.FINE, String.format("Connecting to job master %s:%d", masterHost, masterPort)); - jobMasterAgent.startThreaded(); + jobMasterAgent.startThreaded(false); // now lets send the starting message jobMasterAgent.sendWorkerRunningMessage(); From 68ed20bbe26efbd5d081ea2171e17693d7fbe629 Mon Sep 17 00:00:00 2001 From: supunkamburugamuva Date: Thu, 29 Aug 2019 11:21:09 -0400 Subject: [PATCH 05/11] fixing a compilation error --- .../src/java/edu/iu/dsc/tws/master/worker/JMWorkerAgent.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/twister2/master/src/java/edu/iu/dsc/tws/master/worker/JMWorkerAgent.java b/twister2/master/src/java/edu/iu/dsc/tws/master/worker/JMWorkerAgent.java index e3d6c5b73d..205a6c56ce 100644 --- a/twister2/master/src/java/edu/iu/dsc/tws/master/worker/JMWorkerAgent.java +++ b/twister2/master/src/java/edu/iu/dsc/tws/master/worker/JMWorkerAgent.java @@ -594,7 +594,7 @@ public void onMessage(RequestID id, int workerId, Message message) { public void setBackToLAstCheckpoint() { - System.exit(3); + // System.exit(3); } From 1dba0e99080df67b6cab835c860669e45b87d629 Mon Sep 17 00:00:00 2001 From: pulasthi Date: Thu, 29 Aug 2019 14:15:40 -0400 Subject: [PATCH 06/11] updating kmeans --- .../tws/examples/batch/kmeans/KMeansWorker.java | 2 +- .../tws/task/dataobjects/DataObjectSource.java | 15 ++++++++++++--- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/twister2/examples/src/java/edu/iu/dsc/tws/examples/batch/kmeans/KMeansWorker.java b/twister2/examples/src/java/edu/iu/dsc/tws/examples/batch/kmeans/KMeansWorker.java index c839ff1364..d6df3933ff 100644 --- a/twister2/examples/src/java/edu/iu/dsc/tws/examples/batch/kmeans/KMeansWorker.java +++ b/twister2/examples/src/java/edu/iu/dsc/tws/examples/batch/kmeans/KMeansWorker.java @@ -174,7 +174,7 @@ public static ComputeGraph buildDataPointsTG(String dataDirectory, int dsize, int parallelismValue, int dimension, Config conf) { DataObjectSource dataObjectSource = new DataObjectSource(Context.TWISTER2_DIRECT_EDGE, - dataDirectory); + dataDirectory, dsize); KMeansDataObjectCompute dataObjectCompute = new KMeansDataObjectCompute( Context.TWISTER2_DIRECT_EDGE, dsize, parallelismValue, dimension); KMeansDataObjectDirectSink dataObjectSink = new KMeansDataObjectDirectSink("points"); diff --git a/twister2/task/src/main/java/edu/iu/dsc/tws/task/dataobjects/DataObjectSource.java b/twister2/task/src/main/java/edu/iu/dsc/tws/task/dataobjects/DataObjectSource.java index 2ff255e3e9..9be4ce7c14 100644 --- a/twister2/task/src/main/java/edu/iu/dsc/tws/task/dataobjects/DataObjectSource.java +++ b/twister2/task/src/main/java/edu/iu/dsc/tws/task/dataobjects/DataObjectSource.java @@ -20,7 +20,7 @@ import edu.iu.dsc.tws.api.compute.nodes.BaseSource; import edu.iu.dsc.tws.api.config.Config; import edu.iu.dsc.tws.api.data.Path; -import edu.iu.dsc.tws.data.api.formatters.LocalTextInputPartitioner; +import edu.iu.dsc.tws.data.api.formatters.LocalFixedInputPartitioner; import edu.iu.dsc.tws.data.fs.io.InputSplit; import edu.iu.dsc.tws.dataset.DataSource; import edu.iu.dsc.tws.executor.core.ExecutionRuntime; @@ -47,10 +47,19 @@ public class DataObjectSource extends BaseSource { */ private String edgeName; private String dataDirectory; + private int dataSize; + + public DataObjectSource(String edgename, String dataDirectory, int dsize) { + this.edgeName = edgename; + this.dataDirectory = dataDirectory; + this.dataSize = dsize; + } + public DataObjectSource(String edgename, String dataDirectory) { this.edgeName = edgename; this.dataDirectory = dataDirectory; + this.dataSize = 1; } public String getDataDirectory() { @@ -102,7 +111,7 @@ public void execute() { public void prepare(Config cfg, TaskContext context) { super.prepare(cfg, context); ExecutionRuntime runtime = (ExecutionRuntime) cfg.get(ExecutorContext.TWISTER2_RUNTIME_OBJECT); - this.source = runtime.createInput(cfg, context, new LocalTextInputPartitioner( - new Path(getDataDirectory()), context.getParallelism(), cfg)); + this.source = runtime.createInput(cfg, context, new LocalFixedInputPartitioner( + new Path(getDataDirectory()), context.getParallelism(), cfg, dataSize)); } } From c329b7133b3ea846a168552f807274844f525bb4 Mon Sep 17 00:00:00 2001 From: pulasthi Date: Thu, 29 Aug 2019 21:19:22 -0400 Subject: [PATCH 07/11] kmeans update --- .../tws/examples/batch/kmeans/KMeansTsetJob.java | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/twister2/examples/src/java/edu/iu/dsc/tws/examples/batch/kmeans/KMeansTsetJob.java b/twister2/examples/src/java/edu/iu/dsc/tws/examples/batch/kmeans/KMeansTsetJob.java index 22fb9e4929..8f8fa78a71 100644 --- a/twister2/examples/src/java/edu/iu/dsc/tws/examples/batch/kmeans/KMeansTsetJob.java +++ b/twister2/examples/src/java/edu/iu/dsc/tws/examples/batch/kmeans/KMeansTsetJob.java @@ -56,11 +56,11 @@ public void execute(BatchTSetEnvironment tc) { int csize = kMeansJobParameters.getCsize(); int iterations = kMeansJobParameters.getIterations(); - String dataDirectory = kMeansJobParameters.getDatapointDirectory() + workerId; - String centroidDirectory = kMeansJobParameters.getCentroidDirectory() + workerId; + String dataDirectory = kMeansJobParameters.getDatapointDirectory(); + String centroidDirectory = kMeansJobParameters.getCentroidDirectory(); - workerUtils.generateDatapoints(dimension, numFiles, dsize, csize, dataDirectory, - centroidDirectory); +// workerUtils.generateDatapoints(dimension, numFiles, dsize, csize, dataDirectory, +// centroidDirectory); long startTime = System.currentTimeMillis(); CachedTSet points = @@ -164,8 +164,7 @@ public void prepare(TSetContext context) { Config cfg = context.getConfig(); this.dataSize = Integer.parseInt(cfg.getStringValue(DataObjectConstants.DSIZE)); this.dimension = Integer.parseInt(cfg.getStringValue(DataObjectConstants.DIMENSIONS)); - String datainputDirectory = cfg.getStringValue(DataObjectConstants.DINPUT_DIRECTORY) - + context.getWorkerId(); + String datainputDirectory = cfg.getStringValue(DataObjectConstants.DINPUT_DIRECTORY); int datasize = Integer.parseInt(cfg.getStringValue(DataObjectConstants.DSIZE)); //The +1 in the array size is because of a data balancing bug localPoints = new double[dataSize / para][dimension]; @@ -219,8 +218,7 @@ public void prepare(TSetContext context) { super.prepare(context); Config cfg = context.getConfig(); - String datainputDirectory = cfg.getStringValue(DataObjectConstants.CINPUT_DIRECTORY) - + context.getWorkerId(); + String datainputDirectory = cfg.getStringValue(DataObjectConstants.CINPUT_DIRECTORY); this.dimension = Integer.parseInt(cfg.getStringValue(DataObjectConstants.DIMENSIONS)); int csize = Integer.parseInt(cfg.getStringValue(DataObjectConstants.CSIZE)); From 4b166198e55aee7fdb852830d0a76de1711d2289 Mon Sep 17 00:00:00 2001 From: pulasthi Date: Fri, 30 Aug 2019 00:22:04 -0400 Subject: [PATCH 08/11] kmeans update --- .../iu/dsc/tws/examples/batch/kmeans/KMeansTsetJob.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/twister2/examples/src/java/edu/iu/dsc/tws/examples/batch/kmeans/KMeansTsetJob.java b/twister2/examples/src/java/edu/iu/dsc/tws/examples/batch/kmeans/KMeansTsetJob.java index 8f8fa78a71..200d9f7b6b 100644 --- a/twister2/examples/src/java/edu/iu/dsc/tws/examples/batch/kmeans/KMeansTsetJob.java +++ b/twister2/examples/src/java/edu/iu/dsc/tws/examples/batch/kmeans/KMeansTsetJob.java @@ -14,6 +14,7 @@ import java.io.IOException; import java.io.Serializable; +import java.util.Arrays; import java.util.Iterator; import java.util.logging.Level; import java.util.logging.Logger; @@ -102,8 +103,11 @@ public void execute(BatchTSetEnvironment tc) { LOG.info("Data Load time : " + (endTimeData - startTime) + "\n" + "Total Time : " + (endTime - startTime) + "Compute Time : " + (endTime - endTimeData)); - LOG.info("Final Centroids After\t" + iterations + "\titerations\t" - + centroid.length); + if (workerId == 0) { + LOG.info("Final Centroids After\t" + iterations + "\titerations\t" + + Arrays.toString(centroid[0])); + + } } } From b6d7db4cb24d12071ee469d20f2312bea68d0a26 Mon Sep 17 00:00:00 2001 From: pulasthi Date: Fri, 6 Sep 2019 14:30:55 -0400 Subject: [PATCH 09/11] adding direct iter example --- .../tset/batch/DirectIterExample.java | 52 +++++++++++++++++++ 1 file changed, 52 insertions(+) create mode 100644 twister2/examples/src/java/edu/iu/dsc/tws/examples/tset/batch/DirectIterExample.java diff --git a/twister2/examples/src/java/edu/iu/dsc/tws/examples/tset/batch/DirectIterExample.java b/twister2/examples/src/java/edu/iu/dsc/tws/examples/tset/batch/DirectIterExample.java new file mode 100644 index 0000000000..143fc47c42 --- /dev/null +++ b/twister2/examples/src/java/edu/iu/dsc/tws/examples/tset/batch/DirectIterExample.java @@ -0,0 +1,52 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package edu.iu.dsc.tws.examples.tset.batch; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.logging.Logger; + +import edu.iu.dsc.tws.api.JobConfig; +import edu.iu.dsc.tws.api.config.Config; +import edu.iu.dsc.tws.api.tset.env.BatchTSetEnvironment; +import edu.iu.dsc.tws.api.tset.sets.batch.CachedTSet; +import edu.iu.dsc.tws.api.tset.sets.batch.ComputeTSet; +import edu.iu.dsc.tws.api.tset.sets.batch.SourceTSet; +import edu.iu.dsc.tws.rsched.core.ResourceAllocator; + + +public class DirectIterExample extends BatchTsetExample { + private static final Logger LOG = Logger.getLogger(DirectIterExample.class.getName()); + private static final long serialVersionUID = -2753072757838198105L; + + @Override + public void execute(BatchTSetEnvironment env) { + SourceTSet src = dummySource(env, COUNT, PARALLELISM).setName("src"); + LOG.info("test direct iteration"); + ComputeTSet> testmap = src.direct().map(input -> input); + CachedTSet cached = null; + for (int i = 0; i < 4; i++) { + cached = testmap.cache(true); + } + System.out.println(cached.getDataObject().getPartitions().length); + testmap.finishIter(); + } + + + public static void main(String[] args) { + Config config = ResourceAllocator.loadConfig(new HashMap<>()); + + JobConfig jobConfig = new JobConfig(); + BatchTsetExample.submitJob(config, PARALLELISM, jobConfig, DirectIterExample.class.getName()); + } +} From 33c54347ee64ca4cfa9173e41b940cda427dcc5d Mon Sep 17 00:00:00 2001 From: pulasthi Date: Fri, 6 Sep 2019 14:32:43 -0400 Subject: [PATCH 10/11] undo changes in example --- .../examples/batch/kmeans/KMeansTsetJob.java | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/twister2/examples/src/java/edu/iu/dsc/tws/examples/batch/kmeans/KMeansTsetJob.java b/twister2/examples/src/java/edu/iu/dsc/tws/examples/batch/kmeans/KMeansTsetJob.java index 200d9f7b6b..e6592d61b6 100644 --- a/twister2/examples/src/java/edu/iu/dsc/tws/examples/batch/kmeans/KMeansTsetJob.java +++ b/twister2/examples/src/java/edu/iu/dsc/tws/examples/batch/kmeans/KMeansTsetJob.java @@ -57,11 +57,11 @@ public void execute(BatchTSetEnvironment tc) { int csize = kMeansJobParameters.getCsize(); int iterations = kMeansJobParameters.getIterations(); - String dataDirectory = kMeansJobParameters.getDatapointDirectory(); - String centroidDirectory = kMeansJobParameters.getCentroidDirectory(); + String dataDirectory = kMeansJobParameters.getDatapointDirectory() + workerId; + String centroidDirectory = kMeansJobParameters.getCentroidDirectory() + workerId; -// workerUtils.generateDatapoints(dimension, numFiles, dsize, csize, dataDirectory, -// centroidDirectory); + workerUtils.generateDatapoints(dimension, numFiles, dsize, csize, dataDirectory, + centroidDirectory); long startTime = System.currentTimeMillis(); CachedTSet points = @@ -90,7 +90,8 @@ public void execute(BatchTSetEnvironment tc) { kmeansTSet.addInput("centers", centers); centers = reduced.cache(true); } - //reduced.finishIter(); + + reduced.finishIter(); DataPartition centroidPartition = centers.getDataObject().getPartition(workerId); double[][] centroid = null; @@ -133,7 +134,7 @@ public double[][] map(double[][] data) { } - private class AverageCenters implements MapFunc { + private class AverageCenters implements MapFunc { @Override public double[][] map(double[][] centers) { //The centers that are received at this map is a the sum of all points assigned to each @@ -168,7 +169,8 @@ public void prepare(TSetContext context) { Config cfg = context.getConfig(); this.dataSize = Integer.parseInt(cfg.getStringValue(DataObjectConstants.DSIZE)); this.dimension = Integer.parseInt(cfg.getStringValue(DataObjectConstants.DIMENSIONS)); - String datainputDirectory = cfg.getStringValue(DataObjectConstants.DINPUT_DIRECTORY); + String datainputDirectory = cfg.getStringValue(DataObjectConstants.DINPUT_DIRECTORY) + + context.getWorkerId(); int datasize = Integer.parseInt(cfg.getStringValue(DataObjectConstants.DSIZE)); //The +1 in the array size is because of a data balancing bug localPoints = new double[dataSize / para][dimension]; @@ -222,7 +224,8 @@ public void prepare(TSetContext context) { super.prepare(context); Config cfg = context.getConfig(); - String datainputDirectory = cfg.getStringValue(DataObjectConstants.CINPUT_DIRECTORY); + String datainputDirectory = cfg.getStringValue(DataObjectConstants.CINPUT_DIRECTORY) + + context.getWorkerId(); this.dimension = Integer.parseInt(cfg.getStringValue(DataObjectConstants.DIMENSIONS)); int csize = Integer.parseInt(cfg.getStringValue(DataObjectConstants.CSIZE)); From d3d0bb39a9f2e0374199556cc2e7c14f528bdb92 Mon Sep 17 00:00:00 2001 From: pulasthi Date: Fri, 6 Sep 2019 14:33:03 -0400 Subject: [PATCH 11/11] fixing bugs --- twister2/comms/src/java/edu/iu/dsc/tws/comms/dfw/OneToOne.java | 1 + .../java/edu/iu/dsc/tws/executor/core/ExecutionPlanBuilder.java | 1 + 2 files changed, 2 insertions(+) diff --git a/twister2/comms/src/java/edu/iu/dsc/tws/comms/dfw/OneToOne.java b/twister2/comms/src/java/edu/iu/dsc/tws/comms/dfw/OneToOne.java index 44907a1515..f4ebdaee7f 100644 --- a/twister2/comms/src/java/edu/iu/dsc/tws/comms/dfw/OneToOne.java +++ b/twister2/comms/src/java/edu/iu/dsc/tws/comms/dfw/OneToOne.java @@ -321,6 +321,7 @@ public void reset() { if (finalReceiver != null) { finalReceiver.clean(); finishedSources.clear(); + pendingFinishSources.clear(); } } diff --git a/twister2/executor/src/java/edu/iu/dsc/tws/executor/core/ExecutionPlanBuilder.java b/twister2/executor/src/java/edu/iu/dsc/tws/executor/core/ExecutionPlanBuilder.java index baa341fe02..9f196f9275 100644 --- a/twister2/executor/src/java/edu/iu/dsc/tws/executor/core/ExecutionPlanBuilder.java +++ b/twister2/executor/src/java/edu/iu/dsc/tws/executor/core/ExecutionPlanBuilder.java @@ -213,6 +213,7 @@ public ExecutionPlan build(Config cfg, ComputeGraph taskGraph, parents = new HashSet<>(); } parents.add(inEdge); + inEdges.put(inEdge, parents); } }