Skip to content

Commit

Permalink
Merge pull request #306 from vibhatha/master
Browse files Browse the repository at this point in the history
docs updated for svm and kmeans
  • Loading branch information
supunkamburugamuve authored Mar 28, 2019
2 parents 857e9e4 + f722103 commit 506bde4
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 22 deletions.
48 changes: 26 additions & 22 deletions docs/examples/ml/kmeans/kmeans.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ The implementation details of k-means clustering in Twister2 is pictorially repr
The constants which are used by the k-means algorithm to specify the number of workers, parallelism, dimension, size of datapoints,
size of centroids, file system, number of iterations, datapoints and centroids directory.

```text
```java
public static final String WORKERS = "workers";
public static final String DIMENSIONS = "dim";
public static final String PARALLELISM_VALUE = "parallelism";
Expand All @@ -38,7 +38,7 @@ parses the command line parameters submitted by the user for running the K-Means
It first sets the submitted variables in the JobConfig object and put the JobConfig object into the
Twister2Job Builder, set the worker class (KMeansWorker.java in this example) and submit the job.

```text
```java
edu.iu.dsc.tws.examples.batch.kmeans.KMeansWorkerMain
```

Expand All @@ -58,7 +58,7 @@ The main functionality of the first task graph is to partition the data points,
partitioned datapoints into two-dimensional array, and write the two-dimensional array into their
respective task index values.

```text
```java
/* First Graph to partition and read the partitioned data points **/
DataObjectSource dataObjectSource = new DataObjectSource(Context.TWISTER2_DIRECT_EDGE,
dataDirectory);
Expand All @@ -70,7 +70,7 @@ respective task index values.
First, add the source, compute, and sink tasks to the task graph builder for the first task graph.
Then, create the communication edges between the tasks for the first task graph.

```text
```java
taskGraphBuilder.addSource("datapointsource", dataObjectSource, parallelismValue);
ComputeConnection datapointComputeConnection = taskGraphBuilder.addCompute("datapointcompute",
dataObjectCompute, parallelismValue);
Expand All @@ -87,7 +87,7 @@ Then, create the communication edges between the tasks for the first task graph.

Finally, invoke the taskGraphBuilder to build the first task graph, get the task schedule plan and execution plan for the first task graph, and call the execute() method to execute the datapoints task graph. Once the execution is finished, the output values are retrieved in the "datapointsObject".

```text
```java
//Build the first taskgraph
DataFlowTaskGraph datapointsTaskGraph = taskGraphBuilder.build();
//Get the execution plan for the first task graph
Expand All @@ -106,7 +106,7 @@ Finally, write the partitioned datapoints into their respective edges. The Local
partition the datapoints based on the block whereas the LocalFixedInputPartitioner partition the
datapoints based on the length of the file. For example, if the task parallelism is 4, if there are 16 data points each task will get 4 datapoints to process.

```text
```java
@Override
public void prepare(Config cfg, TaskContext context) {
super.prepare(cfg, context);
Expand All @@ -122,7 +122,7 @@ This class receives the partitioned datapoints as "IMessage" and convert those d
two-dimensional for the k-means clustering process. The converted datapoints are send to the
KMeansDataObjectDirectSink through "direct" edge.

```text
```java
while (((Iterator) message.getContent()).hasNext()) {
String val = String.valueOf(((Iterator) message.getContent()).next());
String[] data = val.split(",");
Expand All @@ -140,7 +140,7 @@ This class receives the message object from the DataObjectCompute and write into
task index values. First, it store the iterator values into the array list then it convert the array
list values into double array values.

```text
```java
@Override
public boolean execute(IMessage message) {
List<double[][]> values = new ArrayList<>();
Expand All @@ -158,7 +158,7 @@ list values into double array values.
Finally, write the appropriate data points into their respective task index values with the entity
partition values.

```text
```java
@Override
public DataPartition<double[][]> get() {
return new EntityPartition<>(context.taskIndex(), dataPointsLocal);
Expand All @@ -175,7 +175,7 @@ but, with one major difference of read the complete file as one partition.
2. KMeansDataObjectCompute, and
3. KMeansDataObjectDirectSink

```text
```java
DataFileReplicatedReadSource dataFileReplicatedReadSource = new DataFileReplicatedReadSource(
Context.TWISTER2_DIRECT_EDGE, centroidDirectory);
KMeansDataObjectCompute centroidObjectCompute = new KMeansDataObjectCompute(
Expand All @@ -185,7 +185,7 @@ but, with one major difference of read the complete file as one partition.

Similar to the first task graph, it add the source, compute, and sink tasks to the task graph builder for the second task graph. Then, create the communication edges between the tasks for the second task graph.

```text
```java
//Add source, compute, and sink tasks to the task graph builder for the second task graph
taskGraphBuilder.addSource("centroidsource", dataFileReplicatedReadSource, parallelismValue);
ComputeConnection centroidComputeConnection = taskGraphBuilder.addCompute("centroidcompute",
Expand All @@ -203,7 +203,7 @@ Similar to the first task graph, it add the source, compute, and sink tasks to t

Finally, invoke the build() method to build the second task graph, get the task schedule plan and execution plan for the second task graph, and call the execute() method to execute the centroids task graph. Once the execution is finished, the output values are retrieved in the "centroidsDataObject".

```text
```java
//Build the second taskgraph
DataFlowTaskGraph centroidsTaskGraph = taskGraphBuilder.build();
//Get the execution plan for the second task graph
Expand All @@ -221,7 +221,7 @@ This class uses the "LocalCompleteTextInputParitioner" to read the whole file fr
directory and write into their task respective task index values using the "direct" task edge.
For example, if the size of centroid value is 16, each task index receive 16 centroid values completely.

```text
```java
public void prepare(Config cfg, TaskContext context) {
super.prepare(cfg, context);
ExecutionRuntime runtime = (ExecutionRuntime) cfg.get(ExecutorContext.TWISTER2_RUNTIME_OBJECT);
Expand All @@ -236,7 +236,7 @@ The third task graph has the following classes namely KMeansSource, KMeansAllRed
CentroidAggregator. Similar to the first and second task graph, first we have to add the source,
sink, and communication edges to the third task graph.

```text
```java
/* Third Graph to do the actual calculation **/
KMeansSourceTask kMeansSourceTask = new KMeansSourceTask();
KMeansAllReduceTask kMeansAllReduceTask = new KMeansAllReduceTask();
Expand All @@ -259,7 +259,7 @@ The datapoint and centroid values are sent to the KMeansTaskGraph as "points" ob
object as an input for further processing. Finally, it invokes the execute() method of the task
executor to do the clustering process.

```text
```java
//Perform the iterations from 0 to 'n' number of iterations
for (int i = 0; i < iterations; i++) {
ExecutionPlan plan = taskExecutor.plan(kmeansTaskGraph);
Expand All @@ -280,7 +280,7 @@ This process repeats for ‘n’ number of iterations as specified by the user.
new centroid value is calculated and the calculated value is distributed across all the task instances.
At the end of every iteration, the centroid value is updated and the iteration continues with the new centroid value.

```text
```java
//retrieve the new centroid value for the next iterations
centroidsDataObject = taskExecutor.getOutput(kmeansTaskGraph, plan, "kmeanssink");
```
Expand All @@ -289,7 +289,7 @@ At the end of every iteration, the centroid value is updated and the iteration c

