BaseJob.php 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. <?php
  2. declare(strict_types=1);
  3. namespace SixShop\Core\Job;
  4. use Closure;
  5. use Exception;
  6. use think\facade\Log;
  7. use think\queue\Job;
  8. use Throwable;
  9. use function Opis\Closure\{serialize, unserialize};
  10. /**
  11. * @template T
  12. */
  13. abstract class BaseJob
  14. {
  15. // 最大重试次数
  16. protected int $maxAttempts = 3;
  17. // 重试延迟时间(秒)
  18. protected int $retryDelay = 60;
  19. // 是否启用失败回调
  20. protected bool $enableFailedCallback = true;
  21. // 是否闭包
  22. protected bool $isClosure = false;
  23. /**
  24. * 分发任务
  25. *
  26. * @param T $data 任务数据
  27. * @param int $delay 延迟时间
  28. * @param string|null $queue 队列名称
  29. */
  30. public static function dispatch(mixed $data = '', int $delay = 0, ?string $queue = null): JobDispatcher
  31. {
  32. if ($data instanceof Closure) {
  33. $data = serialize($data);
  34. }
  35. return new JobDispatcher(static ::class, $data, $delay, $queue);
  36. }
  37. /**
  38. * 主要的处理方法 - 不需要子类重写
  39. *
  40. * @param Job $job 队列任务对象
  41. * @param T $data 任务数据
  42. */
  43. public function fire(Job $job, mixed $data): void
  44. {
  45. try {
  46. if ($this->isClosure) {
  47. $data = unserialize($data);
  48. }
  49. // 前置处理
  50. if (!$this->beforeExecute($data)) {
  51. $job->delete();
  52. return;
  53. }
  54. if (method_exists($this, 'execute')) {
  55. // 执行任务
  56. $result = $this->execute($data);
  57. } else {
  58. // 默认执行逻辑
  59. $result = $data;
  60. }
  61. // 后置处理
  62. $this->afterExecute($data, $result);
  63. // 标记任务完成
  64. $job->delete();
  65. } catch (Exception|Throwable $e) {
  66. $this->handleException($job, $data, $e);
  67. }
  68. }
  69. /**
  70. * 任务前置处理 - 子类可重写
  71. *
  72. * @param T $data 任务数据
  73. * @return bool 是否继续执行
  74. */
  75. protected function beforeExecute(mixed $data): bool
  76. {
  77. return true;
  78. }
  79. /**
  80. * 任务后置处理 - 子类可重写
  81. *
  82. * @param T $data 任务数据
  83. * @param mixed $result 执行结果
  84. */
  85. protected function afterExecute(mixed $data, mixed $result): void
  86. {
  87. // 可以在这里添加通用的后置处理逻辑
  88. }
  89. /**
  90. * 异常处理
  91. *
  92. * @param Job $job 队列任务对象
  93. * @param T $data 任务数据
  94. * @param Throwable|Exception $exception 异常对象
  95. */
  96. protected function handleException(Job $job, mixed $data, Throwable|Exception $exception): void
  97. {
  98. Log::error('队列任务执行异常:' . static::class . ' - ' . $exception->getMessage() . '{data},{trace}', [
  99. 'data' => json_encode($data),
  100. 'trace' => $exception->getTraceAsString()
  101. ]);
  102. // 判断是否需要重试
  103. if ($job->attempts() < $this->maxAttempts) {
  104. // 重新发布任务
  105. $job->release($this->retryDelay);
  106. } else {
  107. // 执行失败回调
  108. if ($this->enableFailedCallback) {
  109. try {
  110. $this->onFailed($data);
  111. } catch (Exception $e) {
  112. Log::error('任务失败回调执行异常:' . $e->getMessage());
  113. }
  114. }
  115. // 重新抛出异常,让 Worker 触发 JobFailed 事件并记录到数据库
  116. throw $exception;
  117. }
  118. }
  119. /**
  120. * 任务失败处理方法 - 子类可重写
  121. *
  122. * @param T $data 任务数据
  123. */
  124. protected function onFailed(mixed $data): void
  125. {
  126. // 默认失败处理逻辑
  127. Log::error('队列任务执行失败: ' . static::class . ' - ' . json_encode($data));
  128. }
  129. /**
  130. * 设置最大重试次数
  131. *
  132. * @param int $attempts
  133. * @return $this
  134. */
  135. protected function setMaxAttempts(int $attempts): static
  136. {
  137. $this->maxAttempts = $attempts;
  138. return $this;
  139. }
  140. /**
  141. * 设置重试延迟时间
  142. *
  143. * @param int $delay
  144. * @return $this
  145. */
  146. protected function setRetryDelay(int $delay): static
  147. {
  148. $this->retryDelay = $delay;
  149. return $this;
  150. }
  151. /**
  152. * 禁用失败回调
  153. *
  154. * @return $this
  155. */
  156. protected function disableFailedCallback(): static
  157. {
  158. $this->enableFailedCallback = false;
  159. return $this;
  160. }
  161. }