Skip to content
This repository has been archived by the owner on Jun 16, 2023. It is now read-only.

Commit

Permalink
Release 0.9.6
Browse files Browse the repository at this point in the history
  • Loading branch information
zhongyan.feng committed Sep 23, 2014
1 parent 20aef2c commit 4e295a0
Show file tree
Hide file tree
Showing 127 changed files with 15,234 additions and 2,584 deletions.
Binary file modified README.md
Binary file not shown.
16 changes: 15 additions & 1 deletion history.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,18 @@
[JStorm 0.9.0 ½éÉÜ](http://wenku.baidu.com/view/59e81017dd36a32d7375818b.html)
[JStorm 0.9.0 介绍](http://wenku.baidu.com/view/59e81017dd36a32d7375818b.html)

#Release 0.9.6
1. Update UI
- Display the metrics information of task and worker
- Add warning flag when errors occur for a topology
- Add link from supervisor page to task page
2. Send metrics data to Alimonitor
3. Add metrics interface for user
4. Add task.cleanup.timeout.sec setting to let task gently cleanup
5. Set the worker's log name as topologyName-worker-port.log
6. Add setting "worker.redirect.output.file", so worker can redirect System.out/System.err to one setting file
7. Add storm list command
8. Add closing channel check in netty client to avoid double close
9. Add connecting check in netty client to avoid connecting one server twice at one time

#Release 0.9.5.1
1. Add netty sync mode
Expand Down
18 changes: 14 additions & 4 deletions jstorm-client-extension/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,18 @@
<parent>
<groupId>com.alibaba.jstorm</groupId>
<artifactId>jstorm-all</artifactId>
<version>0.9.5.1</version>
<version>0.9.6</version>
<relativePath>..</relativePath>
</parent>
</parent>
<!--<parent>
<groupId>com.taobao</groupId>
<artifactId>parent</artifactId>
<version>1.0.2</version>
</parent>-->
</parent> -->
<modelVersion>4.0.0</modelVersion>
<groupId>com.alibaba.jstorm</groupId>
<artifactId>jstorm-client-extension</artifactId>
<version>0.9.5.1</version>
<version>0.9.6</version>
<packaging>jar</packaging>
<name>${project.artifactId}-${project.version}</name>

Expand Down Expand Up @@ -88,6 +88,16 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.5</version>
</dependency>
<dependency>
<groupId>com.codahale.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>3.0.1</version>
</dependency>
<dependency>
<groupId>com.codahale.metrics</groupId>
<artifactId>metrics-jvm</artifactId>
<version>3.0.1</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,16 @@ public static boolean getWorkerRedirectOutput(Map conf) {
return true;
return (Boolean) result;
}

protected static final String WOREKER_REDIRECT_OUTPUT_FILE = "worker.redirect.output.file";

public static void setWorkerRedirectOutputFile(Map conf, String outputPath) {
conf.put(WOREKER_REDIRECT_OUTPUT_FILE, outputPath);
}

public static String getWorkerRedirectOutputFile(Map conf) {
return (String)conf.get(WOREKER_REDIRECT_OUTPUT_FILE);
}

/**
* Usually, spout finish prepare before bolt, so spout need wait several
Expand Down Expand Up @@ -385,4 +395,26 @@ public static boolean isNettyASyncBlock(Map conf) {
public static void setNettyASyncBlock(Map conf, boolean block) {
conf.put(NETTY_ASYNC_BLOCK, block);
}

protected static String ALIMONITOR_METRICS_POST = "topology.alimonitor.metrics.post";

public static boolean isAlimonitorMetricsPost(Map conf) {
return JStormUtils.parseBoolean(conf.get(ALIMONITOR_METRICS_POST), true);
}

public static void setAlimonitorMetricsPost(Map conf, boolean post) {
conf.put(ALIMONITOR_METRICS_POST, post);
}

protected static String TASK_CLEANUP_TIMEOUT_SEC = "task.cleanup.timeout.sec";

public static int getTaskCleanupTimeoutSec(Map conf) {
return JStormUtils.parseInt(conf.get(TASK_CLEANUP_TIMEOUT_SEC), 10);
}

public static void setTaskCleanupTimeoutSec(Map conf, int timeout) {
conf.put(TASK_CLEANUP_TIMEOUT_SEC, timeout);
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.alibaba.jstorm.client.metric;

import com.codahale.metrics.Metric;

public interface MetricCallback<T extends Metric> {
void callback(T metric);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package com.alibaba.jstorm.client.metric;

import backtype.storm.task.TopologyContext;

import com.alibaba.jstorm.metric.Metrics;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import com.alibaba.jstorm.metric.JStormTimer;
import com.alibaba.jstorm.metric.JStormHistogram;

public class MetricClient {

private final int taskid;

public MetricClient(TopologyContext context) {
taskid = context.getThisTaskId();
}

private String getMetricName(Integer taskid, String name) {
return "task-" + String.valueOf(taskid) + ":" + name;
}

public Gauge<?> registerGauge(String name, Gauge<?> gauge, MetricCallback<Gauge<?>> callback) {
String userMetricName = getMetricName(taskid, name);
Gauge<?> ret = Metrics.registerGauge(userMetricName, gauge);
Metrics.registerUserDefine(userMetricName, gauge, callback);
return ret;
}

public Counter registerCounter(String name, MetricCallback<Counter> callback) {
String userMetricName = getMetricName(taskid, name);
Counter ret = Metrics.registerCounter(userMetricName);
Metrics.registerUserDefine(userMetricName, ret, callback);
return ret;
}

public Meter registerMeter(String name, MetricCallback<Meter> callback) {
String userMetricName = getMetricName(taskid, name);
Meter ret = Metrics.registerMeter(userMetricName);
Metrics.registerUserDefine(userMetricName, ret, callback);
return ret;
}

public JStormTimer registerTimer(String name, MetricCallback<Timer> callback) {
String userMetricName = getMetricName(taskid, name);
JStormTimer ret = Metrics.registerTimer(userMetricName);
Metrics.registerUserDefine(userMetricName, ret, callback);
return ret;
}

public JStormHistogram registerHistogram(String name, MetricCallback<Histogram> callback) {
String userMetricName = getMetricName(taskid, name);
JStormHistogram ret = Metrics.registerHistograms(userMetricName);
Metrics.registerUserDefine(userMetricName, ret, callback);
return ret;
}

public boolean unregister(String name, Integer taskid) {
String userMetricName = getMetricName(taskid, name);
return Metrics.unregisterUserDefine(userMetricName);
}

}
Original file line number Diff line number Diff line change
@@ -1,35 +1,39 @@
package com.alibaba.jstorm.daemon.worker.metrics;

import com.codahale.metrics.Histogram;

public class JStormHistogram {
private static boolean isEnable = true;

public static boolean isEnable() {
return isEnable;
}

public static void setEnable(boolean isEnable) {
JStormHistogram.isEnable = isEnable;
}

private Histogram instance;
private String name;

public JStormHistogram(String name, Histogram instance) {
this.name = name;
this.instance = instance;
}

public void update(int value) {
if (isEnable == true) {
instance.update(value);
}
}

public void update(long value) {
if (isEnable == true) {
instance.update(value);
}
}
}
package com.alibaba.jstorm.metric;

import com.codahale.metrics.Histogram;

public class JStormHistogram {
private static boolean isEnable = true;

public static boolean isEnable() {
return isEnable;
}

public static void setEnable(boolean isEnable) {
JStormHistogram.isEnable = isEnable;
}

private Histogram instance;
private String name;

public JStormHistogram(String name, Histogram instance) {
this.name = name;
this.instance = instance;
}

public void update(int value) {
if (isEnable == true) {
instance.update(value);
}
}

public void update(long value) {
if (isEnable == true) {
instance.update(value);
}
}

public Histogram getInstance() {
return instance;
}
}
Original file line number Diff line number Diff line change
@@ -1,64 +1,64 @@
package com.alibaba.jstorm.daemon.worker.metrics;


import java.util.concurrent.atomic.AtomicReference;

import org.apache.log4j.Logger;

import com.codahale.metrics.Timer;

public class JStormTimer {
private static final Logger LOG = Logger.getLogger(JStormTimer.class);
private static boolean isEnable = true;

public static boolean isEnable() {
return isEnable;
}

public static void setEnable(boolean isEnable) {
JStormTimer.isEnable = isEnable;
}


private Timer instance;
private String name;
public JStormTimer(String name, Timer instance) {
this.name = name;
this.instance = instance;
this.timerContext = new AtomicReference<Timer.Context>();
}

/**
* This logic isn't perfect, it will miss metrics when it is called
* in the same time. But this method performance is better than
* create a new instance wrapper Timer.Context
*/
private AtomicReference<Timer.Context> timerContext = null;
public void start() {
if (JStormTimer.isEnable == false) {
return ;
}

if (timerContext.get() != null) {
LOG.warn("Already start timer " + name);
return ;
}


timerContext.set(instance.time());

}

public void stop() {
Timer.Context context = timerContext.getAndSet(null);
if (context != null) {
context.stop();
}
}

public Timer getInstance() {
return instance;
}


}
package com.alibaba.jstorm.metric;


import java.util.concurrent.atomic.AtomicReference;

import org.apache.log4j.Logger;

import com.codahale.metrics.Timer;

public class JStormTimer {
private static final Logger LOG = Logger.getLogger(JStormTimer.class);
private static boolean isEnable = true;

public static boolean isEnable() {
return isEnable;
}

public static void setEnable(boolean isEnable) {
JStormTimer.isEnable = isEnable;
}


private Timer instance;
private String name;
public JStormTimer(String name, Timer instance) {
this.name = name;
this.instance = instance;
this.timerContext = new AtomicReference<Timer.Context>();
}

/**
* This logic isn't perfect, it will miss metrics when it is called
* in the same time. But this method performance is better than
* create a new instance wrapper Timer.Context
*/
private AtomicReference<Timer.Context> timerContext = null;
public void start() {
if (JStormTimer.isEnable == false) {
return ;
}

if (timerContext.get() != null) {
LOG.warn("Already start timer " + name);
return ;
}


timerContext.set(instance.time());

}

public void stop() {
Timer.Context context = timerContext.getAndSet(null);
if (context != null) {
context.stop();
}
}

public Timer getInstance() {
return instance;
}


}
Loading

0 comments on commit 4e295a0

Please sign in to comment.