From 7b6c72b2a1a3be35463b38aa2ee7b73111b87324 Mon Sep 17 00:00:00 2001 From: JTrancender Date: Fri, 27 Aug 2021 16:28:43 +0800 Subject: [PATCH] feat: support configure in-flight option (#33) --- consumer/config.go | 63 ++++++++++++++++++++++++++++++++++++++++++++ consumer/consumer.go | 11 +++----- 2 files changed, 66 insertions(+), 8 deletions(-) create mode 100644 consumer/config.go diff --git a/consumer/config.go b/consumer/config.go new file mode 100644 index 0000000..9922720 --- /dev/null +++ b/consumer/config.go @@ -0,0 +1,63 @@ +package consumer + +import ( + "fmt" + "reflect" + "strings" + "time" + + "github.com/JieTrancender/nsq_consumer/internal/version" + "github.com/JieTrancender/nsq_consumer/libconsumer/common" + "github.com/JieTrancender/nsq_consumer/libconsumer/logp" + "github.com/nsqio/go-nsq" +) + +type config struct { + DialTimeout time.Duration `config:"dial_timeout"` + + ReadTimeout time.Duration `config:"read_timeout"` + WriteTimeout time.Duration `config:"write_timeout"` + + MaxInFlight int `config:"max_in_flight"` + + Test int `config:"test"` +} + +func newNSQConfig(rawConfig *common.Config, consumerType string) *nsq.Config { + cfg := nsq.NewConfig() + cfg.UserAgent = fmt.Sprintf("nsq_consumer/%s go-nsq/%s", version.GetDefaultVersion(), nsq.VERSION) + + configName := strings.ToLower(consumerType) + if !rawConfig.HasField(configName) { + return cfg + } + + sub, err := rawConfig.Child(configName, -1) + if err != nil { + return cfg + } + + c := &config{ + DialTimeout: 6 * time.Second, + ReadTimeout: 60 * time.Second, + WriteTimeout: 1 * time.Second, + MaxInFlight: 200, + } + if err := sub.Unpack(&c); err != nil { + logp.L().Errorf("newNSQConfig unpack fail: %v", err) + return cfg + } + + val := reflect.ValueOf(c).Elem() + typ := val.Type() + cfgVal := reflect.ValueOf(cfg).Elem() + for i := 0; i < typ.NumField(); i++ { + field := typ.Field(i) + fieldVal := cfgVal.FieldByName(field.Name) + if fieldVal.IsValid() { + fieldVal.Set(reflect.ValueOf(val.Field(i).Interface())) + } + } + + return cfg +} diff --git a/consumer/consumer.go b/consumer/consumer.go index af66a7e..ecee97c 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -3,9 +3,7 @@ package consumer import ( "fmt" "sync" - "time" - "github.com/JieTrancender/nsq_consumer/internal/version" "github.com/JieTrancender/nsq_consumer/libconsumer/cmd/instance" "github.com/JieTrancender/nsq_consumer/libconsumer/common" "github.com/JieTrancender/nsq_consumer/libconsumer/consumer" @@ -56,18 +54,15 @@ func newConsumer(c *consumer.ConsumerEntity, settings instance.Settings, rawConf switch consumerType { case "nsq": - return newNSQConsumer(c, settings, rawConfig) + return newNSQConsumer(c, settings, rawConfig, consumerType) default: return nil, fmt.Errorf("consumer name [%s] is invalid", consumerType) } } -func newNSQConsumer(c *consumer.ConsumerEntity, settings instance.Settings, rawConfig *common.Config) (consumer.Consumer, error) { +func newNSQConsumer(c *consumer.ConsumerEntity, settings instance.Settings, rawConfig *common.Config, consumerType string) (consumer.Consumer, error) { opts := newOptions() - cfg := nsq.NewConfig() - cfg.UserAgent = fmt.Sprintf("nsq-consumer/%s go-nsq/%s", version.GetDefaultVersion(), nsq.VERSION) - cfg.MaxInFlight = opts.MaxInFlight - cfg.DialTimeout = 10 * time.Second + cfg := newNSQConfig(rawConfig, consumerType) channel, _ := settings.RunFlags.GetString("channel") if rawConfig.HasField(("channel")) {