上传rpc 客户端 发现节点自动连接握手协议

This commit is contained in:
liwei1dao 2022-06-20 11:37:49 +08:00
parent 3f9a16e01e
commit 92fb2df51a
5 changed files with 70 additions and 17 deletions

View File

@ -24,6 +24,7 @@ func newClient(options Options) (sys *Client, err error) {
metadata: fmt.Sprintf("stype=%s&sid=%s&version=%s&addr=%s", options.ServiceType, options.ServiceId, options.ServiceVersion, "tcp@"+options.ServiceAddr), metadata: fmt.Sprintf("stype=%s&sid=%s&version=%s&addr=%s", options.ServiceType, options.ServiceId, options.ServiceVersion, "tcp@"+options.ServiceAddr),
clients: make(map[string]client.XClient), clients: make(map[string]client.XClient),
conns: make(map[string]net.Conn), conns: make(map[string]net.Conn),
connecting: make(map[string]struct{}),
serviceMap: make(map[string]*service), serviceMap: make(map[string]*service),
msgChan: make(chan *protocol.Message, 1000), msgChan: make(chan *protocol.Message, 1000),
} }
@ -38,6 +39,8 @@ type Client struct {
clients map[string]client.XClient clients map[string]client.XClient
connsMapMu sync.RWMutex connsMapMu sync.RWMutex
conns map[string]net.Conn conns map[string]net.Conn
connectMapMu sync.RWMutex
connecting map[string]struct{}
serviceMapMu sync.RWMutex serviceMapMu sync.RWMutex
serviceMap map[string]*service serviceMap map[string]*service
msgChan chan *protocol.Message // 接收rpcXServer推送消息 msgChan chan *protocol.Message // 接收rpcXServer推送消息
@ -121,9 +124,12 @@ func (this *Client) Call(ctx context.Context, servicePath string, serviceMethod
} }
c = client.NewBidirectionalXClient(spath[0], client.Failfast, client.RandomSelect, d, client.DefaultOption, this.msgChan) c = client.NewBidirectionalXClient(spath[0], client.Failfast, client.RandomSelect, d, client.DefaultOption, this.msgChan)
c.GetPlugins().Add(this) c.GetPlugins().Add(this)
c.SetSelector(newSelector()) if this.options.RpcxStartType == RpcxStartByClient && this.options.AutoConnect {
c.SetSelector(newSelector(this.UpdateServer))
} else {
c.SetSelector(newSelector(nil))
}
this.clients[spath[0]] = c this.clients[spath[0]] = c
} }
ctx = context.WithValue(ctx, share.ReqMetaDataKey, map[string]string{ ctx = context.WithValue(ctx, share.ReqMetaDataKey, map[string]string{
CallRoutRulesKey: servicePath, CallRoutRulesKey: servicePath,
@ -154,7 +160,7 @@ func (this *Client) Go(ctx context.Context, servicePath string, serviceMethod st
} }
c = client.NewXClient(spath[0], client.Failfast, client.RandomSelect, d, client.DefaultOption) c = client.NewXClient(spath[0], client.Failfast, client.RandomSelect, d, client.DefaultOption)
c.GetPlugins().Add(this) c.GetPlugins().Add(this)
c.SetSelector(newSelector()) c.SetSelector(newSelector(this.UpdateServer))
this.clients[spath[0]] = c this.clients[spath[0]] = c
} }
ctx = context.WithValue(ctx, share.ReqMetaDataKey, map[string]string{ ctx = context.WithValue(ctx, share.ReqMetaDataKey, map[string]string{
@ -165,12 +171,45 @@ func (this *Client) Go(ctx context.Context, servicePath string, serviceMethod st
return c.Go(ctx, string(serviceMethod), args, reply, done) return c.Go(ctx, string(serviceMethod), args, reply, done)
} }
//监控服务发现,发现没有连接上的额服务端 就连接上去
func (this *Client) UpdateServer(servers map[string]*ServiceNode) {
for _, v := range servers {
this.connsMapMu.RLock()
_, ok := this.conns[v.ServiceAddr]
this.connsMapMu.RUnlock()
if !ok {
this.connectMapMu.RLock()
_, ok := this.connecting[v.ServiceAddr]
this.connectMapMu.RUnlock()
if !ok {
this.connectMapMu.Lock()
this.connecting[v.ServiceAddr] = struct{}{}
this.connectMapMu.Unlock()
if err := this.Call(context.Background(), fmt.Sprintf("%s/%s", v.ServiceType, v.ServiceId), RpcX_ShakeHands, &ServiceNode{
ServiceId: this.options.ServiceId,
ServiceType: this.options.ServiceType,
ServiceAddr: this.options.ServiceAddr},
&ServiceNode{}); err != nil {
this.Errorf("ShakeHands new node addr:%s err:%v", v.ServiceAddr, err)
this.connectMapMu.Lock()
delete(this.connecting, v.ServiceAddr)
this.connectMapMu.Unlock()
}
}
}
}
}
//监控连接建立 //监控连接建立
func (this *Client) ClientConnected(conn net.Conn) (net.Conn, error) { func (this *Client) ClientConnected(conn net.Conn) (net.Conn, error) {
addr := "tcp@" + conn.RemoteAddr().String() addr := "tcp@" + conn.RemoteAddr().String()
this.connsMapMu.Lock() this.connsMapMu.Lock()
this.conns[addr] = conn this.conns[addr] = conn
this.connsMapMu.Unlock() this.connsMapMu.Unlock()
this.connectMapMu.Lock()
delete(this.connecting, addr)
this.connectMapMu.Unlock()
this.Errorf("ClientConnected addr:%v", addr)
return conn, nil return conn, nil
} }

View File

@ -2,6 +2,7 @@ package rpcx
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"net/url" "net/url"
@ -14,11 +15,12 @@ const (
CallRoutRulesKey = "callrules" CallRoutRulesKey = "callrules"
) )
const RpcX_ShakeHands = "RpcX_ShakeHands" //握手
type ( type (
ISys interface { ISys interface {
Start() (err error) Start() (err error)
Stop() (err error) Stop() (err error)
// Register(rcvr interface{}) (err error)
RegisterFunction(fn interface{}) (err error) RegisterFunction(fn interface{}) (err error)
RegisterFunctionName(name string, fn interface{}) (err error) RegisterFunctionName(name string, fn interface{}) (err error)
UnregisterAll() (err error) UnregisterAll() (err error)
@ -57,9 +59,6 @@ func Stop() (err error) {
return defsys.Stop() return defsys.Stop()
} }
// func Register(rcvr interface{}) (err error) {
// return defsys.Register(rcvr)
// }
func RegisterFunction(fn interface{}) (err error) { func RegisterFunction(fn interface{}) (err error) {
return defsys.RegisterFunction(fn) return defsys.RegisterFunction(fn)
} }
@ -82,7 +81,7 @@ func Go(ctx context.Context, servicePath string, serviceMethod string, args inte
//服务元数据转服务节点信息 //服务元数据转服务节点信息
func smetaToServiceNode(meta string) (node *ServiceNode, err error) { func smetaToServiceNode(meta string) (node *ServiceNode, err error) {
if meta == "" { if meta == "" {
fmt.Errorf("meta is nill") err = errors.New("meta is nill")
return return
} }
node = &ServiceNode{} node = &ServiceNode{}

View File

@ -25,6 +25,7 @@ type Options struct {
ServiceAddr string //服务地址 ServiceAddr string //服务地址
ConsulServers []string //Consul集群服务地址 ConsulServers []string //Consul集群服务地址
RpcxStartType RpcxStartType //Rpcx启动类型 RpcxStartType RpcxStartType //Rpcx启动类型
AutoConnect bool //自动连接 客户端启动模式下 主动连接发现的节点服务器
SerializeType protocol.SerializeType SerializeType protocol.SerializeType
Debug bool //日志是否开启 Debug bool //日志是否开启
Log log.ILog Log log.ILog
@ -86,6 +87,7 @@ func SetLog(v log.ILog) Option {
func newOptions(config map[string]interface{}, opts ...Option) (Options, error) { func newOptions(config map[string]interface{}, opts ...Option) (Options, error) {
options := Options{ options := Options{
AutoConnect: true,
SerializeType: protocol.MsgPack, SerializeType: protocol.MsgPack,
Debug: true, Debug: true,
Log: log.Clone(log.SetLoglayer(2)), Log: log.Clone(log.SetLoglayer(2)),
@ -104,6 +106,7 @@ func newOptions(config map[string]interface{}, opts ...Option) (Options, error)
func newOptionsByOption(opts ...Option) (Options, error) { func newOptionsByOption(opts ...Option) (Options, error) {
options := Options{ options := Options{
AutoConnect: true,
SerializeType: protocol.MsgPack, SerializeType: protocol.MsgPack,
Debug: true, Debug: true,
Log: log.Clone(log.SetLoglayer(2)), Log: log.Clone(log.SetLoglayer(2)),

View File

@ -14,8 +14,9 @@ var rex_nogather = regexp.MustCompile(`\!\[([^)]+)\]`)
var rex_noid = regexp.MustCompile(`\!([^)]+)`) var rex_noid = regexp.MustCompile(`\!([^)]+)`)
var rex_gather = regexp.MustCompile(`\[([^)]+)\]`) var rex_gather = regexp.MustCompile(`\[([^)]+)\]`)
func newSelector() *Selector { func newSelector(fn func(map[string]*ServiceNode)) *Selector {
return &Selector{ return &Selector{
updateServerEvent: fn,
servers: make(map[string]*ServiceNode), servers: make(map[string]*ServiceNode),
serversType: make(map[string][]*ServiceNode), serversType: make(map[string][]*ServiceNode),
i: make(map[string]int), i: make(map[string]int),
@ -30,6 +31,7 @@ type ServiceNode struct {
} }
type Selector struct { type Selector struct {
updateServerEvent func(map[string]*ServiceNode)
servers map[string]*ServiceNode servers map[string]*ServiceNode
serversType map[string][]*ServiceNode serversType map[string][]*ServiceNode
i map[string]int i map[string]int
@ -86,6 +88,9 @@ func (this *Selector) UpdateServer(servers map[string]string) {
} }
this.servers = ss this.servers = ss
this.serversType = sst this.serversType = sst
if this.updateServerEvent != nil {
go this.updateServerEvent(ss)
}
} }
//路由规则解析 //路由规则解析

View File

@ -23,7 +23,7 @@ func newService(options Options) (sys *Service, err error) {
options: options, options: options,
metadata: fmt.Sprintf("stype=%s&sid=%s&version=%s&addr=%s", options.ServiceType, options.ServiceId, options.ServiceVersion, "tcp@"+options.ServiceAddr), metadata: fmt.Sprintf("stype=%s&sid=%s&version=%s&addr=%s", options.ServiceType, options.ServiceId, options.ServiceVersion, "tcp@"+options.ServiceAddr),
server: server.NewServer(), server: server.NewServer(),
selector: newSelector(), selector: newSelector(nil),
clients: make(map[string]net.Conn), clients: make(map[string]net.Conn),
clientmeta: make(map[string]string), clientmeta: make(map[string]string),
pending: make(map[uint64]*client.Call), pending: make(map[uint64]*client.Call),
@ -41,6 +41,7 @@ func newService(options Options) (sys *Service, err error) {
} }
sys.server.Plugins.Add(r) sys.server.Plugins.Add(r)
sys.server.Plugins.Add(sys) sys.server.Plugins.Add(sys)
sys.RegisterFunctionName(RpcX_ShakeHands, sys.RpcxShakeHands) //注册握手函数
return return
} }
@ -148,7 +149,7 @@ func (this *Service) PreHandleRequest(ctx context.Context, r *protocol.Message)
} }
this.clientmutex.Unlock() this.clientmutex.Unlock()
this.selector.UpdateServer(servers) this.selector.UpdateServer(servers)
this.Debugf("PreReadRequest addr:%s smeta:%s \n", addr, smeta) this.Debugf("fond new node addr:%s smeta:%s \n", addr, smeta)
} }
} }
} }
@ -203,6 +204,12 @@ func (this *Service) PostReadRequest(ctx context.Context, r *protocol.Message, e
return nil return nil
} }
//Rpcx握手默认提供给client连接使用
func (this *Service) RpcxShakeHands(ctx context.Context, args *ServiceNode, reply *ServiceNode) error {
// this.Debugf("RpcxShakeHands:%+v", ctx.Value(share.ReqMetaDataKey).(map[string]string))
return nil
}
///日志*********************************************************************** ///日志***********************************************************************
func (this *Service) Debug() bool { func (this *Service) Debug() bool {
return this.options.Debug return this.options.Debug