429 lines
16 KiB
PHP
429 lines
16 KiB
PHP
<?php
|
|
declare(strict_types=1);
|
|
|
|
namespace App;
|
|
|
|
use DateTimeImmutable;
|
|
use DateTimeZone;
|
|
|
|
final class ModuleCronScheduler
|
|
{
|
|
public function __construct(
|
|
private ModuleManager $modules,
|
|
private ?\PDO $pdo
|
|
) {
|
|
}
|
|
|
|
public function definitions(string $moduleName): array
|
|
{
|
|
$module = $this->modules->get($moduleName);
|
|
$tasks = is_array($module['scheduler_jobs'] ?? null) ? $module['scheduler_jobs'] : (is_array($module['cron_tasks'] ?? null) ? $module['cron_tasks'] : []);
|
|
$definitions = [];
|
|
|
|
foreach ($tasks as $task) {
|
|
if (!is_array($task)) {
|
|
continue;
|
|
}
|
|
|
|
$name = trim((string) ($task['name'] ?? ''));
|
|
$callback = trim((string) ($task['callback'] ?? ''));
|
|
if ($name === '' || $callback === '') {
|
|
continue;
|
|
}
|
|
|
|
$definitions[$name] = [
|
|
'name' => $name,
|
|
'label' => trim((string) ($task['label'] ?? $name)),
|
|
'callback' => $callback,
|
|
'mode' => in_array((string) ($task['mode'] ?? 'single'), ['single', 'multi'], true) ? (string) ($task['mode'] ?? 'single') : 'single',
|
|
'default_enabled' => array_key_exists('default_enabled', $task) ? (bool) $task['default_enabled'] : false,
|
|
'default_cron' => trim((string) ($task['default_cron'] ?? '0 * * * *')),
|
|
'default_timezone' => trim((string) ($task['default_timezone'] ?? 'UTC')) ?: 'UTC',
|
|
'timezone_setting' => trim((string) ($task['timezone_setting'] ?? '')),
|
|
'lock_minutes' => max(1, (int) ($task['lock_minutes'] ?? 10)),
|
|
'builder' => is_array($task['builder'] ?? null) ? $task['builder'] : [],
|
|
'help' => trim((string) ($task['help'] ?? '')),
|
|
];
|
|
}
|
|
|
|
return $definitions;
|
|
}
|
|
|
|
public function statuses(string $moduleName): array
|
|
{
|
|
$definitions = $this->definitions($moduleName);
|
|
if ($definitions === []) {
|
|
return [];
|
|
}
|
|
|
|
$settings = $this->modules->settings($moduleName);
|
|
$states = $this->fetchStates($moduleName);
|
|
$nowUtc = new DateTimeImmutable('now', new DateTimeZone('UTC'));
|
|
$statuses = [];
|
|
|
|
foreach ($definitions as $name => $definition) {
|
|
foreach ($this->jobConfigs($definition, $settings) as $entryIndex => $config) {
|
|
$stateKey = $this->stateKey($name, $entryIndex);
|
|
$state = $states[$stateKey] ?? [];
|
|
$timezone = $this->safeTimezone((string) ($config['timezone'] ?? 'UTC'));
|
|
$expression = trim((string) ($config['cron_expression'] ?? ''));
|
|
$isLocked = $this->isLockActive($state, $nowUtc->getTimestamp());
|
|
$parseError = null;
|
|
$previousDueUtc = null;
|
|
$nextDueUtc = null;
|
|
$isDue = false;
|
|
|
|
try {
|
|
$cron = CronExpression::parse($expression);
|
|
if (!empty($config['enabled'])) {
|
|
$previousDueUtc = $cron->previousRun($nowUtc, $timezone);
|
|
$lastScheduledFor = $this->parseUtc((string) ($state['last_scheduled_for'] ?? ''));
|
|
$isDue = !$isLocked
|
|
&& $previousDueUtc instanceof DateTimeImmutable
|
|
&& ($lastScheduledFor === null || $previousDueUtc > $lastScheduledFor);
|
|
$nextDueUtc = $isDue
|
|
? $previousDueUtc
|
|
: $cron->nextRun($nowUtc, $timezone);
|
|
}
|
|
} catch (\Throwable $exception) {
|
|
$parseError = $exception->getMessage();
|
|
}
|
|
|
|
$statuses[] = $definition + [
|
|
'job_name' => $name,
|
|
'entry_index' => $entryIndex,
|
|
'state_key' => $stateKey,
|
|
'config' => $config,
|
|
'state' => $state,
|
|
'last_started_at_local' => ($this->parseUtc((string) ($state['last_started_at'] ?? '')))?->setTimezone($timezone)->format('Y-m-d H:i:s'),
|
|
'last_success_at_local' => ($this->parseUtc((string) ($state['last_success_at'] ?? '')))?->setTimezone($timezone)->format('Y-m-d H:i:s'),
|
|
'enabled' => !empty($config['enabled']),
|
|
'cron_expression' => $expression,
|
|
'timezone' => $timezone->getName(),
|
|
'is_due' => $isDue,
|
|
'is_locked' => $isLocked,
|
|
'parse_error' => $parseError,
|
|
'previous_due_at' => $previousDueUtc?->format('Y-m-d H:i:s'),
|
|
'next_due_at' => $nextDueUtc?->format('Y-m-d H:i:s'),
|
|
'previous_due_at_local' => $previousDueUtc?->setTimezone($timezone)->format('Y-m-d H:i:s'),
|
|
'next_due_at_local' => $nextDueUtc?->setTimezone($timezone)->format('Y-m-d H:i:s'),
|
|
];
|
|
}
|
|
}
|
|
|
|
return $statuses;
|
|
}
|
|
|
|
public function runDue(string $moduleName): array
|
|
{
|
|
$results = [];
|
|
foreach ($this->statuses($moduleName) as $task) {
|
|
if (empty($task['enabled']) || empty($task['is_due']) || !empty($task['parse_error'])) {
|
|
continue;
|
|
}
|
|
|
|
if (!$this->modules->hasFunction($moduleName, (string) $task['callback'])) {
|
|
$results[] = [
|
|
'task' => $task['job_name'],
|
|
'entry_index' => $task['entry_index'],
|
|
'ok' => false,
|
|
'message' => 'Callback nicht registriert.',
|
|
];
|
|
continue;
|
|
}
|
|
|
|
if (!$this->acquireLock($moduleName, (string) $task['state_key'], (int) $task['lock_minutes'])) {
|
|
continue;
|
|
}
|
|
|
|
$startedAt = gmdate('Y-m-d H:i:s');
|
|
$scheduledForUtc = trim((string) ($task['previous_due_at'] ?? ''));
|
|
$timezone = $this->safeTimezone((string) ($task['timezone'] ?? 'UTC'));
|
|
|
|
$this->persistState($moduleName, (string) $task['state_key'], [
|
|
'last_started_at' => $startedAt,
|
|
'last_status' => 'running',
|
|
'last_message' => 'Cron-Lauf gestartet.',
|
|
]);
|
|
|
|
try {
|
|
$result = $this->modules->call($moduleName, (string) $task['callback'], [
|
|
'task' => $task,
|
|
'trigger' => 'cron_runner',
|
|
'started_at' => $startedAt,
|
|
'scheduled_for_utc' => $scheduledForUtc,
|
|
'scheduled_for_local' => $scheduledForUtc !== ''
|
|
? $this->parseUtc($scheduledForUtc)?->setTimezone($timezone)?->format('Y-m-d H:i:s')
|
|
: null,
|
|
'timezone' => $timezone->getName(),
|
|
]);
|
|
|
|
$ok = !is_array($result) || !array_key_exists('ok', $result) || !empty($result['ok']);
|
|
$skipped = is_array($result) && !empty($result['skipped']);
|
|
$message = is_array($result) ? trim((string) ($result['message'] ?? '')) : '';
|
|
$finishedAt = gmdate('Y-m-d H:i:s');
|
|
|
|
$payload = [
|
|
'last_finished_at' => $finishedAt,
|
|
'last_status' => $skipped ? 'skipped' : ($ok ? 'success' : 'error'),
|
|
'last_message' => $message,
|
|
'lock_until' => null,
|
|
'last_scheduled_for' => $scheduledForUtc !== '' ? $scheduledForUtc : null,
|
|
];
|
|
if ($ok && !$skipped) {
|
|
$payload['last_success_at'] = $finishedAt;
|
|
}
|
|
$this->persistState($moduleName, (string) $task['state_key'], $payload);
|
|
|
|
$results[] = [
|
|
'task' => $task['job_name'],
|
|
'entry_index' => $task['entry_index'],
|
|
'ok' => $ok,
|
|
'message' => $message,
|
|
];
|
|
} catch (\Throwable $e) {
|
|
$finishedAt = gmdate('Y-m-d H:i:s');
|
|
$this->persistState($moduleName, (string) $task['state_key'], [
|
|
'last_finished_at' => $finishedAt,
|
|
'last_status' => 'error',
|
|
'last_message' => $e->getMessage(),
|
|
'lock_until' => null,
|
|
'last_scheduled_for' => $scheduledForUtc !== '' ? $scheduledForUtc : null,
|
|
]);
|
|
$results[] = [
|
|
'task' => $task['job_name'],
|
|
'entry_index' => $task['entry_index'],
|
|
'ok' => false,
|
|
'message' => $e->getMessage(),
|
|
];
|
|
}
|
|
}
|
|
|
|
return $results;
|
|
}
|
|
|
|
private function jobConfigs(array $definition, array $settings): array
|
|
{
|
|
$jobs = is_array($settings['scheduler_jobs'] ?? null)
|
|
? $settings['scheduler_jobs']
|
|
: (is_array($settings['cron_jobs'] ?? null) ? $settings['cron_jobs'] : []);
|
|
$jobExists = array_key_exists($definition['name'], $jobs) && is_array($jobs[$definition['name']] ?? null);
|
|
$job = $jobExists ? $jobs[$definition['name']] : [];
|
|
$timezoneSetting = trim((string) ($definition['timezone_setting'] ?? ''));
|
|
$fallbackTimezone = $timezoneSetting !== '' ? trim((string) ($settings[$timezoneSetting] ?? '')) : '';
|
|
$defaultEntry = [
|
|
'enabled' => (bool) $definition['default_enabled'],
|
|
'cron_expression' => trim((string) ($definition['default_cron'] ?? '0 * * * *')),
|
|
'timezone' => trim((string) ($fallbackTimezone !== '' ? $fallbackTimezone : ($definition['default_timezone'] ?? 'UTC'))),
|
|
'builder' => [],
|
|
];
|
|
|
|
$entries = is_array($job['entries'] ?? null) ? $job['entries'] : [];
|
|
if (!$jobExists && $entries === []) {
|
|
$legacyEntry = [];
|
|
foreach (['enabled', 'cron_expression', 'timezone', 'builder'] as $field) {
|
|
if (array_key_exists($field, $job)) {
|
|
$legacyEntry[$field] = $job[$field];
|
|
}
|
|
}
|
|
$entries = [$legacyEntry !== [] ? $legacyEntry : $defaultEntry];
|
|
}
|
|
|
|
if ($jobExists && $entries === [] && ($definition['mode'] ?? 'single') === 'multi') {
|
|
return [];
|
|
}
|
|
|
|
$result = [];
|
|
foreach (array_values($entries) as $entry) {
|
|
if (!is_array($entry)) {
|
|
continue;
|
|
}
|
|
$result[] = [
|
|
'enabled' => array_key_exists('enabled', $entry) ? $this->settingBool($entry['enabled'], (bool) $defaultEntry['enabled']) : (bool) $defaultEntry['enabled'],
|
|
'cron_expression' => trim((string) ($entry['cron_expression'] ?? $defaultEntry['cron_expression'])),
|
|
'timezone' => trim((string) ($entry['timezone'] ?? $defaultEntry['timezone'])),
|
|
'builder' => is_array($entry['builder'] ?? null) ? $entry['builder'] : [],
|
|
];
|
|
}
|
|
|
|
if ($result === []) {
|
|
$result[] = $defaultEntry;
|
|
}
|
|
|
|
if (($definition['mode'] ?? 'single') !== 'multi') {
|
|
return [0 => $result[0]];
|
|
}
|
|
|
|
return $result;
|
|
}
|
|
|
|
private function stateKey(string $jobName, int $entryIndex): string
|
|
{
|
|
return $jobName . '#' . $entryIndex;
|
|
}
|
|
|
|
private function fetchStates(string $moduleName): array
|
|
{
|
|
if (!$this->pdo) {
|
|
return [];
|
|
}
|
|
|
|
$stmt = $this->pdo->prepare(
|
|
'SELECT *
|
|
FROM nexus_module_cron_runs
|
|
WHERE module_name = :module_name'
|
|
);
|
|
$stmt->execute(['module_name' => $moduleName]);
|
|
|
|
$states = [];
|
|
foreach ($stmt->fetchAll(\PDO::FETCH_ASSOC) ?: [] as $row) {
|
|
$name = trim((string) ($row['job_name'] ?? ''));
|
|
if ($name !== '') {
|
|
$states[$name] = $row;
|
|
}
|
|
}
|
|
|
|
return $states;
|
|
}
|
|
|
|
private function acquireLock(string $moduleName, string $jobName, int $lockMinutes): bool
|
|
{
|
|
if (!$this->pdo) {
|
|
return true;
|
|
}
|
|
|
|
$states = $this->fetchStates($moduleName);
|
|
$state = $states[$jobName] ?? [];
|
|
if ($this->isLockActive($state, time())) {
|
|
return false;
|
|
}
|
|
|
|
$lockUntil = gmdate('Y-m-d H:i:s', time() + ($lockMinutes * 60));
|
|
$this->persistState($moduleName, $jobName, [
|
|
'lock_until' => $lockUntil,
|
|
]);
|
|
|
|
return true;
|
|
}
|
|
|
|
private function persistState(string $moduleName, string $jobName, array $values): void
|
|
{
|
|
if (!$this->pdo) {
|
|
return;
|
|
}
|
|
|
|
$current = $this->fetchStates($moduleName)[$jobName] ?? [];
|
|
$payload = array_merge($current, $values, [
|
|
'module_name' => $moduleName,
|
|
'job_name' => $jobName,
|
|
'updated_at' => gmdate('Y-m-d H:i:s'),
|
|
]);
|
|
|
|
$params = [
|
|
'module_name' => $moduleName,
|
|
'job_name' => $jobName,
|
|
'last_scheduled_for' => $this->nullOrString($payload['last_scheduled_for'] ?? null),
|
|
'last_started_at' => $this->nullOrString($payload['last_started_at'] ?? null),
|
|
'last_finished_at' => $this->nullOrString($payload['last_finished_at'] ?? null),
|
|
'last_success_at' => $this->nullOrString($payload['last_success_at'] ?? null),
|
|
'last_status' => $this->nullOrString($payload['last_status'] ?? null),
|
|
'last_message' => $this->nullOrString($payload['last_message'] ?? null),
|
|
'lock_until' => $this->nullOrString($payload['lock_until'] ?? null),
|
|
'updated_at' => $payload['updated_at'],
|
|
];
|
|
|
|
$updateStmt = $this->pdo->prepare(
|
|
'UPDATE nexus_module_cron_runs
|
|
SET last_scheduled_for = :last_scheduled_for,
|
|
last_started_at = :last_started_at,
|
|
last_finished_at = :last_finished_at,
|
|
last_success_at = :last_success_at,
|
|
last_status = :last_status,
|
|
last_message = :last_message,
|
|
lock_until = :lock_until,
|
|
updated_at = :updated_at
|
|
WHERE module_name = :module_name
|
|
AND job_name = :job_name'
|
|
);
|
|
$updateStmt->execute($params);
|
|
if ($updateStmt->rowCount() > 0) {
|
|
return;
|
|
}
|
|
|
|
$insertStmt = $this->pdo->prepare(
|
|
'INSERT INTO nexus_module_cron_runs (
|
|
module_name,
|
|
job_name,
|
|
last_scheduled_for,
|
|
last_started_at,
|
|
last_finished_at,
|
|
last_success_at,
|
|
last_status,
|
|
last_message,
|
|
lock_until,
|
|
updated_at
|
|
) VALUES (
|
|
:module_name,
|
|
:job_name,
|
|
:last_scheduled_for,
|
|
:last_started_at,
|
|
:last_finished_at,
|
|
:last_success_at,
|
|
:last_status,
|
|
:last_message,
|
|
:lock_until,
|
|
:updated_at
|
|
)'
|
|
);
|
|
$insertStmt->execute($params);
|
|
}
|
|
|
|
private function nullOrString(mixed $value): ?string
|
|
{
|
|
$value = trim((string) $value);
|
|
return $value !== '' ? $value : null;
|
|
}
|
|
|
|
private function settingBool(mixed $value, bool $default): bool
|
|
{
|
|
if ($value === null || $value === '') {
|
|
return $default;
|
|
}
|
|
return in_array((string) $value, ['1', 'true', 'yes', 'on'], true);
|
|
}
|
|
|
|
private function isLockActive(array $state, int $nowTs): bool
|
|
{
|
|
$lockUntil = trim((string) ($state['lock_until'] ?? ''));
|
|
if ($lockUntil === '') {
|
|
return false;
|
|
}
|
|
|
|
$lockTs = strtotime($lockUntil);
|
|
return $lockTs !== false && $lockTs > $nowTs;
|
|
}
|
|
|
|
private function parseUtc(string $value): ?DateTimeImmutable
|
|
{
|
|
$value = trim($value);
|
|
if ($value === '') {
|
|
return null;
|
|
}
|
|
|
|
try {
|
|
return new DateTimeImmutable($value, new DateTimeZone('UTC'));
|
|
} catch (\Throwable) {
|
|
return null;
|
|
}
|
|
}
|
|
|
|
private function safeTimezone(string $timezone): DateTimeZone
|
|
{
|
|
try {
|
|
return new DateTimeZone(trim($timezone) !== '' ? trim($timezone) : 'UTC');
|
|
} catch (\Throwable) {
|
|
return new DateTimeZone('UTC');
|
|
}
|
|
}
|
|
}
|