Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: snowpipe streaming #5110

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open

feat: snowpipe streaming #5110

wants to merge 3 commits into from

Conversation

achettyiitr
Copy link
Member

Description

  • Integrated new destination Snowpipe Streaming as part of async destination.

Security

  • The code changed/added as part of this pull request won't create any security issues with how the software is being used.

@achettyiitr achettyiitr force-pushed the feat.snowpipe-streaming branch 8 times, most recently from 68163a6 to 5607206 Compare September 23, 2024 08:53
@achettyiitr achettyiitr marked this pull request as draft September 23, 2024 20:02
@achettyiitr achettyiitr force-pushed the feat.snowpipe-streaming branch 5 times, most recently from 444da09 to f569ead Compare September 23, 2024 23:36
Copy link

codecov bot commented Sep 24, 2024

Codecov Report

Attention: Patch coverage is 77.84553% with 218 lines in your changes missing coverage. Please review.

Project coverage is 74.51%. Comparing base (e7599ca) to head (91341e3).

Files with missing lines Patch % Lines
...tionmanager/snowpipestreaming/snowpipestreaming.go 80.66% 59 Missing and 17 partials ⚠️
...yncdestinationmanager/snowpipestreaming/channel.go 64.88% 40 Missing and 19 partials ⚠️
...ncdestinationmanager/snowpipestreaming/discards.go 73.25% 16 Missing and 7 partials ⚠️
...ationmanager/snowpipestreaming/internal/api/api.go 82.78% 14 Missing and 7 partials ⚠️
warehouse/utils/uploader.go 15.00% 17 Missing ⚠️
...destinationmanager/snowpipestreaming/apiadapter.go 86.04% 9 Missing and 3 partials ⚠️
...manager/snowpipestreaming/testhelper/testhelper.go 62.96% 7 Missing and 3 partials ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #5110      +/-   ##
==========================================
+ Coverage   73.57%   74.51%   +0.93%     
==========================================
  Files         424      433       +9     
  Lines       59935    60902     +967     
==========================================
+ Hits        44099    45382    +1283     
+ Misses      13373    12966     -407     
- Partials     2463     2554      +91     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@github-actions github-actions bot added Stale and removed Stale labels Oct 15, 2024
@achettyiitr achettyiitr force-pushed the feat.snowpipe-streaming branch 5 times, most recently from 788f64d to d44d456 Compare October 28, 2024 08:51
@achettyiitr achettyiitr marked this pull request as ready for review October 28, 2024 08:52
@achettyiitr achettyiitr force-pushed the feat.snowpipe-streaming branch 3 times, most recently from 74e9353 to 0ccab91 Compare October 28, 2024 10:33
@rudderlabs rudderlabs deleted a comment from github-actions bot Oct 28, 2024
@achettyiitr achettyiitr force-pushed the feat.snowpipe-streaming branch 11 times, most recently from 7525987 to 3619fb8 Compare November 14, 2024 07:48
@achettyiitr achettyiitr force-pushed the feat.snowpipe-streaming branch 2 times, most recently from 6773278 to 69e98aa Compare November 14, 2024 09:03
@achettyiitr achettyiitr force-pushed the feat.snowpipe-streaming branch 3 times, most recently from 7a44095 to 06e0dd3 Compare November 26, 2024 05:28
whutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)

