Files
tk-electron-ai/js/rabbitmq-service.js

311 lines
10 KiB
JavaScript
Raw Normal View History

2025-10-28 19:40:13 +08:00
// rabbitmq-live-client.js (CommonJS)
const amqp = require('amqplib');
const { EventEmitter } = require('events');
const CFG = {
protocol: process.env.RABBIT_PROTOCOL || 'amqp', // 'amqp' | 'amqps'
// host: process.env.RABBIT_HOST || '192.168.1.144',
host: process.env.RABBIT_HOST || 'crawlclient.api.yolozs.com',
port: Number(process.env.RABBIT_PORT || 5672),
user: process.env.RABBIT_USER || 'tkdata',
pass: process.env.RABBIT_PASS || '6rARaRj8Z7UG3ahLzh',
vhost: process.env.RABBIT_VHOST || '/',
heartbeat: Number(process.env.RABBIT_HEARTBEAT || 60), // <-- 关键:心跳
frameMax: Number(process.env.RABBIT_FRAME_MAX || 0), // 0=默认;可调大以减少分片
};
let conn = null;
let pubCh = null; // 发布 Confirm Channel
let conCh = null; // 消费 Channel
const emitter = new EventEmitter();
const consumers = new Map(); // queueName -> { onMessage, options, consumerTag }
let reconnecting = false;
let closing = false;
let backoff = 1000; // ms
const MAX_BACKOFF = 15000;
let reconnectTimer = null;
// —— 工具:序列化消息
function toBuffer(payload) {
if (Buffer.isBuffer(payload)) return payload;
if (typeof payload === 'string') return Buffer.from(payload);
return Buffer.from(JSON.stringify(payload));
}
// —— 内部建立连接含心跳、keepalive、事件
async function createConnection() {
const connection = await amqp.connect({
protocol: CFG.protocol,
hostname: CFG.host,
port: CFG.port,
username: CFG.user,
password: CFG.pass,
vhost: CFG.vhost,
heartbeat: CFG.heartbeat,
frameMax: CFG.frameMax > 0 ? CFG.frameMax : undefined,
// 也可用 URL 形式:`amqp://u:p@host:5672/vhost?heartbeat=60`
});
// 打开 TCP keepalive降低 NAT/空闲超时断开的概率
try {
const stream = connection.stream || connection.socket;
if (stream?.setKeepAlive) stream.setKeepAlive(true, 15_000); // 15s
} catch (_) { }
connection.on('error', (e) => {
// 心跳超时常见,避免重复噪音
const msg = e?.message || String(e);
if (msg && /heartbeat/i.test(msg)) {
console.error('[AMQP] connection error (heartbeat):', msg);
} else {
console.error('[AMQP] connection error:', msg);
}
emitter.emit('error', e);
});
connection.on('close', () => {
if (closing) return; // 正在关闭时不重连
console.error('[AMQP] connection closed');
conn = null; pubCh = null; conCh = null;
scheduleReconnect();
});
// Broker 侧内存/磁盘压力会 block 连接
connection.on('blocked', (reason) => {
console.warn('[AMQP] connection blocked by broker:', reason);
emitter.emit('blocked', reason);
});
connection.on('unblocked', () => {
console.log('[AMQP] connection unblocked');
emitter.emit('unblocked');
});
console.log(`[AMQP] connected to ${CFG.host} (hb=${CFG.heartbeat}s)`);
return connection;
}
// —— 内部:确保连接和通道存在
async function ensureChannels() {
if (!conn) conn = await createConnection();
if (!pubCh) {
pubCh = await conn.createConfirmChannel();
pubCh.on('error', e => console.error('[AMQP] pub channel error:', e?.message || e));
pubCh.on('close', () => { pubCh = null; if (!closing) scheduleReconnect(); });
}
if (!conCh) {
conCh = await conn.createChannel();
conCh.on('error', e => console.error('[AMQP] con channel error:', e?.message || e));
conCh.on('close', () => { conCh = null; if (!closing) scheduleReconnect(); });
}
}
// —— 内部:安排重连(指数退避 + 抖动,且只触发一个循环)
function scheduleReconnect() {
if (reconnecting || closing) return;
reconnecting = true;
if (reconnectTimer) { clearTimeout(reconnectTimer); reconnectTimer = null; }
const attempt = async () => {
if (closing) return;
try {
await ensureChannels();
await resumeConsumers(); // 恢复所有消费
reconnecting = false;
backoff = 1000;
emitter.emit('reconnected');
console.log('[AMQP] reconnected and consumers resumed');
} catch (e) {
const base = Math.min(backoff, MAX_BACKOFF);
// 加抖动,避免雪崩:在 75%~125% 之间浮动
const jitter = base * (0.75 + Math.random() * 0.5);
console.warn(`[AMQP] reconnect failed: ${e?.message || e}; retry in ${Math.round(jitter)}ms`);
backoff = Math.min(backoff * 1.6, MAX_BACKOFF);
reconnectTimer = setTimeout(attempt, jitter);
}
};
reconnectTimer = setTimeout(attempt, backoff);
}
// —— 内部:恢复所有消费者
async function resumeConsumers() {
if (!conCh) return;
for (const [queue, c] of consumers.entries()) {
try {
if (c.consumerTag) await conCh.cancel(c.consumerTag);
} catch (_) { }
const tag = await startOneConsumer(queue, c.onMessage, c.options, true);
c.consumerTag = tag.consumerTag;
}
}
// —— 内部:启动一个消费者
async function startOneConsumer(queueName, onMessage, options = {}, isResume = false) {
await ensureChannels();
const {
prefetch = 1,
durable = true,
assertQueue = true,
requeueOnError = false,
// 可选exclusive, arguments, dead-letter 等
} = options;
if (assertQueue) {
await conCh.assertQueue(queueName, { durable });
}
await conCh.prefetch(prefetch);
const consumeResult = await conCh.consume(
queueName,
async (msg) => {
if (!msg) return;
const raw = msg.content;
const text = raw?.toString?.() ?? '';
let json;
try { json = JSON.parse(text); } catch (_) { /* 忽略解析失败 */ }
const payload = {
raw, text, json,
fields: msg.fields,
properties: msg.properties,
ack: () => { try { conCh.ack(msg); } catch (_) { } },
nack: (requeue = requeueOnError) => { try { conCh.nack(msg, false, requeue); } catch (_) { } },
};
try {
emitter.emit('message', queueName, payload);
await onMessage(payload); // 业务回调
payload.ack();
} catch (err) {
emitter.emit('handlerError', queueName, err, payload);
payload.nack(requeueOnError);
}
},
{ noAck: false }
);
if (!isResume) {
consumers.set(queueName, { onMessage, options, consumerTag: consumeResult.consumerTag });
}
console.log(`[*] consuming "${queueName}" (prefetch=${prefetch})`);
return consumeResult;
}
// —— 对外 API启动消费
async function startConsumer(queueName, onMessage, options = {}) {
if (!queueName) throw new Error('queueName 必填');
if (typeof onMessage !== 'function') throw new Error('onMessage 回调必填');
const res = await startOneConsumer(queueName, onMessage, options, false);
return {
emitter,
async stop() {
const c = consumers.get(queueName);
if (c?.consumerTag && conCh) {
await conCh.cancel(c.consumerTag).catch(() => { });
}
consumers.delete(queueName);
return res;
}
};
}
// —— 对外 API发送到队列支持 confirm
async function publishToQueue(queueName, payload, options = {}) {
if (!queueName) throw new Error('queueName 必填');
await ensureChannels();
const {
durable = true,
persistent = true,
assertQueue = true,
confirm = true,
headers = {},
mandatory = false, // 可选:不可路由返回
} = options;
if (assertQueue) {
await pubCh.assertQueue(queueName, { durable });
}
const ok = pubCh.sendToQueue(queueName, toBuffer(payload), { persistent, headers, mandatory });
if (!ok) await new Promise(r => pubCh.once('drain', r));
if (confirm) await pubCh.waitForConfirms();
}
// —— 对外 API发送到交换机支持 confirm
async function publishToExchange(exchange, routingKey, payload, options = {}) {
if (!exchange) throw new Error('exchange 必填');
await ensureChannels();
const {
type = 'direct',
durable = true,
assertExchange = true,
confirm = true,
persistent = true,
headers = {},
mandatory = false,
} = options;
if (assertExchange) {
await pubCh.assertExchange(exchange, type, { durable });
}
const ok = pubCh.publish(exchange, routingKey || '', toBuffer(payload), { persistent, headers, mandatory });
if (!ok) await new Promise(r => pubCh.once('drain', r));
if (confirm) await pubCh.waitForConfirms();
}
// —— 对外 API主动重连给 Electron 恢复/网络变化时调用)
async function reconnectNow() {
if (closing) return;
if (reconnecting) return;
try {
if (pubCh) await pubCh.close().catch(() => { });
if (conCh) await conCh.close().catch(() => { });
if (conn) await conn.close().catch(() => { });
} catch (_) { }
pubCh = null; conCh = null; conn = null;
reconnecting = false;
scheduleReconnect();
}
// —— 关闭
async function close() {
closing = true;
if (reconnectTimer) { clearTimeout(reconnectTimer); reconnectTimer = null; }
try {
// 取消所有消费者
for (const [q, c] of consumers.entries()) {
if (c?.consumerTag && conCh) {
await conCh.cancel(c.consumerTag).catch(() => { });
}
}
} catch (_) { }
consumers.clear();
try { if (pubCh) await pubCh.close(); } catch (_) { }
try { if (conCh) await conCh.close(); } catch (_) { }
try { if (conn) await conn.close(); } catch (_) { }
pubCh = null; conCh = null; conn = null;
}
// —— 进程信号(可选)
process.once('SIGINT', async () => { try { await close(); } finally { process.exit(0); } });
process.once('SIGTERM', async () => { try { await close(); } finally { process.exit(0); } });
module.exports = {
startConsumer,
publishToQueue,
publishToExchange,
reconnectNow,
close,
emitter, // 可订阅 'message' / 'handlerError' / 'reconnected' / 'error' / 'blocked' / 'unblocked'
};