Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better workload splitting #381

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions examples/cluster.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
$: << File.expand_path('../lib', File.dirname(__FILE__))
require 'sneakers'
require 'sneakers/cluster'


class WorkerA
include Sneakers::Worker
from_queue 'downloads'

def work(msg)
sleep 1
ack!
end
end

class WorkerB
include Sneakers::Worker
from_queue 'downloads'

workgroup :transactions

def work(msg)
sleep 1
ack!
end
end

Sneakers::Cluster.configure_workrgoups(
default: {
workers: 2
},
transactions: {
workers: 1,
share_threads: true,
threads: 10
}
)

Sneakers::Cluster.start(nil) # start all groups
73 changes: 73 additions & 0 deletions lib/sneakers/cluster.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
require 'sneakers/runner'

module Sneakers
module Cluster
class << self
attr_reader :after_fork_hook, :current_workgroup

def configure_workrgoups(hash)
@config = hash
end

def after_fork(&block)
@after_fork_hook = block
end

def apply_workgroup_config!
return unless @config
Sneakers.configure(@config.fetch(current_workgroup) { {} })
end

def start(workgroups = nil)
workgroups ||= Sneakers::Worker::Classes.map(&:workgroup).uniq
workgroups = Array(workgroups)
if workgroups.count == 1
run_workgroup(workgroups.first)
else
fork_servers(workgroups)
end
end

private

def fork_servers(workgroups)
hook = Sneakers::CONFIG[:hooks][:before_fork]
hook.call if hook
pids = workgroups.map do |workgroup|
fork do
$0 = "sneakers-#{workgroup}" # set name for supervisor process and childs
run_workgroup(workgroup)
end
end
forward_signals(pids)
Process.waitall
end

def forward_signals(pids)
%w[TERM USR1 HUP USR2 INT].each do |signal|
Signal.trap(signal) do
pids.each do |pid|
begin
Process.kill(signal, pid)
rescue Errno::ESRCH, RangeError # don't crash if child is dead
end
end
end
end
end

def run_workgroup(workgroup)
@current_workgroup = workgroup
apply_workgroup_config!
after_fork_hook.call if after_fork_hook
worker_classes = Sneakers::Worker::Classes.select { |klass| klass.workgroup == workgroup }
Sneakers.logger.info "Running workgroup #{workgroup} with config #{Sneakers::CONFIG.inspect}"
run_sneakers(worker_classes)
end

def run_sneakers(worker_classes)
Sneakers::Runner.new(worker_classes, workers: Sneakers::CONFIG[:workers]).run
end
end
end
end
19 changes: 19 additions & 0 deletions lib/sneakers/tasks.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
require 'sneakers'
require 'sneakers/runner'
require 'sneakers/cluster'

task :environment

Expand Down Expand Up @@ -41,4 +42,22 @@

r.run
end

desc "Start in cluster mode (set $SNEAKERS_WORKGROUPS=WG1,WG2)"
task :cluster do |task, args|
Sneakers.server = true
Rake::Task['environment'].invoke
::Rails.application.eager_load! if defined?(::Rails)

workgroups =
if args.to_a.any?
args.to_a.map(&:to_sym)
elsif (wg_string = ENV["SNEAKERS_WORKGROUPS"])
wg_string.split(",").map(&:to_sym)
else
nil
end

Sneakers::Cluster.start(workgroups)
end
end
8 changes: 8 additions & 0 deletions lib/sneakers/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,14 @@ def enqueue(msg, opts={})
publisher.publish(msg, opts)
end

def workgroup(arg = nil)
if arg
@_sneakers_workgroup = arg
else
@_sneakers_workgroup || :default
end
end

private

def publisher
Expand Down
69 changes: 69 additions & 0 deletions spec/sneakers/cluster_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
require 'spec_helper'
require 'sneakers'
require 'sneakers/cluster'

describe Sneakers::Cluster do
describe "#start" do
before do
Sneakers.configure(log: Logger.new(STDOUT))
Sneakers.logger.level = Logger::WARN
stub(Sneakers::Cluster).fork { |block| block.call }
any_instance_of(Sneakers::Runner) { |runner| stub(runner).run }
stub(Sneakers::CONFIG[:hooks][:before_fork]).call
end

it "calls Sneakers::Runner#run" do
fake_runner = mock(Object.new).run
stub(Sneakers::Runner).new { fake_runner }
Sneakers::Cluster.start(:test)
end

it "calls fork hook" do
called = :no
Sneakers::Cluster.after_fork { called = :yes }
Sneakers::Cluster.start(:test)
called.must_equal :yes
end

it "applies workgroup config" do
mock(Sneakers::Cluster).apply_workgroup_config!
Sneakers::Cluster.start(:test)
end

it "sets @current_workgroup" do
Sneakers::Cluster.start(:test)
Sneakers::Cluster.current_workgroup.must_equal :test
Sneakers::Cluster.start(:test2)
Sneakers::Cluster.current_workgroup.must_equal :test2
end
end

describe "#configure_workrgoups" do
before do
Sneakers.clear!
Sneakers::Cluster.configure_workrgoups(
test1: { somekey: :someval },
test2: { somekey: :someval2 }
)
end

it "does not change config if workgroup not set" do
before = Sneakers::CONFIG
stub(Sneakers::Cluster).current_workgroup { nil }
Sneakers::Cluster.apply_workgroup_config!
Sneakers::CONFIG.must_equal before
end

it "applies config when called" do
stub(Sneakers::Cluster).current_workgroup { :test1 }
Sneakers::Cluster.apply_workgroup_config!
Sneakers::CONFIG[:somekey].must_equal :someval
end

it "scopes config by workgroups" do
stub(Sneakers::Cluster).current_workgroup { :test2 }
Sneakers::Cluster.apply_workgroup_config!
Sneakers::CONFIG[:somekey].must_equal :someval2
end
end
end
14 changes: 14 additions & 0 deletions spec/sneakers/worker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@ def work(msg)
end
end

class InTestWorkgroupWorker
include Sneakers::Worker
workgroup :test_group
end

TestPool ||= Concurrent::ImmediateExecutor

describe Sneakers::Worker do
Expand Down Expand Up @@ -159,6 +164,15 @@ def work(msg)
end
end

describe ".workgroup" do
it "sets and gets workgroup" do
InTestWorkgroupWorker.workgroup.must_equal :test_group
end
it "is :default by default" do
DummyWorker.workgroup.must_equal :default
end
end

describe "#initialize" do
describe "builds an internal queue" do
it "should build a queue with correct configuration given defaults" do
Expand Down