Skip to content

Commit

Permalink
feat: support configure channel name (#32)
Browse files Browse the repository at this point in the history
  • Loading branch information
JTrancender authored Aug 26, 2021
1 parent 5d0bd28 commit 84176dc
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 13 deletions.
1 change: 1 addition & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ func NsqConsumerSettings() instance.Settings {
runFlags.String("etcd-path", "/config/nsq_consumer/default", "etcd path")
runFlags.String("etcd-username", "root", "etcd username")
runFlags.String("etcd-password", "root", "etcd password")
runFlags.String("channel", "NsqConsumer", "channel name of this nsqd consumer")
return instance.Settings{
RunFlags: runFlags,
Name: Name,
Expand Down
15 changes: 10 additions & 5 deletions consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,33 +44,38 @@ type etcdConfig struct {
// New creates a new Consumer pointer instance.
func New(settings instance.Settings) consumer.Creator {
return func(c *consumer.ConsumerEntity, rawConfig *common.Config) (consumer.Consumer, error) {
return newConsumer(c, rawConfig)
return newConsumer(c, settings, rawConfig)
}
}

func newConsumer(c *consumer.ConsumerEntity, rawConfig *common.Config) (consumer.Consumer, error) {
func newConsumer(c *consumer.ConsumerEntity, settings instance.Settings, rawConfig *common.Config) (consumer.Consumer, error) {
consumerType, err := rawConfig.String("consumer-type", -1)
if err != nil {
return nil, err
}

switch consumerType {
case "nsq":
return newNSQConsumer(c, rawConfig)
return newNSQConsumer(c, settings, rawConfig)
default:
return nil, fmt.Errorf("consumer name [%s] is invalid", consumerType)
}
}

func newNSQConsumer(c *consumer.ConsumerEntity, rawConfig *common.Config) (consumer.Consumer, error) {
func newNSQConsumer(c *consumer.ConsumerEntity, settings instance.Settings, rawConfig *common.Config) (consumer.Consumer, error) {
opts := newOptions()
cfg := nsq.NewConfig()
cfg.UserAgent = fmt.Sprintf("nsq-consumer/%s go-nsq/%s", version.GetDefaultVersion(), nsq.VERSION)
cfg.MaxInFlight = opts.MaxInFlight
cfg.DialTimeout = 10 * time.Second

queue := make(chan *Message)
channel, _ := settings.RunFlags.GetString("channel")
if rawConfig.HasField(("channel")) {
channel, _ = rawConfig.String("channel", -1)
}
opts.Channel = channel

queue := make(chan *Message)
consumer := &NSQConsumer{
done: make(chan struct{}),
opts: opts,
Expand Down
12 changes: 10 additions & 2 deletions consumer/options.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
package consumer

import "time"
import (
"time"

"github.com/spf13/pflag"
)

var (
channel = pflag.String("channel", "NsqConsumer", "channel name of this nsq consumer")
)

// Options options for config
type Options struct {
Expand All @@ -24,7 +32,7 @@ func newOptions() *Options {
return &Options{
LogPrefix: "[NsqConsumer] ",
LogLevel: "INFO",
Channel: "NsqConsumer",
Channel: *channel,
MaxInFlight: 200,
OutputDir: "/tmp",
SyncInterval: 30 * time.Second,
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/elastic/go-ucfg v0.8.3
github.com/google/uuid v1.2.0 // indirect
github.com/hashicorp/go-multierror v1.0.0
github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869 // indirect
github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869
github.com/nsqio/go-nsq v1.0.8
github.com/olivere/elastic/v7 v7.0.27
github.com/pkg/errors v0.9.1
Expand Down
2 changes: 1 addition & 1 deletion libconsumer/cmd/instance/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (c *Consumer) createConsumer(ct consumer.Creator) (consumer.Consumer, error

// handleFlags parses the command line flags
func (c *Consumer) handleFlags() error {
// flag.Parse()
// pflag.Parse()
return nil
}

Expand Down
12 changes: 8 additions & 4 deletions libconsumer/logp/configure/logging.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package configure

import (
"flag"
"fmt"

"github.com/JieTrancender/nsq_consumer/libconsumer/common"
"github.com/JieTrancender/nsq_consumer/libconsumer/logp"
"github.com/spf13/pflag"
)

var (
Expand All @@ -17,9 +17,9 @@ var (
type environmentVar logp.Environment

func init() {
flag.BoolVar(&verbose, "v", false, "Log at INFO level")
flag.BoolVar(&toStderr, "e", false, "Log to stderr and disable syslog/file output")
flag.Var((*environmentVar)(&environment), "environment", "set environment being ran in")
pflag.BoolVar(&verbose, "verbose", false, "Log at INFO level")
pflag.BoolVar(&toStderr, "toStderr", false, "Log to stderr and disable syslog/file output")
pflag.Var((*environmentVar)(&environment), "environment", "set environment being ran in")
}

// Logging builds a logp.Config based on the given common.Config and the specified CLI flags.
Expand Down Expand Up @@ -59,3 +59,7 @@ func (v *environmentVar) Set(in string) error {
func (v *environmentVar) String() string {
return (*logp.Environment)(v).String()
}

func (v *environmentVar) Type() string {
return (*logp.Environment)(v).Type()
}
4 changes: 4 additions & 0 deletions libconsumer/logp/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ func (v Environment) String() string {
}
}

func (v Environment) Type() string {
return "Environment"
}

// ParseEnvironment returns the environment type by name.
// The parse is case insensitive.
// InvalidEnvironment is returned if the environment type is unknown.
Expand Down

0 comments on commit 84176dc

Please sign in to comment.