func (m *Manager) prepareChannelResponse(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

name doesn't seems to be apt as it is doing lot of operations inside

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to initializeChannelWithSchema.

@lvrach
Copy link
Member

lvrach commented Nov 28, 2024

@coderabbitai review

Copy link
Contributor

coderabbitai bot commented Nov 28, 2024

Walkthrough

This pull request introduces a series of changes across multiple files, primarily enhancing the functionality and testability of the Snowpipe streaming integration within the batch router. Key modifications include the addition of new methods and structs for managing Snowpipe channels, improved error handling, and updates to existing methods for better time management and job processing. The changes also expand the set of recognized asynchronous destinations and introduce new constants and interfaces to support the Snowpipe streaming context. Overall, the updates aim to refine data handling and improve the robustness of the integration.

Changes

File Path Change Summary
gateway/handle.go Updated getJobDataFromRequest for timestamp handling and error checks.
gateway/handle_lifecycle.go Replaced time.Now with timeutil.Now in Setup method.
processor/transformer/transformer.go Added handling for warehouseutils.SnowpipeStreaming in destTransformURL method.
router/batchrouter/asyncdestinationmanager/common/utils.go Updated asyncDestinations to include "SNOWPIPE_STREAMING".
router/batchrouter/asyncdestinationmanager/manager.go Added case for handling "SNOWPIPE_STREAMING" in newRegularManager.
router/batchrouter/asyncdestinationmanager/snowpipestreaming/apiadapter.go Introduced apiAdapter struct for Snowpipe streaming API interactions.
router/batchrouter/asyncdestinationmanager/snowpipestreaming/channel.go Added Manager struct for managing Snowpipe channels.
router/batchrouter/asyncdestinationmanager/snowpipestreaming/discards.go Introduced functionality for managing discarded records in Snowpipe.
router/batchrouter/asyncdestinationmanager/snowpipestreaming/internal/api/api.go Added API struct for remote service interactions related to channel management.
router/batchrouter/asyncdestinationmanager/snowpipestreaming/internal/api/api_integration_test.go Added integration tests for Snowpipe streaming API.
router/batchrouter/asyncdestinationmanager/snowpipestreaming/internal/api/api_test.go Introduced unit tests for API functionality.
router/batchrouter/asyncdestinationmanager/snowpipestreaming/internal/api/codes.go Added constants for error handling in API interactions.
router/batchrouter/asyncdestinationmanager/snowpipestreaming/internal/model/model.go Introduced data structures for managing Snowpipe operations.
router/batchrouter/asyncdestinationmanager/snowpipestreaming/internal/model/model_test.go Added unit tests for schema generation and JSON unmarshalling.
router/batchrouter/asyncdestinationmanager/snowpipestreaming/snowpipestreaming.go Added Manager struct for Snowpipe streaming operations.
router/batchrouter/asyncdestinationmanager/snowpipestreaming/snowpipestreaming_test.go Added unit tests for Snowpipe streaming functionalities.
router/batchrouter/asyncdestinationmanager/snowpipestreaming/testhelper/testhelper.go Introduced utility functions for managing Snowpipe test credentials.
router/batchrouter/asyncdestinationmanager/snowpipestreaming/types.go Added key structures and interfaces for Snowpipe streaming operations.
router/batchrouter/handle.go Enhanced Handle struct with new fields and improved upload processing logic.
router/batchrouter/handle_async.go Updated asyncUploadWorker logic and improved error handling in job status updates.
router/batchrouter/handle_lifecycle.go Modified Setup method to include new timeout variable and update existing variables for configurability.
utils/misc/misc.go Added "SNOWPIPE_STREAMING" to the list of batch destinations.
warehouse/integrations/manager/manager.go Updated factory functions to handle warehouseutils.SnowpipeStreaming.
warehouse/integrations/snowflake/datatype_mapper.go Renamed calculateDataType to CalculateDataType and updated its signature.
warehouse/integrations/snowflake/datatype_mapper_test.go Updated tests to reflect changes in CalculateDataType function signature.
warehouse/integrations/snowflake/snowflake.go Updated FetchSchema method to use CalculateDataType with new signature.
warehouse/slave/worker.go Renamed handleSchemaChange to HandleSchemaChange for visibility.
warehouse/slave/worker_test.go Updated tests to reflect the renaming of handleSchemaChange.
warehouse/utils/reservedkeywords.go Added "SNOWPIPE_STREAMING" to ReservedKeywords map.
warehouse/utils/uploader.go Introduced Uploader interface and noopUploader implementation for schema management.
warehouse/utils/utils.go Added SnowpipeStreaming constant and removed Uploader interface.

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

‼️ IMPORTANT
Auto-reply has been disabled for this repository in the CodeRabbit settings. The CodeRabbit bot will not respond to your replies unless it is explicitly tagged.

  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai or @coderabbitai title anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 21

🧹 Outside diff range and nitpick comments (44)
warehouse/integrations/snowflake/datatype_mapper_test.go (1)

Line range hint 11-27: Consider adding edge case test scenarios

While the current test cases cover basic scenarios, consider adding tests for:

  • Maximum/minimum INT values
  • Edge cases where scale affects precision
  • Different VARCHAR lengths

Here's a suggested addition to the test cases:

 func TestCalculateDataType(t *testing.T) {
 	testCases := []struct {
 		columnType   string
 		numericScale sql.NullInt64
 		expected     string
 		exists       bool
 	}{
 		{"VARCHAR", sql.NullInt64{}, "string", true},
 		{"INT", sql.NullInt64{}, "int", true},
 		{"INT", sql.NullInt64{Int64: 2, Valid: true}, "float", true},
 		{"UNKNOWN", sql.NullInt64{}, "", false},
+		{"VARCHAR(MAX)", sql.NullInt64{}, "string", true},
+		{"INT", sql.NullInt64{Int64: 10, Valid: true}, "float", true},
+		{"NUMBER", sql.NullInt64{Int64: 0, Valid: true}, "int", true},
 	}
router/batchrouter/asyncdestinationmanager/snowpipestreaming/internal/api/codes.go (1)

3-12: Consider adding documentation and grouping related errors

The error constants are well-defined and follow a consistent naming pattern. However, consider these improvements for better maintainability:

  1. Add godoc comments for each error constant to explain when they occur
  2. Group related constants (e.g., authorization errors together)
  3. Consider adding package documentation with examples of error handling

Here's a suggested improvement:

 package api

+// Error codes returned by the Snowpipe Streaming API
 const (
+       // General errors
        ErrUnknownError                        = "ERR_UNKNOWN_ERROR"
        ErrValidationError                     = "ERR_VALIDATION_ERROR"
        ErrSchemaConflict                      = "ERR_SCHEMA_CONFLICT"

+       // Authentication and authorization errors
        ErrAuthenticationFailed                = "ERR_AUTHENTICATION_FAILED"
        ErrRoleDoesNotExistOrNotAuthorized     = "ERR_ROLE_DOES_NOT_EXIST_OR_NOT_AUTHORIZED"
        ErrDatabaseDoesNotExistOrNotAuthorized = "ERR_DATABASE_DOES_NOT_EXIST_OR_NOT_AUTHORIZED"
        ErrSchemaDoesNotExistOrNotAuthorized   = "ERR_SCHEMA_DOES_NOT_EXIST_OR_NOT_AUTHORIZED"
        ErrTableDoesNotExistOrNotAuthorized    = "ERR_TABLE_DOES_NOT_EXIST_OR_NOT_AUTHORIZED"
 )
warehouse/integrations/snowflake/datatype_mapper.go (1)

Line range hint 47-51: Consider adding validation for negative scale values

While the logic for converting int to float when scale > 0 is correct, consider adding validation for negative scale values which could indicate data issues.

 func CalculateDataType(columnType string, numericScale int64) (string, bool) {
 	if datatype, ok := dataTypesMapToRudder[columnType]; ok {
+		if numericScale < 0 {
+			return "", false
+		}
 		if datatype == "int" && numericScale > 0 {
 			datatype = "float"
 		}
 		return datatype, true
 	}
 	return "", false
 }
router/batchrouter/asyncdestinationmanager/snowpipestreaming/testhelper/testhelper.go (1)

54-70: Consider improving error handling in schema cleanup

The implementation is good with retry logic, but consider adding more context to error messages and handling specific Snowflake error codes.

Consider this enhancement:

 func DropSchema(t testing.TB, db *sql.DB, namespace string) {
 	t.Helper()
-	t.Log("dropping schema", namespace)
+	t.Logf("attempting to drop schema %q", namespace)
 
 	require.Eventually(t,
 		func() bool {
 			_, err := db.ExecContext(context.Background(), fmt.Sprintf(`DROP SCHEMA %q CASCADE;`, namespace))
 			if err != nil {
-				t.Logf("error deleting schema %q: %v", namespace, err)
+				t.Logf("failed to drop schema %q: %v (will retry)", namespace, err)
 				return false
 			}
+			t.Logf("successfully dropped schema %q", namespace)
 			return true
 		},
 		time.Minute,
 		time.Second,
 	)
 }
warehouse/utils/uploader.go (1)

10-24: Consider breaking down the interface and adding documentation

While the interface is well-structured, consider the following improvements:

  1. The interface seems to violate the Interface Segregation Principle by combining multiple responsibilities (schema management, file operations, configuration). Consider splitting it into smaller, focused interfaces.
  2. Add GoDoc comments for the interface and its methods as this appears to be a public API.

Example refactor:

+// SchemaManager handles warehouse schema operations
+type SchemaManager interface {
+    IsWarehouseSchemaEmpty() bool
+    GetLocalSchema(ctx context.Context) (model.Schema, error)
+    UpdateLocalSchema(ctx context.Context, schema model.Schema) error
+    GetTableSchemaInWarehouse(tableName string) model.TableSchema
+    GetTableSchemaInUpload(tableName string) model.TableSchema
+}
+
+// FileManager handles load file operations
+type FileManager interface {
+    GetLoadFilesMetadata(ctx context.Context, options GetLoadFilesOptions) ([]LoadFile, error)
+    GetSampleLoadFileLocation(ctx context.Context, tableName string) (string, error)
+    GetSingleLoadFile(ctx context.Context, tableName string) (LoadFile, error)
+    GetLoadFileType() string
+}
+
+// ConfigProvider handles warehouse-specific configurations
+type ConfigProvider interface {
+    ShouldOnDedupUseNewRecord() bool
+    UseRudderStorage() bool
+    CanAppend() bool
+}
+
+// Uploader combines all warehouse operations
+type Uploader interface {
+    SchemaManager
+    FileManager
+    ConfigProvider
+}
router/batchrouter/asyncdestinationmanager/snowpipestreaming/apiadapter.go (2)

21-33: Add input validation in constructor

Consider validating required dependencies to prevent nil pointer dereferences at runtime.

 func newApiAdapter(
 	logger logger.Logger,
 	statsFactory stats.Stats,
 	api api,
 	destination *backendconfig.DestinationT,
 ) api {
+	if logger == nil {
+		panic("logger is required")
+	}
+	if statsFactory == nil {
+		panic("statsFactory is required")
+	}
+	if api == nil {
+		panic("api is required")
+	}
+	if destination == nil {
+		panic("destination is required")
+	}
 	return &apiAdapter{
 		logger:       logger,
 		statsFactory: statsFactory,
 		api:         api,
 		destination: destination,
 	}
 }

122-124: Define metric name as a constant

Consider moving the metric name to a constant to maintain consistency and ease future updates.

+const (
+	snowpipeStreamingAPIResponseTimeMetric = "snowpipe_streaming_api_response_time"
+)

 func (a *apiAdapter) recordDuration(tags stats.Tags) func() {
-	return a.statsFactory.NewTaggedStat("snowpipe_streaming_api_response_time", stats.TimerType, tags).RecordDuration()
+	return a.statsFactory.NewTaggedStat(snowpipeStreamingAPIResponseTimeMetric, stats.TimerType, tags).RecordDuration()
 }
router/batchrouter/asyncdestinationmanager/snowpipestreaming/internal/model/model_test.go (2)

12-79: Consider adding error handling test cases

The test cases are well-structured and cover various scenarios, but consider adding test cases for:

  1. Invalid scale values (negative or extremely large)
  2. Malformed type strings
  3. Case sensitivity in type names

Example additional test case:

{
    name: "Invalid scale value",
    tableSchema: map[string]ColumnInfo{
        "column1": {Type: lo.ToPtr("NUMBER(2,0)"), Scale: lo.ToPtr(-1.0)},
    },
    expected: whutils.ModelTableSchema{},
},

1-125: Add package and function documentation

Consider adding:

  1. Package documentation explaining the purpose of the model package
  2. Function documentation for both test functions describing what aspects they're testing

Example documentation:

// Package model provides types and functions for handling Snowpipe streaming operations

// TestChannelResponse_GenerateSnowPipeSchema validates the schema generation logic
// for various Snowflake data types and edge cases

// TestChannelResponse_UnmarshalJSON ensures proper parsing of Snowpipe channel
// responses including both successful and error scenarios
router/batchrouter/asyncdestinationmanager/snowpipestreaming/snowpipestreaming_test.go (1)

12-113: Well-structured test cases with good coverage.

The table-driven tests effectively cover the main scenarios for column comparison, including type differences, case sensitivity, and edge cases with empty/nil schemas.

Consider adding edge cases for column names.

While the current test cases are comprehensive for typical scenarios, consider adding tests for:

  • Column names with special characters
  • Very long column names
  • Unicode characters in column names
 		{
+			name: "column names with special characters and unicode",
+			eventSchema: whutils.ModelTableSchema{
+				"special@#$":    "STRING",
+				"very_long_column_name_exceeding_typical_length": "STRING",
+				"column_名字":    "STRING",
+			},
+			snowPipeSchema: whutils.ModelTableSchema{},
+			expected: []whutils.ColumnInfo{
+				{Name: "special@#$", Type: "STRING"},
+				{Name: "very_long_column_name_exceeding_typical_length", Type: "STRING"},
+				{Name: "column_名字", Type: "STRING"},
+			},
+		},
router/batchrouter/asyncdestinationmanager/snowpipestreaming/internal/api/api_integration_test.go (3)

49-60: Consider adding documentation for required environment variables.

While the code correctly checks for required environment variables, it would be helpful to add a comment explaining what TestKeyPairUnencrypted represents and its expected format.


61-156: Consider extracting test data and configuration to test fixtures.

The test is well-structured but could benefit from some improvements:

  1. Move the test data (rows) to a separate test fixture file
  2. Extract configuration values (ports, timeouts) to named constants
  3. Consider using a test helper for common setup steps (creating namespace and table)

Example for constants:

const (
    snowpipeClientPort = 9078
    statusCheckTimeout = 30 * time.Second
    statusCheckInterval = 300 * time.Millisecond
)

227-250: Add documentation for helper functions.

While the functions are well-implemented, they would benefit from documentation explaining their purpose and parameters:

// prepareDestination creates a new destination configuration for Snowpipe streaming
// using the provided credentials.
func prepareDestination(credentials *testhelper.TestCredentials) backendconfig.DestinationT {
    // ... existing implementation ...
}

// convertRowsToRecord converts a slice of model.Row to a slice of string slices,
// maintaining the order specified by the columns parameter. Missing values are
// represented as empty strings.
func convertRowsToRecord(rows []model.Row, columns []string) [][]string {
    // ... existing implementation ...
}
warehouse/slave/worker.go (1)

Line range hint 525-573: Consider improving error handling and type safety

The function could benefit from several improvements:

  1. Add more descriptive error messages including the actual value that failed conversion
  2. Add panic recovery for type assertions
  3. Consider breaking down the function into smaller, more focused functions for each type conversion

Here's a suggested improvement:

 func HandleSchemaChange(existingDataType, currentDataType model.SchemaType, value any) (any, error) {
+    // Recover from any panics during type assertions
+    defer func() {
+        if r := recover(); r != nil {
+            err = fmt.Errorf("panic during type conversion: %v", r)
+        }
+    }()
+
     var (
         newColumnVal any
         err          error
     )
 
     if existingDataType == model.StringDataType || existingDataType == model.TextDataType {
         // only stringify if the previous type is non-string/text/json
         if currentDataType != model.StringDataType && currentDataType != model.TextDataType && currentDataType != model.JSONDataType {
             newColumnVal = fmt.Sprintf("%v", value)
         } else {
             newColumnVal = value
         }
     } else if (currentDataType == model.IntDataType || currentDataType == model.BigIntDataType) && existingDataType == model.FloatDataType {
         intVal, ok := value.(int)
         if !ok {
-            err = fmt.Errorf("incompatible schema conversion from %v to %v", existingDataType, currentDataType)
+            err = fmt.Errorf("incompatible schema conversion from %v to %v (value: %v of type %T)", 
+                existingDataType, currentDataType, value, value)
         } else {
             newColumnVal = float64(intVal)
         }
     } else if currentDataType == model.FloatDataType && (existingDataType == model.IntDataType || existingDataType == model.BigIntDataType) {
         floatVal, ok := value.(float64)
         if !ok {
-            err = fmt.Errorf("incompatible schema conversion from %v to %v", existingDataType, currentDataType)
+            err = fmt.Errorf("incompatible schema conversion from %v to %v (value: %v of type %T)", 
+                existingDataType, currentDataType, value, value)
         } else {
             newColumnVal = int(floatVal)
         }
     } else if existingDataType == model.JSONDataType {
         var interfaceSliceSample []any
         if currentDataType == model.IntDataType || currentDataType == model.FloatDataType || currentDataType == model.BooleanDataType {
             newColumnVal = fmt.Sprintf("%v", value)
         } else if reflect.TypeOf(value) == reflect.TypeOf(interfaceSliceSample) {
             newColumnVal = value
         } else {
             newColumnVal = fmt.Sprintf(`"%v"`, value)
         }
     } else {
-        err = fmt.Errorf("incompatible schema conversion from %v to %v", existingDataType, currentDataType)
+        err = fmt.Errorf("incompatible schema conversion from %v to %v (value: %v of type %T)", 
+            existingDataType, currentDataType, value, value)
     }
 
     return newColumnVal, err
 }

Additionally, consider extracting type-specific conversions into separate functions:

func handleStringConversion(value any) any {
    return fmt.Sprintf("%v", value)
}

func handleFloatToIntConversion(value any) (any, error) {
    floatVal, ok := value.(float64)
    if !ok {
        return nil, fmt.Errorf("value %v is not a float64", value)
    }
    return int(floatVal), nil
}
gateway/handle_lifecycle.go (4)

Line range hint 269-278: Consider adding timeout to errgroup context

The errgroup is created without a timeout, which could lead to goroutines running indefinitely if the parent context isn't cancelled. Consider adding a timeout to the errgroup context.

-g, _ := errgroup.WithContext(ctx)
+g, gctx := errgroup.WithContext(ctx)
+gctx, cancel := context.WithTimeout(gctx, 30*time.Second)
+defer cancel()

Line range hint 449-452: Enhance error handling in Shutdown method

The Shutdown method should handle errors from backgroundWait() more gracefully and potentially implement a timeout to prevent hanging.

 func (gw *Handle) Shutdown() error {
 	gw.backgroundCancel()
+	shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
+	defer cancel()
+
+	errChan := make(chan error, 1)
+	go func() {
+		errChan <- gw.webhook.Shutdown()
+	}()
+
+	select {
+	case err := <-errChan:
+		if err != nil {
+			return fmt.Errorf("webhook shutdown failed: %w", err)
+		}
+	case <-shutdownCtx.Done():
+		return fmt.Errorf("shutdown timed out")
+	}
+
+	gw.inFlightRequests.Wait()
+
+	for _, worker := range gw.userWebRequestWorkers {
+		close(worker.webRequestQ)
+	}
+
+	if err := gw.backgroundWait(); err != nil {
+		return fmt.Errorf("background tasks shutdown failed: %w", err)
+	}
+	return nil
-	if err := gw.webhook.Shutdown(); err != nil {
-		return err
-	}
-
-	gw.inFlightRequests.Wait()
-
-	// UserWebRequestWorkers
-	for _, worker := range gw.userWebRequestWorkers {
-		close(worker.webRequestQ)
-	}
-
-	return gw.backgroundWait()
 }

Line range hint 308-321: Consider implementing rate limiting for batch operations

The batch processing logic in userWorkerRequestBatcher could benefit from rate limiting to prevent overwhelming the system during high load scenarios.

Consider implementing a token bucket rate limiter or similar mechanism to control the rate of batch processing. This would help prevent resource exhaustion during traffic spikes.


Line range hint 366-368: Add metrics for job processing errors

The error handling in dbWriterWorkerProcess should include metrics to track job processing failures for better observability.

 if err != nil {
+    gw.stats.NewStat("gateway.db_writer_errors", stats.CountType).Count(1)
     errorMessage := err.Error()
     if ctx.Err() != nil {
         errorMessage = ctx.Err().Error()
     }
processor/transformer/transformer.go (1)

540-542: Consider refactoring URL parameter construction to reduce duplication.

The URL parameter construction for warehouse destinations, including Snowpipe Streaming, follows a similar pattern. Consider extracting the common logic into a helper function to improve maintainability.

Here's a suggested refactor:

+func (trans *handle) getWarehouseQueryParams() string {
+    return fmt.Sprintf("whSchemaVersion=%s&whIDResolve=%v",
+        trans.conf.GetString("Warehouse.schemaVersion", "v1"),
+        warehouseutils.IDResolutionEnabled())
+}

 func (trans *handle) destTransformURL(destType string) string {
     destinationEndPoint := fmt.Sprintf("%s/v0/destinations/%s", trans.config.destTransformationURL, strings.ToLower(destType))

     if _, ok := warehouseutils.WarehouseDestinationMap[destType]; ok {
-        whSchemaVersionQueryParam := fmt.Sprintf("whSchemaVersion=%s&whIDResolve=%v", trans.conf.GetString("Warehouse.schemaVersion", "v1"), warehouseutils.IDResolutionEnabled())
+        whSchemaVersionQueryParam := trans.getWarehouseQueryParams()
         switch destType {
         case warehouseutils.RS:
             return destinationEndPoint + "?" + whSchemaVersionQueryParam
         case warehouseutils.CLICKHOUSE:
             enableArraySupport := fmt.Sprintf("chEnableArraySupport=%s", fmt.Sprintf("%v", trans.conf.GetBool("Warehouse.clickhouse.enableArraySupport", false)))
             return destinationEndPoint + "?" + whSchemaVersionQueryParam + "&" + enableArraySupport
         default:
             return destinationEndPoint + "?" + whSchemaVersionQueryParam
         }
     }

     if destType == warehouseutils.SnowpipeStreaming {
-        return fmt.Sprintf("%s?whSchemaVersion=%s&whIDResolve=%t", destinationEndPoint, trans.conf.GetString("Warehouse.schemaVersion", "v1"), warehouseutils.IDResolutionEnabled())
+        return destinationEndPoint + "?" + trans.getWarehouseQueryParams()
     }
     return destinationEndPoint
 }
warehouse/utils/utils.go (1)

222-225: Add documentation for type aliases

While the type aliases are well-named, adding documentation comments would improve clarity by explaining the purpose of these aliases and when to use them over the original types.

 type (
+	// ModelWarehouse is an alias for model.Warehouse to simplify type references
 	ModelWarehouse   = model.Warehouse
+	// ModelTableSchema is an alias for model.TableSchema to simplify type references
 	ModelTableSchema = model.TableSchema
 )
gateway/handle.go (1)

491-491: Consider refactoring for better maintainability.

The getJobDataFromRequest function handles multiple responsibilities and is quite long. Consider breaking it down into smaller, focused functions:

  • Event validation
  • Batch processing
  • Error handling
  • User suppression checks

Example refactoring approach:

func (gw *Handle) getJobDataFromRequest(req *webRequestT) (jobData *jobFromReq, err error) {
+    if err := gw.validateRequest(req); err != nil {
+        return nil, err
+    }
+
+    jobData, err = gw.processEventBatch(req)
+    if err != nil {
+        return nil, err
+    }
+
+    if err := gw.applyRateLimiting(req, jobData); err != nil {
+        return nil, err
+    }
+
+    return jobData, nil
}

+func (gw *Handle) validateRequest(req *webRequestT) error {
+    // Extract request validation logic
+}
+
+func (gw *Handle) processEventBatch(req *webRequestT) (*jobFromReq, error) {
+    // Extract batch processing logic
+}
+
+func (gw *Handle) applyRateLimiting(req *webRequestT, jobData *jobFromReq) error {
+    // Extract rate limiting logic
+}
warehouse/utils/reservedkeywords.go (1)

97-189: Consider reducing code duplication with SNOWFLAKE keywords.

Since SNOWPIPE_STREAMING uses identical keywords to SNOWFLAKE, consider refactoring to reuse the same keyword map. This would reduce maintenance overhead and prevent potential divergence.

Example refactor:

var snowflakeKeywords = map[string]bool{
    "ACCOUNT":           true,
    "ALL":               true,
    // ... rest of the keywords
}

var ReservedKeywords = map[string]map[string]bool{
    "SNOWFLAKE":          snowflakeKeywords,
-   "SNOWPIPE_STREAMING": {
-       "ACCOUNT":           true,
-       "ALL":               true,
-       // ... duplicate keywords
-   },
+   "SNOWPIPE_STREAMING": snowflakeKeywords,
    // ... rest of the destinations
}
router/batchrouter/asyncdestinationmanager/snowpipestreaming/internal/model/model.go (3)

93-95: Wrap unmarshalling errors with context.

Consider wrapping the error returned by json.Unmarshal to provide additional context for debugging.

Apply this diff:

 if err := json.Unmarshal(data, &temp); err != nil {
-	return err
+	return fmt.Errorf("failed to unmarshal ChannelResponse: %w", err)
 }

Also, add the necessary import:

import (
	"strings"
	"sync"
	"fmt" // Added import

	jsoniter "github.com/json-iterator/go"

	"github.com/rudderlabs/rudder-server/warehouse/integrations/snowflake"
	whutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)

81-83: Re-evaluate the use of a package-level variable for dataTypeCache.

Consider encapsulating dataTypeCache within a struct or providing access methods to improve modularity and prevent unintended access.


126-134: Handle type assertion in extractDataType safely.

Although the type assertion cachedResult.(string) is expected to be safe, consider adding a type check to prevent potential panics.

Apply this diff:

 if cachedResult, found := dataTypeCache.Load(input); found {
-	return cachedResult.(string)
+	if resultStr, ok := cachedResult.(string); ok {
+		return resultStr
+	}
 }
router/batchrouter/asyncdestinationmanager/snowpipestreaming/internal/api/api.go (2)

42-70: Refactor to reduce code duplication in HTTP request handling

The methods CreateChannel, DeleteChannel, GetChannel, Insert, and GetStatus share similar patterns for creating HTTP requests, setting headers, sending requests, and handling responses. This code duplication can be minimized by abstracting common logic into helper functions.

Consider creating helper functions for:

  • Building request URLs: A function to construct URLs based on endpoint paths and parameters.
  • Creating HTTP requests: A helper to create requests with common headers and context.
  • Executing requests and handling responses: A generic function to execute the request, check status codes, read the response body, and handle errors.
  • Decoding JSON responses: A utility to decode JSON responses into the appropriate structs.

This refactoring will improve maintainability and reduce the potential for inconsistencies or errors.

Also applies to: 72-96, 98-121, 123-151, 153-176


78-78: Remove unnecessary Content-Type header from requests without a body

Setting the Content-Type header on HTTP requests that do not include a body (such as GET and DELETE requests without a payload) is unnecessary and can be omitted.

Consider removing the Content-Type header from these requests to simplify them:

-req.Header.Set("Content-Type", "application/json")

This change clarifies that these requests do not send any content in the body.

Also applies to: 104-104, 159-159

router/batchrouter/asyncdestinationmanager/snowpipestreaming/channel.go (2)

65-67: Simplify defer statements by calling Cleanup directly

You can simplify your defer statements by directly deferring the Cleanup call without wrapping it in an anonymous function:

-	defer func() {
-		snowflakeManager.Cleanup(ctx)
-	}()
+	defer snowflakeManager.Cleanup(ctx)

This improves code readability and maintains the same functionality.

Also applies to: 158-160, 186-188


142-168: Refactor duplicated code in handleSchemaError and handleTableError

The functions handleSchemaError and handleTableError share similar logic for creating the Snowflake manager, deferring cleanup, and creating the schema or table. Consider refactoring the common code into a helper function to reduce duplication and enhance maintainability.

You could create a helper function like this:

func (m *Manager) handleError(
	ctx context.Context,
	channelReq *model.CreateChannelRequest,
	eventSchema whutils.ModelTableSchema,
	createSchema bool,
) (*model.ChannelResponse, error) {
	snowflakeManager, err := m.createSnowflakeManager(ctx, channelReq.TableConfig.Schema)
	if err != nil {
		return nil, fmt.Errorf("creating snowflake manager: %w", err)
	}
	defer snowflakeManager.Cleanup(ctx)

	if createSchema {
		if err := snowflakeManager.CreateSchema(ctx); err != nil {
			return nil, fmt.Errorf("creating schema: %w", err)
		}
	}

	if err := snowflakeManager.CreateTable(ctx, channelReq.TableConfig.Table, eventSchema); err != nil {
		return nil, fmt.Errorf("creating table: %w", err)
	}

	return m.api.CreateChannel(ctx, channelReq)
}

And then modify handleSchemaError and handleTableError to use this helper:

func (m *Manager) handleSchemaError(
	ctx context.Context,
	channelReq *model.CreateChannelRequest,
	eventSchema whutils.ModelTableSchema,
) (*model.ChannelResponse, error) {
	m.logger.Infon("Handling schema error",
		logger.NewStringField("schema", channelReq.TableConfig.Schema),
		logger.NewStringField("table", channelReq.TableConfig.Table),
	)

	return m.handleError(ctx, channelReq, eventSchema, true)
}

func (m *Manager) handleTableError(
	ctx context.Context,
	channelReq *model.CreateChannelRequest,
	eventSchema whutils.ModelTableSchema,
) (*model.ChannelResponse, error) {
	m.logger.Infon("Handling table error",
		logger.NewStringField("schema", channelReq.TableConfig.Schema),
		logger.NewStringField("table", channelReq.TableConfig.Table),
	)

	return m.handleError(ctx, channelReq, eventSchema, false)
}

Also applies to: 170-193

router/batchrouter/asyncdestinationmanager/snowpipestreaming/internal/api/api_test.go (4)

359-359: Fix typo in test name: "Get Statu" should be "Get Status"

There's a typo in the test name. It should be "Get Status" to correctly reflect the method being tested.


41-84: Refactor repeated httptest.NewServer setups into helper functions

The setup code for httptest.NewServer with similar handler functions is repeated across multiple test cases (Create Channel, Delete Channel, Get Channel, Insert, Get Status). Refactoring this code into reusable helper functions would reduce code duplication and improve maintainability.


55-56: Set Content-Type header in HTTP responses

In the test HTTP handlers, it's good practice to set the Content-Type header when sending a JSON response to simulate real-world API behavior. Please add w.Header().Set("Content-Type", "application/json") before writing the response. This applies to all handlers returning JSON responses.


127-127: Correct the test case description

The test case description should be "Request failure (non-200 status code)" without the apostrophe. This change improves clarity and grammatical correctness.

router/batchrouter/asyncdestinationmanager/snowpipestreaming/snowpipestreaming.go (6)

13-13: Nitpick: Unnecessary Import of strconv

The strconv package is imported on line 12~ but does not appear to be used anywhere in the code.

Consider removing the unused import to keep the code clean:

-	"strconv"

287-289: Error Handling Improvement: Clarify Error Message When Creating Channel

In the error message returned when failing to create a channel, consider including the destinationID or more context to aid in debugging.

Update the error message as follows:

-	return nil, nil, fmt.Errorf("creating channel %s: %w", info.tableName, err)
+	return nil, nil, fmt.Errorf("creating channel for table %s in destination %s: %w", info.tableName, destinationID, err)

464-465: Error Handling Improvement: More Informative Error on Invalid Status Response

When the status response is invalid or unsuccessful, returning a generic errInvalidStatusResponse may not provide enough context.

Enhance the error message to include details from statusRes:

	if !statusRes.Valid || !statusRes.Success {
-		return false, errInvalidStatusResponse
+		return false, fmt.Errorf("invalid status response: success=%v, valid=%v", statusRes.Success, statusRes.Valid)
	}

319-319: Error Message Enhancement: Include Error Details

In the error message constructed when inserting data fails, consider including the error messages from insertRes.Errors for better troubleshooting.

Modify the error message to include detailed errors:

	return nil, nil, fmt.Errorf(
-		"inserting data %s failed: %s %v",
+		"inserting data for table %s failed: code=%s, errors=%v",
		info.tableName,
		insertRes.Code,
		errorMessages,
	)

12-12: Unused Import: slices Package Not Utilized

The slices package is imported but not used in the code.

Consider removing the unused import to clean up dependencies:

-	"slices"

181-184: Variable Shadowing: Overlapping Variable Names

In lines 181-184, the variables importingJobIDs and failedJobIDs are declared, which might shadow any variables with the same names in the broader scope.

Ensure that variable names are unique within their scope to prevent shadowing and potential bugs.

router/batchrouter/handle_async.go (1)

Line range hint 323-348: Replace time.After with time.Ticker to prevent timer leaks

Using time.After() inside a loop can lead to unbounded accumulation of timers, causing memory leaks, because each call creates a new timer that cannot be garbage collected until it fires. It is recommended to use time.Ticker for periodic actions within loops.

Apply this diff to refactor the code:

 func (brt *Handle) asyncUploadWorker(ctx context.Context) {
 	if !asynccommon.IsAsyncDestination(brt.destType) {
 		return
 	}

+	ticker := time.NewTicker(brt.asyncUploadWorkerTimeout.Load())
+	defer ticker.Stop()
+
 	for {
 		select {
 		case <-ctx.Done():
 			return
-		case <-time.After(brt.asyncUploadWorkerTimeout.Load()):
+		case <-ticker.C:
 			brt.configSubscriberMu.RLock()
 			destinationsMap := brt.destinationsMap
 			uploadIntervalMap := brt.uploadIntervalMap
 			brt.configSubscriberMu.RUnlock()

 			for destinationID := range destinationsMap {
 				_, ok := brt.asyncDestinationStruct[destinationID]
 				if !ok || brt.asyncDestinationStruct[destinationID].UploadInProgress {
 					continue
 				}

 				timeElapsed := time.Since(brt.asyncDestinationStruct[destinationID].CreatedAt)
 				brt.asyncDestinationStruct[destinationID].UploadMutex.Lock()

 				timeout := uploadIntervalMap[destinationID]
 				if brt.asyncDestinationStruct[destinationID].Exists && (brt.asyncDestinationStruct[destinationID].CanUpload || timeElapsed > timeout) {
 					brt.asyncDestinationStruct[destinationID].CanUpload = true
 					brt.asyncDestinationStruct[destinationID].PartFileNumber++
 					uploadResponse := brt.asyncDestinationStruct[destinationID].Manager.Upload(brt.asyncDestinationStruct[destinationID])
 					if uploadResponse.ImportingParameters != nil && len(uploadResponse.ImportingJobIDs) > 0 {
 						brt.asyncDestinationStruct[destinationID].UploadInProgress = true
 					}
 					brt.setMultipleJobStatus(uploadResponse, true, brt.asyncDestinationStruct[destinationID].AttemptNums, brt.asyncDestinationStruct[destinationID].FirstAttemptedAts, brt.asyncDestinationStruct[destinationID].OriginalJobParameters)
 					brt.asyncStructCleanUp(destinationID)
 				}
 				brt.asyncDestinationStruct[destinationID].UploadMutex.Unlock()
 			}
 		}
 	}
 }
router/batchrouter/handle.go (4)

81-81: Add documentation for asyncUploadWorkerTimeout.

The new field asyncUploadWorkerTimeout config.ValueLoader[time.Duration] has been added to the Handle struct. Adding a comment to explain its purpose, default value, and how it influences the async upload worker would enhance code readability and maintainability.


154-154: Initialize now function in a thread-safe manner.

The now function is initialized in the mainLoop method with brt.now = timeutil.Now if it is nil. Ensure that this lazy initialization is thread-safe. Alternatively, consider initializing brt.now during the construction of the Handle struct to avoid the nil check in the main loop.


Line range hint 422-436: Replace panic calls with proper error handling.

In the upload method, several error conditions are handled using panic(err). Using panic is discouraged in production code as it can crash the entire application. Instead, handle the errors gracefully by returning them to the caller or logging them appropriately.

Apply this diff to handle errors without panicking:

 err = os.MkdirAll(filepath.Dir(gzipFilePath), os.ModePerm)
 if err != nil {
-	panic(err)
+	brt.logger.Errorf("Failed to create directories for %s: %v", gzipFilePath, err)
+	return UploadResult{
+	    Error:          err,
+	    LocalFilePaths: []string{gzipFilePath},
+	}
 }

 err = misc.CreateGZ(gzipFilePath)
 if err != nil {
-	panic(err)
+	brt.logger.Errorf("Failed to create gzip writer for %s: %v", gzipFilePath, err)
+	return UploadResult{
+	    Error:          err,
+	    LocalFilePaths: []string{gzipFilePath},
+	}
 }

Ensure all instances of panic(err) are replaced with appropriate error handling throughout the method.


Line range hint 655-655: Fix typo in comment for skipFetchingJobs method.

There is a typo in the comment:

-// skipFetchingJobs returns true if the destination type is async and the there are still jobs in [importing] state for this destination type
+// skipFetchingJobs returns true if the destination type is async and there are still jobs in [importing] state for this destination type
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between e7599ca and 91341e3.

⛔ Files ignored due to path filters (3)
  • .github/workflows/tests.yaml is excluded by !**/*.yaml
  • integration_test/snowpipestreaming/testdata/docker-compose.rudder-transformer.yml is excluded by !**/*.yml
  • router/batchrouter/asyncdestinationmanager/snowpipestreaming/testdata/docker-compose.rudder-snowpipe-clients.yml is excluded by !**/*.yml
📒 Files selected for processing (31)
  • gateway/handle.go (1 hunks)
  • gateway/handle_lifecycle.go (3 hunks)
  • processor/transformer/transformer.go (1 hunks)
  • router/batchrouter/asyncdestinationmanager/common/utils.go (1 hunks)
  • router/batchrouter/asyncdestinationmanager/manager.go (2 hunks)
  • router/batchrouter/asyncdestinationmanager/snowpipestreaming/apiadapter.go (1 hunks)
  • router/batchrouter/asyncdestinationmanager/snowpipestreaming/channel.go (1 hunks)
  • router/batchrouter/asyncdestinationmanager/snowpipestreaming/discards.go (1 hunks)
  • router/batchrouter/asyncdestinationmanager/snowpipestreaming/internal/api/api.go (1 hunks)
  • router/batchrouter/asyncdestinationmanager/snowpipestreaming/internal/api/api_integration_test.go (1 hunks)
  • router/batchrouter/asyncdestinationmanager/snowpipestreaming/internal/api/api_test.go (1 hunks)
  • router/batchrouter/asyncdestinationmanager/snowpipestreaming/internal/api/codes.go (1 hunks)
  • router/batchrouter/asyncdestinationmanager/snowpipestreaming/internal/model/model.go (1 hunks)
  • router/batchrouter/asyncdestinationmanager/snowpipestreaming/internal/model/model_test.go (1 hunks)
  • router/batchrouter/asyncdestinationmanager/snowpipestreaming/snowpipestreaming.go (1 hunks)
  • router/batchrouter/asyncdestinationmanager/snowpipestreaming/snowpipestreaming_test.go (1 hunks)
  • router/batchrouter/asyncdestinationmanager/snowpipestreaming/testhelper/testhelper.go (1 hunks)
  • router/batchrouter/asyncdestinationmanager/snowpipestreaming/types.go (1 hunks)
  • router/batchrouter/handle.go (4 hunks)
  • router/batchrouter/handle_async.go (3 hunks)
  • router/batchrouter/handle_lifecycle.go (2 hunks)
  • utils/misc/misc.go (1 hunks)
  • warehouse/integrations/manager/manager.go (2 hunks)
  • warehouse/integrations/snowflake/datatype_mapper.go (1 hunks)
  • warehouse/integrations/snowflake/datatype_mapper_test.go (1 hunks)
  • warehouse/integrations/snowflake/snowflake.go (1 hunks)
  • warehouse/slave/worker.go (2 hunks)
  • warehouse/slave/worker_test.go (1 hunks)
  • warehouse/utils/reservedkeywords.go (1 hunks)
  • warehouse/utils/uploader.go (1 hunks)
  • warehouse/utils/utils.go (3 hunks)
🔇 Additional comments (47)
router/batchrouter/asyncdestinationmanager/common/utils.go (1)

6-6: LGTM! Verify constant usage consistency.

The addition of "SNOWPIPE_STREAMING" to asyncDestinations is appropriate for the new feature. However, let's verify the constant usage across the codebase.

✅ Verification successful

LGTM! The constant usage is consistent across the codebase.

The verification shows that "SNOWPIPE_STREAMING" is consistently used across the codebase:

  • Defined as a constant in warehouse/utils/utils.go as SnowpipeStreaming = "SNOWPIPE_STREAMING"
  • Used consistently in batch destinations list in utils/misc/misc.go
  • Referenced correctly in all integration points including warehouse manager, transformer, and async destination manager
  • Properly used in test files with the exact same string value
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify consistent usage of SNOWPIPE_STREAMING constant
# Expected: All occurrences should use the exact same string "SNOWPIPE_STREAMING"

# Search for any variations of the constant name to ensure consistency
rg -i "snowpipe.*streaming" --type go

# Search for destination type declarations to ensure alignment
ast-grep --pattern 'const $_ = "SNOWPIPE_STREAMING"'

Length of output: 40376

warehouse/integrations/snowflake/datatype_mapper_test.go (1)

24-24: Verify null handling safety

The change from passing tc.numericScale to tc.numericScale.Int64 moves null checking responsibility to the caller. Ensure that CalculateDataType properly handles zero values when numericScale.Valid is false.

✅ Verification successful

Let me gather more information about the implementation to verify null handling safety.


Let me fix the regex and try again to check the implementation.


Accessing numericScale.Int64 is safe in this context

The implementation is safe because:

  1. CalculateDataType only uses numericScale to convert "int" to "float" when scale > 0
  2. When numericScale.Valid is false, Int64 defaults to 0, which maintains the correct behavior
  3. Test cases cover both scenarios:
    • sql.NullInt64{} (zero value, Valid=false) for VARCHAR and INT types
    • sql.NullInt64{Int64: 2, Valid: true} for INT with scale
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check if CalculateDataType is called with invalid sql.NullInt64 values in the codebase

# Search for all calls to CalculateDataType
ast-grep --pattern 'CalculateDataType($_, $_)'

# Search for null checks before calls
rg -B 5 'CalculateDataType\(' 

Length of output: 2992


Script:

#!/bin/bash
# Check the implementation of CalculateDataType and how it handles numericScale
ast-grep --pattern 'func CalculateDataType($_, $_) {
  $$$
}'

# Check if numericScale is used in any conditional logic
rg -A 10 'numericScale.*{' warehouse/integrations/snowflake/

Length of output: 220


Script:

#!/bin/bash
# Check the implementation of CalculateDataType and its handling of numericScale
cat warehouse/integrations/snowflake/datatype_mapper.go

# Check all usages of numericScale in conditional statements
rg -A 5 'numericScale' warehouse/integrations/snowflake/

Length of output: 4346

router/batchrouter/asyncdestinationmanager/snowpipestreaming/internal/api/codes.go (2)

1-2: LGTM! Package structure follows Go conventions

The package is appropriately named and correctly placed under the internal directory, indicating it's meant for internal use only.


3-12: Verify coverage of all Snowpipe error scenarios

Let's ensure all potential Snowpipe streaming error scenarios are covered by these error codes.

✅ Verification successful

Let me analyze the error handling patterns in the codebase to verify if all necessary error scenarios are covered by the error codes.


Based on the analysis of the codebase, I can now provide a comprehensive verification of the error codes. Let me generate the final response.

Error codes are comprehensive and properly mapped

The current error codes in codes.go cover all critical Snowpipe streaming error scenarios:

  • Authentication failures: ErrAuthenticationFailed
  • Authorization/access issues:
    • ErrRoleDoesNotExistOrNotAuthorized
    • ErrDatabaseDoesNotExistOrNotAuthorized
    • ErrSchemaDoesNotExistOrNotAuthorized
    • ErrTableDoesNotExistOrNotAuthorized
  • Schema validation:
    • ErrValidationError
    • ErrSchemaConflict
  • Generic errors: ErrUnknownError

The error codes align with:

  • Snowflake's API error responses (seen in API tests)
  • Real-world error scenarios (validated through integration tests)
  • Standard error handling patterns in the codebase
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for any additional error handling patterns in the codebase
# that might indicate missing error scenarios

# Look for error handling in Snowpipe related files
rg -g '**/*snow*.go' -B 2 -A 2 'error|err|fail|invalid'

# Look for TODO/FIXME comments related to error handling
rg -g '**/*snow*.go' 'TODO|FIXME|XXX' -C 2

Length of output: 134196


Script:

#!/bin/bash
# Search for error handling patterns in the Snowpipe API code
ast-grep --pattern 'const (
  $$$
  Err$_ = $_
  $$$
)'

# Look for error handling in Snowpipe API related files
rg -g '**/snowpipestreaming/internal/api/*.go' -B 2 -A 2 'error|err|fail|invalid'

Length of output: 57187

warehouse/integrations/snowflake/datatype_mapper.go (1)

45-45: Verify null handling in callers after signature change

The function has been exported and the parameter type changed from sql.NullInt64 to int64. While this simplifies the function, ensure all callers properly handle NULL values before passing the scale.

✅ Verification successful

Let me check how numericScale is handled before being passed to CalculateDataType.


Let me check the complete context of the numericScale handling in the Snowpipe streaming model file to ensure proper null handling.


Function signature change is safe and properly handled

The change from sql.NullInt64 to int64 is well-handled across all callers:

  • In snowflake.go, the caller safely accesses .Int64 from a sql.NullInt64
  • In model.go, proper initialization with default 0 and safe pointer dereferencing is implemented
  • Test cases cover both valid and invalid scenarios
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for all callers of CalculateDataType to verify NULL handling
rg -A 5 "CalculateDataType\(" 

Length of output: 2729


Script:

#!/bin/bash
# Check how numericScale is handled in the calling contexts
rg -B 5 "CalculateDataType\(" 

Length of output: 2677


Script:

#!/bin/bash
# Check the complete context of numericScale handling in the model file
rg -B 10 -A 5 "numericScale :=" router/batchrouter/asyncdestinationmanager/snowpipestreaming/internal/model/model.go

Length of output: 432

router/batchrouter/asyncdestinationmanager/snowpipestreaming/testhelper/testhelper.go (5)

1-17: LGTM! Clean package structure and imports

The package structure follows Go conventions, and imports are well-organized with a clear separation between standard library, external, and internal packages.


19-22: LGTM! Well-defined constants

Constants are clearly named and follow the standard naming conventions for environment variables.


34-46: LGTM! Well-implemented credentials retrieval

The function has proper error handling and clear error messages. The use of os.LookupEnv is appropriate for test credentials.


48-52: LGTM! Robust schema name generation

Good implementation with:

  • Secure random string generation
  • Timestamp inclusion for uniqueness
  • Proper case conversion for Snowflake compatibility

24-32: Consider secure handling of sensitive credentials

The struct contains sensitive information (private key and passphrase). Ensure that these credentials are properly secured in the testing environment and not accidentally logged or exposed.

Let's verify the usage of these sensitive fields:

Consider implementing the following security measures:

  1. Add documentation comments about the security implications
  2. Consider implementing secure cleanup after tests
✅ Verification successful

Based on the search results, I can generate the final response as I have enough context about how these credentials are handled in the codebase.

Sensitive credentials are properly handled in test environment

The codebase demonstrates secure handling of sensitive credentials in test environments:

  • Test credentials are sourced from environment variables (not hardcoded)
  • Private keys are stored in temporary directories during tests using t.TempDir()
  • Private keys are properly formatted and escaped when used in configurations
  • Test credentials are scoped to test functions and cleaned up after tests complete
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for potential logging or exposure of sensitive credentials
rg -i "privatekey|passphrase" -A 5 -B 5

Length of output: 70435

warehouse/utils/uploader.go (2)

1-9: LGTM: Clean package structure and imports

The package name and imports are appropriate for the functionality being implemented.


1-55: Verify the migration from old Uploader interface

According to the AI summary, this interface replaces a previous Uploader interface from warehouse/utils/utils.go. Let's verify the migration is complete.

✅ Verification successful

Based on the search results, I can now provide a final response about the migration of the Uploader interface:

Migration to new Uploader interface is complete

The codebase search shows that:

  • The new Uploader interface in warehouse/utils/uploader.go is being used extensively across the warehouse integrations (Snowflake, Redshift, PostgreSQL, etc.)
  • No references to the old interface location (warehouse/utils/utils.go) were found
  • All implementations (noopUploader, dummyUploader, source.Uploader) use the new interface
  • Mock implementations in tests are generated from the new interface location
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check for any remaining references to the old Uploader interface

# Search for old interface usage
rg -l "warehouse/utils/utils" | while read -r file; do
  echo "Checking $file for old Uploader interface usage..."
  rg "Uploader" "$file"
done

# Verify all implementations of the new interface
ast-grep --pattern 'type $_ struct {
  $$$
}

func ($_ $_) IsWarehouseSchemaEmpty() bool {
  $$$
}'

Length of output: 146


Script:

#!/bin/bash
# Let's try a different approach to verify the interface implementations and usage

# Search for any implementations of the Uploader interface
echo "=== Searching for Uploader interface implementations ==="
rg "type.*struct.*{" -A 5 | rg -B 5 "IsWarehouseSchemaEmpty\(\).*bool"

# Search for imports of the old utils package
echo -e "\n=== Searching for old utils package imports ==="
rg "\"github.com/rudderlabs/rudder-server/warehouse/utils\""

# Search for Uploader type usage
echo -e "\n=== Searching for Uploader type usage ==="
rg "Uploader" -A 2 -B 2

# Check if there are any references to the old location
echo -e "\n=== Checking references to potential old location ==="
fd "utils.go" --type f | while read -r file; do
    echo "Checking $file..."
    rg "type.*Uploader.*interface" "$file"
done

Length of output: 110347

router/batchrouter/asyncdestinationmanager/manager.go (2)

19-19: LGTM! Import statement is correctly placed and follows conventions.

The snowpipestreaming package import is properly grouped with other internal destination manager imports.


45-46: ⚠️ Potential issue

Review error handling and parameter consistency

A few concerns with the new case implementation:

  1. Error Handling: Other destination managers (e.g., BINGADS_AUDIENCE) return potential errors from their New() function, but this implementation always returns nil error. Consider updating the snowpipestreaming.New() signature to return an error if appropriate.

  2. Parameter Consistency: Other destination managers (e.g., BINGADS_AUDIENCE) receive backendConfig, but it's not passed to snowpipestreaming.New(). Verify if this parameter is needed.

Let's verify the integration:

router/batchrouter/asyncdestinationmanager/snowpipestreaming/apiadapter.go (2)

14-19: LGTM! Constants are well-defined

The API operation constants follow a consistent naming convention and are used appropriately for metrics tagging.


24-24: Verify the api interface definition

The api type used here isn't immediately visible. Let's verify its definition and contract.

✅ Verification successful

Based on the code provided in the context, I can now generate the final response as I can see the complete definition of both the api interface and its implementation.

The api interface is well-defined and properly implemented

The api interface is defined within the same package and is implemented by the apiAdapter struct. The interface defines methods for managing Snowpipe streaming channels:

  • CreateChannel: Creates a new channel for streaming data
  • DeleteChannel: Deletes an existing channel
  • Insert: Inserts data into a channel
  • GetStatus: Gets the status of a channel

The implementation in apiAdapter properly wraps these operations with logging and metrics. The interface and its implementation are consistent with the adapter pattern being used.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for the api interface definition in the same package
ast-grep --pattern 'type api interface {
  $$$
}'

