修复定时器重复执行导致邮件重复发送问题
This commit is contained in:
xcy 2023-12-20 19:05:09 +08:00
parent dacd312bfe
commit 23e1941812

View File

@ -2,13 +2,14 @@ import {RedisClientType, createClient} from "redis";
import {redisClient} from "./setRedis"; import {redisClient} from "./setRedis";
import {ConnectionStatus, WsServer} from "tsrpc"; import {ConnectionStatus, WsServer} from "tsrpc";
import cluster from 'cluster'; import cluster from 'cluster';
import { clusterFun } from "./clusterFunction"; import {clusterFun} from "./clusterFunction";
//维护当前uid和pid的对应关系 //维护当前uid和pid的对应关系
let uid2processId = {}; let uid2processId = {};
//订阅redis //订阅redis
let subscribeRedis: RedisClientType let subscribeRedis: RedisClientType
let firstPid = null; let firstPid = null;
//发布和可写redis复用原 setRedis 里的redisClient //发布和可写redis复用原 setRedis 里的redisClient
@ -20,10 +21,10 @@ let firstPid = null;
*/ */
export function clusterSubscribe(key: string, callback: Function) { export function clusterSubscribe(key: string, callback: Function) {
subscribeRedis.subscribe(G.redis.fromatKey(key), function (msg) { subscribeRedis.subscribe(G.redis.fromatKey(key), function (msg) {
try{ try {
callback(msg); callback(msg);
}catch(e){ } catch (e) {
console.error("clusterSubscribe error",e); console.error("clusterSubscribe error", e);
} }
}); });
} }
@ -43,14 +44,19 @@ export function clusterPublish(key: string, data: any) {
* N个进程中使 * N个进程中使
*/ */
export function clusterRunOnce(fun) { export function clusterRunOnce(fun) {
console.log(`${process.pid}环境变量pm_id===>${process.env.pm_id}`);
if (process.env.pm_id == null || process.env.pm_id === '0') { if (process.env.pm_id == null || process.env.pm_id === '0') {
//非pm2启动的或是pm2下启动的第一个进程 //非pm2启动的或是pm2下启动的第一个进程
console.log("run clusterRunOnce1 ===>", process.pid)
fun(); fun();
return; return;
} }
if(firstPid == process.pid){ if(G.config.isG123)return;
if (firstPid == process.pid) {
//pm2的其中一个进程 //pm2的其中一个进程
console.log("run clusterRunOnce2 ===>", process.pid)
fun(); fun();
return; return;
} }
@ -60,16 +66,16 @@ export function clusterRunOnce(fun) {
* *
* uid所在的进程执行fun方法uid不在任何进程的话 * uid所在的进程执行fun方法uid不在任何进程的话
*/ */
export function clusterFunctionRunAtUidProcess(uid:string, fun:string, ...arg:any[]) { export function clusterFunctionRunAtUidProcess(uid: string, fun: string, ...arg: any[]) {
if(!uid2processId[uid] || uid2processId[uid] == process.pid){ if (!uid2processId[uid] || uid2processId[uid] == process.pid) {
clusterFun[ fun ].call(this, ...arg); clusterFun[fun].call(this, ...arg);
return; return;
} }
clusterPublish("RunclusterFunction", JSON.stringify({ clusterPublish("RunclusterFunction", JSON.stringify({
"uid": uid, "uid": uid,
"fun": fun, "fun": fun,
"arg" : arg "arg": arg
})) }))
} }
@ -159,17 +165,17 @@ async function initSubscribeRedis() {
clusterSubscribe('broadcastClusterMsg', function (msg) { clusterSubscribe('broadcastClusterMsg', function (msg) {
let data = JSON.parse(msg); let data = JSON.parse(msg);
if(!data.filter){ if (!data.filter) {
//如果不是有条件发送的话 //如果不是有条件发送的话
G.server.broadcastMsg(data.msgName, data.msg); G.server.broadcastMsg(data.msgName, data.msg);
}else { } else {
//但是如果是有条件发送的话 //但是如果是有条件发送的话
//条件里可能需要进程内的信息,则需要每个进程都分别筛选后执行 //条件里可能需要进程内的信息,则需要每个进程都分别筛选后执行
if (data.filter.ghId != null) { if (data.filter.ghId != null) {
//指定公会 //指定公会
let conns = G.server.connections; let conns = G.server.connections;
conns = conns.filter(c => c?.gud?.ghId == data.filter.ghId); conns = conns.filter(c => c?.gud?.ghId == data.filter.ghId);
conns.length>0 && G.server.broadcastMsg(data.msgName, data.msg, conns); conns.length > 0 && G.server.broadcastMsg(data.msgName, data.msg, conns);
} }
} }
}); });
@ -178,7 +184,7 @@ async function initSubscribeRedis() {
clusterSubscribe('RunclusterFunction', function (msg) { clusterSubscribe('RunclusterFunction', function (msg) {
let data = JSON.parse(msg); let data = JSON.parse(msg);
if (uid2processId[data.uid] == process.pid) { if (uid2processId[data.uid] == process.pid) {
clusterFun[ data.fun ].call(this, ...data.arg); clusterFun[data.fun].call(this, ...data.arg);
} }
}); });
} }
@ -194,7 +200,7 @@ export async function clusterMain() {
//初始化订阅redis //初始化订阅redis
await initSubscribeRedis(); await initSubscribeRedis();
if(process.env.pm_id != null){ if (process.env.pm_id != null) {
//pm2启动的设置key为我的pid //pm2启动的设置key为我的pid
firstPid = await redisClient.get(G.redis.fromatKey("firstPid")); firstPid = await redisClient.get(G.redis.fromatKey("firstPid"));
if (!firstPid) { if (!firstPid) {