package rpcx import ( "context" "go_dreamfactory/lego/sys/log" "net" "time" "github.com/rcrowley/go-metrics" "github.com/smallnest/rpcx/client" "github.com/smallnest/rpcx/server" "github.com/smallnest/rpcx/serverplugin" ) func newService(rpcx *RPCX) (s *Service, err error) { s = &Service{ server: server.NewServer(), rpcx: rpcx, // clients: make(map[string]net.Conn), // clientmeta: make(map[string]string), } r := &serverplugin.ConsulRegisterPlugin{ ServiceAddress: "tcp@" + rpcx.options.ServiceAddr, ConsulServers: []string{"10.0.0.9:8500"}, BasePath: rpcx.options.ServiceTag, Metrics: metrics.NewRegistry(), UpdateInterval: time.Minute, } if err = r.Start(); err != nil { return } s.server.Plugins.Add(r) return } type Service struct { rpcx *RPCX server *server.Server selector client.Selector clients map[string]net.Conn clientmeta map[string]string } func (this *Service) Start() (err error) { go func() { if err = this.server.Serve("tcp", this.rpcx.options.ServiceAddr); err != nil { log.Errorf("rpcx server exit!") } }() return } func (this *Service) Stop() (err error) { err = this.server.Close() return } func (this *Service) Register(rcvr interface{}) (err error) { err = this.server.RegisterName(this.rpcx.options.ServiceType, rcvr, this.rpcx.metadata) return } func (this *Service) RegisterFunction(fn interface{}) (err error) { err = this.server.RegisterFunction(this.rpcx.options.ServiceType, fn, this.rpcx.metadata) return } func (this *Service) RegisterFunctionName(name string, fn interface{}) (err error) { err = this.server.RegisterFunctionName(this.rpcx.options.ServiceType, name, fn, this.rpcx.metadata) return } func (this *Service) UnregisterAll() (err error) { err = this.server.UnregisterAll() return } //同步调用 func (this *Service) Call(servicePath string, ctx context.Context, serviceMethod string, args interface{}, reply interface{}) (err error) { // var ( // spath string // clientaddr string // conn net.Conn // ok bool // ) // if servicePath == "" { // err = errors.New("servicePath no cant null") // return // } // spath := strings.Split(servicePath, "/") // ctx = context.WithValue(ctx, share.ReqMetaDataKey, map[string]string{ // CallRoutRulesKey: servicePath, // ServiceAddrKey: "tcp@" + this.options.ServiceAddr, // ServiceMetaKey: this.metadata, // }) // if clientaddr = this.selector.Select(ctx, spath[0], serviceMethod, args); clientaddr == "" { // err = fmt.Errorf("on found routRules:%s", routRules) // return // } // if conn, ok = this.clients[clientaddr]; !ok { // err = fmt.Errorf("on found clientaddr:%s", clientaddr) // return // } // err := this.server.SendMessage(conn, spath[0], serviceMethod, nil, []byte("abcde")){ // } return } //异步调用 func (this *Service) Go(routRules string, ctx context.Context, serviceMethod string, args interface{}, reply interface{}) (err error) { return } //发现服务 func (this *Service) Discovery(addr string, conn net.Conn, meta string) { this.clientmeta[addr] = meta this.clients[addr] = conn this.selector.UpdateServer(this.clientmeta) } // //监听客户端链接到服务上 保存客户端的连接对象 // func (this *Service) PreHandleRequest(ctx context.Context, r *protocol.Message) error { // if smeta, ok := ctx.Value(share.ReqMetaDataKey).(map[string]string)[ServiceAddrKey]; ok { // // log.Errorf("smeta:%s err:%v", smeta, ok) // if node, err := smetaToServiceNode(smeta); err == nil { // if _, ok = this.clientmeta[node.ServiceId]; !ok { // this.clientmeta[node.ServiceId] = smeta // } // } // } // // clientConn := ctx.Value(server.RemoteConnContextKey).(net.Conn) // // fmt.Printf("PreReadRequest RoutRules:%s RemoteAddr:%v \n", RoutRules, clientConn.RemoteAddr().String()) // return nil // }