Skip to content
This repository has been archived by the owner on Jun 16, 2023. It is now read-only.

Commit

Permalink
Release 0.9.6.3
Browse files Browse the repository at this point in the history
  • Loading branch information
bastiliu committed Feb 16, 2015
1 parent df8d544 commit e6884c6
Show file tree
Hide file tree
Showing 155 changed files with 8,792 additions and 3,885 deletions.
Binary file modified README.md
Binary file not shown.
5 changes: 4 additions & 1 deletion example/sequence-split-merge/conf/conf.prop
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@ topology.debug=false


spout.parallel=1
bolt.parallel=2
bolt.parallel=1
#send.sleep.second=100
check.sequence=true
kryo.enable=false
fall.back.on.java.serialization=true

enable.split=false

storm.cluster.mode=local
#topology.enable.classloader=true
43 changes: 31 additions & 12 deletions example/sequence-split-merge/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<jstorm.version>0.9.6.1</jstorm.version>
<jstorm.version>0.9.6.3</jstorm.version>
<storm.version>storm-0.9.2-incubating</storm.version>
</properties>
<repositories>
Expand All @@ -29,38 +29,57 @@
</repository>
</repositories>
<dependencies>
<!--
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm</artifactId>
<version>${storm.version}</version>
<scope>provided</scope>
</dependency>
-->

<!-- <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm</artifactId>
<version>${storm.version}</version> <scope>provided</scope> </dependency> -->

<dependency>
<groupId>com.alibaba.jstorm</groupId>
<artifactId>jstorm-client-extension</artifactId>
<version>${jstorm.version}</version>
<scope>provided</scope>
</dependency>


