Stream Error Handling Best Practices

Kitex StreamX stream error handling best practices, introducing TTHeader Streaming error codes and error handling mechanisms.

Preface

Unlike PingPong RPC, stream errors can occur at any time during stream processing. For example, a server can return an error after sending multiple messages. However, once a stream has sent an error, it cannot send any more messages.

Error Types

Framework Exceptions

Error Description Meaning

[ttstream error, code=12007] [server-side stream] [canceled path: ServiceA] user code invoking stream RPC with context processed by context.WithCancel or context.WithTimeout, then invoking cancel() actively
Error DescriptionMeaningNotes
[ttstream error, code=12007]TTHeader Streaming error, error code 12007, corresponding to the scenario where upstream actively cancels
[server-side stream]Indicates that the error is thrown by the Stream on the server side
[canceled path: ServiceA]Indicates that ServiceA actively initiated cancel
user code invoking stream RPC with context processed by context.WithCancel or context.WithTimeout, then invoking cancel() activelySpecific error description

Error Code Summary

TTHeader Streaming Error Summary

Error CodeError DescriptionMeaningNotes
12001application exceptionBusiness exception, downstream handler returns err
12002unexpected header frameHeader Frame related errors
12003illegal biz errFailed to parse business exception contained in Trailer Frame
12004illegal frameFailed to parse basic information of Frame
12005illegal operationError due to improper Stream usage, such as Stream has been CloseSend but still Send
12006transport is closingConnection exception, such as connection has been closed
12007user code invoking stream RPC with context processed by context.WithCancel or context.WithTimeout, then invoking cancel() activelyUpstream actively uses cancel()
12008user code canceled with cancelCause(error)Upstream uses context.WithCancelCause and actively uses cancel(err)
12009canceled by downstreamCanceled by downstream service
12010canceled by upstreamCanceled by upstream service
12011Internal canceledCascade cancel scenario, such as gRPC handler ctx is canceled, cascade cancel TTHeader Streaming
12012canceled by business handler returningHandler exits early, but there are still asynchronous goroutines using Recv/Send
12013canceled by connection closedStream lifecycle ends due to connection closure, common in server-side service migration/update

Business Exceptions

Usage example: For example, in the ChatGPT scenario, we need to constantly check whether the user account balance can continue to call the large model to generate returns.

Server Implementation:

func (si *streamingService) ServerStreamWithErr(ctx context.Context, req *echo.Request, stream echo.TestService_ServerStreamWithErrServer) error {
    // Check user account balance
    for isHasBalance (req.UserId) {
        stream.Send(ctx, res)
    }
    // Return insufficient user balance error
    bizErr := kerrors.NewBizStatusErrorWithExtra(
        10001, "insufficient user balance", map[string]string{"testKey": "testVal"},
    )
    return bizErr
}

Client Implementation:

stream, err = cli.ServerStreamWithErr(ctx, req)

var err error
for {
    res, err = stream.Recv(stream.Context())
    if err != nil {
         break
    }
}
bizErr, ok := kerrors.FromBizStatusError(err)
if ok {
    println(bizErr.BizStatusCode(), bizErr.BizMessage(), bizErr.BizExtra())
}

Other Errors

If the Error returned by the Server is a non-business exception, the framework will be uniformly encapsulated as (*thrift.ApplicationException). At this time, only the error Message can be obtained.

Server Implementation:

func (si *streamingService) ServerStreamWithErr(ctx context.Context, req *echo.Request, stream echo.TestService_ServerStreamWithErrServer) error {
    // ...
    return errors.New("test error")
}

Client Implementation:

stream, err = cli.ServerStreamWithErr(ctx, req)

var err error
for {
    res, err = stream.Recv(stream.Context())
    if err != nil {
         break
    }
}

ex, ok := err.(*thrift.ApplicationException)
if ok {
     println(ex.TypeID(), ex.Msg())
}