First, the execute method in KMeansJobSource retrieve the partitioned data points into their respective task index values and the complete centroid values into their respective task index values.

```text
```java
@Override
public void execute() {
int dim = Integer.parseInt(config.getStringValue("dim"));
Expand All @@ -302,13 +302,13 @@ First, the execute method in KMeansJobSource retrieve the partitioned data point
```
The retrieved data points and centroids are sent to the KMeansCalculator to perform the actual distance calculation using the Euclidean distance.

```text
```java
kMeansCalculator = new KMeansCalculator(datapoints, centroid, dim);
double[][] kMeansCenters = kMeansCalculator.calculate();
```

Finally, each task instance write their calculated centroids value as given below:
```text
```java
context.writeEnd("all-reduce", kMeansCenters);
}
```
Expand All @@ -317,7 +317,7 @@ Finally, each task instance write their calculated centroids value as given belo

The KMeansAllReduceTask write the calculated centroid values of their partitioned datapoints into their respective task index values.

```text
```java
@Override
public boolean execute(IMessage message) {
LOG.log(Level.FINE, "Received centroids: " + context.getWorkerId()
Expand All @@ -343,13 +343,13 @@ The KMeansAllReduceTask write the calculated centroid values of their partitione

The CentroidAggregator implements the IFunction and the function OnMessage which accepts two objects as an argument.

```text
```java
public Object onMessage(Object object1, Object object2)
```

It sums the corresponding centroid values and return the same.

```text
```java
ret.setCenters(newCentroids);
```

Expand All @@ -371,6 +371,7 @@ K-Means clustering process.

### Sample Output

```bash
[2019-03-25 15:27:01 -0400] [INFO] [worker-0] [main] edu.iu.dsc.tws.examples.batch.kmeans.KMeansWorker:
Final Centroids After 100 iterations [[0.2535406313735363, 0.25640515489554255],
[0.7236140928643464, 0.7530306848028933], [0.7481226889281528, 0.24480221871888594],
Expand All @@ -389,3 +390,6 @@ Worker finished executing - 0

[2019-03-25 15:27:01 -0400] [INFO] [-] [JM] edu.iu.dsc.tws.master.server.JobMaster: All 2 workers have completed.
JobMaster is stopping.


```
20 changes: 20 additions & 0 deletions docs/examples/ml/svm/svm.md
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,19 @@ to convert data to the dense format.
./bin/twister2 submit standalone jar examples/libexamples-java.jar edu.iu.dsc.tws.examples.ml.svm.SVMRunner -ram_mb 4096 -disk_gb 2 -instances 1 -alpha 0.1 -C 1.0 -exp_name test-svm -features 22 -samples 35000 -iterations 10 -training_data_dir <path-to-training-csv> -testing_data_dir <path-to-testing-csv> -parallelism 8 -workers 1 -cpus 1 -threads 4
```

#### Sample Output


```bash
Training Dataset [/home/vibhatha/data/svm/w8a/training.csv]
Testing Dataset [/home/vibhatha/data/svm/w8a/testing.csv]
Data Loading Time (Training + Testing) = 1.943881115 s
Training Time = 7.978291269 s
Testing Time = 0.828260105 s
Total Time (Data Loading Time + Training Time + Testing Time) = 10.750432489 s
Accuracy of the Trained Model = 88.904494382 %

```


##Distributed SVM Batch Model - Tset Example
Expand Down Expand Up @@ -582,4 +595,11 @@ For that a simple map function can be plugged into the TSetLink.

```

#### Sample Output

```bash
[2019-03-28 16:40:31 -0400] [INFO] [worker-0] [main] edu.iu.dsc.tws.examples.ml.svm.job.SvmSgdTsetRunner: Training Accuracy : 88.049368

```

###### Note make sure you have formatted the CSV files as instructed in the SVM Task Example.

0 comments on commit 506bde4

Please sign in to comment.