module_fn($module, 'table', $name); $settingsReloadSec = (int)(getenv('PI_CONTROL_SETTINGS_RELOAD_SEC') !== false ? (int)getenv('PI_CONTROL_SETTINGS_RELOAD_SEC') : 30); $settingsReloadSec = $settingsReloadSec > 0 ? $settingsReloadSec : 30; $redis = null; $queueName = 'pi_control:queue'; $defaultTimeout = 300; $lastSettingsAt = 0; $strictHostKey = getenv('PI_CONTROL_STRICT_HOSTKEY') === '1'; $driver = (string)$pdo->getAttribute(PDO::ATTR_DRIVER_NAME); $nowExpr = $driver === 'pgsql' ? 'NOW()' : "DATETIME('now')"; while (true) { if (time() - $lastSettingsAt >= $settingsReloadSec) { $settings = modules()->settings($module); $queueName = (string)($settings['redis']['queue'] ?? ($settings['redis.queue'] ?? (getenv('PI_CONTROL_REDIS_QUEUE') ?: 'pi_control:queue'))); $defaultTimeout = (int)($settings['exec_default_timeout'] ?? (getenv('PI_CONTROL_EXEC_DEFAULT_TIMEOUT') !== false ? (int)getenv('PI_CONTROL_EXEC_DEFAULT_TIMEOUT') : 300)); $defaultTimeout = $defaultTimeout > 0 ? $defaultTimeout : 300; $redis = module_fn($module, 'redis'); $lastSettingsAt = time(); } try { $job = $redis->command(['BLPOP', $queueName, 5]); } catch (\Throwable $e) { fwrite(STDERR, '[worker] Redis error: ' . $e->getMessage() . PHP_EOL); $lastSettingsAt = 0; sleep(5); continue; } if (!$job || !is_array($job) || count($job) < 2) { continue; } $payload = (string)$job[1]; $data = json_decode($payload, true); if (!is_array($data) || empty($data['run_id'])) { continue; } $runId = (int)$data['run_id']; try { $runStmt = $pdo->prepare('SELECT * FROM ' . $table('runs') . ' WHERE id = :id LIMIT 1'); $runStmt->execute(['id' => $runId]); $run = $runStmt->fetch(PDO::FETCH_ASSOC); if (!$run || ($run['status'] ?? '') !== 'queued') { continue; } } catch (\Throwable $e) { fwrite(STDERR, '[worker] DB error: ' . $e->getMessage() . PHP_EOL); sleep(2); continue; } $hostId = (int)($run['host_id'] ?? 0); if ($hostId <= 0) { continue; } $timeoutSec = (int)($run['timeout_sec'] ?? 0); $timeoutSec = $timeoutSec > 0 ? $timeoutSec : $defaultTimeout; $lockTtl = max($timeoutSec + 60, 120); $lockKey = 'pi_control:lock:host:' . $hostId; try { $lockOk = $redis->command(['SET', $lockKey, (string)$runId, 'NX', 'EX', (string)$lockTtl]); } catch (\Throwable $e) { fwrite(STDERR, '[worker] Redis lock error: ' . $e->getMessage() . PHP_EOL); $lastSettingsAt = 0; sleep(2); continue; } if ($lockOk !== 'OK') { try { $redis->command(['RPUSH', $queueName, $payload]); } catch (\Throwable $e) { fwrite(STDERR, '[worker] Redis requeue error: ' . $e->getMessage() . PHP_EOL); $lastSettingsAt = 0; } usleep(250000); continue; } $pdo->exec('UPDATE ' . $table('runs') . ' SET status = \'running\', started_at = ' . $nowExpr . ' WHERE id = ' . (int)$runId); $hostStmt = $pdo->prepare('SELECT * FROM ' . $table('hosts') . ' WHERE id = :id LIMIT 1'); $hostStmt->execute(['id' => $hostId]); $host = $hostStmt->fetch(PDO::FETCH_ASSOC); if (!$host) { $pdo->exec('UPDATE ' . $table('runs') . ' SET status = \'failed\', error = \'Host not found\', finished_at = ' . $nowExpr . ' WHERE id = ' . (int)$runId); try { $redis->command(['DEL', $lockKey]); } catch (\Throwable $e) { fwrite(STDERR, '[worker] Redis unlock error: ' . $e->getMessage() . PHP_EOL); $lastSettingsAt = 0; } continue; } $commandText = (string)($run['command_text'] ?? ''); [$status, $exitCode, $output, $error] = executeSsh($host, $commandText, $timeoutSec, $strictHostKey); $output = truncateText($output, 20000); $error = truncateText($error, 20000); $stmt = $pdo->prepare( 'UPDATE ' . $table('runs') . ' SET status = :status, output = :output, error = :error, exit_code = :exit_code, finished_at = ' . $nowExpr . ' WHERE id = :id' ); $stmt->execute([ 'status' => $status, 'output' => $output !== '' ? $output : null, 'error' => $error !== '' ? $error : null, 'exit_code' => $exitCode, 'id' => $runId, ]); try { $redis->command(['DEL', $lockKey]); } catch (\Throwable $e) { fwrite(STDERR, '[worker] Redis unlock error: ' . $e->getMessage() . PHP_EOL); $lastSettingsAt = 0; } } function executeSsh(array $host, string $command, int $timeoutSec, bool $strictHostKey): array { $hostAddr = (string)($host['host'] ?? ''); $user = (string)($host['username'] ?? ''); $port = (int)($host['port'] ?? 22); $authType = (string)($host['auth_type'] ?? 'key'); $keyPath = (string)($host['key_path'] ?? ''); $password = (string)($host['password'] ?? ''); $opts = $strictHostKey ? '-o StrictHostKeyChecking=accept-new -o UserKnownHostsFile=/root/.ssh/known_hosts' : '-o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null'; $target = escapeshellarg($user . '@' . $hostAddr); $remoteCmd = escapeshellarg($command); $cmd = 'ssh ' . $opts . ' -p ' . (int)$port . ' '; if ($authType === 'key' && $keyPath !== '') { $cmd .= '-i ' . escapeshellarg($keyPath) . ' '; } $cmd .= $target . ' -- ' . $remoteCmd; if ($authType === 'pass' && $password !== '') { $cmd = 'sshpass -p ' . escapeshellarg($password) . ' ' . $cmd; } $descriptors = [ 1 => ['pipe', 'w'], 2 => ['pipe', 'w'], ]; $process = proc_open($cmd, $descriptors, $pipes); if (!is_resource($process)) { return ['failed', 255, '', 'proc_open failed']; } stream_set_blocking($pipes[1], false); stream_set_blocking($pipes[2], false); $output = ''; $error = ''; $start = time(); while (true) { $status = proc_get_status($process); $output .= stream_get_contents($pipes[1]); $error .= stream_get_contents($pipes[2]); if (!$status['running']) { $exitCode = (int)$status['exitcode']; proc_close($process); $finalStatus = $exitCode === 0 ? 'success' : 'failed'; return [$finalStatus, $exitCode, $output, $error]; } if (time() - $start > $timeoutSec) { proc_terminate($process, 9); proc_close($process); return ['timeout', 124, $output, $error]; } usleep(100000); } } function truncateText(string $text, int $limit): string { if (strlen($text) <= $limit) { return $text; } return substr($text, 0, $limit) . "\n...truncated..."; }