Rate limiting is an imperative technique to protect server, which prevents server from overloaded by sudden traffic increase from a client.
Kitex supports the user-defined QPS limiter and connections limiter, and provides default implementation.
server.WithLimit
and server.WithQPSLimiter
or server.WithLimit
and server.WithConnectionLimiter
are used at the same time, only the latter will take effect.OnRead
hook, while in multiplexing or user-defined QPS limiter scenarios, the current limiter takes effect at the OnMessage
hook. This is to ensure that the user-defined QPS limiter can obtain the basic information of the request such as rpc method.WithGRPCInitialWindowSize
and WithGRPCInitialConnWindowSize
to set the flow control window size of stream and connection respectively. For details, see gRPC official documentationimport "github.com/cloudwego/kitex/pkg/limit"
func main() {
svr := xxxservice.NewServer(handler, server.WithLimit(&limit.Option{MaxConnections: 10000, MaxQPS: 1000}))
svr.Run()
}
Parameter description:
MaxConnections
: max connections
MaxQPS
: max QPS (Queries Per Second)
UpdateControl
: provide the ability to modify the rate limit threshold dynamically, for example:
import "github.com/cloudwego/kitex/pkg/limit"
// define your limiter updater to update limit threshold
type MyLimiterUpdater struct {
updater limit.Updater
}
func (lu *MyLimiterUpdater) YourChange() {
// your logic: set new option as needed
newOpt := &limit.Option{
MaxConnections: 20000,
MaxQPS: 2000,
}
// update limit config
isUpdated := lu.updater.UpdateLimit(newOpt)
// your logic
}
func (lu *MyLimiterUpdater) UpdateControl(u limit.Updater) {
lu.updater = u
}
//--- init server ---
var lu = MyLimiterUpdater{}
svr := xxxservice.NewServer(handler, server.WithLimit(&limit.Option{MaxConnections: 10000, MaxQPS: 1000, UpdateControl: lu.UpdateControl}))
The default ConcurrencyLimiter and RateLimiter are used respectively to limit max connection and max QPS.
The default limiters define the LimitReporter
interface, which is used by rate limiting status monitoring, e.g. connection overloaded, QPS overloaded, etc.
Users may implement this interface and inject this implementation by WithLimitReporter
if required.
// LimitReporter is the interface define to report(metric or print log) when limit happen
type LimitReporter interface {
ConnOverloadReport()
QPSOverloadReport()
}
import (
"context"
"time"
"github.com/cloudwego/kitex/pkg/limiter"
"github.com/cloudwego/kitex/pkg/rpcinfo"
"github.com/cloudwego/kitex/server"
)
type qpsLimiter struct{}
func (l *qpsLimiter) Acquire(ctx context.Context) bool {
ri := rpcinfo.GetRPCInfo(ctx)
md := ri.From().Method()
return acquire(md) // return true to allow this request
}
func (l *qpsLimiter) Status(ctx context.Context) (max, current int, interval time.Duration) {
// max: the maximum number of requests allowed in the interval;
// current: the remaining number of requests allowed in the interval;
return
}
type connectionLimiter struct{}
func (l *connectionLimiter) Acquire(ctx context.Context) bool {
ri := rpcinfo.GetRPCInfo(ctx)
addr := ri.From().Address()
return acquire(addr) // return true to allow this connection
}
func (l *connectionLimiter) Release(ctx context.Context) {
ri := rpcinfo.GetRPCInfo(ctx)
addr := ri.From().Address()
return release(addr) // release occupied resource by the connection, only called after the release is successful.
}
func (l *connectionLimiter) Status(ctx context.Context) (limit, occupied int) {
// limit: the maximum number of connections allowed.
// occupied: the number of existing connections.
return
}
func main() {
myQPSLimiter := &qpsLimiter{}
myConnectionLimiter := &connectionLimiter{}
svr := xxxservice.NewServer(handler, server.WithQPSLimiter(myQPSLimiter), server.WithConnectionLimiter(myConnectionLimiter))
svr.Run()
}