diff --git a/js/preload.js b/js/preload.js index c6b49dd..c442006 100644 --- a/js/preload.js +++ b/js/preload.js @@ -7,6 +7,8 @@ contextBridge.exposeInMainWorld('electronAPI', { manualGc: () => ipcRenderer.invoke('manual-gc'), mqSend: (arg) => ipcRenderer.invoke('mq-send', arg), startMq: (tendid, id) => ipcRenderer.invoke('start-mq', tendid, id), + // 新增 toggle 接口 + toggleMq: (type, enabled) => ipcRenderer.invoke('mq-toggle', type, enabled), fileExists: (url) => ipcRenderer.invoke('file-exists', url), isiproxy: (url) => ipcRenderer.invoke('isiproxy', url), getVersion: () => ipcRenderer.invoke('getVersion'), diff --git a/main.js b/main.js index 2f1b22a..ba12c36 100644 --- a/main.js +++ b/main.js @@ -2,7 +2,6 @@ const { app, globalShortcut, BrowserWindow, net, dialog, ipcMain } = require('el const { startSSE } = require('./js/sse-server'); const { createBurstBroadcaster } = require('./js/burst-broadcast'); const mq = require('./js/rabbitmq-service'); -const https = require('https') const axios = require('axios'); const os = require('os'); const fsp = require('node:fs/promises') @@ -28,10 +27,6 @@ const LOCAL_HTTPS_WHITELIST = [ ] let userData = { tenantId: null, userId: null } -let mqEnabled = true; -let mqActive = false; -const mqQueueEnabled = { crawler: true, boss: true }; -const mqConsumers = new Map(); const { exec, spawn, execFile } = require('child_process'); // 如果你用 exec 启动 scrcpy,保留此行 // app.commandLine.appendSwitch('remote-debugging-port', '9222'); //远程控制台端口F12 @@ -314,75 +309,132 @@ function dumpAllMem() { // 在函数外定义计数器(或者放在函数内部,用闭包封装) let consumeCount = 0; -function syncMqEnabled() { - mqEnabled = Boolean(mqQueueEnabled.crawler || mqQueueEnabled.boss); -} +// 存储活跃的消费者 { key: configObject } +const activeConsumers = new Map(); +// 辅助函数:生成消费者 key +const getConsumerKey = (type, tenantId) => `${type}:${tenantId}`; -function getQueueNameByType(type, tenantId) { - if (type === 'crawler') return `q.tenant.${tenantId}`; - if (type === 'boss') return `b.tenant.${tenantId}`; - return null; -} +/** + * 启动指定类型的 MQ 消费者 + * @param {string} type 'crawler' (q.tenant.*) | 'boss' (b.tenant.*) + * @param {string} tenantId + * @param {Function} emitMessage 广播函数 + */ +async function startConsumerByType(type, tenantId, emitMessage) { + const key = getConsumerKey(type, tenantId); -async function startMQConsumer(queueName, emitMessage) { - if (!queueName || mqConsumers.has(queueName)) return; - const consumer = await mq.startConsumer( - queueName, - async (msg) => { - const payload = msg.json ?? msg.text; - consumeCount++; - - const isBurstQueue = queueName.startsWith('b.'); - const meta = isBurstQueue ? 2 : 1; - - console.log( - `[MQ] [${queueName}]`, - payload?.hostsId, - payload?.country, - 'count=', - consumeCount - ); - - const wrapped = { - ...payload, - _mqMeta: meta - }; - - emitMessage(wrapped); - }, - { prefetch: 1, requeueOnError: false, durable: true, assertQueue: true } - ); - mqConsumers.set(queueName, consumer); -} - -async function stopMQConsumer(queueName) { - const consumer = mqConsumers.get(queueName); - if (!consumer) { - console.warn('[MQ] stopMQConsumer: no consumer for', queueName); + // 防止重复启动 + if (activeConsumers.has(key)) { + console.log(`[MQ] ${type} 消费者已在运行,跳过启动`); return; } - if (consumer?.stop) { - await consumer.stop().catch(() => { }); + + let qName = ''; + // 根据类型决定队列名 + if (type === 'crawler') { + qName = `q.tenant.${tenantId}`; + } else if (type === 'boss') { + qName = `b.tenant.${tenantId}`; + } else { + console.warn(`[MQ] 未知消费者类型: ${type}`); + return; + } + + console.log(`[MQ] 正在启动消费者: ${type} -> ${qName}`); + + try { + const consumer = await mq.startConsumer( + qName, + async (msg) => { + const payload = msg.json ?? msg.text; // 原始业务数据 + consumeCount++; // 所有队列共用计数器 + + // 标记来源类型:爬虫=1 (q.tenant.*) / 大哥=2 (b.tenant.*) + const meta = (type === 'boss') ? 2 : 1; + + console.log( + `[MQ消费] [${qName}]`, + payload?.hostsId, + payload?.country, + 'Start' // 简单标记一下 + ); + + // ⚠️ 关键:在原有 payload 的基础上,增加 _mqMeta 字段 + const wrapped = { + ...payload, + _mqMeta: meta + }; + + // 广播到前端 + emitMessage(wrapped); + // 成功返回会在 mq 客户端内部自动 ack + }, + { prefetch: 1, requeueOnError: false, durable: true, assertQueue: true } + ); + console.log('启动成功') + // 记录下来,以便后续关闭 + activeConsumers.set(key, { + type, + tenantId, + qName, + consumer // 包含 .stop() 方法 + }); + + } catch (err) { + console.error(`[MQ] 启动消费者失败 (${type}):`, err); } - mqConsumers.delete(queueName); } -async function setupMQConsumerAndPublisher(emitMessage, tenantId) { - syncMqEnabled(); - if (!mqEnabled) return; - for (const type of Object.keys(mqQueueEnabled)) { - if (!mqQueueEnabled[type]) continue; - const qName = getQueueNameByType(type, tenantId); - await startMQConsumer(qName, emitMessage); +/** + * 停止指定类型的 MQ 消费者 + */ +async function stopConsumerByType(type, tenantId) { + const key = getConsumerKey(type, tenantId); + const item = activeConsumers.get(key); + + if (!item) { + // console.log(`[MQ] ${type} 消费者未运行,无需停止`); + return; } - mqActive = mqConsumers.size > 0; + console.log(`[MQ] 正在停止消费者: ${type} -> ${item.qName}`); + try { + if (item.consumer && typeof item.consumer.stop === 'function') { + await item.consumer.stop(); + console.log(`[MQ] 停止消费者成功: ${type} -> ${item.qName}`); + } + } catch (e) { + console.warn(`[MQ] 停止消费者异常 (${type}):`, e); + } finally { + activeConsumers.delete(key); + } +} + +/** + * 初始化 MQ 管理器(替代原来的 setupMQConsumerAndPublisher) + * 现在它主要负责初始化“发送端”,并注册 toggle 监听 + */ +async function setupMQManager(emitMessage, tenantId) { + // 供渲染进程发送消息到队列(保持原来的 q.tenant.* 不变) ipcMain.removeHandler('mq-send'); ipcMain.handle('mq-send', async (_event, user) => { console.log('消息已发送', user); await mq.publishToQueue(`q.tenant.${tenantId}`, user); return { ok: true }; }); + + // 注册切换监听 + ipcMain.removeHandler('mq-toggle'); + ipcMain.handle('mq-toggle', async (_event, type, enabled) => { + // type: 'crawler' | 'boss' + console.log(`[MQ-Toggle] ${type} -> ${enabled}`); + if (enabled) { + await startConsumerByType(type, tenantId, emitMessage); + } else { + await stopConsumerByType(type, tenantId); + } + return { ok: true }; + }); } @@ -437,10 +489,8 @@ function createWindow() { // 自动判断环境使用不同的页面地址 const isProd = app.isPackaged; - // const targetURL = isProd ? 'https://iosaitest.yolozs.com' : 'http://192.168.2.128:8081'; + // const targetURL = isProd ? 'https://iosaitest.yolozs.com' : 'http://192.168.1.128:8080'; const targetURL = isProd ? 'https://iosai.yolozs.com' : 'http://192.168.2.128:8080'; - // const targetURL = 'https://iosai.yolozs.com'; - // const targetURL = 'http://192.168.2.128:8080'; console.log('[页面加载] 使用地址:', targetURL); let retryIntervalId = null; @@ -532,10 +582,7 @@ function createWindow() { } }); - ipcMain.handle('getVersion', async (_event, opts = {}) => { - return app.getVersion(); - }); // 仅检查固定路径:C:\Users\Administrator\IOSAI\aiConfig.json ipcMain.handle('file-exists', async () => { @@ -635,19 +682,10 @@ function createWindow() { // 如果没有把加载页文件放到指定位置,给个兜底 win.loadURL('data:text/html;charset=utf-8,' + encodeURIComponent('