isClosure) { $data = unserialize($data); } // 前置处理 if (!$this->beforeExecute($data)) { $job->delete(); return; } if (method_exists($this, 'execute')) { // 执行任务 $result = $this->execute($data); } else { // 默认执行逻辑 $result = $data; } // 后置处理 $this->afterExecute($data, $result); // 标记任务完成 $job->delete(); } catch (Exception|Throwable $e) { $this->handleException($job, $data, $e); } } /** * 任务前置处理 - 子类可重写 * * @param T $data 任务数据 * @return bool 是否继续执行 */ protected function beforeExecute(mixed $data): bool { return true; } /** * 任务后置处理 - 子类可重写 * * @param T $data 任务数据 * @param mixed $result 执行结果 */ protected function afterExecute(mixed $data, mixed $result): void { // 可以在这里添加通用的后置处理逻辑 } /** * 异常处理 * * @param Job $job 队列任务对象 * @param T $data 任务数据 * @param Throwable|Exception $exception 异常对象 */ protected function handleException(Job $job, mixed $data, Throwable|Exception $exception): void { Log::error('队列任务执行异常:' . static::class . ' - ' . $exception->getMessage() . '{data},{trace}', [ 'data' => json_encode($data), 'trace' => $exception->getTraceAsString() ]); // 判断是否需要重试 if ($job->attempts() < $this->maxAttempts) { // 重新发布任务 $job->release($this->retryDelay); } else { // 执行失败回调 if ($this->enableFailedCallback) { try { $this->onFailed($data); } catch (Exception $e) { Log::error('任务失败回调执行异常:' . $e->getMessage()); } } // 重新抛出异常,让 Worker 触发 JobFailed 事件并记录到数据库 throw $exception; } } /** * 任务失败处理方法 - 子类可重写 * * @param T $data 任务数据 */ protected function onFailed(mixed $data): void { // 默认失败处理逻辑 Log::error('队列任务执行失败: ' . static::class . ' - ' . json_encode($data)); } /** * 设置最大重试次数 * * @param int $attempts * @return $this */ protected function setMaxAttempts(int $attempts): static { $this->maxAttempts = $attempts; return $this; } /** * 设置重试延迟时间 * * @param int $delay * @return $this */ protected function setRetryDelay(int $delay): static { $this->retryDelay = $delay; return $this; } /** * 禁用失败回调 * * @return $this */ protected function disableFailedCallback(): static { $this->enableFailedCallback = false; return $this; } }