Files
ai-web/app/Services/Crawler/CrawlExecutionService.php
cjd 260460df03
Some checks failed
Tests / PHP 8.2 (push) Has been cancelled
Tests / PHP 8.3 (push) Has been cancelled
Tests / PHP 8.4 (push) Has been cancelled
爬虫开发
2026-02-18 12:56:36 +08:00

335 lines
12 KiB
PHP
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
<?php
declare(strict_types=1);
namespace App\Services\Crawler;
use App\Enums\CrawlAlertSeverity;
use App\Enums\CrawlRunItemStatus;
use App\Enums\CrawlRunStatus;
use App\Enums\CrawlTriggerType;
use App\Models\CrawlRule;
use App\Models\CrawlRun;
use App\Models\CrawlRunItem;
use Illuminate\Support\Arr;
use Illuminate\Support\Str;
class CrawlExecutionService
{
public function __construct(
private readonly CrawlFetcherService $fetcher,
private readonly XPathExtractor $extractor,
private readonly OpenAiFallbackExtractor $aiFallbackExtractor,
private readonly CrawlEntityUpsertService $upsertService,
private readonly CrawlAlertService $alertService,
private readonly CrawlRuleScheduleService $scheduleService,
) {
}
/**
* @param array<string, mixed> $metrics
*/
public function runRule(
CrawlRule $rule,
CrawlTriggerType $triggerType,
?int $createdBy = null,
array $metrics = [],
): CrawlRun {
$run = CrawlRun::query()->create([
'rule_id' => $rule->id,
'trigger_type' => $triggerType,
'status' => CrawlRunStatus::Running,
'started_at' => now(),
'metrics' => $metrics,
'created_by' => $createdBy,
]);
$successCount = 0;
$failedCount = 0;
$skippedCount = 0;
$totalUrls = 0;
$errors = [];
$entryUrls = collect($rule->entry_urls)
->filter(static fn ($url): bool => is_string($url) && filter_var($url, FILTER_VALIDATE_URL) !== false)
->values()
->all();
if ($entryUrls === []) {
$errors[] = 'No valid entry urls configured';
}
$maxPages = max(1, (int) $rule->max_pages);
foreach ($entryUrls as $entryUrl) {
[$listResult, $listAttempt] = $this->fetchWithRetry($rule, $entryUrl);
if (! $listResult['ok']) {
$failedCount++;
$errors[] = sprintf('List fetch failed: %s', (string) ($listResult['error'] ?? 'unknown'));
$this->createRunItem($run, [
'url' => $entryUrl,
'stage' => 'list',
'attempt' => $listAttempt,
'status' => CrawlRunItemStatus::Failed,
'latency_ms' => $listResult['latency_ms'] ?? null,
'http_code' => $listResult['http_code'] ?? null,
'error_code' => 'fetch_failed',
'error_message' => (string) ($listResult['error'] ?? 'Fetch failed'),
]);
continue;
}
$this->createRunItem($run, [
'url' => $entryUrl,
'stage' => 'list',
'attempt' => $listAttempt,
'status' => CrawlRunItemStatus::Success,
'latency_ms' => $listResult['latency_ms'] ?? null,
'http_code' => $listResult['http_code'] ?? null,
]);
$detailUrls = $this->extractor->extractListUrls(
$listResult['body'],
$entryUrl,
is_array($rule->extractor_config) ? $rule->extractor_config : [],
);
if ($detailUrls === []) {
$detailUrls = [$entryUrl];
}
foreach ($detailUrls as $detailUrl) {
if ($totalUrls >= $maxPages) {
break 2;
}
$totalUrls++;
[$detailResult, $detailAttempt] = $this->fetchWithRetry($rule, $detailUrl);
if (! $detailResult['ok']) {
$failedCount++;
$errors[] = sprintf('Detail fetch failed(%s): %s', $detailUrl, (string) ($detailResult['error'] ?? 'unknown'));
$this->createRunItem($run, [
'url' => $detailUrl,
'stage' => 'detail',
'attempt' => $detailAttempt,
'status' => CrawlRunItemStatus::Failed,
'latency_ms' => $detailResult['latency_ms'] ?? null,
'http_code' => $detailResult['http_code'] ?? null,
'error_code' => 'fetch_failed',
'error_message' => (string) ($detailResult['error'] ?? 'Fetch failed'),
]);
continue;
}
$extracted = $this->extractPayload($rule, $detailResult['body']);
$missing = $this->missingRequiredFields($rule, $extracted);
if ($missing !== []) {
$skippedCount++;
$this->createRunItem($run, [
'url' => $detailUrl,
'stage' => 'extract',
'attempt' => $detailAttempt,
'status' => CrawlRunItemStatus::Skipped,
'latency_ms' => $detailResult['latency_ms'] ?? null,
'http_code' => $detailResult['http_code'] ?? null,
'error_code' => 'missing_fields',
'error_message' => 'Missing required fields: '.implode(', ', $missing),
'raw_payload' => ['html_length' => mb_strlen($detailResult['body'])],
'normalized_payload' => $extracted,
]);
continue;
}
try {
$upsertResult = $this->upsertService->upsert($rule, $extracted, $detailUrl);
$successCount++;
$this->createRunItem($run, [
'url' => $detailUrl,
'stage' => 'upsert',
'attempt' => $detailAttempt,
'status' => CrawlRunItemStatus::Success,
'latency_ms' => $detailResult['latency_ms'] ?? null,
'http_code' => $detailResult['http_code'] ?? null,
'normalized_payload' => $extracted,
'upsert_result' => $upsertResult,
]);
} catch (\Throwable $exception) {
$failedCount++;
$errors[] = sprintf('Upsert failed(%s): %s', $detailUrl, $exception->getMessage());
$this->createRunItem($run, [
'url' => $detailUrl,
'stage' => 'upsert',
'attempt' => $detailAttempt,
'status' => CrawlRunItemStatus::Failed,
'latency_ms' => $detailResult['latency_ms'] ?? null,
'http_code' => $detailResult['http_code'] ?? null,
'error_code' => 'upsert_failed',
'error_message' => $exception->getMessage(),
'normalized_payload' => $extracted,
]);
}
}
}
$status = $this->finalizeStatus($successCount, $failedCount, $errors);
$run->fill([
'status' => $status,
'finished_at' => now(),
'total_urls' => $totalUrls,
'success_count' => $successCount,
'failed_count' => $failedCount,
'skipped_count' => $skippedCount,
'error_summary' => $errors !== [] ? Str::limit(implode(' | ', $errors), 1000) : null,
'metrics' => array_merge($metrics, ['entry_url_count' => count($entryUrls)]),
]);
$run->save();
$rule->last_run_at = now();
$rule->next_run_at = $this->scheduleService->nextRunAt($rule);
$rule->save();
if ($failedCount > 0 || $errors !== []) {
$this->alertService->notify(
$failedCount > 0 ? CrawlAlertSeverity::Error : CrawlAlertSeverity::Warning,
'run_failed_or_partial',
sprintf('规则[%s]执行完成,成功%d失败%d跳过%d', $rule->name, $successCount, $failedCount, $skippedCount),
$rule,
$run,
[
'errors' => array_slice($errors, 0, 10),
],
);
}
return $run->refresh();
}
/**
* @return array{0: array{ok: bool, http_code: int|null, body: string, error: string|null, latency_ms: int}, 1: int}
*/
private function fetchWithRetry(CrawlRule $rule, string $url): array
{
$maxAttempts = max(1, (int) $rule->retry_max);
$backoff = max(1, (int) $rule->retry_backoff_seconds);
$lastResult = [
'ok' => false,
'http_code' => null,
'body' => '',
'error' => 'not_started',
'latency_ms' => 0,
];
for ($attempt = 1; $attempt <= $maxAttempts; $attempt++) {
$lastResult = $this->fetcher->fetch($rule, $url);
if ($lastResult['ok']) {
return [$lastResult, $attempt];
}
if ($attempt < $maxAttempts) {
sleep(min($backoff * $attempt, 15));
}
}
return [$lastResult, $maxAttempts];
}
/**
* @return array<string, mixed>
*/
private function extractPayload(CrawlRule $rule, string $html): array
{
$extractorConfig = is_array($rule->extractor_config) ? $rule->extractor_config : [];
$mode = strtolower((string) ($extractorConfig['mode'] ?? 'xpath'));
if (! in_array($mode, ['xpath', 'ai', 'hybrid'], true)) {
$mode = 'xpath';
}
$aiOptions = is_array($extractorConfig['ai'] ?? null) ? $extractorConfig['ai'] : [];
$xpathPayload = [];
$aiPayload = [];
if ($mode !== 'ai') {
$xpathPayload = $this->extractor->extractFields($html, $extractorConfig);
}
$shouldUseAi = $mode === 'ai' || $mode === 'hybrid';
if ($mode === 'xpath' && $rule->ai_fallback_enabled) {
$shouldUseAi = $this->missingRequiredFields($rule, $xpathPayload) !== [];
}
if ($shouldUseAi) {
$aiPayload = $this->aiFallbackExtractor->extract($rule, $html, $aiOptions);
}
if ($mode === 'ai') {
return $aiPayload;
}
if ($mode === 'hybrid') {
return array_merge($aiPayload, $xpathPayload);
}
if ($rule->ai_fallback_enabled && $aiPayload !== []) {
return array_merge($aiPayload, $xpathPayload);
}
return $xpathPayload;
}
/**
* @param array<string, mixed> $payload
* @return list<string>
*/
private function missingRequiredFields(CrawlRule $rule, array $payload): array
{
$required = $rule->target_module?->value === 'model'
? ['name', 'summary', 'modality', 'deployment_mode']
: ['name', 'summary'];
$missing = [];
foreach ($required as $field) {
$value = Arr::get($payload, $field);
if (! is_string($value) || trim($value) === '') {
$missing[] = $field;
}
}
return $missing;
}
/**
* @param list<string> $errors
*/
private function finalizeStatus(int $successCount, int $failedCount, array $errors): CrawlRunStatus
{
if ($successCount > 0 && $failedCount === 0 && $errors === []) {
return CrawlRunStatus::Completed;
}
if ($successCount > 0) {
return CrawlRunStatus::Partial;
}
return CrawlRunStatus::Failed;
}
/**
* @param array<string, mixed> $attributes
*/
private function createRunItem(CrawlRun $run, array $attributes): CrawlRunItem
{
return $run->items()->create($attributes);
}
}