# Search for any references to this interface
rg -A 5 'type api interface'

Length of output: 82


Script:

#!/bin/bash
# Let's try searching for the api type in the same directory with a case-insensitive search
rg -i "type.*api" router/batchrouter/asyncdestinationmanager/snowpipestreaming/

# Also search for any files that might define this API interface
fd -e go . router/batchrouter/asyncdestinationmanager/snowpipestreaming/ --exec cat {}

Length of output: 85907


Script:

#!/bin/bash
# Let's search for the api interface implementation in the same package
rg -A 10 'type api interface' router/batchrouter/asyncdestinationmanager/snowpipestreaming/

# Also search for the api adapter implementation
rg -A 10 'type apiAdapter struct' router/batchrouter/asyncdestinationmanager/snowpipestreaming/

Length of output: 191

warehouse/integrations/manager/manager.go (2)

92-92: LGTM! Consistent implementation

The implementation in NewWarehouseOperations matches the one in New, maintaining consistency in how both destination types are handled.


67-67: Verify snowflake manager's compatibility with SnowpipeStreaming

The code reuses the snowflake manager for both SNOWFLAKE and SnowpipeStreaming destination types. While this suggests shared functionality, we should verify that the snowflake manager can handle both modes correctly.

✅ Verification successful

Let me gather more information about how Snowflake manager handles different destination types.


Let me try a different approach to check the snowflake implementation.


Let me check one more aspect to verify how the snowflake manager handles different destination types.


Based on the gathered information, I can now provide the final response:

Snowflake manager safely handles both destination types

The snowflake manager implementation is destination-type agnostic and handles both SNOWFLAKE and SnowpipeStreaming types correctly. The manager's core functionality:

  • Uses the same connection and configuration setup for both types
  • Handles data loading and schema management uniformly
  • Relies on common utilities for cloud provider and object storage configuration
  • Maintains consistent warehouse operations regardless of the destination type
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify if snowflake manager has specific handling for SnowpipeStreaming

# Check for any specific handling of SnowpipeStreaming in snowflake package
echo "Checking snowflake package for SnowpipeStreaming handling..."
rg -l "SnowpipeStreaming" warehouse/integrations/snowflake/

# Check for any configuration or setup differences
echo "Checking for configuration differences..."
ast-grep --pattern 'if destType == warehouseutils.SnowpipeStreaming {
  $$$
}'

Length of output: 364


Script:

#!/bin/bash
# Let's check for any differences in handling between SNOWFLAKE and SnowpipeStreaming

# Check snowflake manager implementation
echo "Checking snowflake manager implementation..."
rg -A 5 "type SnowflakeManager struct" warehouse/integrations/snowflake/

