Files
nexus/tools/pi_control/worker.php
2026-03-06 00:54:03 +01:00

218 lines
7.1 KiB
PHP

<?php
declare(strict_types=1);
set_time_limit(0);
$root = dirname(__DIR__, 2);
chdir($root);
$fileload = $root . '/config/fileload.php';
if (!file_exists($fileload)) {
fwrite(STDERR, "[worker] Missing config: {$fileload}\n");
while (true) {
sleep(30);
}
}
require $fileload;
$module = 'pi_control';
$pdo = module_fn($module, 'pdo');
module_fn($module, 'ensure_schema');
$table = fn(string $name) => module_fn($module, 'table', $name);
$settingsReloadSec = (int)(getenv('PI_CONTROL_SETTINGS_RELOAD_SEC') !== false ? (int)getenv('PI_CONTROL_SETTINGS_RELOAD_SEC') : 30);
$settingsReloadSec = $settingsReloadSec > 0 ? $settingsReloadSec : 30;
$redis = null;
$queueName = 'pi_control:queue';
$defaultTimeout = 300;
$lastSettingsAt = 0;
$strictHostKey = getenv('PI_CONTROL_STRICT_HOSTKEY') === '1';
$driver = (string)$pdo->getAttribute(PDO::ATTR_DRIVER_NAME);
$nowExpr = $driver === 'pgsql' ? 'NOW()' : "DATETIME('now')";
while (true) {
if (time() - $lastSettingsAt >= $settingsReloadSec) {
$settings = modules()->settings($module);
$queueName = (string)($settings['redis']['queue'] ?? ($settings['redis.queue'] ?? (getenv('PI_CONTROL_REDIS_QUEUE') ?: 'pi_control:queue')));
$defaultTimeout = (int)($settings['exec_default_timeout'] ?? (getenv('PI_CONTROL_EXEC_DEFAULT_TIMEOUT') !== false ? (int)getenv('PI_CONTROL_EXEC_DEFAULT_TIMEOUT') : 300));
$defaultTimeout = $defaultTimeout > 0 ? $defaultTimeout : 300;
$redis = module_fn($module, 'redis');
$lastSettingsAt = time();
}
try {
$job = $redis->command(['BLPOP', $queueName, 5]);
} catch (\Throwable $e) {
fwrite(STDERR, '[worker] Redis error: ' . $e->getMessage() . PHP_EOL);
$lastSettingsAt = 0;
sleep(5);
continue;
}
if (!$job || !is_array($job) || count($job) < 2) {
continue;
}
$payload = (string)$job[1];
$data = json_decode($payload, true);
if (!is_array($data) || empty($data['run_id'])) {
continue;
}
$runId = (int)$data['run_id'];
try {
$runStmt = $pdo->prepare('SELECT * FROM ' . $table('runs') . ' WHERE id = :id LIMIT 1');
$runStmt->execute(['id' => $runId]);
$run = $runStmt->fetch(PDO::FETCH_ASSOC);
if (!$run || ($run['status'] ?? '') !== 'queued') {
continue;
}
} catch (\Throwable $e) {
fwrite(STDERR, '[worker] DB error: ' . $e->getMessage() . PHP_EOL);
sleep(2);
continue;
}
$hostId = (int)($run['host_id'] ?? 0);
if ($hostId <= 0) {
continue;
}
$timeoutSec = (int)($run['timeout_sec'] ?? 0);
$timeoutSec = $timeoutSec > 0 ? $timeoutSec : $defaultTimeout;
$lockTtl = max($timeoutSec + 60, 120);
$lockKey = 'pi_control:lock:host:' . $hostId;
try {
$lockOk = $redis->command(['SET', $lockKey, (string)$runId, 'NX', 'EX', (string)$lockTtl]);
} catch (\Throwable $e) {
fwrite(STDERR, '[worker] Redis lock error: ' . $e->getMessage() . PHP_EOL);
$lastSettingsAt = 0;
sleep(2);
continue;
}
if ($lockOk !== 'OK') {
try {
$redis->command(['RPUSH', $queueName, $payload]);
} catch (\Throwable $e) {
fwrite(STDERR, '[worker] Redis requeue error: ' . $e->getMessage() . PHP_EOL);
$lastSettingsAt = 0;
}
usleep(250000);
continue;
}
$pdo->exec('UPDATE ' . $table('runs') . ' SET status = \'running\', started_at = ' . $nowExpr . ' WHERE id = ' . (int)$runId);
$hostStmt = $pdo->prepare('SELECT * FROM ' . $table('hosts') . ' WHERE id = :id LIMIT 1');
$hostStmt->execute(['id' => $hostId]);
$host = $hostStmt->fetch(PDO::FETCH_ASSOC);
if (!$host) {
$pdo->exec('UPDATE ' . $table('runs') . ' SET status = \'failed\', error = \'Host not found\', finished_at = ' . $nowExpr . ' WHERE id = ' . (int)$runId);
try {
$redis->command(['DEL', $lockKey]);
} catch (\Throwable $e) {
fwrite(STDERR, '[worker] Redis unlock error: ' . $e->getMessage() . PHP_EOL);
$lastSettingsAt = 0;
}
continue;
}
$commandText = (string)($run['command_text'] ?? '');
[$status, $exitCode, $output, $error] = executeSsh($host, $commandText, $timeoutSec, $strictHostKey);
$output = truncateText($output, 20000);
$error = truncateText($error, 20000);
$stmt = $pdo->prepare(
'UPDATE ' . $table('runs') . ' SET status = :status, output = :output, error = :error, exit_code = :exit_code, finished_at = ' . $nowExpr . ' WHERE id = :id'
);
$stmt->execute([
'status' => $status,
'output' => $output !== '' ? $output : null,
'error' => $error !== '' ? $error : null,
'exit_code' => $exitCode,
'id' => $runId,
]);
try {
$redis->command(['DEL', $lockKey]);
} catch (\Throwable $e) {
fwrite(STDERR, '[worker] Redis unlock error: ' . $e->getMessage() . PHP_EOL);
$lastSettingsAt = 0;
}
}
function executeSsh(array $host, string $command, int $timeoutSec, bool $strictHostKey): array
{
$hostAddr = (string)($host['host'] ?? '');
$user = (string)($host['username'] ?? '');
$port = (int)($host['port'] ?? 22);
$authType = (string)($host['auth_type'] ?? 'key');
$keyPath = (string)($host['key_path'] ?? '');
$password = (string)($host['password'] ?? '');
$opts = $strictHostKey
? '-o StrictHostKeyChecking=accept-new -o UserKnownHostsFile=/root/.ssh/known_hosts'
: '-o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null';
$target = escapeshellarg($user . '@' . $hostAddr);
$remoteCmd = escapeshellarg($command);
$cmd = 'ssh ' . $opts . ' -p ' . (int)$port . ' ';
if ($authType === 'key' && $keyPath !== '') {
$cmd .= '-i ' . escapeshellarg($keyPath) . ' ';
}
$cmd .= $target . ' -- ' . $remoteCmd;
if ($authType === 'pass' && $password !== '') {
$cmd = 'sshpass -p ' . escapeshellarg($password) . ' ' . $cmd;
}
$descriptors = [
1 => ['pipe', 'w'],
2 => ['pipe', 'w'],
];
$process = proc_open($cmd, $descriptors, $pipes);
if (!is_resource($process)) {
return ['failed', 255, '', 'proc_open failed'];
}
stream_set_blocking($pipes[1], false);
stream_set_blocking($pipes[2], false);
$output = '';
$error = '';
$start = time();
while (true) {
$status = proc_get_status($process);
$output .= stream_get_contents($pipes[1]);
$error .= stream_get_contents($pipes[2]);
if (!$status['running']) {
$exitCode = (int)$status['exitcode'];
proc_close($process);
$finalStatus = $exitCode === 0 ? 'success' : 'failed';
return [$finalStatus, $exitCode, $output, $error];
}
if (time() - $start > $timeoutSec) {
proc_terminate($process, 9);
proc_close($process);
return ['timeout', 124, $output, $error];
}
usleep(100000);
}
}
function truncateText(string $text, int $limit): string
{
if (strlen($text) <= $limit) {
return $text;
}
return substr($text, 0, $limit) . "\n...truncated...";
}
s