Files
tk-electron-ai/js/rabbitmq-service.js
2026-01-12 16:08:42 +08:00

364 lines
12 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// rabbitmq-live-client.js (CommonJS)
const amqp = require('amqplib');
const { EventEmitter } = require('events');
const CFG = {
protocol: process.env.RABBIT_PROTOCOL || 'amqp', // 'amqp' | 'amqps'
// host: process.env.RABBIT_HOST || '192.168.1.144',
// host: process.env.RABBIT_HOST || 'crawlclient.api.yolozs.com',
host: process.env.RABBIT_HOST || '47.79.98.113',
port: Number(process.env.RABBIT_PORT || 5672),
user: process.env.RABBIT_USER || 'tkdata',
pass: process.env.RABBIT_PASS || '6rARaRj8Z7UG3ahLzh',
vhost: process.env.RABBIT_VHOST || '/',
heartbeat: Number(process.env.RABBIT_HEARTBEAT || 60), // <-- 关键:心跳
frameMax: Number(process.env.RABBIT_FRAME_MAX || 0), // 0=默认;可调大以减少分片
};
let conn = null;
let connectionLock = null; // 🔒 连接锁,防止并发创建
let pubCh = null; // 发布 Confirm Channel
let conCh = null; // 消费 Channel
const emitter = new EventEmitter();
const consumers = new Map(); // queueName -> { onMessage, options, consumerTag }
let reconnecting = false;
let closing = false;
let backoff = 1000; // ms
const MAX_BACKOFF = 15000;
let reconnectTimer = null;
// —— 工具:序列化消息
function toBuffer(payload) {
if (Buffer.isBuffer(payload)) return payload;
if (typeof payload === 'string') return Buffer.from(payload);
return Buffer.from(JSON.stringify(payload));
}
// —— 内部建立连接含心跳、keepalive、事件
async function createConnection() {
// 1. 如果已有连接,直接返回
if (conn) return conn;
// 2. 如果正在连接中,等待它完成
if (connectionLock) {
return connectionLock;
}
// 3. 开始新连接,加锁
connectionLock = (async () => {
try {
console.log(`[AMQP] 开始连接 ${CFG.host}...`);
const connection = await amqp.connect({
protocol: CFG.protocol,
hostname: CFG.host,
port: CFG.port,
username: CFG.user,
password: CFG.pass,
vhost: CFG.vhost,
heartbeat: CFG.heartbeat,
frameMax: CFG.frameMax > 0 ? CFG.frameMax : undefined,
});
// 如果在连接过程中被要求关闭race condition则立刻关闭并抛错
if (closing) {
console.warn('[AMQP] 连接刚建立但 Detected closing=true, closing now...');
connection.close().catch(() => { });
throw new Error('Connection aborted (closing)');
}
// 打开 TCP keepalive
try {
const stream = connection.stream || connection.socket;
if (stream?.setKeepAlive) stream.setKeepAlive(true, 15_000);
} catch (_) { }
connection.on('error', (e) => {
const msg = e?.message || String(e);
if (msg && /heartbeat/i.test(msg)) {
console.error('[AMQP] 连接错误 (心跳):', msg);
} else {
console.error('[AMQP] 连接错误:', msg);
}
emitter.emit('error', e);
});
connection.on('close', () => {
if (closing) return;
console.error('[AMQP] 连接已关闭');
conn = null; pubCh = null; conCh = null;
scheduleReconnect();
});
connection.on('blocked', (reason) => {
console.warn('[AMQP] 连接被代理阻塞::', reason);
emitter.emit('blocked', reason);
});
connection.on('unblocked', () => {
console.log('[AMQP] 链接解锁');
emitter.emit('unblocked');
});
console.log(`[AMQP] 已连接到 ${CFG.host} (hb=${CFG.heartbeat}s)`);
conn = connection; // ✅ 赋值给全局变量
return connection;
} catch (err) {
console.error('[AMQP] createConnection failed:', err.message);
throw err;
} finally {
connectionLock = null; // 🔓 解锁
}
})();
return connectionLock;
}
// —— 内部:确保连接和通道存在
async function ensureChannels() {
if (!conn) conn = await createConnection();
if (!pubCh) {
pubCh = await conn.createConfirmChannel();
pubCh.on('error', e => console.error('[AMQP] 通道错误:', e?.message || e));
pubCh.on('close', () => { pubCh = null; if (!closing) scheduleReconnect(); });
}
if (!conCh) {
conCh = await conn.createChannel();
conCh.on('error', e => console.error('[AMQP] 通道错误:', e?.message || e));
conCh.on('close', () => { conCh = null; if (!closing) scheduleReconnect(); });
}
}
// —— 内部:安排重连(指数退避 + 抖动,且只触发一个循环)
function scheduleReconnect() {
if (reconnecting || closing) return;
reconnecting = true;
if (reconnectTimer) { clearTimeout(reconnectTimer); reconnectTimer = null; }
const attempt = async () => {
if (closing) return;
try {
await ensureChannels();
await resumeConsumers(); // 恢复所有消费
reconnecting = false;
backoff = 1000;
emitter.emit('reconnected');
console.log('[AMQP] 重新连接并恢复了消费者');
} catch (e) {
const base = Math.min(backoff, MAX_BACKOFF);
// 加抖动,避免雪崩:在 75%~125% 之间浮动
const jitter = base * (0.75 + Math.random() * 0.5);
console.warn(`[AMQP] 重连失败: ${e?.message || e}; retry in ${Math.round(jitter)}ms`);
backoff = Math.min(backoff * 1.6, MAX_BACKOFF);
reconnectTimer = setTimeout(attempt, jitter);
}
};
reconnectTimer = setTimeout(attempt, backoff);
}
// —— 内部:恢复所有消费者
async function resumeConsumers() {
if (!conCh) return;
for (const [queue, c] of consumers.entries()) {
try {
if (c.consumerTag) await conCh.cancel(c.consumerTag);
} catch (_) { }
const tag = await startOneConsumer(queue, c.onMessage, c.options, true);
c.consumerTag = tag.consumerTag;
}
}
// —— 内部:启动一个消费者
async function startOneConsumer(queueName, onMessage, options = {}, isResume = false) {
await ensureChannels();
const {
prefetch = 1,
durable = true,
assertQueue = true,
requeueOnError = false,
// 可选exclusive, arguments, dead-letter 等
} = options;
if (assertQueue) {
await conCh.assertQueue(queueName, { durable });
}
await conCh.prefetch(prefetch);
const consumeResult = await conCh.consume(
queueName,
async (msg) => {
if (!msg) return;
const raw = msg.content;
const text = raw?.toString?.() ?? '';
let json;
try { json = JSON.parse(text); } catch (_) { /* 忽略解析失败 */ }
const payload = {
raw, text, json,
fields: msg.fields,
properties: msg.properties,
ack: () => { try { conCh.ack(msg); } catch (_) { } },
nack: (requeue = requeueOnError) => { try { conCh.nack(msg, false, requeue); } catch (_) { } },
};
try {
emitter.emit('message', queueName, payload);
await onMessage(payload); // 业务回调
payload.ack();
} catch (err) {
emitter.emit('handlerError', queueName, err, payload);
payload.nack(requeueOnError);
}
},
{ noAck: false }
);
if (!isResume) {
consumers.set(queueName, { onMessage, options, consumerTag: consumeResult.consumerTag });
}
console.log(`[*] 消费 "${queueName}" (预取=${prefetch})`);
return consumeResult;
}
// —— 对外 API启动消费
async function startConsumer(queueName, onMessage, options = {}) {
if (!queueName) throw new Error('queueName 必填');
if (typeof onMessage !== 'function') throw new Error('onMessage 回调必填');
const res = await startOneConsumer(queueName, onMessage, options, false);
return {
emitter,
async stop() {
const c = consumers.get(queueName);
if (c?.consumerTag && conCh) {
await conCh.cancel(c.consumerTag).catch(() => { });
}
consumers.delete(queueName);
return res;
}
};
}
// —— 对外 API发送到队列支持 confirm
async function publishToQueue(queueName, payload, options = {}) {
if (!queueName) throw new Error('queueName 必填');
await ensureChannels();
const {
durable = true,
persistent = true,
assertQueue = true,
confirm = true,
headers = {},
mandatory = false, // 可选:不可路由返回
} = options;
if (assertQueue) {
await pubCh.assertQueue(queueName, { durable });
}
const ok = pubCh.sendToQueue(queueName, toBuffer(payload), { persistent, headers, mandatory });
if (!ok) await new Promise(r => pubCh.once('drain', r));
if (confirm) await pubCh.waitForConfirms();
}
// —— 对外 API发送到交换机支持 confirm
async function publishToExchange(exchange, routingKey, payload, options = {}) {
if (!exchange) throw new Error('exchange 必填');
await ensureChannels();
const {
type = 'direct',
durable = true,
assertExchange = true,
confirm = true,
persistent = true,
headers = {},
mandatory = false,
// mandatory = false,
} = options;
if (assertExchange) {
await pubCh.assertExchange(exchange, type, { durable });
}
const ok = pubCh.publish(exchange, routingKey || '', toBuffer(payload), { persistent, headers, mandatory });
if (!ok) await new Promise(r => pubCh.once('drain', r));
if (confirm) await pubCh.waitForConfirms();
}
// —— 对外 API主动重连给 Electron 恢复/网络变化时调用)
async function reconnectNow() {
if (closing) return;
if (reconnecting) return;
// 如果正在连接,也视为正在重连/忙碌,稍后
if (connectionLock) return;
try {
if (pubCh) await pubCh.close().catch(() => { });
if (conCh) await conCh.close().catch(() => { });
if (conn) await conn.close().catch(() => { });
} catch (_) { }
pubCh = null; conCh = null; conn = null;
reconnecting = false;
scheduleReconnect();
}
// —— 关闭
async function close() {
closing = true;
if (reconnectTimer) { clearTimeout(reconnectTimer); reconnectTimer = null; }
// 关键修复:如果正在建立连接,必须等待它完成(或失败),再执行关闭
// 否则 amqplib 的 "socket closed abruptly" 错误会频发
if (connectionLock) {
try {
console.log('[AMQP] close() called while connecting, waiting...');
await connectionLock;
} catch (_) {
// 忽略连接失败
}
}
try {
// 取消所有消费者
for (const [q, c] of consumers.entries()) {
if (c?.consumerTag && conCh) {
await conCh.cancel(c.consumerTag).catch(() => { });
}
}
} catch (_) { }
consumers.clear();
try { if (pubCh) await pubCh.close(); } catch (_) { }
try { if (conCh) await conCh.close(); } catch (_) { }
try { if (conn) await conn.close(); } catch (_) { }
pubCh = null; conCh = null; conn = null;
}
// —— 进程信号(可选)
async function open() {
closing = false;
reconnecting = false;
if (reconnectTimer) { clearTimeout(reconnectTimer); reconnectTimer = null; }
backoff = 1000;
await ensureChannels();
}
process.once('SIGINT', async () => { try { await close(); } finally { process.exit(0); } });
process.once('SIGTERM', async () => { try { await close(); } finally { process.exit(0); } });
module.exports = {
startConsumer,
publishToQueue,
publishToExchange,
open,
reconnectNow,
close,
emitter, // 可订阅 'message' / 'handlerError' / 'reconnected' / 'error' / 'blocked' / 'unblocked'
};