上传消息分发处理

This commit is contained in:
liwei1dao 2022-05-31 16:35:44 +08:00
parent 834e9b4221
commit 670342ec91
26 changed files with 576 additions and 183 deletions

21
.vscode/launch.json vendored
View File

@ -5,28 +5,15 @@
"version": "0.2.0",
"configurations": [
{
"name": "gate_1",
"name": "gateway_1",
"type": "go",
"request": "launch",
"mode": "debug",
"program": "${workspaceFolder}/services/gate", //Gomain.go,${workspaceRoot}, /bin /pkg /src
"args": ["-conf","./conf/gate_1.yaml"],
"program": "${workspaceFolder}/services/gateway", //Gomain.go,${workspaceRoot}, /bin /pkg /src
"args": ["-conf","./conf/gateway_1.yaml"],
"cwd": "${workspaceFolder}/bin", //
"internalConsoleOptions": "openOnSessionStart",
"output": "${workspaceFolder}/bin/vsdebug_gate", //vscode
"showGlobalVariables": true,
"env": {}, //,gopathgopath:gopath
},
{
"name": "gate_2",
"type": "go",
"request": "launch",
"mode": "debug",
"program": "${workspaceFolder}/services/gate", //Gomain.go,${workspaceRoot}, /bin /pkg /src
"args": ["-conf","./conf/gate_2.yaml"],
"cwd": "${workspaceFolder}/bin", //
"internalConsoleOptions": "openOnSessionStart",
"output": "${workspaceFolder}/bin/vsdebug_gate", //vscode
"output": "${workspaceFolder}/bin/vsdebug_gateway", //vscode
"showGlobalVariables": true,
"env": {}, //,gopathgopath:gopath
},

View File

@ -1,28 +0,0 @@
id : "gate_2" #服务的唯一id 在集群服务下不能重复即可 建议配置文件名与服务id一致
ip : "172.0.0.1" #运行主机Ip 集群通信中 按ip节点通行的查询字段 此字段可以注释掉 注释后取当前运行主机公网ip地址
port : 9568 #服务监听端口 RPC服务
tag : "demo" #服务集群标签 相同标签 且sys.registry 和 sys.rpc 配置一致 即可互相发现和通信
type : "dreamfactory" #服务类型 相同服务类型相同 如 多个login 服务 相同服务多开 服务类型一致
category : "BusinessService" #服务类别 例如 网关服务器 或者 游戏服务器 以及普通业务服务器
version : 1.0 #服务版本 多服务器相同的服务类型 高版本比低版本拥有更高的访问优先级 使用场景 热更新机制
#系统配置
sys:
log: #日志系统
FileName: "./log/gate_2.log" #日志文件存放地址
Loglevel: 0 #日志文件输出级别
LogMaxSize: 128 #日志文件最大Size
LogMaxAge: 7 #日志文件最多保留天数
registry: #注册表系统 服务发现
RegistryType: 0 #服务发现系统 0 Consul 组件
Consul_Addr: "10.0.0.9:8500"
Consul_RegisterInterval: 5
Consul_RegisterTTL: 7
cache: #缓存系统
Redis_Addr: ["10.0.0.9:9001","10.0.0.9:9002","10.0.0.9:9003","10.0.1.45:9004","10.0.1.45:9005","10.0.1.45:9006"]
Redis_Password: ""
#模块配置
modules:
SM_GateModule:
WSAddr: ":7891"

View File

@ -1,23 +1,23 @@
id : "gate_1" #服务的唯一id 在集群服务下不能重复即可 建议配置文件名与服务id一致
ip : "172.0.0.1" #运行主机Ip 集群通信中 按ip节点通行的查询字段 此字段可以注释掉 注释后取当前运行主机公网ip地址
id : "gateway_1" #服务的唯一id 在集群服务下不能重复即可 建议配置文件名与服务id一致
ip : "127.0.0.1" #运行主机Ip 集群通信中 按ip节点通行的查询字段 此字段可以注释掉 注释后取当前运行主机公网ip地址
port : 9567 #服务监听端口 RPC服务
tag : "demo" #服务集群标签 相同标签 且sys.registry 和 sys.rpc 配置一致 即可互相发现和通信
type : "dreamfactory" #服务类型 相同服务类型相同 如 多个login 服务 相同服务多开 服务类型一致
tag : "dreamfactory" #服务集群标签 相同标签 且sys.registry 和 sys.rpc 配置一致 即可互相发现和通信
type : "gate" #服务类型 相同服务类型相同 如 多个login 服务 相同服务多开 服务类型一致
category : "BusinessService" #服务类别 例如 网关服务器 或者 游戏服务器 以及普通业务服务器
version : 1.0 #服务版本 多服务器相同的服务类型 高版本比低版本拥有更高的访问优先级 使用场景 热更新机制
#系统配置
sys:
log: #日志系统
FileName: "./log/gate_1.log" #日志文件存放地址
FileName: "./log/gateway_1.log" #日志文件存放地址
Loglevel: 0 #日志文件输出级别
LogMaxSize: 128 #日志文件最大Size
LogMaxAge: 7 #日志文件最多保留天数
registry: #注册表系统 服务发现
RegistryType: 0 #服务发现系统 0 Consul 组件
Consul_Addr: "10.0.0.9:8500"
Consul_RegisterInterval: 5
Consul_RegisterTTL: 7
Consul_RegisterInterval: 15
Consul_RegisterTTL: 30
cache: #缓存系统
Redis_Addr: ["10.0.0.9:9001","10.0.0.9:9002","10.0.0.9:9003","10.0.1.45:9004","10.0.1.45:9005","10.0.1.45:9006"]
Redis_Password: ""
@ -25,4 +25,4 @@ sys:
#模块配置
modules:
SM_GateModule:
WSAddr: ":7891"
ListenPort: 7891

View File

@ -1,23 +1,23 @@
id : "worker_1" #服务的唯一id 在集群服务下不能重复即可 建议配置文件名与服务id一致
ip : "172.0.0.1" #运行主机Ip 集群通信中 按ip节点通行的查询字段 此字段可以注释掉 注释后取当前运行主机公网ip地址
id : "worker_2" #服务的唯一id 在集群服务下不能重复即可 建议配置文件名与服务id一致
ip : "127.0.0.1" #运行主机Ip 集群通信中 按ip节点通行的查询字段 此字段可以注释掉 注释后取当前运行主机公网ip地址
port : 9568 #服务监听端口 RPC服务
tag : "demo" #服务集群标签 相同标签 且sys.registry 和 sys.rpc 配置一致 即可互相发现和通信
type : "dreamfactory" #服务类型 相同服务类型相同 如 多个login 服务 相同服务多开 服务类型一致
tag : "dreamfactory" #服务集群标签 相同标签 且sys.registry 和 sys.rpc 配置一致 即可互相发现和通信
type : "worker" #服务类型 相同服务类型相同 如 多个login 服务 相同服务多开 服务类型一致
category : "BusinessService" #服务类别 例如 网关服务器 或者 游戏服务器 以及普通业务服务器
version : 1.0 #服务版本 多服务器相同的服务类型 高版本比低版本拥有更高的访问优先级 使用场景 热更新机制
#系统配置
sys:
log: #日志系统
FileName: "./log/worker_1.log" #日志文件存放地址
FileName: "./log/worker_2.log" #日志文件存放地址
Loglevel: 0 #日志文件输出级别
LogMaxSize: 128 #日志文件最大Size
LogMaxAge: 7 #日志文件最多保留天数
registry: #注册表系统 服务发现
RegistryType: 0 #服务发现系统 0 Consul 组件
Consul_Addr: "10.0.0.9:8500"
Consul_RegisterInterval: 5
Consul_RegisterTTL: 7
Consul_RegisterInterval: 15
Consul_RegisterTTL: 30
cache: #缓存系统
Redis_Addr: ["10.0.0.9:9001","10.0.0.9:9002","10.0.0.9:9003","10.0.1.45:9004","10.0.1.45:9005","10.0.1.45:9006"]
Redis_Password: ""

View File

@ -1,12 +0,0 @@
2022/05/30 16:23:48.421 info rpcx/service.go:90 Sys log Init success !
2022/05/30 16:23:48.437 info rpcx/service.go:95 Sys event Init success !
2022/05/30 16:23:48.438 info rpcx/service.go:100 Sys registry Init success !
2022/05/30 16:23:48.438 info rpcx/service.go:105 Sys rpcx Init success !
2022/05/30 16:23:48.442 info services/servicebase.go:20 init sys.cache success!
2022/05/30 16:23:48.443 info cbase/servicebase.go:58 服务[gate_1] 初始化完成!
2022/05/30 16:23:48.443 info cbase/servicebase.go:80 服务[gate_1:1.0.0.0] 启动完成!
2022/05/30 16:23:48.443 debug gate/module.go:32 Module.Gate Init
2022/05/30 16:23:48.443 debug gate/module.go:38 Module.Gate Start
2022/05/30 16:23:48.604 info registry/consul.go:253 发现新的服务【gate_1:1.0.0.0】
2022/05/30 16:24:01.384 info registry/consul.go:253 发现新的服务【gate_2:1.0.0.0】
2022/05/30 16:24:08.403 info registry/consul.go:288 丢失服务【gate_2】

View File

@ -1,11 +0,0 @@
2022/05/30 16:24:01.257 info rpcx/service.go:90 Sys log Init success !
2022/05/30 16:24:01.273 info rpcx/service.go:95 Sys event Init success !
2022/05/30 16:24:01.273 info rpcx/service.go:100 Sys registry Init success !
2022/05/30 16:24:01.273 info rpcx/service.go:105 Sys rpcx Init success !
2022/05/30 16:24:01.278 info services/servicebase.go:20 init sys.cache success!
2022/05/30 16:24:01.279 info cbase/servicebase.go:58 服务[gate_2] 初始化完成!
2022/05/30 16:24:01.279 info cbase/servicebase.go:80 服务[gate_2:1.0.0.0] 启动完成!
2022/05/30 16:24:01.279 debug gate/module.go:32 Module.Gate Init
2022/05/30 16:24:01.279 debug gate/module.go:38 Module.Gate Start
2022/05/30 16:24:01.280 info registry/consul.go:253 发现新的服务【gate_1:1.0.0.0】
2022/05/30 16:24:01.386 info registry/consul.go:253 发现新的服务【gate_2:1.0.0.0】

View File

@ -1,10 +0,0 @@
2022/05/31 09:47:11.195 info rpcx/service.go:90 Sys log Init success !
2022/05/31 09:47:11.211 info rpcx/service.go:95 Sys event Init success !
2022/05/31 09:47:11.211 info rpcx/service.go:100 Sys registry Init success !
2022/05/31 09:47:11.211 info rpcx/service.go:105 Sys rpcx Init success !
2022/05/31 09:47:11.216 info services/servicebase.go:20 init sys.cache success!
2022/05/31 09:47:11.216 info cbase/servicebase.go:58 服务[worker_1] 初始化完成!
2022/05/31 09:47:11.216 info cbase/servicebase.go:80 服务[worker_1:1.0.0.0] 启动完成!
2022/05/31 09:47:13.793 debug s_comps/comp_gateroute.go:53 注册用户路由【Login】
2022/05/31 09:47:13.796 info registry/consul.go:253 发现新的服务【gate_1:1.0.0.0】
2022/05/31 09:47:13.958 info registry/consul.go:253 发现新的服务【worker_1:1.0.0.0】

View File

@ -21,7 +21,7 @@ const ( //Rpc
type ISC_GateRouteComp interface {
core.IServiceComp
RegisterRoute(methodName string, msg reflect.Type, fn reflect.Method)
RegisterRoute(methodName string, comp reflect.Value, msg reflect.Type, fn reflect.Method)
}
//用户会话
@ -31,6 +31,7 @@ type IUserSession interface {
GetGatewayServiceId() string
SendMsg(ServiceMethod string, msg interface{}) (err error)
Close() (err error)
ToString() string
}
//消息体

View File

@ -1,6 +1,8 @@
package comm
import (
"fmt"
"github.com/liwei1dao/lego/base"
)
@ -38,3 +40,6 @@ func (this *UserSession) SendMsg(ServiceMethod string, msg interface{}) (err err
func (this *UserSession) Close() (err error) {
return
}
func (this *UserSession) ToString() string {
return fmt.Sprintf("SessionId:%s UserId:%d GatewayServiceId:%s", this.SessionId, this.UserId, this.GatewayServiceId)
}

12
go.mod
View File

@ -3,7 +3,8 @@ module go_dreamfactory
go 1.18
require (
github.com/liwei1dao/lego v0.0.0-20220530082438-d1a47a89b5d1
github.com/gorilla/websocket v1.4.2
github.com/liwei1dao/lego v0.0.0-20220531033739-03f821663a48
google.golang.org/protobuf v1.28.0
)
@ -28,6 +29,9 @@ require (
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-ping/ping v0.0.0-20211130115550-779d1e919534 // indirect
github.com/go-playground/locales v0.14.0 // indirect
github.com/go-playground/universal-translator v0.18.0 // indirect
github.com/go-playground/validator/v10 v10.10.1 // indirect
github.com/go-redis/redis/v8 v8.11.5 // indirect
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
@ -50,6 +54,7 @@ require (
github.com/kavu/go_reuseport v1.5.0 // indirect
github.com/klauspost/cpuid/v2 v2.0.12 // indirect
github.com/klauspost/reedsolomon v1.9.16 // indirect
github.com/leodido/go-urn v1.2.1 // indirect
github.com/lestrrat/go-file-rotatelogs v0.0.0-20180223000712-d3151e2a480f // indirect
github.com/lestrrat/go-strftime v0.0.0-20180220042222-ba3bf9c1d042 // indirect
github.com/lucas-clemente/quic-go v0.27.0 // indirect
@ -73,8 +78,10 @@ require (
github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/rpcxio/libkv v0.5.1-0.20210420120011-1fceaedca8a5 // indirect
github.com/rs/cors v1.8.2 // indirect
github.com/rs/xid v1.3.0 // indirect
github.com/rubyist/circuitbreaker v2.2.1+incompatible // indirect
github.com/samuel/go-zookeeper v0.0.0-20201211165307-7117e9ea2414 // indirect
github.com/satori/go.uuid v1.2.0 // indirect
github.com/smallnest/quick v0.0.0-20220103065406-780def6371e6 // indirect
github.com/smallnest/rpcx v1.7.4 // indirect
github.com/soheilhy/cmux v0.1.5 // indirect
@ -83,6 +90,7 @@ require (
github.com/tinylib/msgp v1.1.6 // indirect
github.com/tjfoc/gmsm v1.4.1 // indirect
github.com/toolkits/concurrent v0.0.0-20150624120057-a4371d70e3e3 // indirect
github.com/ugorji/go/codec v1.2.7 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fastrand v1.1.0 // indirect
github.com/vmihailenco/msgpack/v5 v5.3.5 // indirect
@ -106,4 +114,4 @@ require (
gopkg.in/yaml.v2 v2.4.0 // indirect
)
// replace github.com/liwei1dao/lego => F:\work\go\lego
replace github.com/liwei1dao/lego => F:\work\go\lego

11
go.sum
View File

@ -208,9 +208,13 @@ github.com/go-openapi/jsonpointer v0.19.5/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34
github.com/go-openapi/swag v0.19.5/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh66Z9tfKk=
github.com/go-ping/ping v0.0.0-20211130115550-779d1e919534 h1:dhy9OQKGBh4zVXbjwbxxHjRxMJtLXj3zfgpBYQaR4Q4=
github.com/go-ping/ping v0.0.0-20211130115550-779d1e919534/go.mod h1:xIFjORFzTxqIV/tDVGO4eDy/bLuSyawEeojSm3GfRGk=
github.com/go-playground/assert/v2 v2.0.1 h1:MsBgLAaY856+nPRTKrp3/OZK38U/wa0CcBYNjji3q3A=
github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4=
github.com/go-playground/locales v0.14.0 h1:u50s323jtVGugKlcYeyzC0etD1HifMjqmJqb8WugfUU=
github.com/go-playground/locales v0.14.0/go.mod h1:sawfccIbzZTqEDETgFXqTho0QybSa7l++s0DH+LDiLs=
github.com/go-playground/universal-translator v0.18.0 h1:82dyy6p4OuJq4/CByFNOn/jYrnRPArHwAcmLoJZxyho=
github.com/go-playground/universal-translator v0.18.0/go.mod h1:UvRDBj+xPUEGrFYl+lu/H90nyDXpg0fqeB/AQUGNTVA=
github.com/go-playground/validator/v10 v10.10.1 h1:uA0+amWMiglNZKZ9FJRKUAe9U3RX91eVn1JYXMWt7ig=
github.com/go-playground/validator/v10 v10.10.1/go.mod h1:i+3WkQ1FvaUjjxh1kSvIA4dMGDBiPU55YFDl0WbKdWU=
github.com/go-redis/redis/v8 v8.8.2/go.mod h1:F7resOH5Kdug49Otu24RjHWwgK7u9AmtqWMnCV1iP5Y=
github.com/go-redis/redis/v8 v8.11.4/go.mod h1:2Z2wHZXdQpCDXEGzqMockDpNyYvi2l4Pxt6RJr792+w=
@ -349,6 +353,7 @@ github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB7
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4=
github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/grandcat/zeroconf v1.0.0 h1:uHhahLBKqwWBV6WZUDAT71044vwOTL+McW0mBJvo6kE=
github.com/grandcat/zeroconf v1.0.0/go.mod h1:lTKmG1zh86XyCoUeIHSA4FJMBwCJiQmGfcP2PdzytEs=
@ -499,6 +504,7 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/labstack/echo/v4 v4.2.1/go.mod h1:AA49e0DZ8kk5jTOOCKNuPR6oTnBS0dYiM4FW1e6jwpg=
github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL7NoOu+k=
github.com/leodido/go-urn v1.2.1 h1:BqpAaACuzVSgi/VLzGZIobT2z4v53pjosyNd9Yv6n/w=
github.com/leodido/go-urn v1.2.1/go.mod h1:zt4jvISO2HfUBqxjfIshjdMTYS56ZS/qv49ictyFfxY=
github.com/lestrrat/go-envload v0.0.0-20180220120943-6ed08b54a570 h1:0iQektZGS248WXmGIYOwRXSQhD4qn3icjMpuxwO7qlo=
github.com/lestrrat/go-envload v0.0.0-20180220120943-6ed08b54a570/go.mod h1:BLt8L9ld7wVsvEWQbuLrUZnCMnUmLZ+CGDzKtclrTlE=
@ -510,8 +516,6 @@ github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.9.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/linkedin/goavro/v2 v2.9.8/go.mod h1:UgQUb2N/pmueQYH9bfqFioWxzYCZXSfF8Jw03O5sjqA=
github.com/liwei1dao/dm v0.0.0-20211103094420-938edf103cf0/go.mod h1:YH8wwRWv57a88ZbPtflEhwCQDrcm9L9S8wl9y1m8SnQ=
github.com/liwei1dao/lego v0.0.0-20220530082438-d1a47a89b5d1 h1:HqV3CviJUJ965K2PPZMjD0aCyTZb/ytMZQ87lJ25irU=
github.com/liwei1dao/lego v0.0.0-20220530082438-d1a47a89b5d1/go.mod h1:uWnARu9OrAi4qQdPZoG96NtfIaBBAltWIBS72HOdZMA=
github.com/lucas-clemente/quic-go v0.24.0/go.mod h1:paZuzjXCE5mj6sikVLMvqXk8lJV2AsqtJ6bDhjEfxx0=
github.com/lucas-clemente/quic-go v0.27.0 h1:v6WY87q9zD4dKASbG8hy/LpzAVNzEQzw8sEIeloJsc4=
github.com/lucas-clemente/quic-go v0.27.0/go.mod h1:AzgQoPda7N+3IqMMMkywBKggIFo2KT6pfnlrQ2QieeI=
@ -679,6 +683,7 @@ github.com/rpcxio/libkv v0.5.1-0.20210420120011-1fceaedca8a5 h1:oGficf/KJp1y22zT
github.com/rpcxio/libkv v0.5.1-0.20210420120011-1fceaedca8a5/go.mod h1:zHGgtLr3cFhGtbalum0BrMPOjhFZFJXCKiws/25ewls=
github.com/rs/cors v1.8.2 h1:KCooALfAYGs415Cwu5ABvv9n9509fSiG5SQJn/AQo4U=
github.com/rs/cors v1.8.2/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU=
github.com/rs/xid v1.3.0 h1:6NjYksEUlhurdVehpc7S7dk6DAmcKv8V9gG0FsVN2U4=
github.com/rs/xid v1.3.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
github.com/rubyist/circuitbreaker v2.2.1+incompatible h1:KUKd/pV8Geg77+8LNDwdow6rVCAYOp8+kHUyFvL6Mhk=
github.com/rubyist/circuitbreaker v2.2.1+incompatible/go.mod h1:Ycs3JgJADPuzJDwffe12k6BZT8hxVi6lFK+gWYJLN4A=
@ -689,6 +694,7 @@ github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFo
github.com/saintfish/chardet v0.0.0-20120816061221-3af4cd4741ca/go.mod h1:uugorj2VCxiV1x+LzaIdVa9b4S4qGAcH6cbhh4qVxOU=
github.com/samuel/go-zookeeper v0.0.0-20201211165307-7117e9ea2414 h1:AJNDS0kP60X8wwWFvbLPwDuojxubj9pbfK7pjHw0vKg=
github.com/samuel/go-zookeeper v0.0.0-20201211165307-7117e9ea2414/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E=
github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww=
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 h1:nn5Wsu0esKSJiIVhscUtVbo7ada43DJhG55ua/hjS5I=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
@ -778,6 +784,7 @@ github.com/toolkits/concurrent v0.0.0-20150624120057-a4371d70e3e3 h1:kF/7m/ZU+0D
github.com/toolkits/concurrent v0.0.0-20150624120057-a4371d70e3e3/go.mod h1:QDlpd3qS71vYtakd2hmdpqhJ9nwv6mD6A30bQ1BPBFE=
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM=
github.com/ugorji/go v1.2.7/go.mod h1:nF9osbDWLy6bDVv/Rtoh6QgnvNDpmCalQV5urGCCS6M=
github.com/ugorji/go/codec v1.2.7 h1:YPXUKf7fYbp/y8xloBqZOw2qaVggbfwMlI8WM3wZUJ0=
github.com/ugorji/go/codec v1.2.7/go.mod h1:WGN1fab3R1fzQlVQTkfxVtIBhWDRqOviHU95kRgeqEY=
github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/XcUArI=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=

View File

@ -1,40 +0,0 @@
package gate
import (
"go_dreamfactory/comm"
"go_dreamfactory/modules"
"github.com/liwei1dao/lego/core"
"github.com/liwei1dao/lego/sys/log"
)
func NewModule() core.IModule {
m := new(Gate)
return m
}
type Gate struct {
modules.ModuleBase
options *Options
}
func (this *Gate) GetType() core.M_Modules {
return comm.SM_GateModule
}
func (this *Gate) NewOptions() (options core.IModuleOptions) {
return new(Options)
}
func (this *Gate) Init(service core.IService, module core.IModule, options core.IModuleOptions) (err error) {
err = this.ModuleBase.Init(service, module, options)
this.options = options.(*Options)
log.Debugf("Module.Gate Init")
return
}
func (this *Gate) Start() (err error) {
err = this.ModuleBase.Start()
log.Debugf("Module.Gate Start")
return
}

137
modules/gateway/agent.go Normal file
View File

@ -0,0 +1,137 @@
package gateway
import (
"context"
"go_dreamfactory/comm"
"go_dreamfactory/pb"
"sync"
"sync/atomic"
"github.com/gorilla/websocket"
"github.com/liwei1dao/lego/sys/log"
"github.com/liwei1dao/lego/utils/container/id"
"google.golang.org/protobuf/proto"
)
func newAgent(gateway IGateway, conn *websocket.Conn) *Agent {
agent := &Agent{
gateway: gateway,
wsConn: conn,
sessionId: id.NewUUId(),
uId: 0,
writeChan: make(chan *pb.Message, 2),
closeSignal: make(chan bool),
state: 1,
}
agent.wg.Add(2)
go agent.readLoop()
go agent.writeLoop()
return agent
}
//用户代理
type Agent struct {
gateway IGateway
wsConn *websocket.Conn
sessionId string
uId uint32
writeChan chan *pb.Message
closeSignal chan bool
state int32 //状态 0 关闭 1 运行 2 关闭中
wg sync.WaitGroup
}
func (this *Agent) readLoop() {
defer this.wg.Done()
var (
data []byte
msg *pb.Message = &pb.Message{}
err error
)
locp:
for {
if _, data, err = this.wsConn.ReadMessage(); err != nil {
log.Errorf("agent:%s uId:%d ReadMessage err:%v", this.sessionId, this.uId, err)
go this.Close()
break locp
}
if err = proto.Unmarshal(data, msg); err != nil {
log.Errorf("agent:%s uId:%d Unmarshal err:%v", this.sessionId, this.uId, err)
go this.Close()
break locp
} else {
this.messageDistribution(msg)
}
}
log.Debugf("agent:%s uId:%d readLoop end!", this.sessionId, this.uId)
}
func (this *Agent) writeLoop() {
defer this.wg.Done()
var (
data []byte
err error
)
locp:
for {
select {
case <-this.closeSignal:
break locp
case msg, ok := <-this.writeChan:
if ok {
data, err = proto.Marshal(msg)
if err = this.wsConn.WriteMessage(websocket.BinaryMessage, data); err != nil {
log.Errorf("agent:%s uId:%d WriteMessage err:%v", this.sessionId, this.uId, err)
go this.Close()
}
} else {
go this.Close()
}
}
}
log.Debugf("agent:%s uId:%d writeLoop end!", this.sessionId, this.uId)
}
func (this *Agent) SessionId() string {
return this.sessionId
}
func (this *Agent) IP() string {
return this.wsConn.RemoteAddr().String()
}
func (this *Agent) UserId() uint32 {
return this.uId
}
func (this *Agent) WriteMsg(msg *pb.UserMessage) (err error) {
return
}
//外部代用关闭
func (this *Agent) Close() {
if !atomic.CompareAndSwapInt32(&this.state, 1, 2) {
return
}
this.wsConn.Close()
this.closeSignal <- true
this.wg.Wait()
atomic.StoreInt32(&this.state, 0)
this.gateway.DisConnect(this)
}
//分发用户消息
func (this *Agent) messageDistribution(msg *pb.Message) {
reply := &pb.UserMessageReply{}
log.Debugf("agent:%s uId:%d MessageDistribution msg:%s", this.sessionId, this.uId, msg.Head.ServiceMethod)
if err := this.gateway.Service().RpcCallByType("worker", string(comm.Rpc_GateRoute), context.Background(), &pb.UserMessage{
Ip: this.IP(),
UserSessionId: this.sessionId,
UserId: this.uId,
GatewayServiceId: this.gateway.Service().GetId(),
Method: msg.Head.ServiceMethod,
Message: msg.Data,
}, reply); err != nil {
log.Errorf("agent:%s uId:%d MessageDistribution err:%v", this.sessionId, this.uId, err)
} else {
log.Debugf("agent:%s uId:%d MessageDistribution reply:%v", this.sessionId, this.uId, reply)
}
}

View File

@ -0,0 +1,25 @@
package gateway
import (
"sync"
"github.com/liwei1dao/lego/core"
"github.com/liwei1dao/lego/core/cbase"
)
type AgentMgr_Comp struct {
cbase.ModuleCompBase
agents *sync.Map
}
func (this *AgentMgr_Comp) Init(service core.IService, module core.IModule, comp core.IModuleComp, options core.IModuleOptions) (err error) {
err = this.ModuleCompBase.Init(service, module, comp, options)
this.agents = new(sync.Map)
return
}
func (this *AgentMgr_Comp) Connect(a IAgent) {
this.agents.Store(a.SessionId(), a)
}
func (this *AgentMgr_Comp) DisConnect(a IAgent) {
this.agents.Delete(a.SessionId())
}

View File

@ -0,0 +1,34 @@
package gateway_test
import (
"fmt"
"go_dreamfactory/pb"
"testing"
"time"
"github.com/gorilla/websocket"
"google.golang.org/protobuf/proto"
)
func Test_WebSocket(t *testing.T) {
url := "ws://localhost:7891/gateway" //服务器地址
ws, _, err := websocket.DefaultDialer.Dial(url, nil)
if err != nil {
fmt.Printf("err:%v", err)
return
}
loginreq := &pb.UserLoginReq{
Name: "liwei",
}
logindata, _ := proto.Marshal(loginreq)
message := &pb.Message{
Head: &pb.MessageHead{ServiceMethod: "Login"},
Data: logindata,
}
data, _ := proto.Marshal(message)
err = ws.WriteMessage(websocket.BinaryMessage, data)
if err != nil {
fmt.Printf("err:%v", err)
}
time.Sleep(time.Second * 2)
}

24
modules/gateway/core.go Normal file
View File

@ -0,0 +1,24 @@
package gateway
import (
"go_dreamfactory/pb"
"github.com/liwei1dao/lego/base"
"github.com/liwei1dao/lego/core"
)
type (
IAgent interface {
SessionId() string
IP() string
UserId() uint32
WriteMsg(msg *pb.UserMessage) (err error)
Close() //主动关闭接口
}
IGateway interface {
core.IModule
Service() base.IRPCXService
Connect(a IAgent)
DisConnect(a IAgent)
}
)

65
modules/gateway/module.go Normal file
View File

@ -0,0 +1,65 @@
package gateway
import (
"go_dreamfactory/comm"
"go_dreamfactory/modules"
"github.com/liwei1dao/lego/base"
"github.com/liwei1dao/lego/core"
"github.com/liwei1dao/lego/sys/log"
)
func NewModule() core.IModule {
m := new(Gateway)
return m
}
type Gateway struct {
modules.ModuleBase
service base.IRPCXService
wsservice_comp *WSService_Comp
agentmgr_comp *AgentMgr_Comp
}
func (this *Gateway) GetType() core.M_Modules {
return comm.SM_GateModule
}
func (this *Gateway) NewOptions() (options core.IModuleOptions) {
return new(Options)
}
func (this *Gateway) Service() base.IRPCXService {
return this.service
}
func (this *Gateway) Init(service core.IService, module core.IModule, options core.IModuleOptions) (err error) {
err = this.ModuleBase.Init(service, module, options)
this.service = service.(base.IRPCXService)
log.Debugf("Module.Gate Init")
return
}
func (this *Gateway) Start() (err error) {
err = this.ModuleBase.Start()
log.Debugf("Module.Gate Start")
return
}
func (this *Gateway) OnInstallComp() {
this.ModuleBase.OnInstallComp()
this.agentmgr_comp = this.RegisterComp(new(AgentMgr_Comp)).(*AgentMgr_Comp)
this.wsservice_comp = this.RegisterComp(new(WSService_Comp)).(*WSService_Comp)
}
//有新的连接对象进入
func (this *Gateway) Connect(a IAgent) {
log.Debugf("[Module.Gateway] have new connect:Ip[%s] SessionId:[%s]", a.IP(), a.SessionId())
this.agentmgr_comp.Connect(a)
}
//有新的连接对象进入
func (this *Gateway) DisConnect(a IAgent) {
log.Debugf("[Module.Gateway] have disConnect:Ip[%s] SessionId:[%s] uid:[%d]", a.IP(), a.SessionId(), a.UserId())
this.agentmgr_comp.DisConnect(a)
}

View File

@ -1,4 +1,4 @@
package gate
package gateway
import (
"github.com/liwei1dao/lego/utils/mapstructure"
@ -6,7 +6,7 @@ import (
type (
Options struct {
WSAddr string
ListenPort int
}
)

View File

@ -0,0 +1,46 @@
package gateway
import (
"net/http"
"github.com/gorilla/websocket"
"github.com/liwei1dao/lego/core"
"github.com/liwei1dao/lego/core/cbase"
"github.com/liwei1dao/lego/sys/gin"
"github.com/liwei1dao/lego/sys/gin/engine"
"github.com/liwei1dao/lego/sys/log"
)
type WSService_Comp struct {
cbase.ModuleCompBase
options *Options
module IGateway
gin gin.ISys
}
func (this *WSService_Comp) Init(service core.IService, module core.IModule, comp core.IModuleComp, options core.IModuleOptions) (err error) {
err = this.ModuleCompBase.Init(service, module, comp, options)
this.options = options.(*Options)
this.module = module.(IGateway)
this.gin, err = gin.NewSys(gin.SetListenPort(this.options.ListenPort))
this.gin.GET("/gateway", this.ws)
return
}
func (this *WSService_Comp) ws(c *engine.Context) {
upGrader := websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}
if wsConn, err := upGrader.Upgrade(c.Writer, c.Request, nil); err != nil {
log.Errorf("accept faile client:%s err:%v", c.RemoteIP(), err)
return
} else {
agent := newAgent(this.module, wsConn)
this.module.Connect(agent)
}
}

View File

@ -86,7 +86,7 @@ func (this *MComp_GateComp) suitableMethods(scomp comm.ISC_GateRouteComp, typ re
if returnType := mtype.Out(0); returnType != typeOfError {
continue
}
scomp.RegisterRoute(mname, argType, method)
scomp.RegisterRoute(mname, reflect.ValueOf(this.comp), replyType, method)
}
}

View File

@ -7,6 +7,7 @@ import (
"go_dreamfactory/pb"
"github.com/liwei1dao/lego/core"
"github.com/liwei1dao/lego/sys/log"
)
type User_Comp struct {
@ -19,6 +20,6 @@ func (this *User_Comp) Init(service core.IService, module core.IModule, comp cor
}
func (this *User_Comp) Login(ctx context.Context, session comm.IUserSession, rsp *pb.UserLoginReq) error {
log.Debugf("User_Comp Login: session:%v rsp:%v", session.ToString(), rsp)
return nil
}

View File

@ -20,6 +20,110 @@ const (
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
//消息体
type MessageHead struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
ServiceMethod string `protobuf:"bytes,1,opt,name=ServiceMethod,proto3" json:"ServiceMethod,omitempty"` //服务名
}
func (x *MessageHead) Reset() {
*x = MessageHead{}
if protoimpl.UnsafeEnabled {
mi := &file_comm_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *MessageHead) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*MessageHead) ProtoMessage() {}
func (x *MessageHead) ProtoReflect() protoreflect.Message {
mi := &file_comm_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use MessageHead.ProtoReflect.Descriptor instead.
func (*MessageHead) Descriptor() ([]byte, []int) {
return file_comm_proto_rawDescGZIP(), []int{0}
}
func (x *MessageHead) GetServiceMethod() string {
if x != nil {
return x.ServiceMethod
}
return ""
}
//处理JSON消息
type Message struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Head *MessageHead `protobuf:"bytes,1,opt,name=Head,proto3" json:"Head,omitempty"`
Data []byte `protobuf:"bytes,2,opt,name=Data,proto3" json:"Data,omitempty"`
}
func (x *Message) Reset() {
*x = Message{}
if protoimpl.UnsafeEnabled {
mi := &file_comm_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *Message) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*Message) ProtoMessage() {}
func (x *Message) ProtoReflect() protoreflect.Message {
mi := &file_comm_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use Message.ProtoReflect.Descriptor instead.
func (*Message) Descriptor() ([]byte, []int) {
return file_comm_proto_rawDescGZIP(), []int{1}
}
func (x *Message) GetHead() *MessageHead {
if x != nil {
return x.Head
}
return nil
}
func (x *Message) GetData() []byte {
if x != nil {
return x.Data
}
return nil
}
type UserMessage struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
@ -36,7 +140,7 @@ type UserMessage struct {
func (x *UserMessage) Reset() {
*x = UserMessage{}
if protoimpl.UnsafeEnabled {
mi := &file_comm_proto_msgTypes[0]
mi := &file_comm_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@ -49,7 +153,7 @@ func (x *UserMessage) String() string {
func (*UserMessage) ProtoMessage() {}
func (x *UserMessage) ProtoReflect() protoreflect.Message {
mi := &file_comm_proto_msgTypes[0]
mi := &file_comm_proto_msgTypes[2]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@ -62,7 +166,7 @@ func (x *UserMessage) ProtoReflect() protoreflect.Message {
// Deprecated: Use UserMessage.ProtoReflect.Descriptor instead.
func (*UserMessage) Descriptor() ([]byte, []int) {
return file_comm_proto_rawDescGZIP(), []int{0}
return file_comm_proto_rawDescGZIP(), []int{2}
}
func (x *UserMessage) GetIp() string {
@ -119,7 +223,7 @@ type UserMessageReply struct {
func (x *UserMessageReply) Reset() {
*x = UserMessageReply{}
if protoimpl.UnsafeEnabled {
mi := &file_comm_proto_msgTypes[1]
mi := &file_comm_proto_msgTypes[3]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@ -132,7 +236,7 @@ func (x *UserMessageReply) String() string {
func (*UserMessageReply) ProtoMessage() {}
func (x *UserMessageReply) ProtoReflect() protoreflect.Message {
mi := &file_comm_proto_msgTypes[1]
mi := &file_comm_proto_msgTypes[3]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@ -145,7 +249,7 @@ func (x *UserMessageReply) ProtoReflect() protoreflect.Message {
// Deprecated: Use UserMessageReply.ProtoReflect.Descriptor instead.
func (*UserMessageReply) Descriptor() ([]byte, []int) {
return file_comm_proto_rawDescGZIP(), []int{1}
return file_comm_proto_rawDescGZIP(), []int{3}
}
func (x *UserMessageReply) GetCode() int32 {
@ -165,24 +269,31 @@ func (x *UserMessageReply) GetMsg() string {
var File_comm_proto protoreflect.FileDescriptor
var file_comm_proto_rawDesc = []byte{
0x0a, 0x0a, 0x63, 0x6f, 0x6d, 0x6d, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xb9, 0x01, 0x0a,
0x0b, 0x55, 0x73, 0x65, 0x72, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x0e, 0x0a, 0x02,
0x49, 0x70, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x49, 0x70, 0x12, 0x24, 0x0a, 0x0d,
0x55, 0x73, 0x65, 0x72, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x18, 0x02, 0x20,
0x01, 0x28, 0x09, 0x52, 0x0d, 0x55, 0x73, 0x65, 0x72, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e,
0x49, 0x64, 0x12, 0x16, 0x0a, 0x06, 0x55, 0x73, 0x65, 0x72, 0x49, 0x64, 0x18, 0x03, 0x20, 0x01,
0x28, 0x0d, 0x52, 0x06, 0x55, 0x73, 0x65, 0x72, 0x49, 0x64, 0x12, 0x2a, 0x0a, 0x10, 0x47, 0x61,
0x74, 0x65, 0x77, 0x61, 0x79, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x49, 0x64, 0x18, 0x04,
0x20, 0x01, 0x28, 0x09, 0x52, 0x10, 0x47, 0x61, 0x74, 0x65, 0x77, 0x61, 0x79, 0x53, 0x65, 0x72,
0x76, 0x69, 0x63, 0x65, 0x49, 0x64, 0x12, 0x16, 0x0a, 0x06, 0x4d, 0x65, 0x74, 0x68, 0x6f, 0x64,
0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x4d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x12, 0x18,
0x0a, 0x07, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0c, 0x52,
0x07, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x38, 0x0a, 0x10, 0x55, 0x73, 0x65, 0x72,
0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x12, 0x0a, 0x04,
0x43, 0x6f, 0x64, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x04, 0x43, 0x6f, 0x64, 0x65,
0x12, 0x10, 0x0a, 0x03, 0x4d, 0x73, 0x67, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x4d,
0x73, 0x67, 0x42, 0x06, 0x5a, 0x04, 0x2e, 0x3b, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x33,
0x0a, 0x0a, 0x63, 0x6f, 0x6d, 0x6d, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x33, 0x0a, 0x0b,
0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x48, 0x65, 0x61, 0x64, 0x12, 0x24, 0x0a, 0x0d, 0x53,
0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x4d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x18, 0x01, 0x20, 0x01,
0x28, 0x09, 0x52, 0x0d, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x4d, 0x65, 0x74, 0x68, 0x6f,
0x64, 0x22, 0x3f, 0x0a, 0x07, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x20, 0x0a, 0x04,
0x48, 0x65, 0x61, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0c, 0x2e, 0x4d, 0x65, 0x73,
0x73, 0x61, 0x67, 0x65, 0x48, 0x65, 0x61, 0x64, 0x52, 0x04, 0x48, 0x65, 0x61, 0x64, 0x12, 0x12,
0x0a, 0x04, 0x44, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x44, 0x61,
0x74, 0x61, 0x22, 0xb9, 0x01, 0x0a, 0x0b, 0x55, 0x73, 0x65, 0x72, 0x4d, 0x65, 0x73, 0x73, 0x61,
0x67, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x49, 0x70, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02,
0x49, 0x70, 0x12, 0x24, 0x0a, 0x0d, 0x55, 0x73, 0x65, 0x72, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f,
0x6e, 0x49, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x55, 0x73, 0x65, 0x72, 0x53,
0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x12, 0x16, 0x0a, 0x06, 0x55, 0x73, 0x65, 0x72,
0x49, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x06, 0x55, 0x73, 0x65, 0x72, 0x49, 0x64,
0x12, 0x2a, 0x0a, 0x10, 0x47, 0x61, 0x74, 0x65, 0x77, 0x61, 0x79, 0x53, 0x65, 0x72, 0x76, 0x69,
0x63, 0x65, 0x49, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x10, 0x47, 0x61, 0x74, 0x65,
0x77, 0x61, 0x79, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x49, 0x64, 0x12, 0x16, 0x0a, 0x06,
0x4d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x4d, 0x65,
0x74, 0x68, 0x6f, 0x64, 0x12, 0x18, 0x0a, 0x07, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18,
0x06, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x38,
0x0a, 0x10, 0x55, 0x73, 0x65, 0x72, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x70,
0x6c, 0x79, 0x12, 0x12, 0x0a, 0x04, 0x43, 0x6f, 0x64, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05,
0x52, 0x04, 0x43, 0x6f, 0x64, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x4d, 0x73, 0x67, 0x18, 0x02, 0x20,
0x01, 0x28, 0x09, 0x52, 0x03, 0x4d, 0x73, 0x67, 0x42, 0x06, 0x5a, 0x04, 0x2e, 0x3b, 0x70, 0x62,
0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
@ -197,17 +308,20 @@ func file_comm_proto_rawDescGZIP() []byte {
return file_comm_proto_rawDescData
}
var file_comm_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
var file_comm_proto_msgTypes = make([]protoimpl.MessageInfo, 4)
var file_comm_proto_goTypes = []interface{}{
(*UserMessage)(nil), // 0: UserMessage
(*UserMessageReply)(nil), // 1: UserMessageReply
(*MessageHead)(nil), // 0: MessageHead
(*Message)(nil), // 1: Message
(*UserMessage)(nil), // 2: UserMessage
(*UserMessageReply)(nil), // 3: UserMessageReply
}
var file_comm_proto_depIdxs = []int32{
0, // [0:0] is the sub-list for method output_type
0, // [0:0] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
0, // [0:0] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name
0, // 0: Message.Head:type_name -> MessageHead
1, // [1:1] is the sub-list for method output_type
1, // [1:1] is the sub-list for method input_type
1, // [1:1] is the sub-list for extension type_name
1, // [1:1] is the sub-list for extension extendee
0, // [0:1] is the sub-list for field type_name
}
func init() { file_comm_proto_init() }
@ -217,7 +331,7 @@ func file_comm_proto_init() {
}
if !protoimpl.UnsafeEnabled {
file_comm_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*UserMessage); i {
switch v := v.(*MessageHead); i {
case 0:
return &v.state
case 1:
@ -229,6 +343,30 @@ func file_comm_proto_init() {
}
}
file_comm_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*Message); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_comm_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*UserMessage); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_comm_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*UserMessageReply); i {
case 0:
return &v.state
@ -247,7 +385,7 @@ func file_comm_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_comm_proto_rawDesc,
NumEnums: 0,
NumMessages: 2,
NumMessages: 4,
NumExtensions: 0,
NumServices: 0,
},

View File

@ -1,6 +1,18 @@
syntax = "proto3";
option go_package = ".;pb";
//
message MessageHead {
string ServiceMethod =1; //
}
//JSON消息
message Message {
MessageHead Head =1;
bytes Data = 2;
}
message UserMessage {
string Ip = 1;
string UserSessionId = 2;

View File

@ -2,7 +2,7 @@ package main
import (
"flag"
"go_dreamfactory/modules/gate"
"go_dreamfactory/modules/gateway"
"go_dreamfactory/services"
"github.com/liwei1dao/lego"
@ -11,7 +11,7 @@ import (
)
var (
conf = flag.String("conf", "./conf/gate.yaml", "获取需要启动的服务配置文件") //启动服务的Id
conf = flag.String("conf", "./conf/gateway_1.yaml", "获取需要启动的服务配置文件") //启动服务的Id
)
func main() {
@ -23,7 +23,7 @@ func main() {
s.OnInstallComp( //装备组件
)
lego.Run(s, //运行模块
gate.NewModule(),
gateway.NewModule(),
)
}

View File

@ -20,6 +20,7 @@ func NewGateRouteComp() comm.ISC_GateRouteComp {
}
type msghandle struct {
rcvr reflect.Value
msgType reflect.Type
fn reflect.Method
}
@ -43,13 +44,13 @@ func (this *SComp_GateRouteComp) Init(service core.IService, comp core.IServiceC
}
func (this *SComp_GateRouteComp) Start() (err error) {
err = this.ServiceCompBase.Start()
this.service.RegisterFunctionName(string(comm.Rpc_GateRoute), this.ReceiveMsg) //注册网关路由接收接口
err = this.ServiceCompBase.Start()
return
}
//注册路由
func (this *SComp_GateRouteComp) RegisterRoute(methodName string, msg reflect.Type, fn reflect.Method) {
func (this *SComp_GateRouteComp) RegisterRoute(methodName string, comp reflect.Value, msg reflect.Type, fn reflect.Method) {
log.Debugf("注册用户路由【%s】", methodName)
if _, ok := this.msghandles[methodName]; ok {
log.Errorf("重复 注册网关消息【%s】", methodName)
@ -57,6 +58,7 @@ func (this *SComp_GateRouteComp) RegisterRoute(methodName string, msg reflect.Ty
}
this.mrlock.Lock()
this.msghandles[methodName] = &msghandle{
rcvr: comp,
msgType: msg,
fn: fn,
}
@ -64,6 +66,7 @@ func (this *SComp_GateRouteComp) RegisterRoute(methodName string, msg reflect.Ty
}
func (this *SComp_GateRouteComp) ReceiveMsg(ctx context.Context, args *pb.UserMessage, reply *pb.UserMessageReply) error {
log.Debugf("SComp_GateRouteComp ReceiveMsg agent:%s uId:%d MessageDistribution msg:%s", args.UserSessionId, args.UserId, args.Method)
this.mrlock.RLock()
msghandle, ok := this.msghandles[args.Method]
this.mrlock.RUnlock()
@ -74,7 +77,7 @@ func (this *SComp_GateRouteComp) ReceiveMsg(ctx context.Context, args *pb.UserMe
log.Errorf("UserMessage:%s Unmarshal err:%v", args.Method, err)
return err
}
msghandle.fn.Func.Call([]reflect.Value{reflect.ValueOf(ctx), reflect.ValueOf(session), reflect.ValueOf(msg)})
msghandle.fn.Func.Call([]reflect.Value{msghandle.rcvr, reflect.ValueOf(ctx), reflect.ValueOf(session), reflect.ValueOf(msg)})
} else {
reply.Code = int32(core.ErrorCode_ReqParameterError)
}

View File

@ -25,7 +25,8 @@ func main() {
s_comps.NewGateRouteComp(),
)
lego.Run(s, //运行模块
web.NewModule())
web.NewModule(),
)
}