adad
This commit is contained in:
@@ -32,6 +32,17 @@ $mm->registerFunction($moduleName, 'pdo', function () use ($moduleName): \PDO {
|
||||
return $base;
|
||||
});
|
||||
|
||||
$mm->registerFunction($moduleName, 'redis', function () use ($moduleName) {
|
||||
$settings = modules()->settings($moduleName);
|
||||
$redis = (array)($settings['redis'] ?? []);
|
||||
$host = (string)($redis['host'] ?? 'redis');
|
||||
$port = (int)($redis['port'] ?? 6379);
|
||||
$password = (string)($redis['password'] ?? '');
|
||||
$db = (int)($redis['db'] ?? 0);
|
||||
|
||||
return new \App\RedisClient($host, $port, $password !== '' ? $password : null, $db);
|
||||
});
|
||||
|
||||
$mm->registerFunction($moduleName, 'ensure_schema', function () use ($moduleName): void {
|
||||
$pdo = module_fn($moduleName, 'pdo');
|
||||
$table = fn(string $name) => module_fn($moduleName, 'table', $name);
|
||||
@@ -60,6 +71,7 @@ $mm->registerFunction($moduleName, 'ensure_schema', function () use ($moduleName
|
||||
label VARCHAR(160) NOT NULL,
|
||||
command TEXT NOT NULL,
|
||||
admin_only BOOLEAN NOT NULL DEFAULT false,
|
||||
timeout_sec INTEGER NULL,
|
||||
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
|
||||
)");
|
||||
$pdo->exec("CREATE TABLE IF NOT EXISTS {$runTable} (
|
||||
@@ -69,7 +81,12 @@ $mm->registerFunction($moduleName, 'ensure_schema', function () use ($moduleName
|
||||
command_text TEXT NOT NULL,
|
||||
status VARCHAR(20) NOT NULL DEFAULT 'pending',
|
||||
output TEXT NULL,
|
||||
error TEXT NULL,
|
||||
exit_code INTEGER NULL,
|
||||
timeout_sec INTEGER NULL,
|
||||
created_by VARCHAR(120) NULL,
|
||||
started_at TIMESTAMP NULL,
|
||||
finished_at TIMESTAMP NULL,
|
||||
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
|
||||
)");
|
||||
$pdo->exec("CREATE TABLE IF NOT EXISTS {$sessionTable} (
|
||||
@@ -99,6 +116,7 @@ $mm->registerFunction($moduleName, 'ensure_schema', function () use ($moduleName
|
||||
label VARCHAR(160) NOT NULL,
|
||||
command TEXT NOT NULL,
|
||||
admin_only INTEGER NOT NULL DEFAULT 0,
|
||||
timeout_sec INTEGER NULL,
|
||||
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
|
||||
)");
|
||||
$pdo->exec("CREATE TABLE IF NOT EXISTS {$runTable} (
|
||||
@@ -108,7 +126,12 @@ $mm->registerFunction($moduleName, 'ensure_schema', function () use ($moduleName
|
||||
command_text TEXT NOT NULL,
|
||||
status VARCHAR(20) NOT NULL DEFAULT 'pending',
|
||||
output TEXT NULL,
|
||||
error TEXT NULL,
|
||||
exit_code INTEGER NULL,
|
||||
timeout_sec INTEGER NULL,
|
||||
created_by VARCHAR(120) NULL,
|
||||
started_at DATETIME NULL,
|
||||
finished_at DATETIME NULL,
|
||||
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
|
||||
)");
|
||||
$pdo->exec("CREATE TABLE IF NOT EXISTS {$sessionTable} (
|
||||
@@ -123,6 +146,46 @@ $mm->registerFunction($moduleName, 'ensure_schema', function () use ($moduleName
|
||||
)");
|
||||
}
|
||||
|
||||
// Schema migrations for existing tables
|
||||
if ($driver === 'pgsql') {
|
||||
$pdo->exec("ALTER TABLE {$cmdTable} ADD COLUMN IF NOT EXISTS timeout_sec INTEGER NULL");
|
||||
$pdo->exec("ALTER TABLE {$runTable} ADD COLUMN IF NOT EXISTS error TEXT NULL");
|
||||
$pdo->exec("ALTER TABLE {$runTable} ADD COLUMN IF NOT EXISTS exit_code INTEGER NULL");
|
||||
$pdo->exec("ALTER TABLE {$runTable} ADD COLUMN IF NOT EXISTS timeout_sec INTEGER NULL");
|
||||
$pdo->exec("ALTER TABLE {$runTable} ADD COLUMN IF NOT EXISTS started_at TIMESTAMP NULL");
|
||||
$pdo->exec("ALTER TABLE {$runTable} ADD COLUMN IF NOT EXISTS finished_at TIMESTAMP NULL");
|
||||
} else {
|
||||
$columns = [];
|
||||
$stmt = $pdo->query('PRAGMA table_info(' . $cmdTable . ')');
|
||||
foreach ($stmt->fetchAll(PDO::FETCH_ASSOC) as $col) {
|
||||
$columns[$col['name']] = true;
|
||||
}
|
||||
if (empty($columns['timeout_sec'])) {
|
||||
$pdo->exec("ALTER TABLE {$cmdTable} ADD COLUMN timeout_sec INTEGER NULL");
|
||||
}
|
||||
|
||||
$columns = [];
|
||||
$stmt = $pdo->query('PRAGMA table_info(' . $runTable . ')');
|
||||
foreach ($stmt->fetchAll(PDO::FETCH_ASSOC) as $col) {
|
||||
$columns[$col['name']] = true;
|
||||
}
|
||||
if (empty($columns['error'])) {
|
||||
$pdo->exec("ALTER TABLE {$runTable} ADD COLUMN error TEXT NULL");
|
||||
}
|
||||
if (empty($columns['exit_code'])) {
|
||||
$pdo->exec("ALTER TABLE {$runTable} ADD COLUMN exit_code INTEGER NULL");
|
||||
}
|
||||
if (empty($columns['timeout_sec'])) {
|
||||
$pdo->exec("ALTER TABLE {$runTable} ADD COLUMN timeout_sec INTEGER NULL");
|
||||
}
|
||||
if (empty($columns['started_at'])) {
|
||||
$pdo->exec("ALTER TABLE {$runTable} ADD COLUMN started_at DATETIME NULL");
|
||||
}
|
||||
if (empty($columns['finished_at'])) {
|
||||
$pdo->exec("ALTER TABLE {$runTable} ADD COLUMN finished_at DATETIME NULL");
|
||||
}
|
||||
}
|
||||
|
||||
// Seed default commands (only when empty)
|
||||
$count = (int)$pdo->query('SELECT COUNT(*) FROM ' . $cmdTable)->fetchColumn();
|
||||
if ($count === 0) {
|
||||
|
||||
@@ -33,7 +33,13 @@
|
||||
{ "name": "db.password", "label": "DB Passwort", "type": "password", "required": false },
|
||||
{ "name": "ttyd_url", "label": "ttyd URL", "type": "text", "required": false, "help": "z.B. https://staging.nexus.int.kusche.berlin/ttyd" },
|
||||
{ "name": "terminal_token_ttl", "label": "Token TTL (Minuten)", "type": "number", "required": false, "help": "Gültigkeit der Konsole-Token, z.B. 10" },
|
||||
{ "name": "terminal_shared_secret", "label": "Terminal Shared Secret", "type": "password", "required": false, "help": "Zusätzliche Absicherung für terminal_info (Header X-Terminal-Secret)" }
|
||||
{ "name": "terminal_shared_secret", "label": "Terminal Shared Secret", "type": "password", "required": false, "help": "Zusätzliche Absicherung für terminal_info (Header X-Terminal-Secret)" },
|
||||
{ "name": "exec_default_timeout", "label": "Command-Timeout (Sek.)", "type": "number", "required": false, "help": "Default-Timeout für Befehle, z.B. 300" },
|
||||
{ "name": "redis.host", "label": "Redis Host", "type": "text", "required": false, "help": "Service-Name, z.B. redis" },
|
||||
{ "name": "redis.port", "label": "Redis Port", "type": "number", "required": false, "help": "Standard 6379" },
|
||||
{ "name": "redis.password", "label": "Redis Passwort", "type": "password", "required": false },
|
||||
{ "name": "redis.db", "label": "Redis DB", "type": "number", "required": false, "help": "Standard 0" },
|
||||
{ "name": "redis.queue", "label": "Redis Queue", "type": "text", "required": false, "help": "z.B. pi_control:queue" }
|
||||
]
|
||||
},
|
||||
"db_defaults": {
|
||||
|
||||
@@ -11,17 +11,19 @@ if ($_SERVER['REQUEST_METHOD'] === 'POST') {
|
||||
$label = trim((string)($_POST['label'] ?? ''));
|
||||
$command = trim((string)($_POST['command'] ?? ''));
|
||||
$adminOnly = !empty($_POST['admin_only']) ? 1 : 0;
|
||||
$timeoutSec = (int)($_POST['timeout_sec'] ?? 0);
|
||||
|
||||
if ($label === '' || $command === '') {
|
||||
$error = 'Bitte Label und Command angeben.';
|
||||
} else {
|
||||
$stmt = $pdo->prepare(
|
||||
'INSERT INTO ' . $table('commands') . ' (label, command, admin_only) VALUES (:label, :command, :admin_only)'
|
||||
'INSERT INTO ' . $table('commands') . ' (label, command, admin_only, timeout_sec) VALUES (:label, :command, :admin_only, :timeout_sec)'
|
||||
);
|
||||
$stmt->execute([
|
||||
'label' => $label,
|
||||
'command' => $command,
|
||||
'admin_only' => $adminOnly,
|
||||
'timeout_sec' => $timeoutSec > 0 ? $timeoutSec : null,
|
||||
]);
|
||||
$notice = 'Befehl gespeichert.';
|
||||
}
|
||||
@@ -47,9 +49,10 @@ $commands = $pdo->query('SELECT * FROM ' . $table('commands') . ' ORDER BY id DE
|
||||
<div class="grid" style="margin-top:1rem;">
|
||||
<div class="card" style="background:var(--panel-2);">
|
||||
<strong>Neuer Befehl</strong>
|
||||
<form method="post" style="display:grid; gap:10px; margin-top:.75rem;">
|
||||
<form method="post" class="form-grid" style="margin-top:.75rem;">
|
||||
<input type="text" name="label" placeholder="Label" required>
|
||||
<textarea name="command" rows="4" placeholder="Command" required></textarea>
|
||||
<input type="number" name="timeout_sec" placeholder="Timeout (Sek., optional)">
|
||||
<label class="muted" style="display:flex; gap:8px; align-items:center;">
|
||||
<input type="checkbox" name="admin_only" value="1">
|
||||
Nur Admin
|
||||
@@ -66,12 +69,13 @@ $commands = $pdo->query('SELECT * FROM ' . $table('commands') . ' ORDER BY id DE
|
||||
<tr>
|
||||
<th>Label</th>
|
||||
<th>Command</th>
|
||||
<th>Timeout</th>
|
||||
<th>Admin</th>
|
||||
</tr>
|
||||
</thead>
|
||||
<tbody>
|
||||
<?php if (!$commands): ?>
|
||||
<tr><td colspan="3" class="muted">Keine Befehle vorhanden.</td></tr>
|
||||
<tr><td colspan="4" class="muted">Keine Befehle vorhanden.</td></tr>
|
||||
<?php endif; ?>
|
||||
<?php foreach ($commands as $c): ?>
|
||||
<tr>
|
||||
@@ -79,6 +83,7 @@ $commands = $pdo->query('SELECT * FROM ' . $table('commands') . ' ORDER BY id DE
|
||||
<td class="muted" style="max-width:360px;">
|
||||
<code><?= e($c['command'] ?? '') ?></code>
|
||||
</td>
|
||||
<td><?= !empty($c['timeout_sec']) ? e((string)$c['timeout_sec']) . 's' : 'default' ?></td>
|
||||
<td><?= !empty($c['admin_only']) ? 'ja' : 'nein' ?></td>
|
||||
</tr>
|
||||
<?php endforeach; ?>
|
||||
|
||||
@@ -13,6 +13,9 @@ $terminalToken = null;
|
||||
$settings = modules()->settings('pi_control');
|
||||
$ttydUrl = trim((string)($settings['ttyd_url'] ?? '/ttyd'));
|
||||
$defaultProvider = 'ttyd';
|
||||
$defaultTimeout = (int)($settings['exec_default_timeout'] ?? 300);
|
||||
$defaultTimeout = $defaultTimeout > 0 ? $defaultTimeout : 300;
|
||||
$queueName = (string)($settings['redis']['queue'] ?? 'pi_control:queue');
|
||||
$tokenTtl = (int)($settings['terminal_token_ttl'] ?? 10);
|
||||
$tokenTtl = $tokenTtl > 0 ? $tokenTtl : 10;
|
||||
|
||||
@@ -56,6 +59,7 @@ if ($_SERVER['REQUEST_METHOD'] === 'POST') {
|
||||
$hostId = (int)($_POST['host_id'] ?? 0);
|
||||
$commandId = (int)($_POST['command_id'] ?? 0);
|
||||
$rawCommand = trim((string)($_POST['command_text'] ?? ''));
|
||||
$timeoutSec = $defaultTimeout;
|
||||
|
||||
if ($hostId <= 0) {
|
||||
$error = 'Bitte einen Host wählen.';
|
||||
@@ -70,6 +74,9 @@ if ($_SERVER['REQUEST_METHOD'] === 'POST') {
|
||||
$error = 'Dieser Befehl ist nur für Admins.';
|
||||
} else {
|
||||
$selectedCommand = (string)$c['command'];
|
||||
if (!empty($c['timeout_sec'])) {
|
||||
$timeoutSec = (int)$c['timeout_sec'];
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
@@ -78,24 +85,57 @@ if ($_SERVER['REQUEST_METHOD'] === 'POST') {
|
||||
|
||||
if (!$error) {
|
||||
$commandText = $selectedCommand !== '' ? $selectedCommand : $rawCommand;
|
||||
$stmt = $pdo->prepare(
|
||||
'INSERT INTO ' . $table('runs') . ' (host_id, command_id, command_text, status, created_by) VALUES (:host_id, :command_id, :command_text, :status, :created_by)'
|
||||
);
|
||||
$stmt->execute([
|
||||
'host_id' => $hostId,
|
||||
'command_id' => $commandId > 0 ? $commandId : null,
|
||||
'command_text' => $commandText,
|
||||
'status' => 'pending',
|
||||
'created_by' => auth_display_name() ?: null,
|
||||
]);
|
||||
|
||||
$notice = 'Befehl wurde erfasst. (Execution-Backend folgt)';
|
||||
$driver = (string)$pdo->getAttribute(PDO::ATTR_DRIVER_NAME);
|
||||
if ($driver === 'pgsql') {
|
||||
$stmt = $pdo->prepare(
|
||||
'INSERT INTO ' . $table('runs') . ' (host_id, command_id, command_text, status, timeout_sec, created_by)
|
||||
VALUES (:host_id, :command_id, :command_text, :status, :timeout_sec, :created_by)
|
||||
RETURNING id'
|
||||
);
|
||||
$stmt->execute([
|
||||
'host_id' => $hostId,
|
||||
'command_id' => $commandId > 0 ? $commandId : null,
|
||||
'command_text' => $commandText,
|
||||
'status' => 'queued',
|
||||
'timeout_sec' => $timeoutSec,
|
||||
'created_by' => auth_display_name() ?: null,
|
||||
]);
|
||||
$runId = (int)$stmt->fetchColumn();
|
||||
} else {
|
||||
$stmt = $pdo->prepare(
|
||||
'INSERT INTO ' . $table('runs') . ' (host_id, command_id, command_text, status, timeout_sec, created_by)
|
||||
VALUES (:host_id, :command_id, :command_text, :status, :timeout_sec, :created_by)'
|
||||
);
|
||||
$stmt->execute([
|
||||
'host_id' => $hostId,
|
||||
'command_id' => $commandId > 0 ? $commandId : null,
|
||||
'command_text' => $commandText,
|
||||
'status' => 'queued',
|
||||
'timeout_sec' => $timeoutSec,
|
||||
'created_by' => auth_display_name() ?: null,
|
||||
]);
|
||||
$runId = (int)$pdo->lastInsertId();
|
||||
}
|
||||
try {
|
||||
$redis = module_fn('pi_control', 'redis');
|
||||
$payload = json_encode(['run_id' => $runId], JSON_THROW_ON_ERROR);
|
||||
$redis->command(['RPUSH', $queueName, $payload]);
|
||||
$notice = 'Befehl wurde in die Queue gestellt.';
|
||||
} catch (\Throwable $e) {
|
||||
$pdo->exec('UPDATE ' . $table('runs') . ' SET status = \'queue_error\' WHERE id = ' . (int)$runId);
|
||||
$notice = 'Befehl gespeichert, aber Queue nicht erreichbar.';
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
$runs = $pdo->query('SELECT * FROM ' . $table('runs') . ' ORDER BY id DESC LIMIT 20')->fetchAll(PDO::FETCH_ASSOC);
|
||||
$runs = $pdo->query(
|
||||
'SELECT r.*, h.name AS host_name, h.host AS host_addr
|
||||
FROM ' . $table('runs') . ' r
|
||||
LEFT JOIN ' . $table('hosts') . ' h ON h.id = r.host_id
|
||||
ORDER BY r.id DESC LIMIT 20'
|
||||
)->fetchAll(PDO::FETCH_ASSOC);
|
||||
?>
|
||||
<div class="card">
|
||||
<div class="pill">Pi Control</div>
|
||||
@@ -189,7 +229,7 @@ $runs = $pdo->query('SELECT * FROM ' . $table('runs') . ' ORDER BY id DESC LIMIT
|
||||
</label>
|
||||
<button class="cta-button" type="submit">Befehl senden</button>
|
||||
</form>
|
||||
<p class="muted" style="margin-top:.5rem;">Hinweis: Execution-Backend wird im nächsten Schritt ergänzt.</p>
|
||||
<p class="muted" style="margin-top:.5rem;">Befehle werden über die Queue ausgeführt.</p>
|
||||
</div>
|
||||
|
||||
<div class="card form-card" style="background:var(--panel-2);">
|
||||
@@ -200,20 +240,34 @@ $runs = $pdo->query('SELECT * FROM ' . $table('runs') . ' ORDER BY id DESC LIMIT
|
||||
<tr>
|
||||
<th>ID</th>
|
||||
<th>Status</th>
|
||||
<th>Host</th>
|
||||
<th>Command</th>
|
||||
<th>Von</th>
|
||||
<th>Output</th>
|
||||
<th>Timeout</th>
|
||||
</tr>
|
||||
</thead>
|
||||
<tbody>
|
||||
<?php if (!$runs): ?>
|
||||
<tr><td colspan="4" class="muted">Noch keine Runs.</td></tr>
|
||||
<tr><td colspan="7" class="muted">Noch keine Runs.</td></tr>
|
||||
<?php endif; ?>
|
||||
<?php foreach ($runs as $r): ?>
|
||||
<?php
|
||||
$out = (string)($r['output'] ?? '');
|
||||
$err = (string)($r['error'] ?? '');
|
||||
$snippet = $out !== '' ? $out : $err;
|
||||
if (strlen($snippet) > 140) {
|
||||
$snippet = substr($snippet, 0, 140) . '…';
|
||||
}
|
||||
?>
|
||||
<tr>
|
||||
<td><?= e((string)$r['id']) ?></td>
|
||||
<td><?= e($r['status'] ?? '') ?></td>
|
||||
<td><?= e($r['host_name'] ?? $r['host_addr'] ?? '') ?></td>
|
||||
<td class="muted" style="max-width:360px;"><code><?= e($r['command_text'] ?? '') ?></code></td>
|
||||
<td><?= e($r['created_by'] ?? '') ?></td>
|
||||
<td class="muted" style="max-width:240px;"><code><?= e($snippet) ?></code></td>
|
||||
<td><?= !empty($r['timeout_sec']) ? e((string)$r['timeout_sec']) . 's' : 'default' ?></td>
|
||||
</tr>
|
||||
<?php endforeach; ?>
|
||||
</tbody>
|
||||
|
||||
132
src/App/RedisClient.php
Normal file
132
src/App/RedisClient.php
Normal file
@@ -0,0 +1,132 @@
|
||||
<?php
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace App;
|
||||
|
||||
final class RedisClient
|
||||
{
|
||||
private string $host;
|
||||
private int $port;
|
||||
private ?string $password;
|
||||
private int $db;
|
||||
private ?\Socket $socket = null;
|
||||
private $stream = null;
|
||||
|
||||
public function __construct(string $host, int $port = 6379, ?string $password = null, int $db = 0)
|
||||
{
|
||||
$this->host = $host;
|
||||
$this->port = $port;
|
||||
$this->password = $password;
|
||||
$this->db = $db;
|
||||
}
|
||||
|
||||
private function connect(): void
|
||||
{
|
||||
if ($this->stream) {
|
||||
return;
|
||||
}
|
||||
$errNo = 0;
|
||||
$errStr = '';
|
||||
$this->stream = stream_socket_client(
|
||||
"tcp://{$this->host}:{$this->port}",
|
||||
$errNo,
|
||||
$errStr,
|
||||
3.0,
|
||||
STREAM_CLIENT_CONNECT
|
||||
);
|
||||
if (!$this->stream) {
|
||||
throw new \RuntimeException("Redis connect failed: {$errStr}");
|
||||
}
|
||||
stream_set_timeout($this->stream, 3);
|
||||
|
||||
if ($this->password !== null && $this->password !== '') {
|
||||
$this->command(['AUTH', $this->password]);
|
||||
}
|
||||
if ($this->db > 0) {
|
||||
$this->command(['SELECT', (string)$this->db]);
|
||||
}
|
||||
}
|
||||
|
||||
public function command(array $args)
|
||||
{
|
||||
$this->connect();
|
||||
$payload = '*' . count($args) . "\r\n";
|
||||
foreach ($args as $arg) {
|
||||
$arg = (string)$arg;
|
||||
$payload .= '$' . strlen($arg) . "\r\n" . $arg . "\r\n";
|
||||
}
|
||||
fwrite($this->stream, $payload);
|
||||
|
||||
return $this->readResponse();
|
||||
}
|
||||
|
||||
private function readResponse()
|
||||
{
|
||||
$line = $this->readLine();
|
||||
if ($line === '') {
|
||||
throw new \RuntimeException('Redis empty response');
|
||||
}
|
||||
$type = $line[0];
|
||||
$payload = substr($line, 1);
|
||||
|
||||
switch ($type) {
|
||||
case '+':
|
||||
return $payload;
|
||||
case '-':
|
||||
throw new \RuntimeException('Redis error: ' . $payload);
|
||||
case ':':
|
||||
return (int)$payload;
|
||||
case '$':
|
||||
$len = (int)$payload;
|
||||
if ($len === -1) {
|
||||
return null;
|
||||
}
|
||||
$data = $this->readBytes($len);
|
||||
$this->readLine();
|
||||
return $data;
|
||||
case '*':
|
||||
$count = (int)$payload;
|
||||
if ($count === -1) {
|
||||
return null;
|
||||
}
|
||||
$items = [];
|
||||
for ($i = 0; $i < $count; $i++) {
|
||||
$items[] = $this->readResponse();
|
||||
}
|
||||
return $items;
|
||||
default:
|
||||
throw new \RuntimeException('Redis protocol error');
|
||||
}
|
||||
}
|
||||
|
||||
private function readLine(): string
|
||||
{
|
||||
$line = '';
|
||||
while (!feof($this->stream)) {
|
||||
$chunk = fgets($this->stream);
|
||||
if ($chunk === false) {
|
||||
break;
|
||||
}
|
||||
$line .= $chunk;
|
||||
if (str_ends_with($line, "\r\n")) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
return rtrim($line, "\r\n");
|
||||
}
|
||||
|
||||
private function readBytes(int $length): string
|
||||
{
|
||||
$data = '';
|
||||
$remaining = $length;
|
||||
while ($remaining > 0 && !feof($this->stream)) {
|
||||
$chunk = fread($this->stream, $remaining);
|
||||
if ($chunk === false || $chunk === '') {
|
||||
break;
|
||||
}
|
||||
$data .= $chunk;
|
||||
$remaining -= strlen($chunk);
|
||||
}
|
||||
return $data;
|
||||
}
|
||||
}
|
||||
165
tools/pi_control/worker.php
Normal file
165
tools/pi_control/worker.php
Normal file
@@ -0,0 +1,165 @@
|
||||
<?php
|
||||
declare(strict_types=1);
|
||||
|
||||
set_time_limit(0);
|
||||
|
||||
$root = dirname(__DIR__, 2);
|
||||
chdir($root);
|
||||
require $root . '/config/fileload.php';
|
||||
|
||||
$module = 'pi_control';
|
||||
$pdo = module_fn($module, 'pdo');
|
||||
module_fn($module, 'ensure_schema');
|
||||
$table = fn(string $name) => module_fn($module, 'table', $name);
|
||||
$redis = module_fn($module, 'redis');
|
||||
|
||||
$settings = modules()->settings($module);
|
||||
$queueName = (string)($settings['redis']['queue'] ?? 'pi_control:queue');
|
||||
$defaultTimeout = (int)($settings['exec_default_timeout'] ?? 300);
|
||||
$defaultTimeout = $defaultTimeout > 0 ? $defaultTimeout : 300;
|
||||
|
||||
$strictHostKey = getenv('PI_CONTROL_STRICT_HOSTKEY') === '1';
|
||||
|
||||
$driver = (string)$pdo->getAttribute(PDO::ATTR_DRIVER_NAME);
|
||||
$nowExpr = $driver === 'pgsql' ? 'NOW()' : "DATETIME('now')";
|
||||
|
||||
while (true) {
|
||||
$job = $redis->command(['BLPOP', $queueName, 5]);
|
||||
if (!$job || !is_array($job) || count($job) < 2) {
|
||||
continue;
|
||||
}
|
||||
|
||||
$payload = (string)$job[1];
|
||||
$data = json_decode($payload, true);
|
||||
if (!is_array($data) || empty($data['run_id'])) {
|
||||
continue;
|
||||
}
|
||||
$runId = (int)$data['run_id'];
|
||||
|
||||
$runStmt = $pdo->prepare('SELECT * FROM ' . $table('runs') . ' WHERE id = :id LIMIT 1');
|
||||
$runStmt->execute(['id' => $runId]);
|
||||
$run = $runStmt->fetch(PDO::FETCH_ASSOC);
|
||||
if (!$run || ($run['status'] ?? '') !== 'queued') {
|
||||
continue;
|
||||
}
|
||||
|
||||
$hostId = (int)($run['host_id'] ?? 0);
|
||||
if ($hostId <= 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
$timeoutSec = (int)($run['timeout_sec'] ?? 0);
|
||||
$timeoutSec = $timeoutSec > 0 ? $timeoutSec : $defaultTimeout;
|
||||
$lockTtl = max($timeoutSec + 60, 120);
|
||||
|
||||
$lockKey = 'pi_control:lock:host:' . $hostId;
|
||||
$lockOk = $redis->command(['SET', $lockKey, (string)$runId, 'NX', 'EX', (string)$lockTtl]);
|
||||
if ($lockOk !== 'OK') {
|
||||
$redis->command(['RPUSH', $queueName, $payload]);
|
||||
usleep(250000);
|
||||
continue;
|
||||
}
|
||||
|
||||
$pdo->exec('UPDATE ' . $table('runs') . ' SET status = \'running\', started_at = ' . $nowExpr . ' WHERE id = ' . (int)$runId);
|
||||
|
||||
$hostStmt = $pdo->prepare('SELECT * FROM ' . $table('hosts') . ' WHERE id = :id LIMIT 1');
|
||||
$hostStmt->execute(['id' => $hostId]);
|
||||
$host = $hostStmt->fetch(PDO::FETCH_ASSOC);
|
||||
|
||||
if (!$host) {
|
||||
$pdo->exec('UPDATE ' . $table('runs') . ' SET status = \'failed\', error = \'Host not found\', finished_at = ' . $nowExpr . ' WHERE id = ' . (int)$runId);
|
||||
$redis->command(['DEL', $lockKey]);
|
||||
continue;
|
||||
}
|
||||
|
||||
$commandText = (string)($run['command_text'] ?? '');
|
||||
[$status, $exitCode, $output, $error] = executeSsh($host, $commandText, $timeoutSec, $strictHostKey);
|
||||
|
||||
$output = truncateText($output, 20000);
|
||||
$error = truncateText($error, 20000);
|
||||
|
||||
$stmt = $pdo->prepare(
|
||||
'UPDATE ' . $table('runs') . ' SET status = :status, output = :output, error = :error, exit_code = :exit_code, finished_at = ' . $nowExpr . ' WHERE id = :id'
|
||||
);
|
||||
$stmt->execute([
|
||||
'status' => $status,
|
||||
'output' => $output !== '' ? $output : null,
|
||||
'error' => $error !== '' ? $error : null,
|
||||
'exit_code' => $exitCode,
|
||||
'id' => $runId,
|
||||
]);
|
||||
|
||||
$redis->command(['DEL', $lockKey]);
|
||||
}
|
||||
|
||||
function executeSsh(array $host, string $command, int $timeoutSec, bool $strictHostKey): array
|
||||
{
|
||||
$hostAddr = (string)($host['host'] ?? '');
|
||||
$user = (string)($host['username'] ?? '');
|
||||
$port = (int)($host['port'] ?? 22);
|
||||
$authType = (string)($host['auth_type'] ?? 'key');
|
||||
$keyPath = (string)($host['key_path'] ?? '');
|
||||
$password = (string)($host['password'] ?? '');
|
||||
|
||||
$opts = $strictHostKey
|
||||
? '-o StrictHostKeyChecking=accept-new -o UserKnownHostsFile=/root/.ssh/known_hosts'
|
||||
: '-o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null';
|
||||
|
||||
$target = escapeshellarg($user . '@' . $hostAddr);
|
||||
$remoteCmd = escapeshellarg($command);
|
||||
|
||||
$cmd = 'ssh ' . $opts . ' -p ' . (int)$port . ' ';
|
||||
if ($authType === 'key' && $keyPath !== '') {
|
||||
$cmd .= '-i ' . escapeshellarg($keyPath) . ' ';
|
||||
}
|
||||
$cmd .= $target . ' -- ' . $remoteCmd;
|
||||
|
||||
if ($authType === 'pass' && $password !== '') {
|
||||
$cmd = 'sshpass -p ' . escapeshellarg($password) . ' ' . $cmd;
|
||||
}
|
||||
|
||||
$descriptors = [
|
||||
1 => ['pipe', 'w'],
|
||||
2 => ['pipe', 'w'],
|
||||
];
|
||||
$process = proc_open($cmd, $descriptors, $pipes);
|
||||
if (!is_resource($process)) {
|
||||
return ['failed', 255, '', 'proc_open failed'];
|
||||
}
|
||||
|
||||
stream_set_blocking($pipes[1], false);
|
||||
stream_set_blocking($pipes[2], false);
|
||||
|
||||
$output = '';
|
||||
$error = '';
|
||||
$start = time();
|
||||
|
||||
while (true) {
|
||||
$status = proc_get_status($process);
|
||||
$output .= stream_get_contents($pipes[1]);
|
||||
$error .= stream_get_contents($pipes[2]);
|
||||
|
||||
if (!$status['running']) {
|
||||
$exitCode = (int)$status['exitcode'];
|
||||
proc_close($process);
|
||||
$finalStatus = $exitCode === 0 ? 'success' : 'failed';
|
||||
return [$finalStatus, $exitCode, $output, $error];
|
||||
}
|
||||
|
||||
if (time() - $start > $timeoutSec) {
|
||||
proc_terminate($process, 9);
|
||||
proc_close($process);
|
||||
return ['timeout', 124, $output, $error];
|
||||
}
|
||||
|
||||
usleep(100000);
|
||||
}
|
||||
}
|
||||
|
||||
function truncateText(string $text, int $limit): string
|
||||
{
|
||||
if (strlen($text) <= $limit) {
|
||||
return $text;
|
||||
}
|
||||
return substr($text, 0, $limit) . "\n...truncated...";
|
||||
}
|
||||
17
tools/pi_control/worker_entry.sh
Normal file
17
tools/pi_control/worker_entry.sh
Normal file
@@ -0,0 +1,17 @@
|
||||
#!/usr/bin/env sh
|
||||
set -eu
|
||||
|
||||
APP_ENV="${APP_ENV:-staging}"
|
||||
|
||||
if [ "$APP_ENV" = "live" ]; then
|
||||
SCRIPT="/app/live/tools/pi_control/worker.php"
|
||||
else
|
||||
SCRIPT="/app/staging/tools/pi_control/worker.php"
|
||||
fi
|
||||
|
||||
if [ ! -f "$SCRIPT" ]; then
|
||||
echo "worker.php not found at $SCRIPT"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
exec php "$SCRIPT"
|
||||
Reference in New Issue
Block a user