fix: 过程数据线程处理
This commit is contained in:
parent
d56b107cbf
commit
db5ade8ba0
@ -22,7 +22,7 @@ export async function download() {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
interface WR {
|
export interface WR {
|
||||||
message?: string
|
message?: string
|
||||||
code?: number
|
code?: number
|
||||||
keystr?: string
|
keystr?: string
|
||||||
@ -112,7 +112,7 @@ export async function writeObjectOut(params: RegulatoryInterfaceParams, filePath
|
|||||||
</SOAP-ENV:Envelope>`,
|
</SOAP-ENV:Envelope>`,
|
||||||
method: http.RequestMethod.POST,
|
method: http.RequestMethod.POST,
|
||||||
xml: true
|
xml: true
|
||||||
},)
|
})
|
||||||
if (filePath) {
|
if (filePath) {
|
||||||
const fileUtil = new FileUtils(context);
|
const fileUtil = new FileUtils(context);
|
||||||
await fileUtil.initFolder(filePath);
|
await fileUtil.initFolder(filePath);
|
||||||
|
|||||||
170
entry/src/main/ets/pages/Judge/ProcessDataTaskPool.ets
Normal file
170
entry/src/main/ets/pages/Judge/ProcessDataTaskPool.ets
Normal file
@ -0,0 +1,170 @@
|
|||||||
|
import { ProcessDataEnumType, RegulatoryInterfaceParams, WR, WuxiExamType } from '../../model';
|
||||||
|
import taskpool from '@ohos.taskpool';
|
||||||
|
import { dConsole } from '../../utils/LogWorker';
|
||||||
|
import http from '@ohos.net.http';
|
||||||
|
import Request from '../../utils/Request';
|
||||||
|
|
||||||
|
export class ProcessDataTaskPool {
|
||||||
|
private queue: RegulatoryInterfaceParams[] = []
|
||||||
|
private isProcessing: boolean = false;
|
||||||
|
/** 最大重试次数。1次初次尝试 + 5次重试 = 总共6次尝试。 */
|
||||||
|
private readonly maxRetries = 5;
|
||||||
|
|
||||||
|
public addTask(dataItem: RegulatoryInterfaceParams): void {
|
||||||
|
console.info(`[Queue] 新任务已添加: ${JSON.stringify(dataItem)},当前队列长度: ${this.queue.length + 1}`);
|
||||||
|
this.queue.push(dataItem); // 将任务添加到队尾
|
||||||
|
this.triggerProcessing(); // 尝试启动处理流程
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 触发队列处理。
|
||||||
|
* 如果当前没有正在处理的任务,则启动一个新的处理循环。
|
||||||
|
*/
|
||||||
|
private triggerProcessing(): void {
|
||||||
|
if (this.isProcessing) {
|
||||||
|
console.log('[Queue] 处理器正在运行中,新任务将在稍后被处理。');
|
||||||
|
return; // 如果已经在处理,则直接返回,新任务会被正在运行的循环消费掉
|
||||||
|
}
|
||||||
|
// 使用 Promise.resolve().then() 来确保 processQueue 在下一个事件循环中异步执行
|
||||||
|
// 这可以防止阻塞当前的 addTask 调用
|
||||||
|
Promise.resolve().then(() => this.processQueue());
|
||||||
|
}
|
||||||
|
|
||||||
|
private async processQueue(): Promise<void> {
|
||||||
|
this.isProcessing = true;
|
||||||
|
console.log(`[Queue] 启动处理器... 待处理任务数: ${this.queue.length}`);
|
||||||
|
|
||||||
|
while (this.queue.length > 0) {
|
||||||
|
const taskData = this.queue[0]; // 查看队首任务
|
||||||
|
|
||||||
|
try {
|
||||||
|
console.log(`[Queue] 开始处理任务: ${JSON.stringify(taskData)}`);
|
||||||
|
// 此方法若成功则正常返回,若永久失败则会抛出错误
|
||||||
|
let obj: WuxiExamType = {
|
||||||
|
xtlb: taskData.xtlb,
|
||||||
|
jkxlh: taskData.jkxlh,
|
||||||
|
jkid: taskData.jkid,
|
||||||
|
drvexam: {
|
||||||
|
zp: "",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
dConsole.writeProcessData(ProcessDataEnumType.WuxiExam, JSON.stringify(obj))
|
||||||
|
await this.processSingleTaskWithRetries(taskData);
|
||||||
|
// 任务成功,将其从队列中移除
|
||||||
|
this.queue.shift();
|
||||||
|
console.log(`[Queue] ✅ 任务处理成功,已从队列移除。剩余任务: ${this.queue.length}`);
|
||||||
|
} catch (error) {
|
||||||
|
// 捕获到永久失败的错误
|
||||||
|
console.error(`[Queue] 🔥 致命错误: ${(error as Error).message}`);
|
||||||
|
console.error('[Queue] 队列已停止,后续任务将不会被处理。');
|
||||||
|
|
||||||
|
// (可选)可以在此处清空队列,防止下次意外启动时处理旧任务
|
||||||
|
this.queue = [];
|
||||||
|
|
||||||
|
// 终止循环
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
this.isProcessing = false;
|
||||||
|
console.log('[Queue] 处理器已停止。');
|
||||||
|
}
|
||||||
|
|
||||||
|
private async processSingleTaskWithRetries(taskData: RegulatoryInterfaceParams): Promise<void> {
|
||||||
|
// 1 次初次尝试 + 5 次重试
|
||||||
|
for (let attempt = 0; attempt <= this.maxRetries; attempt++) {
|
||||||
|
const attemptNum = attempt + 1;
|
||||||
|
try {
|
||||||
|
const attemptType = attempt === 0 ? '初次尝试' : `重试 ${attempt}`;
|
||||||
|
console.log(`[Queue] 开始上传 (${attemptType}, 总共第 ${attemptNum} 次): ${JSON.stringify(taskData)}`);
|
||||||
|
const result: WR = await taskpool.execute(uploadWorkerTask, taskData);
|
||||||
|
dConsole.writeProcessData(ProcessDataEnumType.WuxiExam, JSON.stringify(result))
|
||||||
|
if (result.code === 1) {
|
||||||
|
console.log(`[Queue] ✔️ 上传成功 (在第 ${attemptNum} 次尝试)`);
|
||||||
|
return; // 成功,立即返回
|
||||||
|
}
|
||||||
|
console.warn(`[Queue] ❌ 上传失败 (第 ${attemptNum} 次)。响应: ${result.message}`);
|
||||||
|
} catch (e) {
|
||||||
|
console.error(`[Queue] ❌ TaskPool 执行错误 (第 ${attemptNum} 次): ${e}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 如果这是最后一次尝试且依然失败,则不再等待,直接跳出循环去抛出错误
|
||||||
|
if (attempt === this.maxRetries) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 如果循环结束,意味着所有尝试都失败了
|
||||||
|
throw new Error(`任务 ${JSON.stringify(taskData)} 在 ${this.maxRetries + 1} 次尝试后永久失败。`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 这是将在 Worker 线程中执行的任务函数。
|
||||||
|
* 它负责执行单次的上传尝试。
|
||||||
|
*
|
||||||
|
* @param data 需要上传的单条数据项
|
||||||
|
* @returns 一个包含本次上传尝试结果的对象
|
||||||
|
*/
|
||||||
|
export async function uploadWorkerTask(data: RegulatoryInterfaceParams): Promise<WR> {
|
||||||
|
let singlePlay: boolean = false
|
||||||
|
let isJGNew = false
|
||||||
|
try {
|
||||||
|
const response = await sendProcessData(data, singlePlay, isJGNew);
|
||||||
|
// 根据返回的 code 判断是否成功
|
||||||
|
return response
|
||||||
|
} catch (err) {
|
||||||
|
// 捕获请求过程中可能出现的异常
|
||||||
|
const error = err as Error;
|
||||||
|
console.error(`[Worker] 上传时发生异常: ${error.message}`);
|
||||||
|
return { code: 20038 };
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async function sendProcessData(data: RegulatoryInterfaceParams, singlePlay: boolean, isJGNew: boolean): Promise<WR> {
|
||||||
|
if (singlePlay) {
|
||||||
|
return { code: 1 }
|
||||||
|
}
|
||||||
|
if (isJGNew) {
|
||||||
|
// 调用新监管
|
||||||
|
}
|
||||||
|
data.drvexam = data.drvexam ?? {};
|
||||||
|
data.drvexam.zp = data.drvexam.zp === undefined ? undefined : encodeURIComponent(data.drvexam.zp);
|
||||||
|
|
||||||
|
const drvexamArr = Object.entries(data.drvexam)
|
||||||
|
.filter((item: [string, string]) => item[1] != undefined)
|
||||||
|
.map((item: [string, object]) => `<${item[0]}>${item[1]}</${item[0]}>`)
|
||||||
|
let JGHOST: string = AppStorage.get<string>("JGHOST") || ""
|
||||||
|
return await Request<object>({
|
||||||
|
host: JGHOST,
|
||||||
|
url: '/dems_ws/services/TmriOutAccess?wsdl',
|
||||||
|
data: `<?xml version="1.0"?>
|
||||||
|
<SOAP-ENV:Envelope
|
||||||
|
xmlns:SOAP-ENV="http://schemas.xmlsoap.org/soap/envelope/"
|
||||||
|
xmlns:xsd="http://www.w3.org/2001/XMLSchema"
|
||||||
|
xmlns:xs="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
>
|
||||||
|
<SOAP-ENV:Body>
|
||||||
|
<writeObjectOut xmlns="http://service.es.doron">
|
||||||
|
<xtlb>${data.xtlb}</xtlb>
|
||||||
|
<jkxlh>${data.jkxlh}</jkxlh>
|
||||||
|
<jkid>${data.jkid}</jkid>
|
||||||
|
<UTF8XmlDoc>
|
||||||
|
<![CDATA[
|
||||||
|
<?xm lversion="1.0 "encoding="GBK"?>
|
||||||
|
<root>
|
||||||
|
<drvexam>
|
||||||
|
${drvexamArr}
|
||||||
|
</drvexam>
|
||||||
|
</root>
|
||||||
|
]]>
|
||||||
|
</UTF8XmlDoc>
|
||||||
|
</writeObjectOut>
|
||||||
|
</SOAP-ENV:Body>
|
||||||
|
</SOAP-ENV:Envelope>`,
|
||||||
|
method: http.RequestMethod.POST,
|
||||||
|
xml: true
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
@ -840,11 +840,11 @@ export const GetIsEndManualProject = (index: number, artSubject3Projects: string
|
|||||||
export const DeductionProjectConversion = (code: string, markRuleListObj: object): DeductionProjectConversionType => {
|
export const DeductionProjectConversion = (code: string, markRuleListObj: object): DeductionProjectConversionType => {
|
||||||
const thisMark: MarkRule = Reflect.get(markRuleListObj, code)
|
const thisMark: MarkRule = Reflect.get(markRuleListObj, code)
|
||||||
return {
|
return {
|
||||||
desc: thisMark.markshow,
|
desc: thisMark.markshow || "",
|
||||||
score: thisMark.markreal,
|
score: thisMark.markreal!,
|
||||||
markcatalog: thisMark.markcatalog,
|
markcatalog: thisMark.markcatalog || "",
|
||||||
markserial: thisMark.markserial,
|
markserial: thisMark.markserial || "",
|
||||||
kfxh: thisMark.kfxh
|
kfxh: thisMark.kfxh || ""
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -45,7 +45,7 @@ export default class TcpClient {
|
|||||||
console.log(TCPTag, 'bindTcp success:', this.localIp, this.localIpPort, this.oppositeIp, this.oppositeIpPort)
|
console.log(TCPTag, 'bindTcp success:', this.localIp, this.localIpPort, this.oppositeIp, this.oppositeIpPort)
|
||||||
resolve(true)
|
resolve(true)
|
||||||
}).catch((err: BusinessError) => {
|
}).catch((err: BusinessError) => {
|
||||||
console.log(TCPTag, 'bindTcp error:', JSON.stringify(err), this.localIp, this.localIpPort, this.oppositeIp, this.oppositeIpPort)
|
console.error(TCPTag, 'bindTcp error:', JSON.stringify(err), this.localIp, this.localIpPort, this.oppositeIp, this.oppositeIpPort)
|
||||||
reject(err)
|
reject(err)
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
@ -72,8 +72,8 @@ export default class TcpClient {
|
|||||||
})
|
})
|
||||||
.catch((err: BusinessError) => {
|
.catch((err: BusinessError) => {
|
||||||
this.linkStatus = false
|
this.linkStatus = false
|
||||||
console.log(TCPTag, "tcp connect or keepAlive error: ", JSON.stringify(err))
|
console.error(TCPTag, "tcp connect or keepAlive error: ", JSON.stringify(err))
|
||||||
console.log(TCPTag, "tcp 重启服务")
|
console.error(TCPTag, "tcp 重启服务")
|
||||||
reject(err)
|
reject(err)
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
@ -106,7 +106,7 @@ export default class TcpClient {
|
|||||||
// 监听tcp错误
|
// 监听tcp错误
|
||||||
onError(callback: Function) {
|
onError(callback: Function) {
|
||||||
this.tcp.on('error', err => {
|
this.tcp.on('error', err => {
|
||||||
console.log(TCPTag, 'tcp on error: ', JSON.stringify(err))
|
console.error(TCPTag, 'tcp on error: ', JSON.stringify(err))
|
||||||
callback?.()
|
callback?.()
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@ -119,7 +119,7 @@ export default class TcpClient {
|
|||||||
// 监听tcp消息
|
// 监听tcp消息
|
||||||
onMsg(callback: Function) {
|
onMsg(callback: Function) {
|
||||||
if (this.events.includes(callback)) {
|
if (this.events.includes(callback)) {
|
||||||
console.log(TCPTag, '已经存在这个获取消息方法了');
|
console.error(TCPTag, '已经存在这个获取消息方法了');
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
this.events.push(callback);
|
this.events.push(callback);
|
||||||
@ -128,13 +128,13 @@ export default class TcpClient {
|
|||||||
// 接收tcp消息
|
// 接收tcp消息
|
||||||
sendMsg(data: string): Promise<void> {
|
sendMsg(data: string): Promise<void> {
|
||||||
if (!this.linkStatus) {
|
if (!this.linkStatus) {
|
||||||
console.log(TCPTag, '不允许发送: TCP未连接');
|
console.error(TCPTag, '不允许发送: TCP未连接');
|
||||||
return Promise.reject(new Error('TCP connection is not established'));
|
return Promise.reject(new Error('TCP connection is not established'));
|
||||||
}
|
}
|
||||||
return this.tcp?.send({
|
return this.tcp?.send({
|
||||||
data
|
data
|
||||||
}).catch(async (err: BusinessError) => {
|
}).catch(async (err: BusinessError) => {
|
||||||
console.log(TCPTag, 'sendMsg error:', JSON.stringify(err), this.oppositeIp, this.oppositeIpPort)
|
console.error(TCPTag, 'sendMsg error:', JSON.stringify(err), this.oppositeIp, this.oppositeIpPort)
|
||||||
this.tcpSendNum++
|
this.tcpSendNum++
|
||||||
if (this.tcpSendNum > 10) {
|
if (this.tcpSendNum > 10) {
|
||||||
this.tcpSendNum = 0
|
this.tcpSendNum = 0
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user