-
Notifications
You must be signed in to change notification settings - Fork 56
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
No way to call methods on bolts/spouts in the topology DSL #99
Comments
thanks for your suggestion @tlrobinson. yes, this is an interesting caveat with the way I imagined the dsl and the use-case you refer to. I will need to think about this a bit. A temporary workaround could be to write a simple Java spout wrapper class that initializes itself correctly and that you could use as-is in the JRuby DSL? Or you could try the same idea but with a JRuby wrapper spout class? |
That's basically what I'm trying to do, however I'm getting a somewhat opaque error message about
Here's basically the code I'm running: java_import 'backtype.storm.contrib.jms.JmsProvider'
java_import 'backtype.storm.contrib.jms.JmsTupleProducer'
java_import 'backtype.storm.contrib.jms.spout.JmsSpout'
java_import 'backtype.storm.tuple.Fields'
java_import 'backtype.storm.tuple.Values'
java_import 'javax.jms.Session'
java_import 'javax.jms.TextMessage'
java_import 'javax.jms.Topic'
java_import 'javax.jms.TopicSession'
java_import 'org.apache.activemq.ActiveMQConnectionFactory'
java_import 'storm.kafka.KafkaConfig'
java_import 'storm.kafka.KafkaSpout'
java_import 'storm.kafka.SpoutConfig'
java_import 'storm.kafka.ZkHosts'
require 'red_storm'
class CreateEventBolt < RedStorm::DSL::Bolt
on_receive do |tuple|
object = JSON.parse(tuple[:bytes].to_s)
puts object.inspect
[json]
end
end
class MyTopic
include Topic
attr_accessor :topic_name
def initialize(topic)
@topic_name = topic
end
def getTopicName
@topic_name
end
end
class MyJmsProvider
include JmsProvider
attr_accessor :connectionFactory
attr_accessor :destination
def initialize(url, topic)
@connectionFactory = ActiveMQConnectionFactory.new(url)
@destination = MyTopic.new(topic)
end
def connectionFactory
@connectionFactory
end
def destination
@destination
end
end
class JsonTupleProducer
include JmsTupleProducer
def toTuple(msg)
if msg.java_kind_of? TextMessage
Values.new(msg.getText())
else
nil
end
end
def declareOutputFields(declarer)
declarer.declare(Fields.new("json"))
end
end
class MyJmsSpout < JmsSpout
def initialize(provider, tuple_producer, ack_mode=Session.CLIENT_ACKNOWLEDGE, distributed=true)
super
self.setJmsProvider provider
self.setJmsTupleProducer tuple_producer
self.setJmsAcknowledgeMode ack_mode
self.setDistributed distributed
end
end
class MyTopology < RedStorm::DSL::Topology
jms_provider = MyJmsProvider.new("tcp://localhost:61616", "virtual.events")
tuple_producer = JsonTupleProducer.new()
spout MyJmsSpout, [jms_provider, tuple_producer] do
output_fields :bytes
end
bolt CreateEventBolt, :parallelism => 2 do
output_fields :json
source MyJmsSpout, :shuffle
end
configure self.topology_name do |env|
end
end It needs (some of?) these dependencies: <dependency org="com.github.ptgoetz" name="storm-jms" rev="0.9.0" conf="default" transitive="true"/>
<dependency org="org.apache.activemq" name="activemq-spring" rev="5.9.1" conf="default" transitive="true"/>
<dependency org="org.springframework" name="spring-core" rev="3.2.5.RELEASE" conf="default" transitive="true"/>
<dependency org="org.springframework" name="spring-beans" rev="3.2.5.RELEASE" conf="default" transitive="true"/>
<dependency org="org.springframework" name="spring-test" rev="3.2.5.RELEASE" conf="default" transitive="true"/> |
The problem is here: https://github.com/colinsurprenant/redstorm/blob/master/lib/red_storm/dsl/topology.rb#L46 Since So what can we do. Well, you could write I'm afraid that there's no simple work around to this other than writing a few lines of Java... but we should totally offer something to make this work, not sure how exactly how at this point. |
I think we could make it work the way you suggested with spout JmsSpout, :id => :events_jms do |spout|
spout.setJmsProvider(jmsQueueProvider)
# etc
output_fields :bytes
end by creating the spout/bolt instance in the self.spout/bolt methods and passing the instance to the block, instead of delaying the instance creation at topology build. I'll create a branch shortly you can test. The potential caveat is that the spout/bolt block is already called in the configurator context but I'm guessing that you will typically only be using the |spout| param to call methods on it... thoughts? |
That would be fine. I'm not sure I understand the caveat you mentioned though. |
In the meantime, I've gotten a JMSSpout subclass working, but where would you recommend storing Java source files? Currently I have it in |
You can put java sources under |
Thanks. What do I need to do to set up Rake to build from |
There doesn't appear to be a way to call instance methods on bolts or spouts in the DSL, e.x. as is required here: https://github.com/ptgoetz/storm-jms/blob/master/examples/src/main/java/backtype/storm/contrib/jms/example/ExampleJmsTopology.java#L63-L67
Perhaps the instance should be passed to the block, so you can do something like this:
The text was updated successfully, but these errors were encountered: