Skip to content

Commit

Permalink
refactor: error reporting to drive category fron stats
Browse files Browse the repository at this point in the history
propagate statTags from transformer response to reporting.
  • Loading branch information
koladilip committed Nov 28, 2024
1 parent a9bd822 commit 5a0baac
Show file tree
Hide file tree
Showing 11 changed files with 240 additions and 137 deletions.
129 changes: 87 additions & 42 deletions enterprise/reporting/error_extractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,48 +109,72 @@ func (ext *ExtractorHandle) getSimpleMessage(jsonStr string) string {
}

for key, erRes := range jsonMap {
erResStr, isString := erRes.(string)
if !isString {
ext.log.Debugf("Type-assertion failed for %v with value %v: not a string", key, erRes)
if result := ext.handleKey(key, erRes); result != nil {
return *result
}
switch key {
case "reason":
return erResStr
case "Error":
if !IsJSON(erResStr) {
return strings.Split(erResStr, "\n")[0]
}
return ""
case responseKey, errorKey:
if IsJSON(erResStr) {
var unmarshalledJson interface{}
unmarshalledErr := json.Unmarshal([]byte(erResStr), &unmarshalledJson)
if unmarshalledErr != nil {
return erResStr
}
return getErrorMessageFromResponse(unmarshalledJson, ext.ErrorMessageKeys)
}
lowerErResStr := strings.ToLower(erResStr)
if strings.Contains(lowerErResStr, "<body") && strings.Contains(lowerErResStr, "</body>") {
return getHTMLErrorMessage(erResStr)
}
}
return ""
}

if len(erResStr) == 0 {
return ""
}
return erResStr
// Warehouse related errors
case "internal_processing_failed", "fetching_remote_schema_failed", "exporting_data_failed":
valAsMap, isMap := erRes.(map[string]interface{})
if !isMap {
ext.log.Debugf("Failed while type asserting to map[string]interface{} warehouse error with whKey:%s", key)
return ""
}
return getErrorFromWarehouse(valAsMap)
func (ext *ExtractorHandle) handleKey(key string, value interface{}) *string {
valueStr, isString := value.(string)
if !isString {
ext.log.Debugf("Type assertion failed for key: %v, value: %v", key, value)
return nil
}

switch key {
case "reason":
return &valueStr
case "Error":
return handleError(valueStr)
case responseKey, errorKey:
return ext.handleResponseOrErrorKey(valueStr)
case "internal_processing_failed", "fetching_remote_schema_failed", "exporting_data_failed":
return ext.handleWarehouseError(value, key)
}

return nil
}

func handleError(valueStr string) *string {
if !IsJSON(valueStr) {
firstLine := strings.Split(valueStr, "\n")[0]
return &firstLine
}
return nil
}

func (ext *ExtractorHandle) handleResponseOrErrorKey(valueStr string) *string {
if IsJSON(valueStr) {
var unmarshalledJSON interface{}
if err := json.Unmarshal([]byte(valueStr), &unmarshalledJSON); err != nil {
return &valueStr
}
result := getErrorMessageFromResponse(unmarshalledJSON, ext.ErrorMessageKeys)
return &result
}

return ""
lowerStr := strings.ToLower(valueStr)
if strings.Contains(lowerStr, "<body") && strings.Contains(lowerStr, "</body>") {
result := getHTMLErrorMessage(valueStr)
return &result
}

if len(valueStr) == 0 {
return nil
}
return &valueStr
}

func (ext *ExtractorHandle) handleWarehouseError(value interface{}, key string) *string {
valAsMap, isMap := value.(map[string]interface{})
if !isMap {
ext.log.Debugf("Failed type assertion to map[string]interface{} for warehouse error key: %s", key)
return nil
}
result := getErrorFromWarehouse(valAsMap)
return &result
}

func getHTMLErrorMessage(erResStr string) string {
Expand Down Expand Up @@ -302,10 +326,21 @@ func (ext *ExtractorHandle) CleanUpErrorMessage(errMsg string) string {
return regexdMsg
}

func (ext *ExtractorHandle) GetErrorCode(errorMessage string) string {
// version deprecation logic
func getErrorCodeFromStatTags(statTags map[string]string) string {
var errorCodeParts []string
if len(statTags) > 0 {
if errorCategory, ok := statTags["errorCategory"]; ok {
errorCodeParts = append(errorCodeParts, errorCategory)
}
if errorType, ok := statTags["errorType"]; ok {
errorCodeParts = append(errorCodeParts, errorType)
}
}
return strings.Join(errorCodeParts, ":")
}

func (ext *ExtractorHandle) isVersionDeprecationError(errorMessage string) bool {
var score int
var errorCode string

errorMessage = strings.ToLower(errorMessage)
for keyword, s := range lowercasedDeprecationKeywords {
Expand All @@ -315,7 +350,17 @@ func (ext *ExtractorHandle) GetErrorCode(errorMessage string) string {
}

if score > ext.versionDeprecationThresholdScore.Load() {
errorCode = "deprecation"
return true
}
return errorCode
return false
}

func (ext *ExtractorHandle) GetErrorCode(errorMessage string, statTags map[string]string) string {
if errorCode := getErrorCodeFromStatTags(statTags); errorCode != "" {
return errorCode
}
if ext.isVersionDeprecationError(errorMessage) {
return "deprecation"
}
return ""
}
10 changes: 5 additions & 5 deletions enterprise/reporting/error_reporting.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func (edr *ErrorDetailReporter) Report(ctx context.Context, metrics []*types.PUR
edr.log.Debugn("DestinationId & DestDetail details", obskit.DestinationID(metric.ConnectionDetails.DestinationID), logger.NewField("destinationDetail", destinationDetail))

// extract error-message & error-code
errDets := edr.extractErrorDetails(metric.StatusDetail.SampleResponse)
errDets := edr.extractErrorDetails(metric.StatusDetail)

edr.stats.NewTaggedStat("error_detail_reporting_failures", stats.CountType, stats.Tags{
"errorCode": errDets.ErrorCode,
Expand All @@ -250,7 +250,7 @@ func (edr *ErrorDetailReporter) Report(ctx context.Context, metrics []*types.PUR
workspaceID,
edr.namespace,
edr.instanceID,
metric.ConnectionDetails.SourceDefinitionId,
metric.ConnectionDetails.SourceDefinitionID,
metric.ConnectionDetails.SourceID,
destinationDetail.destinationDefinitionID,
metric.ConnectionDetails.DestinationID,
Expand Down Expand Up @@ -309,10 +309,10 @@ func (edr *ErrorDetailReporter) migrate(c types.SyncerConfig) (*sql.DB, error) {
return dbHandle, nil
}

func (edr *ErrorDetailReporter) extractErrorDetails(sampleResponse string) errorDetails {
errMsg := edr.errorDetailExtractor.GetErrorMessage(sampleResponse)
func (edr *ErrorDetailReporter) extractErrorDetails(statusDetails *types.StatusDetail) errorDetails {
errMsg := edr.errorDetailExtractor.GetErrorMessage(statusDetails.SampleResponse)
cleanedErrMsg := edr.errorDetailExtractor.CleanUpErrorMessage(errMsg)
errorCode := edr.errorDetailExtractor.GetErrorCode(cleanedErrMsg)
errorCode := edr.errorDetailExtractor.GetErrorCode(cleanedErrMsg, statusDetails.StatTags)
return errorDetails{
ErrorMessage: cleanedErrMsg,
ErrorCode: errorCode,
Expand Down
12 changes: 6 additions & 6 deletions enterprise/reporting/reporting.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,10 +230,10 @@ func (r *DefaultReporter) getReports(currentMs int64, syncerKey string) (reports
metricReport := types.ReportByStatus{StatusDetail: &types.StatusDetail{}}
err = rows.Scan(
&metricReport.InstanceDetails.WorkspaceID, &metricReport.InstanceDetails.Namespace, &metricReport.InstanceDetails.InstanceID,
&metricReport.ConnectionDetails.SourceDefinitionId,
&metricReport.ConnectionDetails.SourceDefinitionID,
&metricReport.ConnectionDetails.SourceCategory,
&metricReport.ConnectionDetails.SourceID,
&metricReport.ConnectionDetails.DestinationDefinitionId,
&metricReport.ConnectionDetails.DestinationDefinitionID,
&metricReport.ConnectionDetails.DestinationID,
&metricReport.ConnectionDetails.SourceTaskRunID,
&metricReport.ConnectionDetails.SourceJobID,
Expand Down Expand Up @@ -299,10 +299,10 @@ func (*DefaultReporter) getAggregatedReports(reports []*types.ReportByStatus) []
InstanceID: report.InstanceID,
},
ConnectionDetails: types.ConnectionDetails{
SourceDefinitionId: report.SourceDefinitionId,
SourceDefinitionID: report.SourceDefinitionID,
SourceCategory: report.SourceCategory,
SourceID: report.SourceID,
DestinationDefinitionId: report.DestinationDefinitionId,
DestinationDefinitionID: report.DestinationDefinitionID,
DestinationID: report.DestinationID,
SourceTaskRunID: report.SourceTaskRunID,
SourceJobID: report.SourceJobID,
Expand Down Expand Up @@ -665,10 +665,10 @@ func (r *DefaultReporter) Report(ctx context.Context, metrics []*types.PUReporte

_, err = stmt.Exec(
workspaceID, r.namespace, r.instanceID,
metric.ConnectionDetails.SourceDefinitionId,
metric.ConnectionDetails.SourceDefinitionID,
metric.ConnectionDetails.SourceCategory,
metric.ConnectionDetails.SourceID,
metric.ConnectionDetails.DestinationDefinitionId,
metric.ConnectionDetails.DestinationDefinitionID,
metric.ConnectionDetails.DestinationID,
metric.ConnectionDetails.SourceTaskRunID,
metric.ConnectionDetails.SourceJobID,
Expand Down
87 changes: 63 additions & 24 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1239,7 +1239,7 @@ func (proc *Handle) updateMetricMaps(
sourceCategory: event.Metadata.SourceCategory,
transformationID: event.Metadata.TransformationID,
transformationVersionID: event.Metadata.TransformationVersionID,
trackingPlanID: event.Metadata.TrackingPlanId,
trackingPlanID: event.Metadata.TrackingPlanID,
trackingPlanVersion: event.Metadata.TrackingPlanVersion,
}
}
Expand All @@ -1251,27 +1251,27 @@ func (proc *Handle) updateMetricMaps(
event.Metadata.SourceJobRunID,
event.Metadata.TransformationID,
event.Metadata.TransformationVersionID,
event.Metadata.TrackingPlanId,
event.Metadata.TrackingPlanID,
event.Metadata.TrackingPlanVersion,
status, event.StatusCode,
eventName, eventType,
)

if _, ok := connectionDetailsMap[key]; !ok {
connectionDetailsMap[key] = types.CreateConnectionDetail(
event.Metadata.SourceID,
event.Metadata.DestinationID,
event.Metadata.SourceTaskRunID,
event.Metadata.SourceJobID,
event.Metadata.SourceJobRunID,
event.Metadata.SourceDefinitionID,
event.Metadata.DestinationDefinitionID,
event.Metadata.SourceCategory,
event.Metadata.TransformationID,
event.Metadata.TransformationVersionID,
event.Metadata.TrackingPlanId,
event.Metadata.TrackingPlanVersion,
)
connectionDetailsMap[key] = &types.ConnectionDetails{
SourceID: event.Metadata.SourceID,
SourceTaskRunID: event.Metadata.SourceTaskRunID,
SourceJobID: event.Metadata.SourceJobID,
SourceJobRunID: event.Metadata.SourceJobRunID,
SourceDefinitionID: event.Metadata.SourceDefinitionID,
SourceCategory: event.Metadata.SourceCategory,
DestinationID: event.Metadata.DestinationID,
DestinationDefinitionID: event.Metadata.DestinationDefinitionID,
TransformationID: event.Metadata.TransformationID,
TransformationVersionID: event.Metadata.TransformationVersionID,
TrackingPlanID: event.Metadata.TrackingPlanID,
TrackingPlanVersion: event.Metadata.TrackingPlanVersion,
}
}

if _, ok := statusDetailsMap[key]; !ok {
Expand All @@ -1294,7 +1294,16 @@ func (proc *Handle) updateMetricMaps(

sd, ok := statusDetailsMap[key][sdkey]
if !ok {
sd = types.CreateStatusDetail(status, 0, 0, event.StatusCode, event.Error, payload(), eventName, eventType, ve.Type)
sd = &types.StatusDetail{
Status: status,
StatusCode: event.StatusCode,
SampleResponse: event.Error,
SampleEvent: payload(),
EventName: eventName,
EventType: eventType,
ErrorType: ve.Type,
StatTags: event.StatTags,
}
statusDetailsMap[key][sdkey] = sd
}
sd.ViolationCount += incrementCount
Expand All @@ -1307,7 +1316,15 @@ func (proc *Handle) updateMetricMaps(
sdkey := fmt.Sprintf("%s:%d:%s:%s:%s", status, event.StatusCode, eventName, eventType, "")
sd, ok := statusDetailsMap[key][sdkey]
if !ok {
sd = types.CreateStatusDetail(status, 0, 0, event.StatusCode, event.Error, payload(), eventName, eventType, "")
sd = &types.StatusDetail{
Status: status,
StatusCode: event.StatusCode,
SampleResponse: event.Error,
SampleEvent: payload(),
EventName: eventName,
EventType: eventType,
StatTags: event.StatTags,
}
statusDetailsMap[key][sdkey] = sd
}

Expand Down Expand Up @@ -1564,9 +1581,31 @@ func getDiffMetrics(
if diff != 0 {
metricMetadata := inCountMetadataMap[key]
metric := &types.PUReportedMetric{
ConnectionDetails: *types.CreateConnectionDetail(metricMetadata.sourceID, metricMetadata.destinationID, metricMetadata.sourceTaskRunID, metricMetadata.sourceJobID, metricMetadata.sourceJobRunID, metricMetadata.sourceDefinitionID, metricMetadata.destinationDefinitionID, metricMetadata.sourceCategory, metricMetadata.transformationID, metricMetadata.transformationVersionID, metricMetadata.trackingPlanID, metricMetadata.trackingPlanVersion),
PUDetails: *types.CreatePUDetails(inPU, pu, false, false),
StatusDetail: types.CreateStatusDetail(types.DiffStatus, diff, 0, 0, "", []byte(`{}`), eventName, eventType, ""),
ConnectionDetails: types.ConnectionDetails{
SourceID: metricMetadata.sourceID,
DestinationID: metricMetadata.destinationID,
SourceTaskRunID: metricMetadata.sourceTaskRunID,
SourceJobID: metricMetadata.sourceJobID,
SourceJobRunID: metricMetadata.sourceJobRunID,
SourceDefinitionID: metricMetadata.sourceDefinitionID,
DestinationDefinitionID: metricMetadata.destinationDefinitionID,
SourceCategory: metricMetadata.sourceCategory,
TransformationID: metricMetadata.transformationID,
TransformationVersionID: metricMetadata.transformationVersionID,
TrackingPlanID: metricMetadata.trackingPlanID,
TrackingPlanVersion: metricMetadata.trackingPlanVersion,
},
PUDetails: types.PUDetails{
InPU: inPU,
PU: pu,
},
StatusDetail: &types.StatusDetail{
Status: types.DiffStatus,
Count: diff,
SampleEvent: []byte(`{}`),
EventName: eventName,
EventType: eventType,
},
}
diffMetrics = append(diffMetrics, metric)
statFactory.NewTaggedStat(
Expand Down Expand Up @@ -1930,7 +1969,7 @@ func (proc *Handle) processJobsForDestV2(partition string, subJobs subJob) (*tra
trackingPlanVersion = int(version)
}
}
shallowEventCopy.Metadata.TrackingPlanId = trackingPlanID
shallowEventCopy.Metadata.TrackingPlanID = trackingPlanID
shallowEventCopy.Metadata.TrackingPlanVersion = trackingPlanVersion
shallowEventCopy.Metadata.SourceTpConfig = event.source.DgSourceTrackingPlanConfig.Config
shallowEventCopy.Metadata.MergedTpConfig = event.source.DgSourceTrackingPlanConfig.GetMergedConfig(commonMetadataFromSingularEvent.EventType)
Expand Down Expand Up @@ -2458,7 +2497,7 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) (*trans
trackingPlanVersion = int(version)
}
}
shallowEventCopy.Metadata.TrackingPlanId = trackingPlanID
shallowEventCopy.Metadata.TrackingPlanID = trackingPlanID
shallowEventCopy.Metadata.TrackingPlanVersion = trackingPlanVersion
shallowEventCopy.Metadata.SourceTpConfig = source.DgSourceTrackingPlanConfig.Config
shallowEventCopy.Metadata.MergedTpConfig = source.DgSourceTrackingPlanConfig.GetMergedConfig(commonMetadataFromSingularEvent.EventType)
Expand Down Expand Up @@ -2991,7 +3030,7 @@ func (proc *Handle) transformSrcDest(
sourceCategory: event.Metadata.SourceCategory,
transformationID: event.Metadata.TransformationID,
transformationVersionID: event.Metadata.TransformationVersionID,
trackingPlanID: event.Metadata.TrackingPlanId,
trackingPlanID: event.Metadata.TrackingPlanID,
trackingPlanVersion: event.Metadata.TrackingPlanVersion,
}
}
Expand Down
Loading

0 comments on commit 5a0baac

Please sign in to comment.