# Check initialization and configuration
echo "Checking initialization and configuration..."
rg -A 10 "func NewSnowflake\(" warehouse/integrations/snowflake/

# Check if there are any mode-specific configurations
echo "Checking for destination type specific handling..."
rg -A 3 "destType.*=.*SnowpipeStreaming" warehouse/integrations/snowflake/
rg -A 3 "DestinationType.*=.*SnowpipeStreaming" warehouse/integrations/snowflake/

# Check for any snowpipe specific methods or configurations
echo "Checking for snowpipe specific implementations..."
rg -l "snowpipe" warehouse/integrations/snowflake/

Length of output: 771


Script:

#!/bin/bash
# Let's examine the snowflake package more thoroughly

# First, list all files in the snowflake package to understand its structure
echo "Listing snowflake package files..."
fd . warehouse/integrations/snowflake/

# Check for any streaming-related implementations
echo "Checking for streaming implementations..."
rg -l "Stream" warehouse/integrations/snowflake/

# Look for configuration or setup related code
echo "Checking for configuration setup..."
rg -A 5 "Config" warehouse/integrations/snowflake/

# Check for any type-specific handling in the upload process
echo "Checking upload process..."
rg -A 5 "Upload" warehouse/integrations/snowflake/

# Check for any initialization differences
echo "Checking initialization..."
rg -A 5 "Init" warehouse/integrations/snowflake/

