From db5ade8ba0c1e3e25d95442477bd7dcab1ce5070 Mon Sep 17 00:00:00 2001 From: wangzhongjie Date: Mon, 18 Aug 2025 10:04:32 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E8=BF=87=E7=A8=8B=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E7=BA=BF=E7=A8=8B=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- entry/src/main/ets/api/judge.ets | 4 +- .../ets/pages/Judge/ProcessDataTaskPool.ets | 170 ++++++++++++++++++ entry/src/main/ets/pages/Judge/utils.ets | 10 +- entry/src/main/ets/utils/TcpUtils.ets | 14 +- 4 files changed, 184 insertions(+), 14 deletions(-) create mode 100644 entry/src/main/ets/pages/Judge/ProcessDataTaskPool.ets diff --git a/entry/src/main/ets/api/judge.ets b/entry/src/main/ets/api/judge.ets index fab8d59..2100034 100644 --- a/entry/src/main/ets/api/judge.ets +++ b/entry/src/main/ets/api/judge.ets @@ -22,7 +22,7 @@ export async function download() { }) } -interface WR { +export interface WR { message?: string code?: number keystr?: string @@ -112,7 +112,7 @@ export async function writeObjectOut(params: RegulatoryInterfaceParams, filePath `, method: http.RequestMethod.POST, xml: true - },) + }) if (filePath) { const fileUtil = new FileUtils(context); await fileUtil.initFolder(filePath); diff --git a/entry/src/main/ets/pages/Judge/ProcessDataTaskPool.ets b/entry/src/main/ets/pages/Judge/ProcessDataTaskPool.ets new file mode 100644 index 0000000..a9ced7b --- /dev/null +++ b/entry/src/main/ets/pages/Judge/ProcessDataTaskPool.ets @@ -0,0 +1,170 @@ +import { ProcessDataEnumType, RegulatoryInterfaceParams, WR, WuxiExamType } from '../../model'; +import taskpool from '@ohos.taskpool'; +import { dConsole } from '../../utils/LogWorker'; +import http from '@ohos.net.http'; +import Request from '../../utils/Request'; + +export class ProcessDataTaskPool { + private queue: RegulatoryInterfaceParams[] = [] + private isProcessing: boolean = false; + /** 最大重试次数。1次初次尝试 + 5次重试 = 总共6次尝试。 */ + private readonly maxRetries = 5; + + public addTask(dataItem: RegulatoryInterfaceParams): void { + console.info(`[Queue] 新任务已添加: ${JSON.stringify(dataItem)},当前队列长度: ${this.queue.length + 1}`); + this.queue.push(dataItem); // 将任务添加到队尾 + this.triggerProcessing(); // 尝试启动处理流程 + } + + /** + * 触发队列处理。 + * 如果当前没有正在处理的任务,则启动一个新的处理循环。 + */ + private triggerProcessing(): void { + if (this.isProcessing) { + console.log('[Queue] 处理器正在运行中,新任务将在稍后被处理。'); + return; // 如果已经在处理,则直接返回,新任务会被正在运行的循环消费掉 + } + // 使用 Promise.resolve().then() 来确保 processQueue 在下一个事件循环中异步执行 + // 这可以防止阻塞当前的 addTask 调用 + Promise.resolve().then(() => this.processQueue()); + } + + private async processQueue(): Promise { + this.isProcessing = true; + console.log(`[Queue] 启动处理器... 待处理任务数: ${this.queue.length}`); + + while (this.queue.length > 0) { + const taskData = this.queue[0]; // 查看队首任务 + + try { + console.log(`[Queue] 开始处理任务: ${JSON.stringify(taskData)}`); + // 此方法若成功则正常返回,若永久失败则会抛出错误 + let obj: WuxiExamType = { + xtlb: taskData.xtlb, + jkxlh: taskData.jkxlh, + jkid: taskData.jkid, + drvexam: { + zp: "", + }, + } + dConsole.writeProcessData(ProcessDataEnumType.WuxiExam, JSON.stringify(obj)) + await this.processSingleTaskWithRetries(taskData); + // 任务成功,将其从队列中移除 + this.queue.shift(); + console.log(`[Queue] ✅ 任务处理成功,已从队列移除。剩余任务: ${this.queue.length}`); + } catch (error) { + // 捕获到永久失败的错误 + console.error(`[Queue] 🔥 致命错误: ${(error as Error).message}`); + console.error('[Queue] 队列已停止,后续任务将不会被处理。'); + + // (可选)可以在此处清空队列,防止下次意外启动时处理旧任务 + this.queue = []; + + // 终止循环 + break; + } + } + + this.isProcessing = false; + console.log('[Queue] 处理器已停止。'); + } + + private async processSingleTaskWithRetries(taskData: RegulatoryInterfaceParams): Promise { + // 1 次初次尝试 + 5 次重试 + for (let attempt = 0; attempt <= this.maxRetries; attempt++) { + const attemptNum = attempt + 1; + try { + const attemptType = attempt === 0 ? '初次尝试' : `重试 ${attempt}`; + console.log(`[Queue] 开始上传 (${attemptType}, 总共第 ${attemptNum} 次): ${JSON.stringify(taskData)}`); + const result: WR = await taskpool.execute(uploadWorkerTask, taskData); + dConsole.writeProcessData(ProcessDataEnumType.WuxiExam, JSON.stringify(result)) + if (result.code === 1) { + console.log(`[Queue] ✔️ 上传成功 (在第 ${attemptNum} 次尝试)`); + return; // 成功,立即返回 + } + console.warn(`[Queue] ❌ 上传失败 (第 ${attemptNum} 次)。响应: ${result.message}`); + } catch (e) { + console.error(`[Queue] ❌ TaskPool 执行错误 (第 ${attemptNum} 次): ${e}`); + } + + // 如果这是最后一次尝试且依然失败,则不再等待,直接跳出循环去抛出错误 + if (attempt === this.maxRetries) { + break; + } + } + + // 如果循环结束,意味着所有尝试都失败了 + throw new Error(`任务 ${JSON.stringify(taskData)} 在 ${this.maxRetries + 1} 次尝试后永久失败。`); + } +} + + +/** + * 这是将在 Worker 线程中执行的任务函数。 + * 它负责执行单次的上传尝试。 + * + * @param data 需要上传的单条数据项 + * @returns 一个包含本次上传尝试结果的对象 + */ +export async function uploadWorkerTask(data: RegulatoryInterfaceParams): Promise { + let singlePlay: boolean = false + let isJGNew = false + try { + const response = await sendProcessData(data, singlePlay, isJGNew); + // 根据返回的 code 判断是否成功 + return response + } catch (err) { + // 捕获请求过程中可能出现的异常 + const error = err as Error; + console.error(`[Worker] 上传时发生异常: ${error.message}`); + return { code: 20038 }; + } +} + +async function sendProcessData(data: RegulatoryInterfaceParams, singlePlay: boolean, isJGNew: boolean): Promise { + if (singlePlay) { + return { code: 1 } + } + if (isJGNew) { + // 调用新监管 + } + data.drvexam = data.drvexam ?? {}; + data.drvexam.zp = data.drvexam.zp === undefined ? undefined : encodeURIComponent(data.drvexam.zp); + + const drvexamArr = Object.entries(data.drvexam) + .filter((item: [string, string]) => item[1] != undefined) + .map((item: [string, object]) => `<${item[0]}>${item[1]}`) + let JGHOST: string = AppStorage.get("JGHOST") || "" + return await Request({ + host: JGHOST, + url: '/dems_ws/services/TmriOutAccess?wsdl', + data: ` + + + + ${data.xtlb} + ${data.jkxlh} + ${data.jkid} + + + + + ${drvexamArr} + + + ]]> + + + + `, + method: http.RequestMethod.POST, + xml: true + }) +} + diff --git a/entry/src/main/ets/pages/Judge/utils.ets b/entry/src/main/ets/pages/Judge/utils.ets index e3f1c5c..dfb88e2 100644 --- a/entry/src/main/ets/pages/Judge/utils.ets +++ b/entry/src/main/ets/pages/Judge/utils.ets @@ -840,11 +840,11 @@ export const GetIsEndManualProject = (index: number, artSubject3Projects: string export const DeductionProjectConversion = (code: string, markRuleListObj: object): DeductionProjectConversionType => { const thisMark: MarkRule = Reflect.get(markRuleListObj, code) return { - desc: thisMark.markshow, - score: thisMark.markreal, - markcatalog: thisMark.markcatalog, - markserial: thisMark.markserial, - kfxh: thisMark.kfxh + desc: thisMark.markshow || "", + score: thisMark.markreal!, + markcatalog: thisMark.markcatalog || "", + markserial: thisMark.markserial || "", + kfxh: thisMark.kfxh || "" } } diff --git a/entry/src/main/ets/utils/TcpUtils.ets b/entry/src/main/ets/utils/TcpUtils.ets index acae0f5..f4fd3c8 100644 --- a/entry/src/main/ets/utils/TcpUtils.ets +++ b/entry/src/main/ets/utils/TcpUtils.ets @@ -45,7 +45,7 @@ export default class TcpClient { console.log(TCPTag, 'bindTcp success:', this.localIp, this.localIpPort, this.oppositeIp, this.oppositeIpPort) resolve(true) }).catch((err: BusinessError) => { - console.log(TCPTag, 'bindTcp error:', JSON.stringify(err), this.localIp, this.localIpPort, this.oppositeIp, this.oppositeIpPort) + console.error(TCPTag, 'bindTcp error:', JSON.stringify(err), this.localIp, this.localIpPort, this.oppositeIp, this.oppositeIpPort) reject(err) }) }) @@ -72,8 +72,8 @@ export default class TcpClient { }) .catch((err: BusinessError) => { this.linkStatus = false - console.log(TCPTag, "tcp connect or keepAlive error: ", JSON.stringify(err)) - console.log(TCPTag, "tcp 重启服务") + console.error(TCPTag, "tcp connect or keepAlive error: ", JSON.stringify(err)) + console.error(TCPTag, "tcp 重启服务") reject(err) }) }) @@ -106,7 +106,7 @@ export default class TcpClient { // 监听tcp错误 onError(callback: Function) { this.tcp.on('error', err => { - console.log(TCPTag, 'tcp on error: ', JSON.stringify(err)) + console.error(TCPTag, 'tcp on error: ', JSON.stringify(err)) callback?.() }); } @@ -119,7 +119,7 @@ export default class TcpClient { // 监听tcp消息 onMsg(callback: Function) { if (this.events.includes(callback)) { - console.log(TCPTag, '已经存在这个获取消息方法了'); + console.error(TCPTag, '已经存在这个获取消息方法了'); return; } this.events.push(callback); @@ -128,13 +128,13 @@ export default class TcpClient { // 接收tcp消息 sendMsg(data: string): Promise { if (!this.linkStatus) { - console.log(TCPTag, '不允许发送: TCP未连接'); + console.error(TCPTag, '不允许发送: TCP未连接'); return Promise.reject(new Error('TCP connection is not established')); } return this.tcp?.send({ data }).catch(async (err: BusinessError) => { - console.log(TCPTag, 'sendMsg error:', JSON.stringify(err), this.oppositeIp, this.oppositeIpPort) + console.error(TCPTag, 'sendMsg error:', JSON.stringify(err), this.oppositeIp, this.oppositeIpPort) this.tcpSendNum++ if (this.tcpSendNum > 10) { this.tcpSendNum = 0