<dependency>
<groupId>com.alibaba.jstorm</groupId>
<artifactId>jstorm-client</artifactId>
<version>${jstorm.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>com.alibaba.jstorm</groupId>
<artifactId>jstorm-server</artifactId>
<version>${jstorm.version}</version>
<scope>provided</scope>

</dependency>

<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.0.13</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
<version>1.7.10</version>
</dependency>


<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
<scope>test</scope>
</dependency>


<!-- <dependency> <groupId>org.clojure</groupId> <artifactId>clojure</artifactId>
<version>1.2.0</version> </dependency> <dependency> <groupId>org.clojure</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import java.util.Map;
import java.util.Properties;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.Yaml;

import backtype.storm.Config;
Expand All @@ -29,9 +31,10 @@
import com.alipay.dw.jstorm.example.sequence.spout.SequenceSpout;

public class SequenceTopology {
private static Logger LOG = LoggerFactory.getLogger(SequenceTopology.class);

private final static String TOPOLOGY_SPOUT_PARALLELISM_HINT = "spout.parallel";
private final static String TOPOLOGY_BOLT_PARALLELISM_HINT = "bolt.parallel";
public final static String TOPOLOGY_SPOUT_PARALLELISM_HINT = "spout.parallel";
public final static String TOPOLOGY_BOLT_PARALLELISM_HINT = "bolt.parallel";

public static void SetBuilder(TopologyBuilder builder, Map conf) {

Expand All @@ -54,7 +57,8 @@ SequenceTopologyDef.TOTAL_BOLT_NAME, new TotalCount(),
// localFirstGrouping is only for jstorm
// boltDeclarer.localFirstGrouping(SequenceTopologyDef.SEQUENCE_SPOUT_NAME);
boltDeclarer
.localOrShuffleGrouping(SequenceTopologyDef.SEQUENCE_SPOUT_NAME);
.localOrShuffleGrouping(SequenceTopologyDef.SEQUENCE_SPOUT_NAME)
.addConfiguration(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 3);
} else {

builder.setBolt(SequenceTopologyDef.SPLIT_BOLT_NAME,
Expand Down Expand Up @@ -115,15 +119,19 @@ SequenceTopologyDef.TOTAL_BOLT_NAME, new TotalCount(),

public static void SetLocalTopology() throws Exception {
TopologyBuilder builder = new TopologyBuilder();

conf.put(TOPOLOGY_BOLT_PARALLELISM_HINT, Integer.valueOf(1));

SetBuilder(builder, conf);

LOG.debug("test");
LOG.info("Submit log");
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("SplitMerge", conf, builder.createTopology());

Thread.sleep(60000);

cluster.killTopology("SplitMerge");

cluster.shutdown();
}
Expand Down Expand Up @@ -165,7 +173,7 @@ public static void SetDPRCTopology() throws AlreadyAliveException,

private static Map conf = new HashMap<Object, Object>();

private static void LoadProperty(String prop) {
public static void LoadProperty(String prop) {
Properties properties = new Properties();

try {
Expand All @@ -182,7 +190,7 @@ private static void LoadProperty(String prop) {
conf.putAll(properties);
}

private static void LoadYaml(String confPath) {
public static void LoadYaml(String confPath) {

Yaml yaml = new Yaml();

Expand All @@ -205,7 +213,7 @@ private static void LoadYaml(String confPath) {
return;
}

private static void LoadConf(String arg) {
public static void LoadConf(String arg) {
if (arg.endsWith("yaml")) {
LoadYaml(arg);
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package com.alipay.dw.jstorm.example.sequence;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import backtype.storm.topology.TopologyBuilder;

import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.client.WorkerAssignment;

public class SequenceTopologyUserDefine extends SequenceTopology {
private static Logger LOG = LoggerFactory
.getLogger(SequenceTopologyUserDefine.class);

private static Map conf = new HashMap<Object, Object>();

public static void SetBuilder(TopologyBuilder builder, Map conf) {

SequenceTopology.SetBuilder(builder, conf);

List<WorkerAssignment> userDefinedWorks = new ArrayList<WorkerAssignment>();
WorkerAssignment spoutWorkerAssignment = new WorkerAssignment();
spoutWorkerAssignment.addComponent(
SequenceTopologyDef.SEQUENCE_SPOUT_NAME, 1);
spoutWorkerAssignment.setHostName((String) conf.get("spout.host"));
userDefinedWorks.add(spoutWorkerAssignment);

WorkerAssignment totalWorkerAssignment = new WorkerAssignment();
totalWorkerAssignment.addComponent(SequenceTopologyDef.TOTAL_BOLT_NAME,
2);
totalWorkerAssignment.setHostName((String) conf.get("total.host"));
userDefinedWorks.add(totalWorkerAssignment);

ConfigExtension.setUserDefineAssignment(conf, userDefinedWorks);

}

public static void main(String[] args) throws Exception {
if (args.length == 0) {
System.err.println("Please input configuration file");
System.exit(-1);
}

LoadConf(args[0]);

if (local_mode(conf)) {
SetLocalTopology();
} else {
SetRemoteTopology();
}

}

}
Original file line number Diff line number Diff line change
@@ -1,20 +1,31 @@
package com.alipay.dw.jstorm.example.sequence.bolt;

import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicLong;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import backtype.storm.Constants;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
import backtype.storm.utils.TupleHelpers;

import com.alibaba.jstorm.client.metric.MetricCallback;
import com.alibaba.jstorm.client.metric.MetricClient;
import com.alibaba.jstorm.metric.JStormHistogram;
import com.alibaba.jstorm.metric.JStormTimer;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alipay.dw.jstorm.example.TpsCounter;
import com.alipay.dw.jstorm.example.sequence.bean.TradeCustomer;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Metric;

public class TotalCount implements IRichBolt {
public static Logger LOG = LoggerFactory.getLogger(TotalCount.class);
Expand All @@ -26,6 +37,15 @@ public class TotalCount implements IRichBolt {
private boolean checkTupleId = false;
private boolean slowDonw = false;

private MetricClient metricClient;
private Gauge<Integer> myGauge;
private JStormTimer myTimer;
private Counter myCounter;
private Meter myMeter;
private JStormHistogram myJStormHistogram;
private MetricCallback myCallback;


@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
Expand All @@ -39,41 +59,89 @@ public void prepare(Map stormConf, TopologyContext context,

slowDonw = JStormUtils.parseBoolean(stormConf.get("bolt.slow.down"), false);

LOG.info("Finished preparation");
metricClient = new MetricClient(context);
myCallback = new MetricCallback<Metric>() {

@Override
public void callback(Metric metric) {
LOG.info("Callback " + metric.getClass().getName() + ":" + metric);
}
};


myGauge = new Gauge<Integer>() {
private Random random = new Random();

@Override
public Integer getValue() {

return random.nextInt(100);
}

};
myGauge = (Gauge<Integer>) metricClient.registerGauge("name1", myGauge, myCallback);

myTimer = metricClient.registerTimer("name2", myCallback);

myCounter = metricClient.registerCounter("name3", myCallback);

myMeter = metricClient.registerMeter("name4", myCallback);

myJStormHistogram = metricClient.registerHistogram("name5", myCallback);




LOG.info("Finished preparation " + stormConf);
}

private AtomicLong tradeSum = new AtomicLong(0);
private AtomicLong customerSum = new AtomicLong(1);

@Override
public void execute(Tuple input) {

if (checkTupleId) {
Long tupleId = input.getLong(0);
if (tupleId <= lastTupleId) {
LOG.error("LastTupleId is " + lastTupleId + ", but now:" + tupleId);
}
lastTupleId = tupleId;
if (TupleHelpers.isTickTuple(input) ){
LOG.info("Receive one Ticket Tuple " + input.getSourceComponent());
return ;
}

TradeCustomer tradeCustomer = (TradeCustomer) input.getValue(1);

tradeSum.addAndGet(tradeCustomer.getTrade().getValue());
customerSum.addAndGet(tradeCustomer.getCustomer().getValue());

collector.ack(input);

long now = System.currentTimeMillis();
long spend = now - tradeCustomer.getTimestamp();

tpsCounter.count(spend);

// long spend = System.currentTimeMillis() - input.getLong(0);
// tpsCounter.count(spend);

if (slowDonw) {
JStormUtils.sleepMs(20);
}
long before = System.currentTimeMillis();
myTimer.start();
try {
//LOG.info(input.toString());
myCounter.inc();
myMeter.mark();

if (checkTupleId) {
Long tupleId = input.getLong(0);
if (tupleId <= lastTupleId) {
LOG.error("LastTupleId is " + lastTupleId + ", but now:" + tupleId);
}
lastTupleId = tupleId;
}

TradeCustomer tradeCustomer = (TradeCustomer) input.getValue(1);

tradeSum.addAndGet(tradeCustomer.getTrade().getValue());
customerSum.addAndGet(tradeCustomer.getCustomer().getValue());

collector.ack(input);

long now = System.currentTimeMillis();
long spend = now - tradeCustomer.getTimestamp();

tpsCounter.count(spend);

// long spend = System.currentTimeMillis() - input.getLong(0);
// tpsCounter.count(spend);

if (slowDonw) {
JStormUtils.sleepMs(20);
}
}finally {
myTimer.stop();
}
long after = System.currentTimeMillis();
myJStormHistogram.update(after - before);
}

public void cleanup() {
Expand Down
Loading

0 comments on commit e6884c6

Please sign in to comment.