Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix 64 Stream Info #67

Merged
merged 3 commits into from
Nov 5, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
193 changes: 117 additions & 76 deletions cmd/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,36 +59,48 @@ type StreamRetentionData []struct {
Duration string `json:"duration"`
}

// StreamAlertData is the data structure for stream alerts
type StreamAlertData struct {
Alerts []struct {
Message string `json:"message"`
Name string `json:"name"`
Rule struct {
Config struct {
Column string `json:"column"`
Operator string `json:"operator"`
Repeats int `json:"repeats"`
Value int `json:"value"`
} `json:"config"`
Type string `json:"type"`
} `json:"rule"`
Targets []struct {
Endpoint string `json:"endpoint"`
Password string `json:"password,omitempty"`
Repeat struct {
Interval string `json:"interval"`
Times int `json:"times"`
} `json:"repeat"`
SkipTLSCheck bool `json:"skip_tls_check,omitempty"`
Type string `json:"type"`
Username string `json:"username,omitempty"`
Headers struct {
Authorization string `json:"Authorization"`
} `json:"headers,omitempty"`
} `json:"targets"`
} `json:"alerts"`
Version string `json:"version"`
// AlertConfig structure
type AlertConfig struct {
Version string `json:"version"`
Alerts []Alert `json:"alerts"`
}

// Alert structure
type Alert struct {
Targets []Target `json:"targets"`
Name string `json:"name"`
Message string `json:"message"`
Rule Rule `json:"rule"`
}

// Target structure
type Target struct {
Type string `json:"type"`
Endpoint string `json:"endpoint"`
Headers map[string]string `json:"headers"`
SkipTLSCheck bool `json:"skip_tls_check"`
Repeat Repeat `json:"repeat"`
}

// Repeat structure
type Repeat struct {
Interval string `json:"interval"`
Times int `json:"times"`
}

// Rule structure
type Rule struct {
Type string `json:"type"`
Config RuleConfig `json:"config"`
}

// RuleConfig structure
type RuleConfig struct {
Column string `json:"column"`
Operator string `json:"operator"`
IgnoreCase bool `json:"ignoreCase"`
Value interface{} `json:"value"`
Repeats int `json:"repeats"`
}

// AddStreamCmd is the parent command for stream
Expand Down Expand Up @@ -132,10 +144,11 @@ var StatStreamCmd = &cobra.Command{
Example: " pb stream info backend_logs",
Short: "Get statistics for a stream",
Args: cobra.ExactArgs(1),
RunE: func(_ *cobra.Command, args []string) error {
RunE: func(cmd *cobra.Command, args []string) error {
name := args[0]
client := DefaultClient()

// Fetch stats data
stats, err := fetchStats(&client, name)
if err != nil {
return err
Expand All @@ -144,68 +157,96 @@ var StatStreamCmd = &cobra.Command{
ingestionCount := stats.Ingestion.Count
ingestionSize, _ := strconv.Atoi(strings.TrimRight(stats.Ingestion.Size, " Bytes"))
storageSize, _ := strconv.Atoi(strings.TrimRight(stats.Storage.Size, " Bytes"))
compressionRatio := 100 - (float64(storageSize) / float64(ingestionSize) * 100)

// Fetch retention data
retention, err := fetchRetention(&client, name)
if err != nil {
return err
}

isRetentionSet := len(retention) > 0

fmt.Println(StyleBold.Render("\nInfo:"))
fmt.Printf(" Event Count: %d\n", ingestionCount)
fmt.Printf(" Ingestion Size: %s\n", humanize.Bytes(uint64(ingestionSize)))
fmt.Printf(" Storage Size: %s\n", humanize.Bytes(uint64(storageSize)))
fmt.Printf(
" Compression Ratio: %.2f%s\n",
100-(float64(storageSize)/float64(ingestionSize))*100, "%")
fmt.Println()

if isRetentionSet {
fmt.Println(StyleBold.Render("Retention:"))
for _, item := range retention {
fmt.Printf(" Action: %s\n", StyleBold.Render(item.Action))
fmt.Printf(" Duration: %s\n", StyleBold.Render(item.Duration))
fmt.Println()
}
} else {
fmt.Println(StyleBold.Render("No retention period set on stream\n"))
}

// Fetch alerts data
alertsData, err := fetchAlerts(&client, name)
if err != nil {
return err
}
alerts := alertsData.Alerts

isAlertsSet := len(alerts) > 0

if isAlertsSet {
fmt.Println(StyleBold.Render("Alerts:"))
for _, alert := range alerts {
fmt.Printf(" Alert: %s\n", StyleBold.Render(alert.Name))
ruleFmt := fmt.Sprintf(
"%s %s %s repeated %d times",
alert.Rule.Config.Column,
alert.Rule.Config.Operator,
fmt.Sprint(alert.Rule.Config.Value),
alert.Rule.Config.Repeats,
)
fmt.Printf(" Rule: %s\n", ruleFmt)
fmt.Printf(" Targets: ")
for _, target := range alert.Targets {
fmt.Printf("%s, ", target.Type)
}
fmt.Print("\n\n")

// Check output format
output, _ := cmd.Flags().GetString("output")
if output == "json" {
// Prepare JSON response
data := map[string]interface{}{
"info": map[string]interface{}{
"event_count": ingestionCount,
"ingestion_size": humanize.Bytes(uint64(ingestionSize)),
"storage_size": humanize.Bytes(uint64(storageSize)),
"compression_ratio": fmt.Sprintf("%.2f%%", compressionRatio),
},
"retention": retention,
"alerts": alertsData.Alerts,
}

jsonData, err := json.MarshalIndent(data, "", " ")
if err != nil {
return err
}
fmt.Println(string(jsonData))
} else {
fmt.Println(StyleBold.Render("No alerts set on stream\n"))
// Default text output
isRetentionSet := len(retention) > 0
isAlertsSet := len(alertsData.Alerts) > 0

fmt.Println(StyleBold.Render("\nInfo:"))
fmt.Printf(" Event Count: %d\n", ingestionCount)
fmt.Printf(" Ingestion Size: %s\n", humanize.Bytes(uint64(ingestionSize)))
fmt.Printf(" Storage Size: %s\n", humanize.Bytes(uint64(storageSize)))
fmt.Printf(
" Compression Ratio: %.2f%s\n",
compressionRatio, "%")
fmt.Println()

if isRetentionSet {
fmt.Println(StyleBold.Render("Retention:"))
for _, item := range retention {
fmt.Printf(" Action: %s\n", StyleBold.Render(item.Action))
fmt.Printf(" Duration: %s\n", StyleBold.Render(item.Duration))
fmt.Println()
}
} else {
fmt.Println(StyleBold.Render("No retention period set on stream\n"))
}

if isAlertsSet {
fmt.Println(StyleBold.Render("Alerts:"))
for _, alert := range alertsData.Alerts {
fmt.Printf(" Alert: %s\n", StyleBold.Render(alert.Name))
ruleFmt := fmt.Sprintf(
"%s %s %s repeated %d times",
alert.Rule.Config.Column,
alert.Rule.Config.Operator,
fmt.Sprint(alert.Rule.Config.Value),
alert.Rule.Config.Repeats,
)
fmt.Printf(" Rule: %s\n", ruleFmt)
fmt.Printf(" Targets: ")
for _, target := range alert.Targets {
fmt.Printf("%s, ", target.Type)
}
fmt.Print("\n\n")
}
} else {
fmt.Println(StyleBold.Render("No alerts set on stream\n"))
}
}

return nil
},
}

func init() {
StatStreamCmd.Flags().String("output", "text", "Output format: text or json")
}

var RemoveStreamCmd = &cobra.Command{
Use: "remove stream-name",
Aliases: []string{"rm"},
Expand Down Expand Up @@ -345,7 +386,7 @@ func fetchRetention(client *HTTPClient, name string) (data StreamRetentionData,
return
}

func fetchAlerts(client *HTTPClient, name string) (data StreamAlertData, err error) {
func fetchAlerts(client *HTTPClient, name string) (data AlertConfig, err error) {
req, err := client.NewRequest("GET", fmt.Sprintf("logstream/%s/alert", name), nil)
if err != nil {
return
Expand Down
Loading