爬虫开发
This commit is contained in:
335
app/Services/Crawler/CrawlExecutionService.php
Normal file
335
app/Services/Crawler/CrawlExecutionService.php
Normal file
@@ -0,0 +1,335 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace App\Services\Crawler;
|
||||
|
||||
use App\Enums\CrawlAlertSeverity;
|
||||
use App\Enums\CrawlRunItemStatus;
|
||||
use App\Enums\CrawlRunStatus;
|
||||
use App\Enums\CrawlTriggerType;
|
||||
use App\Models\CrawlRule;
|
||||
use App\Models\CrawlRun;
|
||||
use App\Models\CrawlRunItem;
|
||||
use Illuminate\Support\Arr;
|
||||
use Illuminate\Support\Str;
|
||||
|
||||
class CrawlExecutionService
|
||||
{
|
||||
public function __construct(
|
||||
private readonly CrawlFetcherService $fetcher,
|
||||
private readonly XPathExtractor $extractor,
|
||||
private readonly OpenAiFallbackExtractor $aiFallbackExtractor,
|
||||
private readonly CrawlEntityUpsertService $upsertService,
|
||||
private readonly CrawlAlertService $alertService,
|
||||
private readonly CrawlRuleScheduleService $scheduleService,
|
||||
) {
|
||||
}
|
||||
|
||||
/**
|
||||
* @param array<string, mixed> $metrics
|
||||
*/
|
||||
public function runRule(
|
||||
CrawlRule $rule,
|
||||
CrawlTriggerType $triggerType,
|
||||
?int $createdBy = null,
|
||||
array $metrics = [],
|
||||
): CrawlRun {
|
||||
$run = CrawlRun::query()->create([
|
||||
'rule_id' => $rule->id,
|
||||
'trigger_type' => $triggerType,
|
||||
'status' => CrawlRunStatus::Running,
|
||||
'started_at' => now(),
|
||||
'metrics' => $metrics,
|
||||
'created_by' => $createdBy,
|
||||
]);
|
||||
|
||||
$successCount = 0;
|
||||
$failedCount = 0;
|
||||
$skippedCount = 0;
|
||||
$totalUrls = 0;
|
||||
$errors = [];
|
||||
|
||||
$entryUrls = collect($rule->entry_urls)
|
||||
->filter(static fn ($url): bool => is_string($url) && filter_var($url, FILTER_VALIDATE_URL) !== false)
|
||||
->values()
|
||||
->all();
|
||||
|
||||
if ($entryUrls === []) {
|
||||
$errors[] = 'No valid entry urls configured';
|
||||
}
|
||||
|
||||
$maxPages = max(1, (int) $rule->max_pages);
|
||||
|
||||
foreach ($entryUrls as $entryUrl) {
|
||||
[$listResult, $listAttempt] = $this->fetchWithRetry($rule, $entryUrl);
|
||||
|
||||
if (! $listResult['ok']) {
|
||||
$failedCount++;
|
||||
$errors[] = sprintf('List fetch failed: %s', (string) ($listResult['error'] ?? 'unknown'));
|
||||
$this->createRunItem($run, [
|
||||
'url' => $entryUrl,
|
||||
'stage' => 'list',
|
||||
'attempt' => $listAttempt,
|
||||
'status' => CrawlRunItemStatus::Failed,
|
||||
'latency_ms' => $listResult['latency_ms'] ?? null,
|
||||
'http_code' => $listResult['http_code'] ?? null,
|
||||
'error_code' => 'fetch_failed',
|
||||
'error_message' => (string) ($listResult['error'] ?? 'Fetch failed'),
|
||||
]);
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
$this->createRunItem($run, [
|
||||
'url' => $entryUrl,
|
||||
'stage' => 'list',
|
||||
'attempt' => $listAttempt,
|
||||
'status' => CrawlRunItemStatus::Success,
|
||||
'latency_ms' => $listResult['latency_ms'] ?? null,
|
||||
'http_code' => $listResult['http_code'] ?? null,
|
||||
]);
|
||||
|
||||
$detailUrls = $this->extractor->extractListUrls(
|
||||
$listResult['body'],
|
||||
$entryUrl,
|
||||
is_array($rule->extractor_config) ? $rule->extractor_config : [],
|
||||
);
|
||||
|
||||
if ($detailUrls === []) {
|
||||
$detailUrls = [$entryUrl];
|
||||
}
|
||||
|
||||
foreach ($detailUrls as $detailUrl) {
|
||||
if ($totalUrls >= $maxPages) {
|
||||
break 2;
|
||||
}
|
||||
|
||||
$totalUrls++;
|
||||
[$detailResult, $detailAttempt] = $this->fetchWithRetry($rule, $detailUrl);
|
||||
|
||||
if (! $detailResult['ok']) {
|
||||
$failedCount++;
|
||||
$errors[] = sprintf('Detail fetch failed(%s): %s', $detailUrl, (string) ($detailResult['error'] ?? 'unknown'));
|
||||
|
||||
$this->createRunItem($run, [
|
||||
'url' => $detailUrl,
|
||||
'stage' => 'detail',
|
||||
'attempt' => $detailAttempt,
|
||||
'status' => CrawlRunItemStatus::Failed,
|
||||
'latency_ms' => $detailResult['latency_ms'] ?? null,
|
||||
'http_code' => $detailResult['http_code'] ?? null,
|
||||
'error_code' => 'fetch_failed',
|
||||
'error_message' => (string) ($detailResult['error'] ?? 'Fetch failed'),
|
||||
]);
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
$extracted = $this->extractPayload($rule, $detailResult['body']);
|
||||
$missing = $this->missingRequiredFields($rule, $extracted);
|
||||
|
||||
if ($missing !== []) {
|
||||
$skippedCount++;
|
||||
$this->createRunItem($run, [
|
||||
'url' => $detailUrl,
|
||||
'stage' => 'extract',
|
||||
'attempt' => $detailAttempt,
|
||||
'status' => CrawlRunItemStatus::Skipped,
|
||||
'latency_ms' => $detailResult['latency_ms'] ?? null,
|
||||
'http_code' => $detailResult['http_code'] ?? null,
|
||||
'error_code' => 'missing_fields',
|
||||
'error_message' => 'Missing required fields: '.implode(', ', $missing),
|
||||
'raw_payload' => ['html_length' => mb_strlen($detailResult['body'])],
|
||||
'normalized_payload' => $extracted,
|
||||
]);
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
try {
|
||||
$upsertResult = $this->upsertService->upsert($rule, $extracted, $detailUrl);
|
||||
$successCount++;
|
||||
|
||||
$this->createRunItem($run, [
|
||||
'url' => $detailUrl,
|
||||
'stage' => 'upsert',
|
||||
'attempt' => $detailAttempt,
|
||||
'status' => CrawlRunItemStatus::Success,
|
||||
'latency_ms' => $detailResult['latency_ms'] ?? null,
|
||||
'http_code' => $detailResult['http_code'] ?? null,
|
||||
'normalized_payload' => $extracted,
|
||||
'upsert_result' => $upsertResult,
|
||||
]);
|
||||
} catch (\Throwable $exception) {
|
||||
$failedCount++;
|
||||
$errors[] = sprintf('Upsert failed(%s): %s', $detailUrl, $exception->getMessage());
|
||||
|
||||
$this->createRunItem($run, [
|
||||
'url' => $detailUrl,
|
||||
'stage' => 'upsert',
|
||||
'attempt' => $detailAttempt,
|
||||
'status' => CrawlRunItemStatus::Failed,
|
||||
'latency_ms' => $detailResult['latency_ms'] ?? null,
|
||||
'http_code' => $detailResult['http_code'] ?? null,
|
||||
'error_code' => 'upsert_failed',
|
||||
'error_message' => $exception->getMessage(),
|
||||
'normalized_payload' => $extracted,
|
||||
]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
$status = $this->finalizeStatus($successCount, $failedCount, $errors);
|
||||
$run->fill([
|
||||
'status' => $status,
|
||||
'finished_at' => now(),
|
||||
'total_urls' => $totalUrls,
|
||||
'success_count' => $successCount,
|
||||
'failed_count' => $failedCount,
|
||||
'skipped_count' => $skippedCount,
|
||||
'error_summary' => $errors !== [] ? Str::limit(implode(' | ', $errors), 1000) : null,
|
||||
'metrics' => array_merge($metrics, ['entry_url_count' => count($entryUrls)]),
|
||||
]);
|
||||
$run->save();
|
||||
|
||||
$rule->last_run_at = now();
|
||||
$rule->next_run_at = $this->scheduleService->nextRunAt($rule);
|
||||
$rule->save();
|
||||
|
||||
if ($failedCount > 0 || $errors !== []) {
|
||||
$this->alertService->notify(
|
||||
$failedCount > 0 ? CrawlAlertSeverity::Error : CrawlAlertSeverity::Warning,
|
||||
'run_failed_or_partial',
|
||||
sprintf('规则[%s]执行完成,成功%d,失败%d,跳过%d', $rule->name, $successCount, $failedCount, $skippedCount),
|
||||
$rule,
|
||||
$run,
|
||||
[
|
||||
'errors' => array_slice($errors, 0, 10),
|
||||
],
|
||||
);
|
||||
}
|
||||
|
||||
return $run->refresh();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return array{0: array{ok: bool, http_code: int|null, body: string, error: string|null, latency_ms: int}, 1: int}
|
||||
*/
|
||||
private function fetchWithRetry(CrawlRule $rule, string $url): array
|
||||
{
|
||||
$maxAttempts = max(1, (int) $rule->retry_max);
|
||||
$backoff = max(1, (int) $rule->retry_backoff_seconds);
|
||||
|
||||
$lastResult = [
|
||||
'ok' => false,
|
||||
'http_code' => null,
|
||||
'body' => '',
|
||||
'error' => 'not_started',
|
||||
'latency_ms' => 0,
|
||||
];
|
||||
|
||||
for ($attempt = 1; $attempt <= $maxAttempts; $attempt++) {
|
||||
$lastResult = $this->fetcher->fetch($rule, $url);
|
||||
if ($lastResult['ok']) {
|
||||
return [$lastResult, $attempt];
|
||||
}
|
||||
|
||||
if ($attempt < $maxAttempts) {
|
||||
sleep(min($backoff * $attempt, 15));
|
||||
}
|
||||
}
|
||||
|
||||
return [$lastResult, $maxAttempts];
|
||||
}
|
||||
|
||||
/**
|
||||
* @return array<string, mixed>
|
||||
*/
|
||||
private function extractPayload(CrawlRule $rule, string $html): array
|
||||
{
|
||||
$extractorConfig = is_array($rule->extractor_config) ? $rule->extractor_config : [];
|
||||
$mode = strtolower((string) ($extractorConfig['mode'] ?? 'xpath'));
|
||||
if (! in_array($mode, ['xpath', 'ai', 'hybrid'], true)) {
|
||||
$mode = 'xpath';
|
||||
}
|
||||
|
||||
$aiOptions = is_array($extractorConfig['ai'] ?? null) ? $extractorConfig['ai'] : [];
|
||||
|
||||
$xpathPayload = [];
|
||||
$aiPayload = [];
|
||||
|
||||
if ($mode !== 'ai') {
|
||||
$xpathPayload = $this->extractor->extractFields($html, $extractorConfig);
|
||||
}
|
||||
|
||||
$shouldUseAi = $mode === 'ai' || $mode === 'hybrid';
|
||||
|
||||
if ($mode === 'xpath' && $rule->ai_fallback_enabled) {
|
||||
$shouldUseAi = $this->missingRequiredFields($rule, $xpathPayload) !== [];
|
||||
}
|
||||
|
||||
if ($shouldUseAi) {
|
||||
$aiPayload = $this->aiFallbackExtractor->extract($rule, $html, $aiOptions);
|
||||
}
|
||||
|
||||
if ($mode === 'ai') {
|
||||
return $aiPayload;
|
||||
}
|
||||
|
||||
if ($mode === 'hybrid') {
|
||||
return array_merge($aiPayload, $xpathPayload);
|
||||
}
|
||||
|
||||
if ($rule->ai_fallback_enabled && $aiPayload !== []) {
|
||||
return array_merge($aiPayload, $xpathPayload);
|
||||
}
|
||||
|
||||
return $xpathPayload;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param array<string, mixed> $payload
|
||||
* @return list<string>
|
||||
*/
|
||||
private function missingRequiredFields(CrawlRule $rule, array $payload): array
|
||||
{
|
||||
$required = $rule->target_module?->value === 'model'
|
||||
? ['name', 'summary', 'modality', 'deployment_mode']
|
||||
: ['name', 'summary'];
|
||||
|
||||
$missing = [];
|
||||
foreach ($required as $field) {
|
||||
$value = Arr::get($payload, $field);
|
||||
if (! is_string($value) || trim($value) === '') {
|
||||
$missing[] = $field;
|
||||
}
|
||||
}
|
||||
|
||||
return $missing;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param list<string> $errors
|
||||
*/
|
||||
private function finalizeStatus(int $successCount, int $failedCount, array $errors): CrawlRunStatus
|
||||
{
|
||||
if ($successCount > 0 && $failedCount === 0 && $errors === []) {
|
||||
return CrawlRunStatus::Completed;
|
||||
}
|
||||
|
||||
if ($successCount > 0) {
|
||||
return CrawlRunStatus::Partial;
|
||||
}
|
||||
|
||||
return CrawlRunStatus::Failed;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param array<string, mixed> $attributes
|
||||
*/
|
||||
private function createRunItem(CrawlRun $run, array $attributes): CrawlRunItem
|
||||
{
|
||||
return $run->items()->create($attributes);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user