diff --git a/src/App/RedisClient.php b/src/App/RedisClient.php index 0eedb17..5faed93 100644 --- a/src/App/RedisClient.php +++ b/src/App/RedisClient.php @@ -64,6 +64,10 @@ final class RedisClient { $line = $this->readLine(); if ($line === '') { + if ($this->stream) { + fclose($this->stream); + $this->stream = null; + } throw new \RuntimeException('Redis empty response'); } $type = $line[0]; diff --git a/tools/pi_control/worker.php b/tools/pi_control/worker.php index f224f79..0bdf34f 100644 --- a/tools/pi_control/worker.php +++ b/tools/pi_control/worker.php @@ -38,6 +38,7 @@ while (true) { $job = $redis->command(['BLPOP', $queueName, 5]); } catch (\Throwable $e) { fwrite(STDERR, '[worker] Redis error: ' . $e->getMessage() . PHP_EOL); + $lastSettingsAt = 0; sleep(5); continue; } @@ -75,9 +76,21 @@ while (true) { $lockTtl = max($timeoutSec + 60, 120); $lockKey = 'pi_control:lock:host:' . $hostId; - $lockOk = $redis->command(['SET', $lockKey, (string)$runId, 'NX', 'EX', (string)$lockTtl]); + try { + $lockOk = $redis->command(['SET', $lockKey, (string)$runId, 'NX', 'EX', (string)$lockTtl]); + } catch (\Throwable $e) { + fwrite(STDERR, '[worker] Redis lock error: ' . $e->getMessage() . PHP_EOL); + $lastSettingsAt = 0; + sleep(2); + continue; + } if ($lockOk !== 'OK') { - $redis->command(['RPUSH', $queueName, $payload]); + try { + $redis->command(['RPUSH', $queueName, $payload]); + } catch (\Throwable $e) { + fwrite(STDERR, '[worker] Redis requeue error: ' . $e->getMessage() . PHP_EOL); + $lastSettingsAt = 0; + } usleep(250000); continue; } @@ -90,7 +103,12 @@ while (true) { if (!$host) { $pdo->exec('UPDATE ' . $table('runs') . ' SET status = \'failed\', error = \'Host not found\', finished_at = ' . $nowExpr . ' WHERE id = ' . (int)$runId); - $redis->command(['DEL', $lockKey]); + try { + $redis->command(['DEL', $lockKey]); + } catch (\Throwable $e) { + fwrite(STDERR, '[worker] Redis unlock error: ' . $e->getMessage() . PHP_EOL); + $lastSettingsAt = 0; + } continue; } @@ -111,7 +129,12 @@ while (true) { 'id' => $runId, ]); - $redis->command(['DEL', $lockKey]); + try { + $redis->command(['DEL', $lockKey]); + } catch (\Throwable $e) { + fwrite(STDERR, '[worker] Redis unlock error: ' . $e->getMessage() . PHP_EOL); + $lastSettingsAt = 0; + } } function executeSsh(array $host, string $command, int $timeoutSec, bool $strictHostKey): array