Skip to content

Commit

Permalink
fix: consume no need reqid
Browse files Browse the repository at this point in the history
  • Loading branch information
zijiren233 committed Nov 23, 2024
1 parent 5a07975 commit 617df5b
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 47 deletions.
6 changes: 3 additions & 3 deletions service/aiproxy/common/balance/sealos.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (s *Sealos) getGroupRemainBalance(ctx context.Context, group string) (float
return decimal.NewFromInt(cache.Balance).Div(decimalBalancePrecision).InexactFloat64(),
newSealosPostGroupConsumer(s.accountURL, group, cache.UserUID, cache.Balance), nil
} else if err != nil && !errors.Is(err, redis.Nil) {
logger.Errorf(ctx, "get group (%s) balance cache failed: %s", group, err)
logger.SysErrorf("get group (%s) balance cache failed: %s", group, err)
}

balance, userUID, err := s.fetchBalanceFromAPI(ctx, group)
Expand All @@ -174,7 +174,7 @@ func (s *Sealos) getGroupRemainBalance(ctx context.Context, group string) (float
}

if err := cacheSetGroupBalance(ctx, group, balance, userUID); err != nil {
logger.Errorf(ctx, "set group (%s) balance cache failed: %s", group, err)
logger.SysErrorf("set group (%s) balance cache failed: %s", group, err)
}

