This commit is contained in:
2026-03-06 00:12:02 +01:00
parent 7d331fb39a
commit 0252c0b4ab
2 changed files with 31 additions and 4 deletions

View File

@@ -64,6 +64,10 @@ final class RedisClient
{ {
$line = $this->readLine(); $line = $this->readLine();
if ($line === '') { if ($line === '') {
if ($this->stream) {
fclose($this->stream);
$this->stream = null;
}
throw new \RuntimeException('Redis empty response'); throw new \RuntimeException('Redis empty response');
} }
$type = $line[0]; $type = $line[0];

View File

@@ -38,6 +38,7 @@ while (true) {
$job = $redis->command(['BLPOP', $queueName, 5]); $job = $redis->command(['BLPOP', $queueName, 5]);
} catch (\Throwable $e) { } catch (\Throwable $e) {
fwrite(STDERR, '[worker] Redis error: ' . $e->getMessage() . PHP_EOL); fwrite(STDERR, '[worker] Redis error: ' . $e->getMessage() . PHP_EOL);
$lastSettingsAt = 0;
sleep(5); sleep(5);
continue; continue;
} }
@@ -75,9 +76,21 @@ while (true) {
$lockTtl = max($timeoutSec + 60, 120); $lockTtl = max($timeoutSec + 60, 120);
$lockKey = 'pi_control:lock:host:' . $hostId; $lockKey = 'pi_control:lock:host:' . $hostId;
$lockOk = $redis->command(['SET', $lockKey, (string)$runId, 'NX', 'EX', (string)$lockTtl]); try {
$lockOk = $redis->command(['SET', $lockKey, (string)$runId, 'NX', 'EX', (string)$lockTtl]);
} catch (\Throwable $e) {
fwrite(STDERR, '[worker] Redis lock error: ' . $e->getMessage() . PHP_EOL);
$lastSettingsAt = 0;
sleep(2);
continue;
}
if ($lockOk !== 'OK') { if ($lockOk !== 'OK') {
$redis->command(['RPUSH', $queueName, $payload]); try {
$redis->command(['RPUSH', $queueName, $payload]);
} catch (\Throwable $e) {
fwrite(STDERR, '[worker] Redis requeue error: ' . $e->getMessage() . PHP_EOL);
$lastSettingsAt = 0;
}
usleep(250000); usleep(250000);
continue; continue;
} }
@@ -90,7 +103,12 @@ while (true) {
if (!$host) { if (!$host) {
$pdo->exec('UPDATE ' . $table('runs') . ' SET status = \'failed\', error = \'Host not found\', finished_at = ' . $nowExpr . ' WHERE id = ' . (int)$runId); $pdo->exec('UPDATE ' . $table('runs') . ' SET status = \'failed\', error = \'Host not found\', finished_at = ' . $nowExpr . ' WHERE id = ' . (int)$runId);
$redis->command(['DEL', $lockKey]); try {
$redis->command(['DEL', $lockKey]);
} catch (\Throwable $e) {
fwrite(STDERR, '[worker] Redis unlock error: ' . $e->getMessage() . PHP_EOL);
$lastSettingsAt = 0;
}
continue; continue;
} }
@@ -111,7 +129,12 @@ while (true) {
'id' => $runId, 'id' => $runId,
]); ]);
$redis->command(['DEL', $lockKey]); try {
$redis->command(['DEL', $lockKey]);
} catch (\Throwable $e) {
fwrite(STDERR, '[worker] Redis unlock error: ' . $e->getMessage() . PHP_EOL);
$lastSettingsAt = 0;
}
} }
function executeSsh(array $host, string $command, int $timeoutSec, bool $strictHostKey): array function executeSsh(array $host, string $command, int $timeoutSec, bool $strictHostKey): array