Length of output: 86754


Script:

#!/bin/bash
# Check for any differences in handling between SNOWFLAKE and SnowpipeStreaming

# Check the main snowflake implementation file
echo "Checking snowflake.go for destination type handling..."
rg -A 5 "type Snowflake struct" warehouse/integrations/snowflake/snowflake.go

# Check for any configuration differences based on destination type
echo "Checking for destination type specific configuration..."
rg -A 5 "DestinationType.*=.*" warehouse/integrations/snowflake/snowflake.go

# Check for any setup differences
echo "Checking Setup method for destination type handling..."
rg -A 10 "func.*Setup.*" warehouse/integrations/snowflake/snowflake.go

Length of output: 1214

router/batchrouter/asyncdestinationmanager/snowpipestreaming/snowpipestreaming_test.go (2)

1-10: LGTM! Clean import organization.

The package declaration and imports are well-organized, following Go conventions with proper grouping of standard library, external, and internal packages.


109-110: Verify the implementation of tested functions.

The test file references two functions that should be implemented elsewhere:

  • findNewColumns function
  • destConfig.Decode method

Also applies to: 167-168

✅ Verification successful

Let me search for the findNewColumns function in the snowpipestreaming package specifically.


Both tested functions are properly implemented in the codebase

The verification confirms:

  • findNewColumns function is implemented in router/batchrouter/asyncdestinationmanager/snowpipestreaming/channel.go
  • destConfig.Decode method is implemented in router/batchrouter/asyncdestinationmanager/snowpipestreaming/types.go
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify the implementation of tested functions exists

