335 lines
12 KiB
PHP
335 lines
12 KiB
PHP
|
|
<?php
|
|||
|
|
|
|||
|
|
declare(strict_types=1);
|
|||
|
|
|
|||
|
|
namespace App\Services\Crawler;
|
|||
|
|
|
|||
|
|
use App\Enums\CrawlAlertSeverity;
|
|||
|
|
use App\Enums\CrawlRunItemStatus;
|
|||
|
|
use App\Enums\CrawlRunStatus;
|
|||
|
|
use App\Enums\CrawlTriggerType;
|
|||
|
|
use App\Models\CrawlRule;
|
|||
|
|
use App\Models\CrawlRun;
|
|||
|
|
use App\Models\CrawlRunItem;
|
|||
|
|
use Illuminate\Support\Arr;
|
|||
|
|
use Illuminate\Support\Str;
|
|||
|
|
|
|||
|
|
class CrawlExecutionService
|
|||
|
|
{
|
|||
|
|
public function __construct(
|
|||
|
|
private readonly CrawlFetcherService $fetcher,
|
|||
|
|
private readonly XPathExtractor $extractor,
|
|||
|
|
private readonly OpenAiFallbackExtractor $aiFallbackExtractor,
|
|||
|
|
private readonly CrawlEntityUpsertService $upsertService,
|
|||
|
|
private readonly CrawlAlertService $alertService,
|
|||
|
|
private readonly CrawlRuleScheduleService $scheduleService,
|
|||
|
|
) {
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/**
|
|||
|
|
* @param array<string, mixed> $metrics
|
|||
|
|
*/
|
|||
|
|
public function runRule(
|
|||
|
|
CrawlRule $rule,
|
|||
|
|
CrawlTriggerType $triggerType,
|
|||
|
|
?int $createdBy = null,
|
|||
|
|
array $metrics = [],
|
|||
|
|
): CrawlRun {
|
|||
|
|
$run = CrawlRun::query()->create([
|
|||
|
|
'rule_id' => $rule->id,
|
|||
|
|
'trigger_type' => $triggerType,
|
|||
|
|
'status' => CrawlRunStatus::Running,
|
|||
|
|
'started_at' => now(),
|
|||
|
|
'metrics' => $metrics,
|
|||
|
|
'created_by' => $createdBy,
|
|||
|
|
]);
|
|||
|
|
|
|||
|
|
$successCount = 0;
|
|||
|
|
$failedCount = 0;
|
|||
|
|
$skippedCount = 0;
|
|||
|
|
$totalUrls = 0;
|
|||
|
|
$errors = [];
|
|||
|
|
|
|||
|
|
$entryUrls = collect($rule->entry_urls)
|
|||
|
|
->filter(static fn ($url): bool => is_string($url) && filter_var($url, FILTER_VALIDATE_URL) !== false)
|
|||
|
|
->values()
|
|||
|
|
->all();
|
|||
|
|
|
|||
|
|
if ($entryUrls === []) {
|
|||
|
|
$errors[] = 'No valid entry urls configured';
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
$maxPages = max(1, (int) $rule->max_pages);
|
|||
|
|
|
|||
|
|
foreach ($entryUrls as $entryUrl) {
|
|||
|
|
[$listResult, $listAttempt] = $this->fetchWithRetry($rule, $entryUrl);
|
|||
|
|
|
|||
|
|
if (! $listResult['ok']) {
|
|||
|
|
$failedCount++;
|
|||
|
|
$errors[] = sprintf('List fetch failed: %s', (string) ($listResult['error'] ?? 'unknown'));
|
|||
|
|
$this->createRunItem($run, [
|
|||
|
|
'url' => $entryUrl,
|
|||
|
|
'stage' => 'list',
|
|||
|
|
'attempt' => $listAttempt,
|
|||
|
|
'status' => CrawlRunItemStatus::Failed,
|
|||
|
|
'latency_ms' => $listResult['latency_ms'] ?? null,
|
|||
|
|
'http_code' => $listResult['http_code'] ?? null,
|
|||
|
|
'error_code' => 'fetch_failed',
|
|||
|
|
'error_message' => (string) ($listResult['error'] ?? 'Fetch failed'),
|
|||
|
|
]);
|
|||
|
|
|
|||
|
|
continue;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
$this->createRunItem($run, [
|
|||
|
|
'url' => $entryUrl,
|
|||
|
|
'stage' => 'list',
|
|||
|
|
'attempt' => $listAttempt,
|
|||
|
|
'status' => CrawlRunItemStatus::Success,
|
|||
|
|
'latency_ms' => $listResult['latency_ms'] ?? null,
|
|||
|
|
'http_code' => $listResult['http_code'] ?? null,
|
|||
|
|
]);
|
|||
|
|
|
|||
|
|
$detailUrls = $this->extractor->extractListUrls(
|
|||
|
|
$listResult['body'],
|
|||
|
|
$entryUrl,
|
|||
|
|
is_array($rule->extractor_config) ? $rule->extractor_config : [],
|
|||
|
|
);
|
|||
|
|
|
|||
|
|
if ($detailUrls === []) {
|
|||
|
|
$detailUrls = [$entryUrl];
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
foreach ($detailUrls as $detailUrl) {
|
|||
|
|
if ($totalUrls >= $maxPages) {
|
|||
|
|
break 2;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
$totalUrls++;
|
|||
|
|
[$detailResult, $detailAttempt] = $this->fetchWithRetry($rule, $detailUrl);
|
|||
|
|
|
|||
|
|
if (! $detailResult['ok']) {
|
|||
|
|
$failedCount++;
|
|||
|
|
$errors[] = sprintf('Detail fetch failed(%s): %s', $detailUrl, (string) ($detailResult['error'] ?? 'unknown'));
|
|||
|
|
|
|||
|
|
$this->createRunItem($run, [
|
|||
|
|
'url' => $detailUrl,
|
|||
|
|
'stage' => 'detail',
|
|||
|
|
'attempt' => $detailAttempt,
|
|||
|
|
'status' => CrawlRunItemStatus::Failed,
|
|||
|
|
'latency_ms' => $detailResult['latency_ms'] ?? null,
|
|||
|
|
'http_code' => $detailResult['http_code'] ?? null,
|
|||
|
|
'error_code' => 'fetch_failed',
|
|||
|
|
'error_message' => (string) ($detailResult['error'] ?? 'Fetch failed'),
|
|||
|
|
]);
|
|||
|
|
|
|||
|
|
continue;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
$extracted = $this->extractPayload($rule, $detailResult['body']);
|
|||
|
|
$missing = $this->missingRequiredFields($rule, $extracted);
|
|||
|
|
|
|||
|
|
if ($missing !== []) {
|
|||
|
|
$skippedCount++;
|
|||
|
|
$this->createRunItem($run, [
|
|||
|
|
'url' => $detailUrl,
|
|||
|
|
'stage' => 'extract',
|
|||
|
|
'attempt' => $detailAttempt,
|
|||
|
|
'status' => CrawlRunItemStatus::Skipped,
|
|||
|
|
'latency_ms' => $detailResult['latency_ms'] ?? null,
|
|||
|
|
'http_code' => $detailResult['http_code'] ?? null,
|
|||
|
|
'error_code' => 'missing_fields',
|
|||
|
|
'error_message' => 'Missing required fields: '.implode(', ', $missing),
|
|||
|
|
'raw_payload' => ['html_length' => mb_strlen($detailResult['body'])],
|
|||
|
|
'normalized_payload' => $extracted,
|
|||
|
|
]);
|
|||
|
|
|
|||
|
|
continue;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
try {
|
|||
|
|
$upsertResult = $this->upsertService->upsert($rule, $extracted, $detailUrl);
|
|||
|
|
$successCount++;
|
|||
|
|
|
|||
|
|
$this->createRunItem($run, [
|
|||
|
|
'url' => $detailUrl,
|
|||
|
|
'stage' => 'upsert',
|
|||
|
|
'attempt' => $detailAttempt,
|
|||
|
|
'status' => CrawlRunItemStatus::Success,
|
|||
|
|
'latency_ms' => $detailResult['latency_ms'] ?? null,
|
|||
|
|
'http_code' => $detailResult['http_code'] ?? null,
|
|||
|
|
'normalized_payload' => $extracted,
|
|||
|
|
'upsert_result' => $upsertResult,
|
|||
|
|
]);
|
|||
|
|
} catch (\Throwable $exception) {
|
|||
|
|
$failedCount++;
|
|||
|
|
$errors[] = sprintf('Upsert failed(%s): %s', $detailUrl, $exception->getMessage());
|
|||
|
|
|
|||
|
|
$this->createRunItem($run, [
|
|||
|
|
'url' => $detailUrl,
|
|||
|
|
'stage' => 'upsert',
|
|||
|
|
'attempt' => $detailAttempt,
|
|||
|
|
'status' => CrawlRunItemStatus::Failed,
|
|||
|
|
'latency_ms' => $detailResult['latency_ms'] ?? null,
|
|||
|
|
'http_code' => $detailResult['http_code'] ?? null,
|
|||
|
|
'error_code' => 'upsert_failed',
|
|||
|
|
'error_message' => $exception->getMessage(),
|
|||
|
|
'normalized_payload' => $extracted,
|
|||
|
|
]);
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
$status = $this->finalizeStatus($successCount, $failedCount, $errors);
|
|||
|
|
$run->fill([
|
|||
|
|
'status' => $status,
|
|||
|
|
'finished_at' => now(),
|
|||
|
|
'total_urls' => $totalUrls,
|
|||
|
|
'success_count' => $successCount,
|
|||
|
|
'failed_count' => $failedCount,
|
|||
|
|
'skipped_count' => $skippedCount,
|
|||
|
|
'error_summary' => $errors !== [] ? Str::limit(implode(' | ', $errors), 1000) : null,
|
|||
|
|
'metrics' => array_merge($metrics, ['entry_url_count' => count($entryUrls)]),
|
|||
|
|
]);
|
|||
|
|
$run->save();
|
|||
|
|
|
|||
|
|
$rule->last_run_at = now();
|
|||
|
|
$rule->next_run_at = $this->scheduleService->nextRunAt($rule);
|
|||
|
|
$rule->save();
|
|||
|
|
|
|||
|
|
if ($failedCount > 0 || $errors !== []) {
|
|||
|
|
$this->alertService->notify(
|
|||
|
|
$failedCount > 0 ? CrawlAlertSeverity::Error : CrawlAlertSeverity::Warning,
|
|||
|
|
'run_failed_or_partial',
|
|||
|
|
sprintf('规则[%s]执行完成,成功%d,失败%d,跳过%d', $rule->name, $successCount, $failedCount, $skippedCount),
|
|||
|
|
$rule,
|
|||
|
|
$run,
|
|||
|
|
[
|
|||
|
|
'errors' => array_slice($errors, 0, 10),
|
|||
|
|
],
|
|||
|
|
);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return $run->refresh();
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/**
|
|||
|
|
* @return array{0: array{ok: bool, http_code: int|null, body: string, error: string|null, latency_ms: int}, 1: int}
|
|||
|
|
*/
|
|||
|
|
private function fetchWithRetry(CrawlRule $rule, string $url): array
|
|||
|
|
{
|
|||
|
|
$maxAttempts = max(1, (int) $rule->retry_max);
|
|||
|
|
$backoff = max(1, (int) $rule->retry_backoff_seconds);
|
|||
|
|
|
|||
|
|
$lastResult = [
|
|||
|
|
'ok' => false,
|
|||
|
|
'http_code' => null,
|
|||
|
|
'body' => '',
|
|||
|
|
'error' => 'not_started',
|
|||
|
|
'latency_ms' => 0,
|
|||
|
|
];
|
|||
|
|
|
|||
|
|
for ($attempt = 1; $attempt <= $maxAttempts; $attempt++) {
|
|||
|
|
$lastResult = $this->fetcher->fetch($rule, $url);
|
|||
|
|
if ($lastResult['ok']) {
|
|||
|
|
return [$lastResult, $attempt];
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if ($attempt < $maxAttempts) {
|
|||
|
|
sleep(min($backoff * $attempt, 15));
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return [$lastResult, $maxAttempts];
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/**
|
|||
|
|
* @return array<string, mixed>
|
|||
|
|
*/
|
|||
|
|
private function extractPayload(CrawlRule $rule, string $html): array
|
|||
|
|
{
|
|||
|
|
$extractorConfig = is_array($rule->extractor_config) ? $rule->extractor_config : [];
|
|||
|
|
$mode = strtolower((string) ($extractorConfig['mode'] ?? 'xpath'));
|
|||
|
|
if (! in_array($mode, ['xpath', 'ai', 'hybrid'], true)) {
|
|||
|
|
$mode = 'xpath';
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
$aiOptions = is_array($extractorConfig['ai'] ?? null) ? $extractorConfig['ai'] : [];
|
|||
|
|
|
|||
|
|
$xpathPayload = [];
|
|||
|
|
$aiPayload = [];
|
|||
|
|
|
|||
|
|
if ($mode !== 'ai') {
|
|||
|
|
$xpathPayload = $this->extractor->extractFields($html, $extractorConfig);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
$shouldUseAi = $mode === 'ai' || $mode === 'hybrid';
|
|||
|
|
|
|||
|
|
if ($mode === 'xpath' && $rule->ai_fallback_enabled) {
|
|||
|
|
$shouldUseAi = $this->missingRequiredFields($rule, $xpathPayload) !== [];
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if ($shouldUseAi) {
|
|||
|
|
$aiPayload = $this->aiFallbackExtractor->extract($rule, $html, $aiOptions);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if ($mode === 'ai') {
|
|||
|
|
return $aiPayload;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if ($mode === 'hybrid') {
|
|||
|
|
return array_merge($aiPayload, $xpathPayload);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if ($rule->ai_fallback_enabled && $aiPayload !== []) {
|
|||
|
|
return array_merge($aiPayload, $xpathPayload);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return $xpathPayload;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/**
|
|||
|
|
* @param array<string, mixed> $payload
|
|||
|
|
* @return list<string>
|
|||
|
|
*/
|
|||
|
|
private function missingRequiredFields(CrawlRule $rule, array $payload): array
|
|||
|
|
{
|
|||
|
|
$required = $rule->target_module?->value === 'model'
|
|||
|
|
? ['name', 'summary', 'modality', 'deployment_mode']
|
|||
|
|
: ['name', 'summary'];
|
|||
|
|
|
|||
|
|
$missing = [];
|
|||
|
|
foreach ($required as $field) {
|
|||
|
|
$value = Arr::get($payload, $field);
|
|||
|
|
if (! is_string($value) || trim($value) === '') {
|
|||
|
|
$missing[] = $field;
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return $missing;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/**
|
|||
|
|
* @param list<string> $errors
|
|||
|
|
*/
|
|||
|
|
private function finalizeStatus(int $successCount, int $failedCount, array $errors): CrawlRunStatus
|
|||
|
|
{
|
|||
|
|
if ($successCount > 0 && $failedCount === 0 && $errors === []) {
|
|||
|
|
return CrawlRunStatus::Completed;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if ($successCount > 0) {
|
|||
|
|
return CrawlRunStatus::Partial;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return CrawlRunStatus::Failed;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/**
|
|||
|
|
* @param array<string, mixed> $attributes
|
|||
|
|
*/
|
|||
|
|
private function createRunItem(CrawlRun $run, array $attributes): CrawlRunItem
|
|||
|
|
{
|
|||
|
|
return $run->items()->create($attributes);
|
|||
|
|
}
|
|||
|
|
}
|