return decimal.NewFromInt(balance).Div(decimalBalancePrecision).InexactFloat64(),
Expand Down Expand Up @@ -237,7 +237,7 @@ func (s *SealosPostGroupConsumer) PostGroupConsume(ctx context.Context, tokenNam
amount := s.calculateAmount(usage)

if err := cacheDecreaseGroupBalance(ctx, s.group, amount.IntPart()); err != nil {
logger.Errorf(ctx, "decrease group (%s) balance cache failed: %s", s.group, err)
logger.SysErrorf("decrease group (%s) balance cache failed: %s", s.group, err)
}

if err := s.postConsume(ctx, amount.IntPart(), tokenName); err != nil {
Expand Down
35 changes: 25 additions & 10 deletions service/aiproxy/relay/controller/audio.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/labring/sealos/service/aiproxy/common"
"github.com/labring/sealos/service/aiproxy/common/balance"
"github.com/labring/sealos/service/aiproxy/common/ctxkey"
"github.com/labring/sealos/service/aiproxy/common/helper"
"github.com/labring/sealos/service/aiproxy/common/logger"
"github.com/labring/sealos/service/aiproxy/relay"
"github.com/labring/sealos/service/aiproxy/relay/adaptor/openai"
Expand Down Expand Up @@ -98,35 +97,51 @@ func RelayAudioHelper(c *gin.Context, relayMode int) *relaymodel.ErrorWithStatus

resp, err := adaptor.DoRequest(c, meta, body)
if err != nil {
logger.Errorf(c, "do request failed: %s", err.Error())
ConsumeWaitGroup.Add(1)
go postConsumeAmount(context.Background(),
&ConsumeWaitGroup,
postGroupConsumer,
http.StatusInternalServerError,
c.Request.URL.Path,
nil, meta, price, completionPrice, err.Error(),
)
return openai.ErrorWrapper(err, "do_request_failed", http.StatusInternalServerError)
}

consumeCtx := context.WithValue(context.Background(), helper.RequestIDKey, c.Value(helper.RequestIDKey))

if resp.StatusCode != http.StatusOK {
if isErrorHappened(meta, resp) {
err := RelayErrorHandler(resp)
ConsumeWaitGroup.Add(1)
go postConsumeAmount(consumeCtx,
go postConsumeAmount(context.Background(),
&ConsumeWaitGroup,
postGroupConsumer,
resp.StatusCode,
c.Request.URL.Path,
&relaymodel.Usage{
PromptTokens: 0,
CompletionTokens: 0,
}, meta, price, completionPrice,
nil,
meta,
price,
completionPrice,
err.String(),
)
return err
}

usage, respErr := adaptor.DoResponse(c, resp, meta)
if respErr != nil {
logger.Errorf(c, "do response failed: %s", respErr)
ConsumeWaitGroup.Add(1)
go postConsumeAmount(context.Background(),
&ConsumeWaitGroup,
postGroupConsumer,
respErr.StatusCode,
c.Request.URL.Path,
nil, meta, price, completionPrice, respErr.String(),
)
return respErr
}

ConsumeWaitGroup.Add(1)
go postConsumeAmount(consumeCtx,
go postConsumeAmount(context.Background(),
&ConsumeWaitGroup,
postGroupConsumer,
resp.StatusCode,
Expand Down
70 changes: 45 additions & 25 deletions service/aiproxy/relay/controller/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ import (
"github.com/gin-gonic/gin"
"github.com/labring/sealos/service/aiproxy/common"
"github.com/labring/sealos/service/aiproxy/common/balance"
"github.com/labring/sealos/service/aiproxy/common/helper"
"github.com/labring/sealos/service/aiproxy/common/logger"
"github.com/labring/sealos/service/aiproxy/model"
"github.com/labring/sealos/service/aiproxy/relay"
"github.com/labring/sealos/service/aiproxy/relay/adaptor/openai"
"github.com/labring/sealos/service/aiproxy/relay/channeltype"
Expand Down Expand Up @@ -166,38 +164,60 @@ func RelayImageHelper(c *gin.Context, _ int) *relaymodel.ErrorWithStatusCode {
resp, err := adaptor.DoRequest(c, meta, requestBody)
if err != nil {
logger.Errorf(ctx, "do request failed: %s", err.Error())
ConsumeWaitGroup.Add(1)
go postConsumeAmount(context.Background(),
&ConsumeWaitGroup,
postGroupConsumer,
http.StatusInternalServerError,
c.Request.URL.Path, nil, meta, imageCostPrice, 0, err.Error(),
)
return openai.ErrorWrapper(err, "do_request_failed", http.StatusInternalServerError)
}

defer func() {
if resp == nil || resp.StatusCode != http.StatusOK {
_ = model.RecordConsumeLog(ctx, meta.Group, resp.StatusCode, meta.ChannelID, imageRequest.N, 0, imageRequest.Model, meta.TokenID, meta.TokenName, 0, imageCostPrice, 0, c.Request.URL.Path, imageRequest.Size)
return
}

consumeCtx := context.WithValue(context.Background(), helper.RequestIDKey, ctx.Value(helper.RequestIDKey))
_amount, err := postGroupConsumer.PostGroupConsume(consumeCtx, meta.TokenName, amount)
if err != nil {
logger.Error(ctx, "error consuming token remain balance: "+err.Error())
err = model.CreateConsumeError(meta.Group, meta.TokenName, imageRequest.Model, err.Error(), amount, meta.TokenID)
if err != nil {
logger.Error(ctx, "failed to create consume error: "+err.Error())
}
} else {
amount = _amount
}
err = model.BatchRecordConsume(consumeCtx, meta.Group, resp.StatusCode, meta.ChannelID, imageRequest.N, 0, imageRequest.Model, meta.TokenID, meta.TokenName, amount, imageCostPrice, 0, c.Request.URL.Path, imageRequest.Size)
if err != nil {
logger.Error(ctx, "failed to record consume log: "+err.Error())
}
}()
if isErrorHappened(meta, resp) {
err := RelayErrorHandler(resp)
ConsumeWaitGroup.Add(1)
go postConsumeAmount(context.Background(),
&ConsumeWaitGroup,
postGroupConsumer,
resp.StatusCode,
c.Request.URL.Path,
nil,
meta,
imageCostPrice,
0,
err.String(),
)
return err
}

// do response
_, respErr := adaptor.DoResponse(c, resp, meta)
if respErr != nil {
logger.Errorf(ctx, "respErr is not nil: %+v", respErr)
logger.Errorf(ctx, "do response failed: %s", respErr)
ConsumeWaitGroup.Add(1)
go postConsumeAmount(context.Background(),
&ConsumeWaitGroup,
postGroupConsumer,
respErr.StatusCode,
c.Request.URL.Path,
nil,
meta,
imageCostPrice,
0,
respErr.String(),
)
return respErr
}

ConsumeWaitGroup.Add(1)
go postConsumeAmount(context.Background(),
&ConsumeWaitGroup,
postGroupConsumer,
resp.StatusCode,
c.Request.URL.Path,
nil, meta, imageCostPrice, 0, imageRequest.Size,
)

return nil
}
25 changes: 16 additions & 9 deletions service/aiproxy/relay/controller/text.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

"github.com/gin-gonic/gin"
json "github.com/json-iterator/go"
"github.com/labring/sealos/service/aiproxy/common/helper"
"github.com/labring/sealos/service/aiproxy/common/logger"
"github.com/labring/sealos/service/aiproxy/relay"
"github.com/labring/sealos/service/aiproxy/relay/adaptor"
Expand Down Expand Up @@ -47,7 +46,7 @@ func RelayTextHelper(c *gin.Context) *model.ErrorWithStatusCode {
// pre-consume balance
promptTokens := getPromptTokens(textRequest, meta.Mode)
meta.PromptTokens = promptTokens
ok, postGroupConsume, err := preCheckGroupBalance(ctx, textRequest, promptTokens, price, meta)
ok, postGroupConsumer, err := preCheckGroupBalance(ctx, textRequest, promptTokens, price, meta)
if err != nil {
logger.Errorf(ctx, "get group (%s) balance failed: %s", meta.Group, err)
return openai.ErrorWrapper(
Expand Down Expand Up @@ -78,15 +77,23 @@ func RelayTextHelper(c *gin.Context) *model.ErrorWithStatusCode {
resp, err := adaptor.DoRequest(c, meta, requestBody)
if err != nil {
logger.Errorf(ctx, "do request failed: %s", err.Error())
ConsumeWaitGroup.Add(1)
go postConsumeAmount(context.Background(),
&ConsumeWaitGroup,
postGroupConsumer,
http.StatusInternalServerError,
c.Request.URL.Path,
nil, meta, price, completionPrice, err.Error(),
)
return openai.ErrorWrapper(err, "do_request_failed", http.StatusInternalServerError)
}
consumeCtx := context.WithValue(context.Background(), helper.RequestIDKey, ctx.Value(helper.RequestIDKey))

if isErrorHappened(meta, resp) {
err := RelayErrorHandler(resp)
ConsumeWaitGroup.Add(1)
go postConsumeAmount(consumeCtx,
go postConsumeAmount(context.Background(),
&ConsumeWaitGroup,
postGroupConsume,
postGroupConsumer,
resp.StatusCode,
c.Request.URL.Path,
nil,
Expand All @@ -103,9 +110,9 @@ func RelayTextHelper(c *gin.Context) *model.ErrorWithStatusCode {
if respErr != nil {
logger.Errorf(ctx, "do response failed: %s", respErr)
ConsumeWaitGroup.Add(1)
go postConsumeAmount(consumeCtx,
go postConsumeAmount(context.Background(),
&ConsumeWaitGroup,
postGroupConsume,
postGroupConsumer,
respErr.StatusCode,
c.Request.URL.Path,
usage,
Expand All @@ -118,9 +125,9 @@ func RelayTextHelper(c *gin.Context) *model.ErrorWithStatusCode {
}
// post-consume amount
ConsumeWaitGroup.Add(1)
go postConsumeAmount(consumeCtx,
go postConsumeAmount(context.Background(),
&ConsumeWaitGroup,
postGroupConsume,
postGroupConsumer,
resp.StatusCode,
c.Request.URL.Path,
usage, meta, price, completionPrice, "",
Expand Down

0 comments on commit 617df5b

Please sign in to comment.