# Search for findNewColumns function implementation
echo "Searching for findNewColumns implementation..."
ast-grep --pattern 'func findNewColumns($_, $_) {
  $$$
}'

# Search for destConfig struct and Decode method
echo "Searching for destConfig struct and Decode method..."
ast-grep --pattern 'type destConfig struct {
  $$$
}'

ast-grep --pattern 'func ($_) Decode($_) error {
  $$$
}'

Length of output: 2336


Script:

#!/bin/bash
# Search for findNewColumns in snowpipestreaming package files
echo "Searching for findNewColumns in snowpipestreaming files..."
rg -l "snowpipestreaming" | xargs rg "findNewColumns" -A 5

# Also search for the function implementation using a more specific pattern
echo "Searching with specific pattern..."
ast-grep --pattern 'func findNewColumns(eventSchema []interface{}, snowPipeSchema []interface{}) []string {
  $$$
}'

Length of output: 2472

router/batchrouter/asyncdestinationmanager/snowpipestreaming/internal/api/api_integration_test.go (1)

32-47: LGTM! Clean and minimal mock implementations.

The mock types are well-designed for testing HTTP interactions.

warehouse/slave/worker.go (1)

348-348: Verify the impact of exporting HandleSchemaChange

The function has been made public (exported). Please ensure this change is intentional and verify if other packages need this functionality.

✅ Verification successful

Function export is justified and properly used

The function HandleSchemaChange is correctly exported as it's being used in another package (router/batchrouter/asyncdestinationmanager/snowpipestreaming/discards.go). The change from private to public scope is intentional and necessary for this cross-package usage.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check if HandleSchemaChange is used in other packages

# Search for usage of HandleSchemaChange across the codebase
rg "HandleSchemaChange" --type go

# Search for potential test files that might need to be updated
rg "handleSchemaChange" --type go

Length of output: 744

router/batchrouter/handle_lifecycle.go (2)

214-216: LGTM! Configuration keys properly scoped to destination type.

The modification to include destination type in configuration keys for minIdleSleep, uploadFreq, and mainLoopFreq follows the established pattern and allows for destination-specific timing configurations.

Let's verify the configuration usage across destinations:

✅ Verification successful

Based on the search results, I can now generate the final response since I have sufficient information to verify the configuration changes:

Configuration changes properly implemented and used consistently

The destination-specific configuration keys for minIdleSleep, uploadFreq, and mainLoopFreq are correctly implemented and used throughout the codebase:

  • The configuration values are properly loaded with destination type prefix in handle_lifecycle.go
  • The values are used consistently in worker scheduling and execution logic
  • Integration tests (especially snowpipestreaming_test.go) demonstrate proper usage of destination-specific configurations
  • No hardcoded values found that should be using these configurations instead
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify configuration patterns and usage across destinations
# Expected: Find consistent usage of these configurations with destination type prefixes

# Search for minIdleSleep usage
rg "minIdleSleep" --type go -B 2 -A 2

# Search for uploadFreq configuration patterns
rg "uploadFreq" --type go -B 2 -A 2

# Search for mainLoopFreq usage
rg "mainLoopFreq" --type go -B 2 -A 2

# Check for any hardcoded values that should use these configurations
ast-grep --pattern 'time.Sleep($duration)'

Length of output: 34133


205-205: LGTM! Verify timeout configuration.

The addition of asyncUploadWorkerTimeout with a default of 10 seconds follows the established pattern. The configuration is properly prefixed with destination type for flexibility.

Let's verify if similar timeout configurations exist in other async destinations:

✅ Verification successful

The timeout configuration is appropriate and consistent with the codebase patterns

The asyncUploadWorkerTimeout with a default of 10 seconds aligns well with similar timeout patterns in the codebase:

  • It follows the same configuration pattern as other BatchRouter timeouts
  • The 10-second default is consistent with similar worker/upload timeouts:
    • asyncUploadTimeout (30 minutes for overall upload)
    • pollStatusLoopSleep (10 seconds)
    • minIdleSleep (2 seconds)
    • uploadFreq (30 seconds)

The configuration is properly structured with destination-type specific override support (BatchRouter.<destType>.asyncUploadWorkerTimeout) and a global fallback (BatchRouter.asyncUploadWorkerTimeout).

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check for similar worker timeout configurations across the codebase
# Expected: Find similar timeout patterns in other async destinations

# Search for worker timeout configurations
rg -A 2 "uploadWorkerTimeout|workerTimeout" --type go

# Search for default timeout values in async destinations
ast-grep --pattern 'config.GetReloadableDurationVar($_, $_, time.Second, $$$)'

Length of output: 136


Script:

#!/bin/bash
# Let's try a broader search for timeout configurations and worker-related settings

# Search for timeout configurations in batch router
rg -i "timeout.*=.*config" router/batchrouter/

# Search for worker-related configurations
rg -i "worker.*=.*config" router/batchrouter/

# Search for GetReloadableDurationVar usage
rg "GetReloadableDurationVar" -A 2 --type go

# Look for similar configuration patterns
rg "BatchRouter\." -A 2 --type go

Length of output: 42470

gateway/handle_lifecycle.go (1)

41-41: LGTM: Import addition is well-placed

The addition of the timeutil package import is correctly placed and aligns with the implementation changes.

processor/transformer/transformer.go (1)

540-542: LGTM! The implementation follows existing patterns.

The Snowpipe Streaming destination handling is properly integrated, following the same pattern as other warehouse destinations and reusing existing configuration values.

Let's verify the configuration documentation:

utils/misc/misc.go (1)

99-99: LGTM! Verify integration with destination handlers

The addition of "SNOWPIPE_STREAMING" to the batch destinations list is consistent with the PR's objective of adding Snowpipe Streaming support.

Let's verify that the new destination type is properly handled in the destination-specific code:

✅ Verification successful

Based on the comprehensive verification results, I can now generate the final response:

LGTM! Snowpipe Streaming integration is properly implemented

The addition of "SNOWPIPE_STREAMING" to the batch destinations list is well-integrated:

  • Complete implementation found in router/batchrouter/asyncdestinationmanager/snowpipestreaming/
  • Proper destination type handling in warehouse integrations manager
  • Comprehensive test coverage including integration tests
  • Full implementation of required interfaces (Upload, Poll, GetUploadStats)
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify that SNOWPIPE_STREAMING is properly integrated with destination handlers

# Check for destination handler implementation
ast-grep --pattern 'case "SNOWPIPE_STREAMING": $$$'

# Check for any configuration structs or interfaces related to Snowpipe
rg -l "(?i)snowpipe.*(?:config|options|settings)"

# Check for any test files related to Snowpipe
fd -g "*test*.go" | xargs rg -l "(?i)snowpipe"

Length of output: 816


Script:

#!/bin/bash
# Let's verify the integration by checking the implementation details

# Check the main implementation file for proper handling
cat router/batchrouter/asyncdestinationmanager/snowpipestreaming/snowpipestreaming.go

# Check if the destination type is properly handled in router configuration
rg -A 5 "type DestinationType string" 

# Check for any switch/case statements handling destination types
ast-grep --pattern 'switch destType {
  $$$
}'

Length of output: 26001

warehouse/utils/utils.go (2)

41-52: LGTM: Constants block is well-structured

The addition of SnowpipeStreaming constant follows the established naming convention and is appropriately placed within the warehouse destinations block.


520-521: LGTM: Consistent handling of SnowpipeStreaming case

The modification to include SnowpipeStreaming in the uppercase conversion logic is consistent with the existing pattern and correctly groups it with SNOWFLAKE.

warehouse/slave/worker_test.go (3)

Line range hint 964-975: LGTM! Well-structured and comprehensive test cases

The test implementation demonstrates good practices:

  • Comprehensive coverage of data type conversions
  • Well-organized table-driven tests
  • Parallel test execution for better performance
  • Clear test case names and expectations

Line range hint 1-975: LGTM! Excellent test structure and coverage

The test file demonstrates excellent testing practices:

  • Logical organization using sub-tests
  • Proper isolation of integration tests using docker containers
  • Thorough error case coverage
  • Comprehensive assertions

964-964: Verify the exported function's usage and documentation

The function has been made public (exported) by renaming from handleSchemaChange to HandleSchemaChange. This change requires verification of its usage across the codebase and proper documentation.

✅ Verification successful

Based on the results, I can now generate the final response:

Function export change is properly implemented and documented

The HandleSchemaChange function is:

  • Properly documented in warehouse/slave/worker.go
  • Used in both test files and production code
  • Correctly exported as it's being used across packages (specifically in router/batchrouter/asyncdestinationmanager/snowpipestreaming/discards.go)
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check for function usage and documentation
# Test 1: Find all usages of HandleSchemaChange
rg -p "HandleSchemaChange"

# Test 2: Check if the function is documented in any package documentation
fd -e go -x grep -l "^// HandleSchemaChange" {}

Length of output: 725

gateway/handle.go (1)

491-491: LGTM! Verify time handling consistency.

The change to use gw.now() instead of time.Now() improves testability. This is a good practice as it allows for time mocking in tests.

Let's verify the consistency of timestamp handling across the codebase:

✅ Verification successful

Based on the search results, I can now generate the final response:

LGTM! Time handling is consistent across the codebase

The change to use gw.now() instead of time.Now() is consistent with the codebase's time handling patterns:

  • The timestamp format misc.RFC3339Milli is consistently used across the codebase for event timestamps
  • The gateway package already uses gw.now() in other places for timestamp generation
  • The change maintains consistency with the existing pattern of dependency injection for time functions, which improves testability
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check for direct time.Now() calls and timestamp format consistency

# Check for direct time.Now() calls in the gateway package
echo "Checking for direct time.Now() calls in gateway package..."
rg "time\.Now\(\)" --type go

# Check for RFC3339Milli format usage consistency
echo "Checking timestamp format consistency..."
rg "RFC3339|time\.RFC" --type go

Length of output: 66261

warehouse/integrations/snowflake/snowflake.go (1)

1403-1403: LGTM! Verify other usages of the exported function.

The change to use CalculateDataType with explicit Int64 access improves code reusability and type safety.

Let's verify other usages of this function across the codebase:

✅ Verification successful

