diff --git a/.vscode/launch.json b/.vscode/launch.json index 73988f5b7..52ddd6165 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -5,28 +5,15 @@ "version": "0.2.0", "configurations": [ { - "name": "gate_1", + "name": "gateway_1", "type": "go", "request": "launch", "mode": "debug", - "program": "${workspaceFolder}/services/gate", //配置Go项目启动文件路径,即main函数所在的.go文件的路径,${workspaceRoot}代表项目的根目录,也就是 /bin /pkg /src这三个文件夹所在的目录 - "args": ["-conf","./conf/gate_1.yaml"], + "program": "${workspaceFolder}/services/gateway", //配置Go项目启动文件路径,即main函数所在的.go文件的路径,${workspaceRoot}代表项目的根目录,也就是 /bin /pkg /src这三个文件夹所在的目录 + "args": ["-conf","./conf/gateway_1.yaml"], "cwd": "${workspaceFolder}/bin", //设置工作目录 "internalConsoleOptions": "openOnSessionStart", - "output": "${workspaceFolder}/bin/vsdebug_gate", //设置vscode调试时生成文件的路径 - "showGlobalVariables": true, - "env": {}, //可以用来配置调试启动时所用的环境变量参数,比如gopath临时设置为某个参数就可以在这里指定,如果有多个gopath,用英文冒号:来连接多个gopath - }, - { - "name": "gate_2", - "type": "go", - "request": "launch", - "mode": "debug", - "program": "${workspaceFolder}/services/gate", //配置Go项目启动文件路径,即main函数所在的.go文件的路径,${workspaceRoot}代表项目的根目录,也就是 /bin /pkg /src这三个文件夹所在的目录 - "args": ["-conf","./conf/gate_2.yaml"], - "cwd": "${workspaceFolder}/bin", //设置工作目录 - "internalConsoleOptions": "openOnSessionStart", - "output": "${workspaceFolder}/bin/vsdebug_gate", //设置vscode调试时生成文件的路径 + "output": "${workspaceFolder}/bin/vsdebug_gateway", //设置vscode调试时生成文件的路径 "showGlobalVariables": true, "env": {}, //可以用来配置调试启动时所用的环境变量参数,比如gopath临时设置为某个参数就可以在这里指定,如果有多个gopath,用英文冒号:来连接多个gopath }, diff --git a/bin/conf/gate_2.yaml b/bin/conf/gate_2.yaml deleted file mode 100644 index da7d3f2db..000000000 --- a/bin/conf/gate_2.yaml +++ /dev/null @@ -1,28 +0,0 @@ -id : "gate_2" #服务的唯一id 在集群服务下不能重复即可 建议配置文件名与服务id一致 -ip : "172.0.0.1" #运行主机Ip 集群通信中 按ip节点通行的查询字段 此字段可以注释掉 注释后取当前运行主机公网ip地址 -port : 9568 #服务监听端口 RPC服务 -tag : "demo" #服务集群标签 相同标签 且sys.registry 和 sys.rpc 配置一致 即可互相发现和通信 -type : "dreamfactory" #服务类型 相同服务类型相同 如 多个login 服务 相同服务多开 服务类型一致 -category : "BusinessService" #服务类别 例如 网关服务器 或者 游戏服务器 以及普通业务服务器 -version : 1.0 #服务版本 多服务器相同的服务类型 高版本比低版本拥有更高的访问优先级 使用场景 热更新机制 - -#系统配置 -sys: - log: #日志系统 - FileName: "./log/gate_2.log" #日志文件存放地址 - Loglevel: 0 #日志文件输出级别 - LogMaxSize: 128 #日志文件最大Size - LogMaxAge: 7 #日志文件最多保留天数 - registry: #注册表系统 服务发现 - RegistryType: 0 #服务发现系统 0 Consul 组件 - Consul_Addr: "10.0.0.9:8500" - Consul_RegisterInterval: 5 - Consul_RegisterTTL: 7 - cache: #缓存系统 - Redis_Addr: ["10.0.0.9:9001","10.0.0.9:9002","10.0.0.9:9003","10.0.1.45:9004","10.0.1.45:9005","10.0.1.45:9006"] - Redis_Password: "" - -#模块配置 -modules: - SM_GateModule: - WSAddr: ":7891" diff --git a/bin/conf/gate_1.yaml b/bin/conf/gateway_1.yaml similarity index 63% rename from bin/conf/gate_1.yaml rename to bin/conf/gateway_1.yaml index 54babd44f..ab732c059 100644 --- a/bin/conf/gate_1.yaml +++ b/bin/conf/gateway_1.yaml @@ -1,23 +1,23 @@ -id : "gate_1" #服务的唯一id 在集群服务下不能重复即可 建议配置文件名与服务id一致 -ip : "172.0.0.1" #运行主机Ip 集群通信中 按ip节点通行的查询字段 此字段可以注释掉 注释后取当前运行主机公网ip地址 +id : "gateway_1" #服务的唯一id 在集群服务下不能重复即可 建议配置文件名与服务id一致 +ip : "127.0.0.1" #运行主机Ip 集群通信中 按ip节点通行的查询字段 此字段可以注释掉 注释后取当前运行主机公网ip地址 port : 9567 #服务监听端口 RPC服务 -tag : "demo" #服务集群标签 相同标签 且sys.registry 和 sys.rpc 配置一致 即可互相发现和通信 -type : "dreamfactory" #服务类型 相同服务类型相同 如 多个login 服务 相同服务多开 服务类型一致 +tag : "dreamfactory" #服务集群标签 相同标签 且sys.registry 和 sys.rpc 配置一致 即可互相发现和通信 +type : "gate" #服务类型 相同服务类型相同 如 多个login 服务 相同服务多开 服务类型一致 category : "BusinessService" #服务类别 例如 网关服务器 或者 游戏服务器 以及普通业务服务器 version : 1.0 #服务版本 多服务器相同的服务类型 高版本比低版本拥有更高的访问优先级 使用场景 热更新机制 #系统配置 sys: log: #日志系统 - FileName: "./log/gate_1.log" #日志文件存放地址 + FileName: "./log/gateway_1.log" #日志文件存放地址 Loglevel: 0 #日志文件输出级别 LogMaxSize: 128 #日志文件最大Size LogMaxAge: 7 #日志文件最多保留天数 registry: #注册表系统 服务发现 RegistryType: 0 #服务发现系统 0 Consul 组件 Consul_Addr: "10.0.0.9:8500" - Consul_RegisterInterval: 5 - Consul_RegisterTTL: 7 + Consul_RegisterInterval: 15 + Consul_RegisterTTL: 30 cache: #缓存系统 Redis_Addr: ["10.0.0.9:9001","10.0.0.9:9002","10.0.0.9:9003","10.0.1.45:9004","10.0.1.45:9005","10.0.1.45:9006"] Redis_Password: "" @@ -25,4 +25,4 @@ sys: #模块配置 modules: SM_GateModule: - WSAddr: ":7891" + ListenPort: 7891 diff --git a/bin/conf/worker_1.yaml b/bin/conf/worker_1.yaml index 87dd0d669..3526d41a4 100644 --- a/bin/conf/worker_1.yaml +++ b/bin/conf/worker_1.yaml @@ -1,23 +1,23 @@ -id : "worker_1" #服务的唯一id 在集群服务下不能重复即可 建议配置文件名与服务id一致 -ip : "172.0.0.1" #运行主机Ip 集群通信中 按ip节点通行的查询字段 此字段可以注释掉 注释后取当前运行主机公网ip地址 +id : "worker_2" #服务的唯一id 在集群服务下不能重复即可 建议配置文件名与服务id一致 +ip : "127.0.0.1" #运行主机Ip 集群通信中 按ip节点通行的查询字段 此字段可以注释掉 注释后取当前运行主机公网ip地址 port : 9568 #服务监听端口 RPC服务 -tag : "demo" #服务集群标签 相同标签 且sys.registry 和 sys.rpc 配置一致 即可互相发现和通信 -type : "dreamfactory" #服务类型 相同服务类型相同 如 多个login 服务 相同服务多开 服务类型一致 +tag : "dreamfactory" #服务集群标签 相同标签 且sys.registry 和 sys.rpc 配置一致 即可互相发现和通信 +type : "worker" #服务类型 相同服务类型相同 如 多个login 服务 相同服务多开 服务类型一致 category : "BusinessService" #服务类别 例如 网关服务器 或者 游戏服务器 以及普通业务服务器 version : 1.0 #服务版本 多服务器相同的服务类型 高版本比低版本拥有更高的访问优先级 使用场景 热更新机制 #系统配置 sys: log: #日志系统 - FileName: "./log/worker_1.log" #日志文件存放地址 + FileName: "./log/worker_2.log" #日志文件存放地址 Loglevel: 0 #日志文件输出级别 LogMaxSize: 128 #日志文件最大Size LogMaxAge: 7 #日志文件最多保留天数 registry: #注册表系统 服务发现 RegistryType: 0 #服务发现系统 0 Consul 组件 Consul_Addr: "10.0.0.9:8500" - Consul_RegisterInterval: 5 - Consul_RegisterTTL: 7 + Consul_RegisterInterval: 15 + Consul_RegisterTTL: 30 cache: #缓存系统 Redis_Addr: ["10.0.0.9:9001","10.0.0.9:9002","10.0.0.9:9003","10.0.1.45:9004","10.0.1.45:9005","10.0.1.45:9006"] Redis_Password: "" diff --git a/bin/log/gate_1.log b/bin/log/gate_1.log deleted file mode 100644 index b63e6cc2f..000000000 --- a/bin/log/gate_1.log +++ /dev/null @@ -1,12 +0,0 @@ -2022/05/30 16:23:48.421 info rpcx/service.go:90 Sys log Init success ! -2022/05/30 16:23:48.437 info rpcx/service.go:95 Sys event Init success ! -2022/05/30 16:23:48.438 info rpcx/service.go:100 Sys registry Init success ! -2022/05/30 16:23:48.438 info rpcx/service.go:105 Sys rpcx Init success ! -2022/05/30 16:23:48.442 info services/servicebase.go:20 init sys.cache success! -2022/05/30 16:23:48.443 info cbase/servicebase.go:58 服务[gate_1] 初始化完成! -2022/05/30 16:23:48.443 info cbase/servicebase.go:80 服务[gate_1:1.0.0.0] 启动完成! -2022/05/30 16:23:48.443 debug gate/module.go:32 Module.Gate Init -2022/05/30 16:23:48.443 debug gate/module.go:38 Module.Gate Start -2022/05/30 16:23:48.604 info registry/consul.go:253 发现新的服务【gate_1:1.0.0.0】 -2022/05/30 16:24:01.384 info registry/consul.go:253 发现新的服务【gate_2:1.0.0.0】 -2022/05/30 16:24:08.403 info registry/consul.go:288 丢失服务【gate_2】 diff --git a/bin/log/gate_2.log b/bin/log/gate_2.log deleted file mode 100644 index b989ff581..000000000 --- a/bin/log/gate_2.log +++ /dev/null @@ -1,11 +0,0 @@ -2022/05/30 16:24:01.257 info rpcx/service.go:90 Sys log Init success ! -2022/05/30 16:24:01.273 info rpcx/service.go:95 Sys event Init success ! -2022/05/30 16:24:01.273 info rpcx/service.go:100 Sys registry Init success ! -2022/05/30 16:24:01.273 info rpcx/service.go:105 Sys rpcx Init success ! -2022/05/30 16:24:01.278 info services/servicebase.go:20 init sys.cache success! -2022/05/30 16:24:01.279 info cbase/servicebase.go:58 服务[gate_2] 初始化完成! -2022/05/30 16:24:01.279 info cbase/servicebase.go:80 服务[gate_2:1.0.0.0] 启动完成! -2022/05/30 16:24:01.279 debug gate/module.go:32 Module.Gate Init -2022/05/30 16:24:01.279 debug gate/module.go:38 Module.Gate Start -2022/05/30 16:24:01.280 info registry/consul.go:253 发现新的服务【gate_1:1.0.0.0】 -2022/05/30 16:24:01.386 info registry/consul.go:253 发现新的服务【gate_2:1.0.0.0】 diff --git a/bin/log/worker_1.log b/bin/log/worker_1.log deleted file mode 100644 index ffcb6f8c7..000000000 --- a/bin/log/worker_1.log +++ /dev/null @@ -1,10 +0,0 @@ -2022/05/31 09:47:11.195 info rpcx/service.go:90 Sys log Init success ! -2022/05/31 09:47:11.211 info rpcx/service.go:95 Sys event Init success ! -2022/05/31 09:47:11.211 info rpcx/service.go:100 Sys registry Init success ! -2022/05/31 09:47:11.211 info rpcx/service.go:105 Sys rpcx Init success ! -2022/05/31 09:47:11.216 info services/servicebase.go:20 init sys.cache success! -2022/05/31 09:47:11.216 info cbase/servicebase.go:58 服务[worker_1] 初始化完成! -2022/05/31 09:47:11.216 info cbase/servicebase.go:80 服务[worker_1:1.0.0.0] 启动完成! -2022/05/31 09:47:13.793 debug s_comps/comp_gateroute.go:53 注册用户路由【Login】 -2022/05/31 09:47:13.796 info registry/consul.go:253 发现新的服务【gate_1:1.0.0.0】 -2022/05/31 09:47:13.958 info registry/consul.go:253 发现新的服务【worker_1:1.0.0.0】 diff --git a/comm/core.go b/comm/core.go index 68668c33c..3e6eb90b2 100644 --- a/comm/core.go +++ b/comm/core.go @@ -21,7 +21,7 @@ const ( //Rpc type ISC_GateRouteComp interface { core.IServiceComp - RegisterRoute(methodName string, msg reflect.Type, fn reflect.Method) + RegisterRoute(methodName string, comp reflect.Value, msg reflect.Type, fn reflect.Method) } //用户会话 @@ -31,6 +31,7 @@ type IUserSession interface { GetGatewayServiceId() string SendMsg(ServiceMethod string, msg interface{}) (err error) Close() (err error) + ToString() string } //消息体 diff --git a/comm/usersession.go b/comm/usersession.go index 690bc9ffd..5c259ed60 100644 --- a/comm/usersession.go +++ b/comm/usersession.go @@ -1,6 +1,8 @@ package comm import ( + "fmt" + "github.com/liwei1dao/lego/base" ) @@ -38,3 +40,6 @@ func (this *UserSession) SendMsg(ServiceMethod string, msg interface{}) (err err func (this *UserSession) Close() (err error) { return } +func (this *UserSession) ToString() string { + return fmt.Sprintf("SessionId:%s UserId:%d GatewayServiceId:%s", this.SessionId, this.UserId, this.GatewayServiceId) +} diff --git a/go.mod b/go.mod index f2807f361..30a4e5698 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,8 @@ module go_dreamfactory go 1.18 require ( - github.com/liwei1dao/lego v0.0.0-20220530082438-d1a47a89b5d1 + github.com/gorilla/websocket v1.4.2 + github.com/liwei1dao/lego v0.0.0-20220531033739-03f821663a48 google.golang.org/protobuf v1.28.0 ) @@ -28,6 +29,9 @@ require ( github.com/go-logr/logr v1.2.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-ping/ping v0.0.0-20211130115550-779d1e919534 // indirect + github.com/go-playground/locales v0.14.0 // indirect + github.com/go-playground/universal-translator v0.18.0 // indirect + github.com/go-playground/validator/v10 v10.10.1 // indirect github.com/go-redis/redis/v8 v8.11.5 // indirect github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 // indirect github.com/gogo/protobuf v1.3.2 // indirect @@ -50,6 +54,7 @@ require ( github.com/kavu/go_reuseport v1.5.0 // indirect github.com/klauspost/cpuid/v2 v2.0.12 // indirect github.com/klauspost/reedsolomon v1.9.16 // indirect + github.com/leodido/go-urn v1.2.1 // indirect github.com/lestrrat/go-file-rotatelogs v0.0.0-20180223000712-d3151e2a480f // indirect github.com/lestrrat/go-strftime v0.0.0-20180220042222-ba3bf9c1d042 // indirect github.com/lucas-clemente/quic-go v0.27.0 // indirect @@ -73,8 +78,10 @@ require ( github.com/robfig/cron/v3 v3.0.1 // indirect github.com/rpcxio/libkv v0.5.1-0.20210420120011-1fceaedca8a5 // indirect github.com/rs/cors v1.8.2 // indirect + github.com/rs/xid v1.3.0 // indirect github.com/rubyist/circuitbreaker v2.2.1+incompatible // indirect github.com/samuel/go-zookeeper v0.0.0-20201211165307-7117e9ea2414 // indirect + github.com/satori/go.uuid v1.2.0 // indirect github.com/smallnest/quick v0.0.0-20220103065406-780def6371e6 // indirect github.com/smallnest/rpcx v1.7.4 // indirect github.com/soheilhy/cmux v0.1.5 // indirect @@ -83,6 +90,7 @@ require ( github.com/tinylib/msgp v1.1.6 // indirect github.com/tjfoc/gmsm v1.4.1 // indirect github.com/toolkits/concurrent v0.0.0-20150624120057-a4371d70e3e3 // indirect + github.com/ugorji/go/codec v1.2.7 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/fastrand v1.1.0 // indirect github.com/vmihailenco/msgpack/v5 v5.3.5 // indirect @@ -106,4 +114,4 @@ require ( gopkg.in/yaml.v2 v2.4.0 // indirect ) -// replace github.com/liwei1dao/lego => F:\work\go\lego +replace github.com/liwei1dao/lego => F:\work\go\lego diff --git a/go.sum b/go.sum index 2add18b55..4045c5439 100644 --- a/go.sum +++ b/go.sum @@ -208,9 +208,13 @@ github.com/go-openapi/jsonpointer v0.19.5/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34 github.com/go-openapi/swag v0.19.5/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh66Z9tfKk= github.com/go-ping/ping v0.0.0-20211130115550-779d1e919534 h1:dhy9OQKGBh4zVXbjwbxxHjRxMJtLXj3zfgpBYQaR4Q4= github.com/go-ping/ping v0.0.0-20211130115550-779d1e919534/go.mod h1:xIFjORFzTxqIV/tDVGO4eDy/bLuSyawEeojSm3GfRGk= +github.com/go-playground/assert/v2 v2.0.1 h1:MsBgLAaY856+nPRTKrp3/OZK38U/wa0CcBYNjji3q3A= github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= +github.com/go-playground/locales v0.14.0 h1:u50s323jtVGugKlcYeyzC0etD1HifMjqmJqb8WugfUU= github.com/go-playground/locales v0.14.0/go.mod h1:sawfccIbzZTqEDETgFXqTho0QybSa7l++s0DH+LDiLs= +github.com/go-playground/universal-translator v0.18.0 h1:82dyy6p4OuJq4/CByFNOn/jYrnRPArHwAcmLoJZxyho= github.com/go-playground/universal-translator v0.18.0/go.mod h1:UvRDBj+xPUEGrFYl+lu/H90nyDXpg0fqeB/AQUGNTVA= +github.com/go-playground/validator/v10 v10.10.1 h1:uA0+amWMiglNZKZ9FJRKUAe9U3RX91eVn1JYXMWt7ig= github.com/go-playground/validator/v10 v10.10.1/go.mod h1:i+3WkQ1FvaUjjxh1kSvIA4dMGDBiPU55YFDl0WbKdWU= github.com/go-redis/redis/v8 v8.8.2/go.mod h1:F7resOH5Kdug49Otu24RjHWwgK7u9AmtqWMnCV1iP5Y= github.com/go-redis/redis/v8 v8.11.4/go.mod h1:2Z2wHZXdQpCDXEGzqMockDpNyYvi2l4Pxt6RJr792+w= @@ -349,6 +353,7 @@ github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB7 github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= +github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/grandcat/zeroconf v1.0.0 h1:uHhahLBKqwWBV6WZUDAT71044vwOTL+McW0mBJvo6kE= github.com/grandcat/zeroconf v1.0.0/go.mod h1:lTKmG1zh86XyCoUeIHSA4FJMBwCJiQmGfcP2PdzytEs= @@ -499,6 +504,7 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/labstack/echo/v4 v4.2.1/go.mod h1:AA49e0DZ8kk5jTOOCKNuPR6oTnBS0dYiM4FW1e6jwpg= github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL7NoOu+k= +github.com/leodido/go-urn v1.2.1 h1:BqpAaACuzVSgi/VLzGZIobT2z4v53pjosyNd9Yv6n/w= github.com/leodido/go-urn v1.2.1/go.mod h1:zt4jvISO2HfUBqxjfIshjdMTYS56ZS/qv49ictyFfxY= github.com/lestrrat/go-envload v0.0.0-20180220120943-6ed08b54a570 h1:0iQektZGS248WXmGIYOwRXSQhD4qn3icjMpuxwO7qlo= github.com/lestrrat/go-envload v0.0.0-20180220120943-6ed08b54a570/go.mod h1:BLt8L9ld7wVsvEWQbuLrUZnCMnUmLZ+CGDzKtclrTlE= @@ -510,8 +516,6 @@ github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.9.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/linkedin/goavro/v2 v2.9.8/go.mod h1:UgQUb2N/pmueQYH9bfqFioWxzYCZXSfF8Jw03O5sjqA= github.com/liwei1dao/dm v0.0.0-20211103094420-938edf103cf0/go.mod h1:YH8wwRWv57a88ZbPtflEhwCQDrcm9L9S8wl9y1m8SnQ= -github.com/liwei1dao/lego v0.0.0-20220530082438-d1a47a89b5d1 h1:HqV3CviJUJ965K2PPZMjD0aCyTZb/ytMZQ87lJ25irU= -github.com/liwei1dao/lego v0.0.0-20220530082438-d1a47a89b5d1/go.mod h1:uWnARu9OrAi4qQdPZoG96NtfIaBBAltWIBS72HOdZMA= github.com/lucas-clemente/quic-go v0.24.0/go.mod h1:paZuzjXCE5mj6sikVLMvqXk8lJV2AsqtJ6bDhjEfxx0= github.com/lucas-clemente/quic-go v0.27.0 h1:v6WY87q9zD4dKASbG8hy/LpzAVNzEQzw8sEIeloJsc4= github.com/lucas-clemente/quic-go v0.27.0/go.mod h1:AzgQoPda7N+3IqMMMkywBKggIFo2KT6pfnlrQ2QieeI= @@ -679,6 +683,7 @@ github.com/rpcxio/libkv v0.5.1-0.20210420120011-1fceaedca8a5 h1:oGficf/KJp1y22zT github.com/rpcxio/libkv v0.5.1-0.20210420120011-1fceaedca8a5/go.mod h1:zHGgtLr3cFhGtbalum0BrMPOjhFZFJXCKiws/25ewls= github.com/rs/cors v1.8.2 h1:KCooALfAYGs415Cwu5ABvv9n9509fSiG5SQJn/AQo4U= github.com/rs/cors v1.8.2/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU= +github.com/rs/xid v1.3.0 h1:6NjYksEUlhurdVehpc7S7dk6DAmcKv8V9gG0FsVN2U4= github.com/rs/xid v1.3.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= github.com/rubyist/circuitbreaker v2.2.1+incompatible h1:KUKd/pV8Geg77+8LNDwdow6rVCAYOp8+kHUyFvL6Mhk= github.com/rubyist/circuitbreaker v2.2.1+incompatible/go.mod h1:Ycs3JgJADPuzJDwffe12k6BZT8hxVi6lFK+gWYJLN4A= @@ -689,6 +694,7 @@ github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFo github.com/saintfish/chardet v0.0.0-20120816061221-3af4cd4741ca/go.mod h1:uugorj2VCxiV1x+LzaIdVa9b4S4qGAcH6cbhh4qVxOU= github.com/samuel/go-zookeeper v0.0.0-20201211165307-7117e9ea2414 h1:AJNDS0kP60X8wwWFvbLPwDuojxubj9pbfK7pjHw0vKg= github.com/samuel/go-zookeeper v0.0.0-20201211165307-7117e9ea2414/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E= +github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww= github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 h1:nn5Wsu0esKSJiIVhscUtVbo7ada43DJhG55ua/hjS5I= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= @@ -778,6 +784,7 @@ github.com/toolkits/concurrent v0.0.0-20150624120057-a4371d70e3e3 h1:kF/7m/ZU+0D github.com/toolkits/concurrent v0.0.0-20150624120057-a4371d70e3e3/go.mod h1:QDlpd3qS71vYtakd2hmdpqhJ9nwv6mD6A30bQ1BPBFE= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= github.com/ugorji/go v1.2.7/go.mod h1:nF9osbDWLy6bDVv/Rtoh6QgnvNDpmCalQV5urGCCS6M= +github.com/ugorji/go/codec v1.2.7 h1:YPXUKf7fYbp/y8xloBqZOw2qaVggbfwMlI8WM3wZUJ0= github.com/ugorji/go/codec v1.2.7/go.mod h1:WGN1fab3R1fzQlVQTkfxVtIBhWDRqOviHU95kRgeqEY= github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/XcUArI= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= diff --git a/modules/gate/module.go b/modules/gate/module.go deleted file mode 100644 index 06beaa52c..000000000 --- a/modules/gate/module.go +++ /dev/null @@ -1,40 +0,0 @@ -package gate - -import ( - "go_dreamfactory/comm" - "go_dreamfactory/modules" - - "github.com/liwei1dao/lego/core" - "github.com/liwei1dao/lego/sys/log" -) - -func NewModule() core.IModule { - m := new(Gate) - return m -} - -type Gate struct { - modules.ModuleBase - options *Options -} - -func (this *Gate) GetType() core.M_Modules { - return comm.SM_GateModule -} - -func (this *Gate) NewOptions() (options core.IModuleOptions) { - return new(Options) -} - -func (this *Gate) Init(service core.IService, module core.IModule, options core.IModuleOptions) (err error) { - err = this.ModuleBase.Init(service, module, options) - this.options = options.(*Options) - log.Debugf("Module.Gate Init") - return -} - -func (this *Gate) Start() (err error) { - err = this.ModuleBase.Start() - log.Debugf("Module.Gate Start") - return -} diff --git a/modules/gateway/agent.go b/modules/gateway/agent.go new file mode 100644 index 000000000..00738942c --- /dev/null +++ b/modules/gateway/agent.go @@ -0,0 +1,137 @@ +package gateway + +import ( + "context" + "go_dreamfactory/comm" + "go_dreamfactory/pb" + "sync" + "sync/atomic" + + "github.com/gorilla/websocket" + "github.com/liwei1dao/lego/sys/log" + "github.com/liwei1dao/lego/utils/container/id" + "google.golang.org/protobuf/proto" +) + +func newAgent(gateway IGateway, conn *websocket.Conn) *Agent { + agent := &Agent{ + gateway: gateway, + wsConn: conn, + sessionId: id.NewUUId(), + uId: 0, + writeChan: make(chan *pb.Message, 2), + closeSignal: make(chan bool), + state: 1, + } + agent.wg.Add(2) + go agent.readLoop() + go agent.writeLoop() + return agent +} + +//用户代理 +type Agent struct { + gateway IGateway + wsConn *websocket.Conn + sessionId string + uId uint32 + writeChan chan *pb.Message + closeSignal chan bool + state int32 //状态 0 关闭 1 运行 2 关闭中 + wg sync.WaitGroup +} + +func (this *Agent) readLoop() { + defer this.wg.Done() + var ( + data []byte + msg *pb.Message = &pb.Message{} + err error + ) +locp: + for { + if _, data, err = this.wsConn.ReadMessage(); err != nil { + log.Errorf("agent:%s uId:%d ReadMessage err:%v", this.sessionId, this.uId, err) + go this.Close() + break locp + } + if err = proto.Unmarshal(data, msg); err != nil { + log.Errorf("agent:%s uId:%d Unmarshal err:%v", this.sessionId, this.uId, err) + go this.Close() + break locp + } else { + this.messageDistribution(msg) + } + } + log.Debugf("agent:%s uId:%d readLoop end!", this.sessionId, this.uId) +} + +func (this *Agent) writeLoop() { + defer this.wg.Done() + var ( + data []byte + err error + ) +locp: + for { + select { + case <-this.closeSignal: + break locp + case msg, ok := <-this.writeChan: + if ok { + data, err = proto.Marshal(msg) + if err = this.wsConn.WriteMessage(websocket.BinaryMessage, data); err != nil { + log.Errorf("agent:%s uId:%d WriteMessage err:%v", this.sessionId, this.uId, err) + go this.Close() + } + } else { + go this.Close() + } + } + } + log.Debugf("agent:%s uId:%d writeLoop end!", this.sessionId, this.uId) +} + +func (this *Agent) SessionId() string { + return this.sessionId +} + +func (this *Agent) IP() string { + return this.wsConn.RemoteAddr().String() +} +func (this *Agent) UserId() uint32 { + return this.uId +} +func (this *Agent) WriteMsg(msg *pb.UserMessage) (err error) { + return +} + +//外部代用关闭 +func (this *Agent) Close() { + if !atomic.CompareAndSwapInt32(&this.state, 1, 2) { + return + } + this.wsConn.Close() + this.closeSignal <- true + this.wg.Wait() + atomic.StoreInt32(&this.state, 0) + this.gateway.DisConnect(this) +} + +//分发用户消息 +func (this *Agent) messageDistribution(msg *pb.Message) { + reply := &pb.UserMessageReply{} + log.Debugf("agent:%s uId:%d MessageDistribution msg:%s", this.sessionId, this.uId, msg.Head.ServiceMethod) + if err := this.gateway.Service().RpcCallByType("worker", string(comm.Rpc_GateRoute), context.Background(), &pb.UserMessage{ + Ip: this.IP(), + UserSessionId: this.sessionId, + UserId: this.uId, + GatewayServiceId: this.gateway.Service().GetId(), + Method: msg.Head.ServiceMethod, + Message: msg.Data, + }, reply); err != nil { + log.Errorf("agent:%s uId:%d MessageDistribution err:%v", this.sessionId, this.uId, err) + } else { + log.Debugf("agent:%s uId:%d MessageDistribution reply:%v", this.sessionId, this.uId, reply) + } +} diff --git a/modules/gateway/agentmgr_comp.go b/modules/gateway/agentmgr_comp.go new file mode 100644 index 000000000..8f3542dc3 --- /dev/null +++ b/modules/gateway/agentmgr_comp.go @@ -0,0 +1,25 @@ +package gateway + +import ( + "sync" + + "github.com/liwei1dao/lego/core" + "github.com/liwei1dao/lego/core/cbase" +) + +type AgentMgr_Comp struct { + cbase.ModuleCompBase + agents *sync.Map +} + +func (this *AgentMgr_Comp) Init(service core.IService, module core.IModule, comp core.IModuleComp, options core.IModuleOptions) (err error) { + err = this.ModuleCompBase.Init(service, module, comp, options) + this.agents = new(sync.Map) + return +} +func (this *AgentMgr_Comp) Connect(a IAgent) { + this.agents.Store(a.SessionId(), a) +} +func (this *AgentMgr_Comp) DisConnect(a IAgent) { + this.agents.Delete(a.SessionId()) +} diff --git a/modules/gateway/client_test.go b/modules/gateway/client_test.go new file mode 100644 index 000000000..150ac28f6 --- /dev/null +++ b/modules/gateway/client_test.go @@ -0,0 +1,34 @@ +package gateway_test + +import ( + "fmt" + "go_dreamfactory/pb" + "testing" + "time" + + "github.com/gorilla/websocket" + "google.golang.org/protobuf/proto" +) + +func Test_WebSocket(t *testing.T) { + url := "ws://localhost:7891/gateway" //服务器地址 + ws, _, err := websocket.DefaultDialer.Dial(url, nil) + if err != nil { + fmt.Printf("err:%v", err) + return + } + loginreq := &pb.UserLoginReq{ + Name: "liwei", + } + logindata, _ := proto.Marshal(loginreq) + message := &pb.Message{ + Head: &pb.MessageHead{ServiceMethod: "Login"}, + Data: logindata, + } + data, _ := proto.Marshal(message) + err = ws.WriteMessage(websocket.BinaryMessage, data) + if err != nil { + fmt.Printf("err:%v", err) + } + time.Sleep(time.Second * 2) +} diff --git a/modules/gateway/core.go b/modules/gateway/core.go new file mode 100644 index 000000000..0d662ff16 --- /dev/null +++ b/modules/gateway/core.go @@ -0,0 +1,24 @@ +package gateway + +import ( + "go_dreamfactory/pb" + + "github.com/liwei1dao/lego/base" + "github.com/liwei1dao/lego/core" +) + +type ( + IAgent interface { + SessionId() string + IP() string + UserId() uint32 + WriteMsg(msg *pb.UserMessage) (err error) + Close() //主动关闭接口 + } + IGateway interface { + core.IModule + Service() base.IRPCXService + Connect(a IAgent) + DisConnect(a IAgent) + } +) diff --git a/modules/gateway/module.go b/modules/gateway/module.go new file mode 100644 index 000000000..f2753846c --- /dev/null +++ b/modules/gateway/module.go @@ -0,0 +1,65 @@ +package gateway + +import ( + "go_dreamfactory/comm" + "go_dreamfactory/modules" + + "github.com/liwei1dao/lego/base" + "github.com/liwei1dao/lego/core" + "github.com/liwei1dao/lego/sys/log" +) + +func NewModule() core.IModule { + m := new(Gateway) + return m +} + +type Gateway struct { + modules.ModuleBase + service base.IRPCXService + wsservice_comp *WSService_Comp + agentmgr_comp *AgentMgr_Comp +} + +func (this *Gateway) GetType() core.M_Modules { + return comm.SM_GateModule +} + +func (this *Gateway) NewOptions() (options core.IModuleOptions) { + return new(Options) +} + +func (this *Gateway) Service() base.IRPCXService { + return this.service +} + +func (this *Gateway) Init(service core.IService, module core.IModule, options core.IModuleOptions) (err error) { + err = this.ModuleBase.Init(service, module, options) + this.service = service.(base.IRPCXService) + log.Debugf("Module.Gate Init") + return +} + +func (this *Gateway) Start() (err error) { + err = this.ModuleBase.Start() + log.Debugf("Module.Gate Start") + return +} + +func (this *Gateway) OnInstallComp() { + this.ModuleBase.OnInstallComp() + this.agentmgr_comp = this.RegisterComp(new(AgentMgr_Comp)).(*AgentMgr_Comp) + this.wsservice_comp = this.RegisterComp(new(WSService_Comp)).(*WSService_Comp) +} + +//有新的连接对象进入 +func (this *Gateway) Connect(a IAgent) { + log.Debugf("[Module.Gateway] have new connect:Ip[%s] SessionId:[%s]", a.IP(), a.SessionId()) + this.agentmgr_comp.Connect(a) +} + +//有新的连接对象进入 +func (this *Gateway) DisConnect(a IAgent) { + log.Debugf("[Module.Gateway] have disConnect:Ip[%s] SessionId:[%s] uid:[%d]", a.IP(), a.SessionId(), a.UserId()) + this.agentmgr_comp.DisConnect(a) +} diff --git a/modules/gate/options.go b/modules/gateway/options.go similarity index 88% rename from modules/gate/options.go rename to modules/gateway/options.go index cdcad7f85..2cc769d01 100644 --- a/modules/gate/options.go +++ b/modules/gateway/options.go @@ -1,4 +1,4 @@ -package gate +package gateway import ( "github.com/liwei1dao/lego/utils/mapstructure" @@ -6,7 +6,7 @@ import ( type ( Options struct { - WSAddr string + ListenPort int } ) diff --git a/modules/gateway/wservice_comp.go b/modules/gateway/wservice_comp.go new file mode 100644 index 000000000..d77a9f696 --- /dev/null +++ b/modules/gateway/wservice_comp.go @@ -0,0 +1,46 @@ +package gateway + +import ( + "net/http" + + "github.com/gorilla/websocket" + "github.com/liwei1dao/lego/core" + "github.com/liwei1dao/lego/core/cbase" + "github.com/liwei1dao/lego/sys/gin" + "github.com/liwei1dao/lego/sys/gin/engine" + "github.com/liwei1dao/lego/sys/log" +) + +type WSService_Comp struct { + cbase.ModuleCompBase + options *Options + + module IGateway + gin gin.ISys +} + +func (this *WSService_Comp) Init(service core.IService, module core.IModule, comp core.IModuleComp, options core.IModuleOptions) (err error) { + err = this.ModuleCompBase.Init(service, module, comp, options) + this.options = options.(*Options) + this.module = module.(IGateway) + this.gin, err = gin.NewSys(gin.SetListenPort(this.options.ListenPort)) + this.gin.GET("/gateway", this.ws) + return +} + +func (this *WSService_Comp) ws(c *engine.Context) { + upGrader := websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { + return true + }, + ReadBufferSize: 1024, + WriteBufferSize: 1024, + } + if wsConn, err := upGrader.Upgrade(c.Writer, c.Request, nil); err != nil { + log.Errorf("accept faile client:%s err:%v", c.RemoteIP(), err) + return + } else { + agent := newAgent(this.module, wsConn) + this.module.Connect(agent) + } +} diff --git a/modules/m_comps/gate_comp.go b/modules/m_comps/gate_comp.go index a142ddbd9..5932d0ad5 100644 --- a/modules/m_comps/gate_comp.go +++ b/modules/m_comps/gate_comp.go @@ -86,7 +86,7 @@ func (this *MComp_GateComp) suitableMethods(scomp comm.ISC_GateRouteComp, typ re if returnType := mtype.Out(0); returnType != typeOfError { continue } - scomp.RegisterRoute(mname, argType, method) + scomp.RegisterRoute(mname, reflect.ValueOf(this.comp), replyType, method) } } diff --git a/modules/web/user_comp.go b/modules/web/user_comp.go index 5ef06b9bc..ee6f82943 100644 --- a/modules/web/user_comp.go +++ b/modules/web/user_comp.go @@ -7,6 +7,7 @@ import ( "go_dreamfactory/pb" "github.com/liwei1dao/lego/core" + "github.com/liwei1dao/lego/sys/log" ) type User_Comp struct { @@ -19,6 +20,6 @@ func (this *User_Comp) Init(service core.IService, module core.IModule, comp cor } func (this *User_Comp) Login(ctx context.Context, session comm.IUserSession, rsp *pb.UserLoginReq) error { - + log.Debugf("User_Comp Login: session:%v rsp:%v", session.ToString(), rsp) return nil } diff --git a/pb/comm.pb.go b/pb/comm.pb.go index a449138fd..0afdb6900 100644 --- a/pb/comm.pb.go +++ b/pb/comm.pb.go @@ -20,6 +20,110 @@ const ( _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) ) +//消息体 +type MessageHead struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + ServiceMethod string `protobuf:"bytes,1,opt,name=ServiceMethod,proto3" json:"ServiceMethod,omitempty"` //服务名 +} + +func (x *MessageHead) Reset() { + *x = MessageHead{} + if protoimpl.UnsafeEnabled { + mi := &file_comm_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *MessageHead) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*MessageHead) ProtoMessage() {} + +func (x *MessageHead) ProtoReflect() protoreflect.Message { + mi := &file_comm_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use MessageHead.ProtoReflect.Descriptor instead. +func (*MessageHead) Descriptor() ([]byte, []int) { + return file_comm_proto_rawDescGZIP(), []int{0} +} + +func (x *MessageHead) GetServiceMethod() string { + if x != nil { + return x.ServiceMethod + } + return "" +} + +//处理JSON消息 +type Message struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Head *MessageHead `protobuf:"bytes,1,opt,name=Head,proto3" json:"Head,omitempty"` + Data []byte `protobuf:"bytes,2,opt,name=Data,proto3" json:"Data,omitempty"` +} + +func (x *Message) Reset() { + *x = Message{} + if protoimpl.UnsafeEnabled { + mi := &file_comm_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Message) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Message) ProtoMessage() {} + +func (x *Message) ProtoReflect() protoreflect.Message { + mi := &file_comm_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Message.ProtoReflect.Descriptor instead. +func (*Message) Descriptor() ([]byte, []int) { + return file_comm_proto_rawDescGZIP(), []int{1} +} + +func (x *Message) GetHead() *MessageHead { + if x != nil { + return x.Head + } + return nil +} + +func (x *Message) GetData() []byte { + if x != nil { + return x.Data + } + return nil +} + type UserMessage struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -36,7 +140,7 @@ type UserMessage struct { func (x *UserMessage) Reset() { *x = UserMessage{} if protoimpl.UnsafeEnabled { - mi := &file_comm_proto_msgTypes[0] + mi := &file_comm_proto_msgTypes[2] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -49,7 +153,7 @@ func (x *UserMessage) String() string { func (*UserMessage) ProtoMessage() {} func (x *UserMessage) ProtoReflect() protoreflect.Message { - mi := &file_comm_proto_msgTypes[0] + mi := &file_comm_proto_msgTypes[2] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -62,7 +166,7 @@ func (x *UserMessage) ProtoReflect() protoreflect.Message { // Deprecated: Use UserMessage.ProtoReflect.Descriptor instead. func (*UserMessage) Descriptor() ([]byte, []int) { - return file_comm_proto_rawDescGZIP(), []int{0} + return file_comm_proto_rawDescGZIP(), []int{2} } func (x *UserMessage) GetIp() string { @@ -119,7 +223,7 @@ type UserMessageReply struct { func (x *UserMessageReply) Reset() { *x = UserMessageReply{} if protoimpl.UnsafeEnabled { - mi := &file_comm_proto_msgTypes[1] + mi := &file_comm_proto_msgTypes[3] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -132,7 +236,7 @@ func (x *UserMessageReply) String() string { func (*UserMessageReply) ProtoMessage() {} func (x *UserMessageReply) ProtoReflect() protoreflect.Message { - mi := &file_comm_proto_msgTypes[1] + mi := &file_comm_proto_msgTypes[3] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -145,7 +249,7 @@ func (x *UserMessageReply) ProtoReflect() protoreflect.Message { // Deprecated: Use UserMessageReply.ProtoReflect.Descriptor instead. func (*UserMessageReply) Descriptor() ([]byte, []int) { - return file_comm_proto_rawDescGZIP(), []int{1} + return file_comm_proto_rawDescGZIP(), []int{3} } func (x *UserMessageReply) GetCode() int32 { @@ -165,24 +269,31 @@ func (x *UserMessageReply) GetMsg() string { var File_comm_proto protoreflect.FileDescriptor var file_comm_proto_rawDesc = []byte{ - 0x0a, 0x0a, 0x63, 0x6f, 0x6d, 0x6d, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xb9, 0x01, 0x0a, - 0x0b, 0x55, 0x73, 0x65, 0x72, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x0e, 0x0a, 0x02, - 0x49, 0x70, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x49, 0x70, 0x12, 0x24, 0x0a, 0x0d, - 0x55, 0x73, 0x65, 0x72, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x18, 0x02, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x0d, 0x55, 0x73, 0x65, 0x72, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, - 0x49, 0x64, 0x12, 0x16, 0x0a, 0x06, 0x55, 0x73, 0x65, 0x72, 0x49, 0x64, 0x18, 0x03, 0x20, 0x01, - 0x28, 0x0d, 0x52, 0x06, 0x55, 0x73, 0x65, 0x72, 0x49, 0x64, 0x12, 0x2a, 0x0a, 0x10, 0x47, 0x61, - 0x74, 0x65, 0x77, 0x61, 0x79, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x49, 0x64, 0x18, 0x04, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x10, 0x47, 0x61, 0x74, 0x65, 0x77, 0x61, 0x79, 0x53, 0x65, 0x72, - 0x76, 0x69, 0x63, 0x65, 0x49, 0x64, 0x12, 0x16, 0x0a, 0x06, 0x4d, 0x65, 0x74, 0x68, 0x6f, 0x64, - 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x4d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x12, 0x18, - 0x0a, 0x07, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0c, 0x52, - 0x07, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x38, 0x0a, 0x10, 0x55, 0x73, 0x65, 0x72, - 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x12, 0x0a, 0x04, - 0x43, 0x6f, 0x64, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x04, 0x43, 0x6f, 0x64, 0x65, - 0x12, 0x10, 0x0a, 0x03, 0x4d, 0x73, 0x67, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x4d, - 0x73, 0x67, 0x42, 0x06, 0x5a, 0x04, 0x2e, 0x3b, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x33, + 0x0a, 0x0a, 0x63, 0x6f, 0x6d, 0x6d, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x33, 0x0a, 0x0b, + 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x48, 0x65, 0x61, 0x64, 0x12, 0x24, 0x0a, 0x0d, 0x53, + 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x4d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x0d, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x4d, 0x65, 0x74, 0x68, 0x6f, + 0x64, 0x22, 0x3f, 0x0a, 0x07, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x20, 0x0a, 0x04, + 0x48, 0x65, 0x61, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0c, 0x2e, 0x4d, 0x65, 0x73, + 0x73, 0x61, 0x67, 0x65, 0x48, 0x65, 0x61, 0x64, 0x52, 0x04, 0x48, 0x65, 0x61, 0x64, 0x12, 0x12, + 0x0a, 0x04, 0x44, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x44, 0x61, + 0x74, 0x61, 0x22, 0xb9, 0x01, 0x0a, 0x0b, 0x55, 0x73, 0x65, 0x72, 0x4d, 0x65, 0x73, 0x73, 0x61, + 0x67, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x49, 0x70, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, + 0x49, 0x70, 0x12, 0x24, 0x0a, 0x0d, 0x55, 0x73, 0x65, 0x72, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, + 0x6e, 0x49, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x55, 0x73, 0x65, 0x72, 0x53, + 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x12, 0x16, 0x0a, 0x06, 0x55, 0x73, 0x65, 0x72, + 0x49, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x06, 0x55, 0x73, 0x65, 0x72, 0x49, 0x64, + 0x12, 0x2a, 0x0a, 0x10, 0x47, 0x61, 0x74, 0x65, 0x77, 0x61, 0x79, 0x53, 0x65, 0x72, 0x76, 0x69, + 0x63, 0x65, 0x49, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x10, 0x47, 0x61, 0x74, 0x65, + 0x77, 0x61, 0x79, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x49, 0x64, 0x12, 0x16, 0x0a, 0x06, + 0x4d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x4d, 0x65, + 0x74, 0x68, 0x6f, 0x64, 0x12, 0x18, 0x0a, 0x07, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, + 0x06, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x38, + 0x0a, 0x10, 0x55, 0x73, 0x65, 0x72, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x70, + 0x6c, 0x79, 0x12, 0x12, 0x0a, 0x04, 0x43, 0x6f, 0x64, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, + 0x52, 0x04, 0x43, 0x6f, 0x64, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x4d, 0x73, 0x67, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x03, 0x4d, 0x73, 0x67, 0x42, 0x06, 0x5a, 0x04, 0x2e, 0x3b, 0x70, 0x62, + 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -197,17 +308,20 @@ func file_comm_proto_rawDescGZIP() []byte { return file_comm_proto_rawDescData } -var file_comm_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_comm_proto_msgTypes = make([]protoimpl.MessageInfo, 4) var file_comm_proto_goTypes = []interface{}{ - (*UserMessage)(nil), // 0: UserMessage - (*UserMessageReply)(nil), // 1: UserMessageReply + (*MessageHead)(nil), // 0: MessageHead + (*Message)(nil), // 1: Message + (*UserMessage)(nil), // 2: UserMessage + (*UserMessageReply)(nil), // 3: UserMessageReply } var file_comm_proto_depIdxs = []int32{ - 0, // [0:0] is the sub-list for method output_type - 0, // [0:0] is the sub-list for method input_type - 0, // [0:0] is the sub-list for extension type_name - 0, // [0:0] is the sub-list for extension extendee - 0, // [0:0] is the sub-list for field type_name + 0, // 0: Message.Head:type_name -> MessageHead + 1, // [1:1] is the sub-list for method output_type + 1, // [1:1] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name } func init() { file_comm_proto_init() } @@ -217,7 +331,7 @@ func file_comm_proto_init() { } if !protoimpl.UnsafeEnabled { file_comm_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*UserMessage); i { + switch v := v.(*MessageHead); i { case 0: return &v.state case 1: @@ -229,6 +343,30 @@ func file_comm_proto_init() { } } file_comm_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Message); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_comm_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*UserMessage); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_comm_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*UserMessageReply); i { case 0: return &v.state @@ -247,7 +385,7 @@ func file_comm_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_comm_proto_rawDesc, NumEnums: 0, - NumMessages: 2, + NumMessages: 4, NumExtensions: 0, NumServices: 0, }, diff --git a/pb/proto/comm.proto b/pb/proto/comm.proto index ef7de12f3..0ce5656f8 100644 --- a/pb/proto/comm.proto +++ b/pb/proto/comm.proto @@ -1,6 +1,18 @@ syntax = "proto3"; option go_package = ".;pb"; + +//消息体 +message MessageHead { + string ServiceMethod =1; //服务名 +} + +//处理JSON消息 +message Message { + MessageHead Head =1; + bytes Data = 2; +} + message UserMessage { string Ip = 1; string UserSessionId = 2; diff --git a/services/gate/main.go b/services/gateway/main.go similarity index 76% rename from services/gate/main.go rename to services/gateway/main.go index 0d41e1afb..6fd0e8387 100644 --- a/services/gate/main.go +++ b/services/gateway/main.go @@ -2,7 +2,7 @@ package main import ( "flag" - "go_dreamfactory/modules/gate" + "go_dreamfactory/modules/gateway" "go_dreamfactory/services" "github.com/liwei1dao/lego" @@ -11,7 +11,7 @@ import ( ) var ( - conf = flag.String("conf", "./conf/gate.yaml", "获取需要启动的服务配置文件") //启动服务的Id + conf = flag.String("conf", "./conf/gateway_1.yaml", "获取需要启动的服务配置文件") //启动服务的Id ) func main() { @@ -23,7 +23,7 @@ func main() { s.OnInstallComp( //装备组件 ) lego.Run(s, //运行模块 - gate.NewModule(), + gateway.NewModule(), ) } diff --git a/services/s_comps/comp_gateroute.go b/services/s_comps/comp_gateroute.go index 1f7e6ee6d..4338d9065 100644 --- a/services/s_comps/comp_gateroute.go +++ b/services/s_comps/comp_gateroute.go @@ -20,6 +20,7 @@ func NewGateRouteComp() comm.ISC_GateRouteComp { } type msghandle struct { + rcvr reflect.Value msgType reflect.Type fn reflect.Method } @@ -43,13 +44,13 @@ func (this *SComp_GateRouteComp) Init(service core.IService, comp core.IServiceC } func (this *SComp_GateRouteComp) Start() (err error) { - err = this.ServiceCompBase.Start() this.service.RegisterFunctionName(string(comm.Rpc_GateRoute), this.ReceiveMsg) //注册网关路由接收接口 + err = this.ServiceCompBase.Start() return } //注册路由 -func (this *SComp_GateRouteComp) RegisterRoute(methodName string, msg reflect.Type, fn reflect.Method) { +func (this *SComp_GateRouteComp) RegisterRoute(methodName string, comp reflect.Value, msg reflect.Type, fn reflect.Method) { log.Debugf("注册用户路由【%s】", methodName) if _, ok := this.msghandles[methodName]; ok { log.Errorf("重复 注册网关消息【%s】", methodName) @@ -57,6 +58,7 @@ func (this *SComp_GateRouteComp) RegisterRoute(methodName string, msg reflect.Ty } this.mrlock.Lock() this.msghandles[methodName] = &msghandle{ + rcvr: comp, msgType: msg, fn: fn, } @@ -64,6 +66,7 @@ func (this *SComp_GateRouteComp) RegisterRoute(methodName string, msg reflect.Ty } func (this *SComp_GateRouteComp) ReceiveMsg(ctx context.Context, args *pb.UserMessage, reply *pb.UserMessageReply) error { + log.Debugf("SComp_GateRouteComp ReceiveMsg agent:%s uId:%d MessageDistribution msg:%s", args.UserSessionId, args.UserId, args.Method) this.mrlock.RLock() msghandle, ok := this.msghandles[args.Method] this.mrlock.RUnlock() @@ -74,7 +77,7 @@ func (this *SComp_GateRouteComp) ReceiveMsg(ctx context.Context, args *pb.UserMe log.Errorf("UserMessage:%s Unmarshal err:%v", args.Method, err) return err } - msghandle.fn.Func.Call([]reflect.Value{reflect.ValueOf(ctx), reflect.ValueOf(session), reflect.ValueOf(msg)}) + msghandle.fn.Func.Call([]reflect.Value{msghandle.rcvr, reflect.ValueOf(ctx), reflect.ValueOf(session), reflect.ValueOf(msg)}) } else { reply.Code = int32(core.ErrorCode_ReqParameterError) } diff --git a/services/worker/main.go b/services/worker/main.go index 6fbe4630b..b480fb481 100644 --- a/services/worker/main.go +++ b/services/worker/main.go @@ -25,7 +25,8 @@ func main() { s_comps.NewGateRouteComp(), ) lego.Run(s, //运行模块 - web.NewModule()) + web.NewModule(), + ) }