import {RedisClientType, createClient} from "redis"; import {redisClient} from "./setRedis"; import {ConnectionStatus, WsServer} from "tsrpc"; import cluster from 'cluster'; import {clusterFun} from "./clusterFunction"; //维护当前uid和pid的对应关系 let uid2processId = {}; //订阅redis let subscribeRedis: RedisClientType let firstPid = null; //发布和可写redis复用原 setRedis 里的redisClient /** * 对外方法: * 集群接收订阅 * @param key 事件标记 * @param callback 回调函数 */ export function clusterSubscribe(key: string, callback: Function) { subscribeRedis.subscribe(G.redis.fromatKey(key), function (msg) { try { callback(msg); } catch (e) { console.error("clusterSubscribe error", e); } }); } /** * 对外方法: * 集群发布 * @param key 事件标记 * @param data 发送数据 */ export function clusterPublish(key: string, data: any) { redisClient.publish(G.redis.fromatKey(key), data); } /** * 对外方法: * 在集群的N个进程中,只运行一次,在业务逻辑中也可使用 */ export function clusterRunOnce(fun) { // console.log(`${process.pid}环境变量pm_id===>${process.env.pm_id}`); if (process.env.pm_id == null || process.env.pm_id === '0') { //非pm2启动的,或是pm2下启动的第一个进程 // console.log("run clusterRunOnce1 ===>", process.pid) fun(); return; } if(G.config.isG123)return; if (firstPid == process.pid) { //pm2的其中一个进程 // console.log("run clusterRunOnce2 ===>", process.pid) fun(); return; } } /** * 对外方法: * 在uid所在的进程执行fun方法,如果uid不在任何进程的话,则在当前进程执行 */ export function clusterFunctionRunAtUidProcess(uid: string, fun: string, ...arg: any[]) { if (!uid2processId[uid] || uid2processId[uid] == process.pid) { clusterFun[fun].call(this, ...arg); return; } clusterPublish("RunclusterFunction", JSON.stringify({ "uid": uid, "fun": fun, "arg": arg })) } /** * 设置玩家所在的processId * @param uid 玩家uid */ export async function setUidProcessId(uid: string, _processId?: number) { let pid = _processId != null ? _processId : process.pid; uid2processId[uid] = pid; redisClient.hSet(G.redis.fromatKey("uid2processId"), uid, pid); clusterPublish("setUid2ProcessId", JSON.stringify({ "uid": uid, "processId": pid })) return } /** * 清空玩家所在的processId,用于玩家下线时 * @param uid 玩家uid */ export async function delUidProcessId(uid: string) { // 查询是否本进程的,在本进程才允许清除 let pid = await redisClient.hGet(G.redis.fromatKey("uid2processId"), uid) if (~~pid == process.pid) { delete uid2processId[uid]; redisClient.hDel(G.redis.fromatKey("uid2processId"), uid); clusterPublish("delUidProcessId", JSON.stringify({ "uid": uid })) } } /** * 获取uid所在的processId * @param uid * @returns */ function getUidProcessId(uid: string) { return uid2processId[uid] || null; } /** * 从redis中还原玩家的pid信息 */ async function initUid2processId() { let info = await redisClient.hGetAll(G.redis.fromatKey("uid2processId")) for (let uid in info) { uid2processId[uid] = info[uid]; } } /** * 初始化订阅redis */ async function initSubscribeRedis() { subscribeRedis = createClient({url: G.config.redisUrl}); await subscribeRedis.connect(); //订阅其他进程用户登陆信息 clusterSubscribe('setUid2ProcessId', function (msg) { let data = JSON.parse(msg); uid2processId[data.uid] = data.processId; }); //订阅其他进程用户登出信息 clusterSubscribe('delUidProcessId', function (msg) { let data = JSON.parse(msg); delete uid2processId[data.uid]; }); //订阅其他进程要求的向指定用户发送信息 clusterSubscribe('sendMsgByUid', function (msg) { let data = JSON.parse(msg); // 消息内包含pid时,对比本地pid,本地过滤此消息。 if (data?.pid && data.pid != '0' && data.pid == process.pid) return //如果在本进程才发送,避免在sendMsgByUid时递归publish if (uid2processId[data.uid] == process.pid) { G.server.sendMsgByUid(data.uid, data.type, data.val); } }); //订阅其他进程广播信息 clusterSubscribe('broadcastClusterMsg', function (msg) { let data = JSON.parse(msg); if (!data.filter) { //如果不是有条件发送的话 G.server.broadcastMsg(data.msgName, data.msg); } else { //但是如果是有条件发送的话 //条件里可能需要进程内的信息,则需要每个进程都分别筛选后执行 if (data.filter.ghId != null) { //指定公会 let conns = G.server.connections; conns = conns.filter(c => c?.gud?.ghId == data.filter.ghId); conns.length > 0 && G.server.broadcastMsg(data.msgName, data.msg, conns); } } }); //订阅其他进程跨进程执行方法 clusterSubscribe('RunclusterFunction', function (msg) { let data = JSON.parse(msg); if (uid2processId[data.uid] == process.pid) { clusterFun[data.fun].call(this, ...data.arg); } }); } export async function clusterMain() { if (G.argv.serverType != "cross") { //玩家登出时,清理 G.on('PLAYER_DISCONNECT', function (uid) { delUidProcessId(uid); }); //初始化订阅redis await initSubscribeRedis(); if (process.env.pm_id != null) { //pm2启动的,设置key为我的pid firstPid = await redisClient.get(G.redis.fromatKey("firstPid")); if (!firstPid) { //设置key为我的pid,有效期10秒 await redisClient.set(G.redis.fromatKey("firstPid"), process.pid, {EX: 10}); } firstPid = await redisClient.get(G.redis.fromatKey("firstPid")); clusterRunOnce(() => { redisClient.del(G.redis.fromatKey("uid2processId")); }); } //初始化玩家pid信息 await initUid2processId(); } } // WsServer 扩展,向单个玩家推送消息 // 考虑集群情况 // checkPid: 本地pid,存在时,接收消息的处理会过滤本pid的处理。 WsServer.prototype.sendMsgByUid = async function (uid, type, val, checkPid = "0") { if (uid2processId[uid] == process.pid) { //这个用户在本进程,直接发送 if (G.server.uid_connections[uid]?.status == ConnectionStatus.Opened) { G.server.uid_connections[uid].sendMsg(type, val); } } else { clusterPublish("sendMsgByUid", JSON.stringify({ "uid": uid, "type": type, "val": val, "pid": checkPid })) } }; //向整个集群广播信息 WsServer.prototype.broadcastClusterMsg = async function (msgName, msg, filter): Promise<{ isSucc: true; } | { isSucc: false; errMsg: string; }> { clusterPublish("broadcastClusterMsg", JSON.stringify({ "msgName": msgName, "msg": msg, "filter": filter })) return { isSucc: true }; };