Inconsistent usage of calculateDataType function across the codebase

The verification reveals that there are two different implementations:

  • calculateDataType in warehouse/integrations/redshift/redshift.go
  • CalculateDataType in warehouse/integrations/snowflake/datatype_mapper.go

Let's verify if these implementations are similar and should be consolidated:


Let me try a different pattern to fetch the implementations:


The change to CalculateDataType is correctly implemented but reveals separate concerns

The verification shows that while both functions share a similar structure, they handle different warehouse-specific logic:

  • Redshift's version handles string length constraints
  • Snowflake's version handles numeric scale for float conversion

These implementations should remain separate as they address warehouse-specific type mapping requirements. The change to export Snowflake's function is appropriate as it's now being used by the snowpipe streaming module.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Find all usages of the calculateDataType/CalculateDataType function

# Search for both old and new function names to ensure complete migration
echo "Searching for old function name (calculateDataType)..."
rg "calculateDataType\("

echo "Searching for new function name (CalculateDataType)..."
rg "CalculateDataType\("

Length of output: 1186


Script:

#!/bin/bash
# Get implementation of both functions
echo "Redshift calculateDataType implementation:"
ast-grep --pattern 'func calculateDataType($_, $_) { $$$ }' warehouse/integrations/redshift/redshift.go

echo -e "\nSnowflake CalculateDataType implementation:"
ast-grep --pattern 'func CalculateDataType($_, $_) { $$$ }' warehouse/integrations/snowflake/datatype_mapper.go

Length of output: 417


Script:

#!/bin/bash
echo "Redshift calculateDataType implementation:"
rg -A 20 "^func calculateDataType" warehouse/integrations/redshift/redshift.go

echo -e "\nSnowflake CalculateDataType implementation:"
rg -A 20 "^func CalculateDataType" warehouse/integrations/snowflake/datatype_mapper.go

Length of output: 1277

warehouse/utils/reservedkeywords.go (1)

97-189: LGTM! The SNOWPIPE_STREAMING keywords are correctly defined.

The addition of SNOWPIPE_STREAMING keywords is appropriate as it matches the SNOWFLAKE keywords, which is expected since Snowpipe Streaming is a Snowflake feature using the same SQL dialect.

router/batchrouter/asyncdestinationmanager/snowpipestreaming/types.go (2)

32-43: Ensure Proper Initialization of Client Configuration Parameters

The client configuration parameters within the Manager struct's config field do not show default values in this snippet. Ensure that these fields are properly initialized elsewhere in the codebase. Uninitialized configuration settings can lead to runtime issues.


22-31: 🛠️ Refactor suggestion

Potential Confusion Between appConfig and config Fields in Manager Struct

The Manager struct contains both appConfig and config fields, which could lead to confusion due to their similar naming. Consider renaming the config field to something more specific, such as managerConfig, to improve clarity.

This is similar to a previous comment where conf was renamed to appConfig.

router/batchrouter/asyncdestinationmanager/snowpipestreaming/internal/model/model.go (1)

14-14: Consider defining types individually for better clarity.

Defining types individually rather than within a grouped type block can improve readability and make it easier to locate and maintain each type.

router/batchrouter/asyncdestinationmanager/snowpipestreaming/discards.go (1)

68-76: Function names should start with a verb for clarity

Consider renaming discardsTable and discardsSchema to getDiscardsTable and getDiscardsSchema to follow Go naming conventions, enhancing readability and consistency across the codebase.

router/batchrouter/asyncdestinationmanager/snowpipestreaming/channel.go (1)

229-248: Verify secure handling of authentication configuration

The code sets useKeyPairAuth to true, enforcing key pair authentication. Ensure that private keys and passphrases in destConf are securely managed and not exposed in logs or error messages throughout the codebase.

router/batchrouter/asyncdestinationmanager/snowpipestreaming/snowpipestreaming.go (3)

463-466: Logic Clarification: Condition for Import Completion

The condition statusRes.Offset != info.Offset determines if the import is still in progress. Confirm that this logic correctly reflects the intended behavior, as it might be counterintuitive.

Please verify if the condition should be statusRes.Offset == info.Offset to indicate completion.


375-376: Typographical Error: Incorrect HTTP Status Code

The HTTP status code for a bad request should be http.StatusBadRequest, which is 400. However, in line 375, you might want to double-check if this status code aligns with the intended response in this context.

Ensure that returning http.StatusBadRequest is appropriate here, or if another status code like http.StatusInternalServerError would be more suitable.


236-265: Potential Performance Issue: Reading Large Files Without Limiting Buffer Size

When reading events from the file in eventsFromFile, the scanner buffer is set based on maxBufferCapacity. Ensure that maxBufferCapacity is appropriately sized to prevent potential memory issues with large files.

Run the following script to verify the usages and default value of maxBufferCapacity:

Ensure that maxBufferCapacity is set to a reasonable value and consider making it configurable if necessary.

✅ Verification successful

Buffer Size is Properly Configured and Limited

The verification shows that maxBufferCapacity is properly configured with a reasonable default value of 512KB and is implemented as a reloadable configuration variable. This is a safe and appropriate size for buffered scanning, as it:

  • Has a sensible default (512KB)
  • Is configurable at runtime through configuration
  • Uses proper unit handling through bytesize.KB
  • Is explicitly used to limit the scanner's buffer size
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify the default value and usages of maxBufferCapacity.

# Check for the default value setting
rg -A 2 'm\.config\.maxBufferCapacity' -- 'snowpipestreaming.go'

# Check for other usages of maxBufferCapacity
rg 'maxBufferCapacity' -- 'snowpipestreaming.go'

Length of output: 233


Script:

#!/bin/bash
# Let's first find the correct path to the file
fd snowpipestreaming.go

# After finding the file, let's check for maxBufferCapacity usage
fd snowpipestreaming.go --exec rg -A 2 'maxBufferCapacity'

# Let's also check for any configuration struct definitions
fd snowpipestreaming.go --exec ast-grep --pattern 'type $_ struct {
  $$$
  maxBufferCapacity $_
  $$$
}'

Length of output: 508

router/batchrouter/handle_async.go (2)

18-18: Import statement added correctly

The addition of github.com/rudderlabs/rudder-go-kit/stats is appropriate and necessary for the updated functionality.


381-381: Standardize time retrieval using brt.now()

Replacing time.Now() with brt.now() enhances consistency in time handling across the codebase, allowing for easier testing and potential future adjustments to time retrieval mechanisms.

router/batchrouter/handle.go (2)

31-31: Verify the necessity of the new import asynccommon.

The import of asynccommon "github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/common" has been added. Please ensure that this package is used in the code. Unused imports can clutter the codebase and introduce unnecessary dependencies.


44-44: Confirm usage of the timeutil package.

The timeutil package has been imported with github.com/rudderlabs/rudder-server/utils/timeutil. Verify that the utilities provided by this package are effectively utilized in the code. If it's only used for timeutil.Now, consider whether importing the entire package is necessary or if time.Now suffices.

warehouse/utils/uploader.go Show resolved Hide resolved
Comment on lines +453 to +466
statusRes, err := m.api.GetStatus(ctx, info.ChannelID)
if err != nil {
return false, fmt.Errorf("getting status: %w", err)
}
log.Infon("Polled import info",
logger.NewBoolField("success", statusRes.Success),
logger.NewStringField("polledOffset", statusRes.Offset),
logger.NewBoolField("valid", statusRes.Valid),
logger.NewBoolField("completed", statusRes.Offset == info.Offset),
)
if !statusRes.Valid || !statusRes.Success {
return false, errInvalidStatusResponse
}
return statusRes.Offset != info.Offset, nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Possible Incorrect Import Progress Check

Within pollForImportInfo, the import is considered in progress if statusRes.Offset != info.Offset. However, this might lead to incorrect status reporting if offsets are not properly synchronized.

Evaluate the logic and ensure that the offset comparison accurately reflects the import status.

Would you like assistance in reviewing and correcting the logic for import status determination?

m.config.client.timeoutDuration = conf.GetDuration("SnowpipeStreaming.Client.timeout", 300, time.Second)
m.config.client.retryWaitMin = conf.GetDuration("SnowpipeStreaming.Client.retryWaitMin", 100, time.Millisecond)
m.config.client.retryWaitMax = conf.GetDuration("SnowpipeStreaming.Client.retryWaitMax", 10, time.Second)
m.config.client.retryMax = conf.GetInt("SnowpipeStreaming.Client.retryWaitMin", 5)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Critical Issue: Incorrect Configuration Key for retryMax

The configuration key used for setting m.config.client.retryMax is incorrect. Currently, it reads from "SnowpipeStreaming.Client.retryWaitMin", which is likely a typo.

Apply this diff to fix the configuration key:

-	m.config.client.retryMax = conf.GetInt("SnowpipeStreaming.Client.retryWaitMin", 5)
+	m.config.client.retryMax = conf.GetInt("SnowpipeStreaming.Client.retryMax", 5)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
m.config.client.retryMax = conf.GetInt("SnowpipeStreaming.Client.retryWaitMin", 5)
m.config.client.retryMax = conf.GetInt("SnowpipeStreaming.Client.retryMax", 5)

Comment on lines +321 to +338

var discardImInfo *importInfo
if len(discardInfos) > 0 {
discardImInfo, err = m.sendDiscardEventsToSnowpipe(ctx, offset, info.discardChannelResponse.ChannelID, discardInfos)
if err != nil {
return nil, nil, fmt.Errorf("sending discard events to Snowpipe: %w", err)
}
}
log.Infon("Sent events to Snowpipe")

imInfo := &importInfo{
ChannelID: channelResponse.ChannelID,
Offset: offset,
Table: info.tableName,
Count: len(info.events),
}
return imInfo, discardImInfo, nil
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Resource Management: Potential Missing Deletion of Discard Channels

After sending discard events, if an error occurs or after successful processing, the discard channel may not be deleted, potentially leading to resource leaks.

Consider modifying the code to ensure discard channels are properly cleaned up:

+	defer func() {
+		if deleteErr := m.deleteChannel(ctx, info.tableName, info.discardChannelResponse.ChannelID); deleteErr != nil {
+			m.logger.Warnn("Failed to delete discard channel", obskit.Error(deleteErr))
+		}
+	}()

Committable suggestion skipped: line range outside the PR's diff.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants