json | yaml |
---|---|
✔ | ✔ |
go get github.com/kitex-contrib/config-file
Local file configuration center adapter, kitex converts the configuration in local files into governance feature configurations of kitex through WithSuite
.
The usage can be divided into two steps:
WithSuite
to introduce configuration monitoring.type FileConfigServerSuite struct {
watcher monitor.ConfigMonitor
}
Function Signature:
func NewSuite(key string, watcher filewatcher.FileWatcher, opts ...utils.Option) *FileConfigServerSuite
Sample code(or visit here):
package main
import (
"context"
"encoding/json"
"log"
"github.com/cloudwego/kitex-examples/kitex_gen/api"
"github.com/cloudwego/kitex-examples/kitex_gen/api/echo"
"github.com/cloudwego/kitex/pkg/klog"
"github.com/cloudwego/kitex/pkg/rpcinfo"
kitexserver "github.com/cloudwego/kitex/server"
"github.com/kitex-contrib/config-file/filewatcher"
"github.com/kitex-contrib/config-file/parser"
fileserver "github.com/kitex-contrib/config-file/server"
)
var _ api.Echo = &EchoImpl{}
const (
filepath = "kitex_server.json"
key = "ServiceName"
serviceName = "ServiceName"
)
// EchoImpl implements the last service interface defined in the IDL.
type EchoImpl struct{}
// Echo implements the Echo interface.
func (s *EchoImpl) Echo(ctx context.Context, req *api.Request) (resp *api.Response, err error) {
klog.Info("echo called")
return &api.Response{Message: req.Message}, nil
}
// customed by user
type MyParser struct{}
// one example for custom parser
// if the type of server config is json or yaml,just using default parser
func (p *MyParser) Decode(kind parser.ConfigType, data []byte, config interface{}) error {
return json.Unmarshal(data, config)
}
func main() {
klog.SetLevel(klog.LevelDebug)
// Create a filewatcher object.
fw, err := filewatcher.NewFileWatcher(filepath)
if err != nil {
panic(err)
}
// Start monitoring file changes.
if err = fw.StartWatching(); err != nil {
panic(err)
}
defer fw.StopWatching()
svr := echo.NewServer(
new(EchoImpl),
kitexserver.WithServerBasicInfo(&rpcinfo.EndpointBasicInfo{ServiceName: serviceName}),
kitexserver.WithSuite(fileserver.NewSuite(key, fw)), // Add watcher
)
if err := svr.Run(); err != nil {
log.Println("server stopped with error:", err)
} else {
log.Println("server stopped")
}
}
type FileConfigClientSuite struct {
watcher monitor.ConfigMonitor
service string
}
Function Signature:
func NewSuite(service, key string, watcher filewatcher.FileWatcher,opts ...utils.Option)*FileConfigClientSuite
Sample code(or visit here):
package main
import (
"context"
"encoding/json"
"log"
"os"
"os/signal"
"time"
"github.com/cloudwego/kitex-examples/kitex_gen/api"
"github.com/cloudwego/kitex-examples/kitex_gen/api/echo"
kitexclient "github.com/cloudwego/kitex/client"
"github.com/cloudwego/kitex/pkg/klog"
fileclient "github.com/kitex-contrib/config-file/client"
"github.com/kitex-contrib/config-file/filewatcher"
"github.com/kitex-contrib/config-file/parser"
)
const (
filepath = "kitex_client.json"
key = "ClientName/ServiceName"
serviceName = "ServiceName"
clientName = "ClientName"
)
// customed by user
type MyParser struct{}
// one example for custom parser
// if the type of client config is json or yaml,just using default parser
func (p *MyParser) Decode(kind parser.ConfigType, data []byte, config interface{}) error {
return json.Unmarshal(data, config)
}
func main() {
klog.SetLevel(klog.LevelDebug)
// Create a file watcher object.
fw, err := filewatcher.NewFileWatcher(filepath)
if err != nil {
panic(err)
}
// Start monitoring file changes.
if err = fw.StartWatching(); err != nil {
panic(err)
}
go func() {
sig := make(chan os.Signal, 1)
signal.Notify(sig, os.Interrupt, os.Kill)
<-sig
fw.StopWatching()
os.Exit(1)
}()
client, err := echo.NewClient(
serviceName,
kitexclient.WithHostPorts("0.0.0.0:8888"),
kitexclient.WithSuite(fileclient.NewSuite(serviceName, key, fw)),
)
if err != nil {
log.Fatal(err)
}
for {
req := &api.Request{Message: "my request"}
resp, err := client.Echo(context.Background(), req)
if err != nil {
klog.Errorf("take request error: %v", err)
} else {
klog.Infof("receive response %v", resp)
}
time.Sleep(time.Second * 10)
}
}
Create a local file watcher
Function Signature:
func NewFileWatcher(filePath string) (FileWatcher, error)
Sample code:
package main
import "github.com/kitex-contrib/config-file/filewatcher"
func main() {
// Create a filewatcher object.
fw, err := filewatcher.NewFileWatcher(filepath)
if err != nil {
panic(err)
}
// Start file monitoring (should be started before importing the Suite).
if err = fw.StartWatching(); err != nil {
panic(err)
}
// Cancel watching when the program exits.
go func() {
sig := make(chan os.Signal, 1)
signal.Notify(sig, os.Interrupt, os.Kill)
<-sig
fw.StopWatching()
os.Exit(1)
}()
}
In the server-side (Server), due to the characteristics of KitexServer, we only need to define defer fw.StopWatching()
Define a custom format parser and pass it in through option
of NewSuite
. The format supports json
and yaml
by default.
Interface Definition:
// ConfigParser the parser for config file.
type ConfigParser interface {
Decode(kind ConfigType, data []byte, config interface{}) error
}
Sample:
Extend parsing YAML types.
// customed by user
type MyParser struct{}
// one example for custom parser
// if the type of client config is json or yaml,just using default parser
func (p *MyParser) Decode(kind parser.ConfigType, data []byte, config interface{}) error {
return yaml.Unmarshal(data, config)
}
const YAML parser.ConfigType = "yaml"
func withParser(o *utils.Options) {
o.Parser = &MyParser{}
o.Params = &parser.ConfigParam{
Type: YAML,
}
}
// passed in with `NewSuite`
// server
svr := echo.NewServer(
new(EchoImpl),
kitexserver.WithServerBasicInfo(&rpcinfo.EndpointBasicInfo{ServiceName: serviceName}),
kitexserver.WithSuite(fileserver.NewSuite(key, fw, withParser)), // add watcher
)
// client
client, err := echo.NewClient(
serviceName,
kitexclient.WithHostPorts("0.0.0.0:8888"),
kitexclient.WithSuite(fileclient.NewSuite(serviceName, key, fw, withParser)),
)
In subsequent examples, we set the service name to ServiceName
and the client name to ClientName
.
Category=limit
Currently, current limiting only supports the server side, so ClientServiceName is empty.
Variable | Introduction |
---|---|
connection_limit | Maximum concurrent connections |
qps_limit | Maximum request number every 100ms |
Example:
{
"ServiceName": {
"limit": {
"connection_limit": 300,
"qps_limit": 200
}
}
}
Note:
ServiceName
Category=retry
Variable | Introduction |
---|---|
type | 0: failure_policy 1: backup_policy |
failure_policy.backoff_policy | Can only be set one of fixed none random |
Example:
key is ClientName/ServiceName
{
"ClientName/ServiceName": {
"retry": {
"*": {
"enable": true,
"type": 0,
"failure_policy": {
"stop_policy": {
"max_retry_times": 3,
"max_duration_ms": 2000,
"cb_policy": {
"error_rate": 0.2
}
}
}
},
"Echo": {
"enable": true,
"type": 1,
"backup_policy": {
"retry_delay_ms": 200,
"stop_policy": {
"max_retry_times": 2,
"max_duration_ms": 1000,
"cb_policy": {
"error_rate": 0.3
}
}
}
}
}
}
}
Note: retry.Container has built-in support for specifying the default configuration using the *
wildcard (see the getRetryer method for details).
Category=rpc_timeout
Example:
key is ClientName/ServiceName
{
"ClientName/ServiceName": {
"timeout": {
"*": {
"conn_timeout_ms": 100,
"rpc_timeout_ms": 2000
},
"Pay": {
"conn_timeout_ms": 50,
"rpc_timeout_ms": 1000
}
}
}
}
Category=circuit_break
Variable | Introduction |
---|---|
min_sample | Minimum statistical sample number |
The echo method uses the following configuration (0.3, 100) and other methods use the global default configuration (0.5, 200)
Example:
key is ClientName/ServiceName
{
"ClientName/ServiceName": {
"circuitbreaker": {
"Echo": {
"enable": true,
"err_rate": 0.3,
"min_sample": 100
}
}
}
}
Note: The circuit breaker implementation of kitex does not currently support changing the global default configuration (see initServiceCB for details).
Refer to example for more usage.
For client configuration, you should write all their configurations in the same pair of $UserServiceName/$ServerServiceName
, for example
{
"ClientName/ServiceName": {
"timeout": {
"*": {
"conn_timeout_ms": 100,
"rpc_timeout_ms": 2000
},
"Pay": {
"conn_timeout_ms": 50,
"rpc_timeout_ms": 1000
}
},
"circuitbreaker": {
"Echo": {
"enable": true,
"err_rate": 0.3,
"min_sample": 100
}
},
"retry": {
"*": {
"enable": true,
"type": 0,
"failure_policy": {
"stop_policy": {
"max_retry_times": 3,
"max_duration_ms": 2000,
"cb_policy": {
"error_rate": 0.2
}
}
}
},
"Echo": {
"enable": true,
"type": 1,
"backup_policy": {
"retry_delay_ms": 200,
"stop_policy": {
"max_retry_times": 2,
"max_duration_ms": 1000,
"cb_policy": {
"error_rate": 0.3
}
}
}
}
}
}
}
The project uses the new features of sync/atomic
added in version 1.19, so the Go version must be >= 1.19