fix: 过程数据线程处理
This commit is contained in:
		
							parent
							
								
									d56b107cbf
								
							
						
					
					
						commit
						db5ade8ba0
					
				| @ -22,7 +22,7 @@ export async function download() { | |||||||
|   }) |   }) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| interface WR { | export interface WR { | ||||||
|   message?: string |   message?: string | ||||||
|   code?: number |   code?: number | ||||||
|   keystr?: string |   keystr?: string | ||||||
| @ -112,7 +112,7 @@ export async function writeObjectOut(params: RegulatoryInterfaceParams, filePath | |||||||
|                </SOAP-ENV:Envelope>`, |                </SOAP-ENV:Envelope>`, | ||||||
|     method: http.RequestMethod.POST, |     method: http.RequestMethod.POST, | ||||||
|     xml: true |     xml: true | ||||||
|   },) |   }) | ||||||
|   if (filePath) { |   if (filePath) { | ||||||
|     const fileUtil = new FileUtils(context); |     const fileUtil = new FileUtils(context); | ||||||
|     await fileUtil.initFolder(filePath); |     await fileUtil.initFolder(filePath); | ||||||
|  | |||||||
							
								
								
									
										170
									
								
								entry/src/main/ets/pages/Judge/ProcessDataTaskPool.ets
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										170
									
								
								entry/src/main/ets/pages/Judge/ProcessDataTaskPool.ets
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,170 @@ | |||||||
|  | import { ProcessDataEnumType, RegulatoryInterfaceParams, WR, WuxiExamType } from '../../model'; | ||||||
|  | import taskpool from '@ohos.taskpool'; | ||||||
|  | import { dConsole } from '../../utils/LogWorker'; | ||||||
|  | import http from '@ohos.net.http'; | ||||||
|  | import Request from '../../utils/Request'; | ||||||
|  | 
 | ||||||
|  | export class ProcessDataTaskPool { | ||||||
|  |   private queue: RegulatoryInterfaceParams[] = [] | ||||||
|  |   private isProcessing: boolean = false; | ||||||
|  |   /** 最大重试次数。1次初次尝试 + 5次重试 = 总共6次尝试。 */ | ||||||
|  |   private readonly maxRetries = 5; | ||||||
|  | 
 | ||||||
|  |   public addTask(dataItem: RegulatoryInterfaceParams): void { | ||||||
|  |     console.info(`[Queue] 新任务已添加: ${JSON.stringify(dataItem)},当前队列长度: ${this.queue.length + 1}`); | ||||||
|  |     this.queue.push(dataItem); // 将任务添加到队尾 | ||||||
|  |     this.triggerProcessing(); // 尝试启动处理流程 | ||||||
|  |   } | ||||||
|  | 
 | ||||||
|  |   /** | ||||||
|  |    * 触发队列处理。 | ||||||
|  |    * 如果当前没有正在处理的任务,则启动一个新的处理循环。 | ||||||
|  |    */ | ||||||
|  |   private triggerProcessing(): void { | ||||||
|  |     if (this.isProcessing) { | ||||||
|  |       console.log('[Queue] 处理器正在运行中,新任务将在稍后被处理。'); | ||||||
|  |       return; // 如果已经在处理,则直接返回,新任务会被正在运行的循环消费掉 | ||||||
|  |     } | ||||||
|  |     // 使用 Promise.resolve().then() 来确保 processQueue 在下一个事件循环中异步执行 | ||||||
|  |     // 这可以防止阻塞当前的 addTask 调用 | ||||||
|  |     Promise.resolve().then(() => this.processQueue()); | ||||||
|  |   } | ||||||
|  | 
 | ||||||
|  |   private async processQueue(): Promise<void> { | ||||||
|  |     this.isProcessing = true; | ||||||
|  |     console.log(`[Queue] 启动处理器... 待处理任务数: ${this.queue.length}`); | ||||||
|  | 
 | ||||||
|  |     while (this.queue.length > 0) { | ||||||
|  |       const taskData = this.queue[0]; // 查看队首任务 | ||||||
|  | 
 | ||||||
|  |       try { | ||||||
|  |         console.log(`[Queue] 开始处理任务: ${JSON.stringify(taskData)}`); | ||||||
|  |         // 此方法若成功则正常返回,若永久失败则会抛出错误 | ||||||
|  |         let obj: WuxiExamType = { | ||||||
|  |           xtlb: taskData.xtlb, | ||||||
|  |           jkxlh: taskData.jkxlh, | ||||||
|  |           jkid: taskData.jkid, | ||||||
|  |           drvexam: { | ||||||
|  |             zp: "", | ||||||
|  |           }, | ||||||
|  |         } | ||||||
|  |         dConsole.writeProcessData(ProcessDataEnumType.WuxiExam, JSON.stringify(obj)) | ||||||
|  |         await this.processSingleTaskWithRetries(taskData); | ||||||
|  |         // 任务成功,将其从队列中移除 | ||||||
|  |         this.queue.shift(); | ||||||
|  |         console.log(`[Queue] ✅ 任务处理成功,已从队列移除。剩余任务: ${this.queue.length}`); | ||||||
|  |       } catch (error) { | ||||||
|  |         // 捕获到永久失败的错误 | ||||||
|  |         console.error(`[Queue] 🔥 致命错误: ${(error as Error).message}`); | ||||||
|  |         console.error('[Queue] 队列已停止,后续任务将不会被处理。'); | ||||||
|  | 
 | ||||||
|  |         // (可选)可以在此处清空队列,防止下次意外启动时处理旧任务 | ||||||
|  |         this.queue = []; | ||||||
|  | 
 | ||||||
|  |         // 终止循环 | ||||||
|  |         break; | ||||||
|  |       } | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     this.isProcessing = false; | ||||||
|  |     console.log('[Queue] 处理器已停止。'); | ||||||
|  |   } | ||||||
|  | 
 | ||||||
|  |   private async processSingleTaskWithRetries(taskData: RegulatoryInterfaceParams): Promise<void> { | ||||||
|  |     // 1 次初次尝试 + 5 次重试 | ||||||
|  |     for (let attempt = 0; attempt <= this.maxRetries; attempt++) { | ||||||
|  |       const attemptNum = attempt + 1; | ||||||
|  |       try { | ||||||
|  |         const attemptType = attempt === 0 ? '初次尝试' : `重试 ${attempt}`; | ||||||
|  |         console.log(`[Queue] 开始上传 (${attemptType}, 总共第 ${attemptNum} 次): ${JSON.stringify(taskData)}`); | ||||||
|  |         const result: WR = await taskpool.execute(uploadWorkerTask, taskData); | ||||||
|  |         dConsole.writeProcessData(ProcessDataEnumType.WuxiExam, JSON.stringify(result)) | ||||||
|  |         if (result.code === 1) { | ||||||
|  |           console.log(`[Queue] ✔️ 上传成功 (在第 ${attemptNum} 次尝试)`); | ||||||
|  |           return; // 成功,立即返回 | ||||||
|  |         } | ||||||
|  |         console.warn(`[Queue] ❌ 上传失败 (第 ${attemptNum} 次)。响应: ${result.message}`); | ||||||
|  |       } catch (e) { | ||||||
|  |         console.error(`[Queue] ❌ TaskPool 执行错误 (第 ${attemptNum} 次): ${e}`); | ||||||
|  |       } | ||||||
|  | 
 | ||||||
|  |       // 如果这是最后一次尝试且依然失败,则不再等待,直接跳出循环去抛出错误 | ||||||
|  |       if (attempt === this.maxRetries) { | ||||||
|  |         break; | ||||||
|  |       } | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     // 如果循环结束,意味着所有尝试都失败了 | ||||||
|  |     throw new Error(`任务 ${JSON.stringify(taskData)} 在 ${this.maxRetries + 1} 次尝试后永久失败。`); | ||||||
|  |   } | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | /** | ||||||
|  |  * 这是将在 Worker 线程中执行的任务函数。 | ||||||
|  |  * 它负责执行单次的上传尝试。 | ||||||
|  |  * | ||||||
|  |  * @param data 需要上传的单条数据项 | ||||||
|  |  * @returns 一个包含本次上传尝试结果的对象 | ||||||
|  |  */ | ||||||
|  | export async function uploadWorkerTask(data: RegulatoryInterfaceParams): Promise<WR> { | ||||||
|  |   let singlePlay: boolean = false | ||||||
|  |   let isJGNew = false | ||||||
|  |   try { | ||||||
|  |     const response = await sendProcessData(data, singlePlay, isJGNew); | ||||||
|  |     // 根据返回的 code 判断是否成功 | ||||||
|  |     return response | ||||||
|  |   } catch (err) { | ||||||
|  |     // 捕获请求过程中可能出现的异常 | ||||||
|  |     const error = err as Error; | ||||||
|  |     console.error(`[Worker] 上传时发生异常: ${error.message}`); | ||||||
|  |     return { code: 20038 }; | ||||||
|  |   } | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | async function sendProcessData(data: RegulatoryInterfaceParams, singlePlay: boolean, isJGNew: boolean): Promise<WR> { | ||||||
|  |   if (singlePlay) { | ||||||
|  |     return { code: 1 } | ||||||
|  |   } | ||||||
|  |   if (isJGNew) { | ||||||
|  |     //   调用新监管 | ||||||
|  |   } | ||||||
|  |   data.drvexam = data.drvexam ?? {}; | ||||||
|  |   data.drvexam.zp = data.drvexam.zp === undefined ? undefined : encodeURIComponent(data.drvexam.zp); | ||||||
|  | 
 | ||||||
|  |   const drvexamArr = Object.entries(data.drvexam) | ||||||
|  |     .filter((item: [string, string]) => item[1] != undefined) | ||||||
|  |     .map((item: [string, object]) => `<${item[0]}>${item[1]}</${item[0]}>`) | ||||||
|  |   let JGHOST: string = AppStorage.get<string>("JGHOST") || "" | ||||||
|  |   return await Request<object>({ | ||||||
|  |     host: JGHOST, | ||||||
|  |     url: '/dems_ws/services/TmriOutAccess?wsdl', | ||||||
|  |     data: `<?xml version="1.0"?> | ||||||
|  |                 <SOAP-ENV:Envelope | ||||||
|  |                   xmlns:SOAP-ENV="http://schemas.xmlsoap.org/soap/envelope/" | ||||||
|  |                   xmlns:xsd="http://www.w3.org/2001/XMLSchema" | ||||||
|  |                   xmlns:xs="http://www.w3.org/2001/XMLSchema-instance" | ||||||
|  |                  > | ||||||
|  |                  <SOAP-ENV:Body> | ||||||
|  |                     <writeObjectOut  xmlns="http://service.es.doron"> | ||||||
|  |                       <xtlb>${data.xtlb}</xtlb> | ||||||
|  |                       <jkxlh>${data.jkxlh}</jkxlh> | ||||||
|  |                       <jkid>${data.jkid}</jkid> | ||||||
|  |                       <UTF8XmlDoc> | ||||||
|  |                       <![CDATA[ | ||||||
|  |                         <?xm lversion="1.0 "encoding="GBK"?> | ||||||
|  |                         <root> | ||||||
|  |                           <drvexam> | ||||||
|  |                             ${drvexamArr} | ||||||
|  |                           </drvexam> | ||||||
|  |                         </root> | ||||||
|  |                       ]]> | ||||||
|  |                       </UTF8XmlDoc> | ||||||
|  |                     </writeObjectOut> | ||||||
|  |                  </SOAP-ENV:Body> | ||||||
|  |                </SOAP-ENV:Envelope>`, | ||||||
|  |     method: http.RequestMethod.POST, | ||||||
|  |     xml: true | ||||||
|  |   }) | ||||||
|  | } | ||||||
|  | 
 | ||||||
| @ -840,11 +840,11 @@ export const GetIsEndManualProject = (index: number, artSubject3Projects: string | |||||||
| export const DeductionProjectConversion = (code: string, markRuleListObj: object): DeductionProjectConversionType => { | export const DeductionProjectConversion = (code: string, markRuleListObj: object): DeductionProjectConversionType => { | ||||||
|   const thisMark: MarkRule = Reflect.get(markRuleListObj, code) |   const thisMark: MarkRule = Reflect.get(markRuleListObj, code) | ||||||
|   return { |   return { | ||||||
|     desc: thisMark.markshow, |     desc: thisMark.markshow || "", | ||||||
|     score: thisMark.markreal, |     score: thisMark.markreal!, | ||||||
|     markcatalog: thisMark.markcatalog, |     markcatalog: thisMark.markcatalog || "", | ||||||
|     markserial: thisMark.markserial, |     markserial: thisMark.markserial || "", | ||||||
|     kfxh: thisMark.kfxh |     kfxh: thisMark.kfxh || "" | ||||||
|   } |   } | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | |||||||
| @ -45,7 +45,7 @@ export default class TcpClient { | |||||||
|         console.log(TCPTag, 'bindTcp success:', this.localIp, this.localIpPort, this.oppositeIp, this.oppositeIpPort) |         console.log(TCPTag, 'bindTcp success:', this.localIp, this.localIpPort, this.oppositeIp, this.oppositeIpPort) | ||||||
|         resolve(true) |         resolve(true) | ||||||
|       }).catch((err: BusinessError) => { |       }).catch((err: BusinessError) => { | ||||||
|         console.log(TCPTag, 'bindTcp error:', JSON.stringify(err), this.localIp, this.localIpPort, this.oppositeIp, this.oppositeIpPort) |         console.error(TCPTag, 'bindTcp error:', JSON.stringify(err), this.localIp, this.localIpPort, this.oppositeIp, this.oppositeIpPort) | ||||||
|         reject(err) |         reject(err) | ||||||
|       }) |       }) | ||||||
|     }) |     }) | ||||||
| @ -72,8 +72,8 @@ export default class TcpClient { | |||||||
|         }) |         }) | ||||||
|         .catch((err: BusinessError) => { |         .catch((err: BusinessError) => { | ||||||
|           this.linkStatus = false |           this.linkStatus = false | ||||||
|           console.log(TCPTag, "tcp connect or keepAlive error: ", JSON.stringify(err)) |           console.error(TCPTag, "tcp connect or keepAlive error: ", JSON.stringify(err)) | ||||||
|           console.log(TCPTag, "tcp 重启服务") |           console.error(TCPTag, "tcp 重启服务") | ||||||
|           reject(err) |           reject(err) | ||||||
|         }) |         }) | ||||||
|     }) |     }) | ||||||
| @ -106,7 +106,7 @@ export default class TcpClient { | |||||||
|   // 监听tcp错误 |   // 监听tcp错误 | ||||||
|   onError(callback: Function) { |   onError(callback: Function) { | ||||||
|     this.tcp.on('error', err => { |     this.tcp.on('error', err => { | ||||||
|       console.log(TCPTag, 'tcp on error: ', JSON.stringify(err)) |       console.error(TCPTag, 'tcp on error: ', JSON.stringify(err)) | ||||||
|       callback?.() |       callback?.() | ||||||
|     }); |     }); | ||||||
|   } |   } | ||||||
| @ -119,7 +119,7 @@ export default class TcpClient { | |||||||
|   // 监听tcp消息 |   // 监听tcp消息 | ||||||
|   onMsg(callback: Function) { |   onMsg(callback: Function) { | ||||||
|     if (this.events.includes(callback)) { |     if (this.events.includes(callback)) { | ||||||
|       console.log(TCPTag, '已经存在这个获取消息方法了'); |       console.error(TCPTag, '已经存在这个获取消息方法了'); | ||||||
|       return; |       return; | ||||||
|     } |     } | ||||||
|     this.events.push(callback); |     this.events.push(callback); | ||||||
| @ -128,13 +128,13 @@ export default class TcpClient { | |||||||
|   // 接收tcp消息 |   // 接收tcp消息 | ||||||
|   sendMsg(data: string): Promise<void> { |   sendMsg(data: string): Promise<void> { | ||||||
|     if (!this.linkStatus) { |     if (!this.linkStatus) { | ||||||
|       console.log(TCPTag, '不允许发送: TCP未连接'); |       console.error(TCPTag, '不允许发送: TCP未连接'); | ||||||
|       return Promise.reject(new Error('TCP connection is not established')); |       return Promise.reject(new Error('TCP connection is not established')); | ||||||
|     } |     } | ||||||
|     return this.tcp?.send({ |     return this.tcp?.send({ | ||||||
|       data |       data | ||||||
|     }).catch(async (err: BusinessError) => { |     }).catch(async (err: BusinessError) => { | ||||||
|       console.log(TCPTag, 'sendMsg error:', JSON.stringify(err), this.oppositeIp, this.oppositeIpPort) |       console.error(TCPTag, 'sendMsg error:', JSON.stringify(err), this.oppositeIp, this.oppositeIpPort) | ||||||
|       this.tcpSendNum++ |       this.tcpSendNum++ | ||||||
|       if (this.tcpSendNum > 10) { |       if (this.tcpSendNum > 10) { | ||||||
|         this.tcpSendNum = 0 |         this.tcpSendNum = 0 | ||||||
|  | |||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user