sdasd
This commit is contained in:
@@ -11,12 +11,13 @@ $module = 'pi_control';
|
|||||||
$pdo = module_fn($module, 'pdo');
|
$pdo = module_fn($module, 'pdo');
|
||||||
module_fn($module, 'ensure_schema');
|
module_fn($module, 'ensure_schema');
|
||||||
$table = fn(string $name) => module_fn($module, 'table', $name);
|
$table = fn(string $name) => module_fn($module, 'table', $name);
|
||||||
$redis = module_fn($module, 'redis');
|
$settingsReloadSec = (int)(getenv('PI_CONTROL_SETTINGS_RELOAD_SEC') !== false ? (int)getenv('PI_CONTROL_SETTINGS_RELOAD_SEC') : 30);
|
||||||
|
$settingsReloadSec = $settingsReloadSec > 0 ? $settingsReloadSec : 30;
|
||||||
|
|
||||||
$settings = modules()->settings($module);
|
$redis = null;
|
||||||
$queueName = (string)($settings['redis']['queue'] ?? (getenv('PI_CONTROL_REDIS_QUEUE') ?: 'pi_control:queue'));
|
$queueName = 'pi_control:queue';
|
||||||
$defaultTimeout = (int)($settings['exec_default_timeout'] ?? (getenv('PI_CONTROL_EXEC_DEFAULT_TIMEOUT') !== false ? (int)getenv('PI_CONTROL_EXEC_DEFAULT_TIMEOUT') : 300));
|
$defaultTimeout = 300;
|
||||||
$defaultTimeout = $defaultTimeout > 0 ? $defaultTimeout : 300;
|
$lastSettingsAt = 0;
|
||||||
|
|
||||||
$strictHostKey = getenv('PI_CONTROL_STRICT_HOSTKEY') === '1';
|
$strictHostKey = getenv('PI_CONTROL_STRICT_HOSTKEY') === '1';
|
||||||
|
|
||||||
@@ -24,7 +25,22 @@ $driver = (string)$pdo->getAttribute(PDO::ATTR_DRIVER_NAME);
|
|||||||
$nowExpr = $driver === 'pgsql' ? 'NOW()' : "DATETIME('now')";
|
$nowExpr = $driver === 'pgsql' ? 'NOW()' : "DATETIME('now')";
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
$job = $redis->command(['BLPOP', $queueName, 5]);
|
if (time() - $lastSettingsAt >= $settingsReloadSec) {
|
||||||
|
$settings = modules()->settings($module);
|
||||||
|
$queueName = (string)($settings['redis']['queue'] ?? (getenv('PI_CONTROL_REDIS_QUEUE') ?: 'pi_control:queue'));
|
||||||
|
$defaultTimeout = (int)($settings['exec_default_timeout'] ?? (getenv('PI_CONTROL_EXEC_DEFAULT_TIMEOUT') !== false ? (int)getenv('PI_CONTROL_EXEC_DEFAULT_TIMEOUT') : 300));
|
||||||
|
$defaultTimeout = $defaultTimeout > 0 ? $defaultTimeout : 300;
|
||||||
|
$redis = module_fn($module, 'redis');
|
||||||
|
$lastSettingsAt = time();
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
$job = $redis->command(['BLPOP', $queueName, 5]);
|
||||||
|
} catch (\Throwable $e) {
|
||||||
|
fwrite(STDERR, '[worker] Redis error: ' . $e->getMessage() . PHP_EOL);
|
||||||
|
sleep(5);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
if (!$job || !is_array($job) || count($job) < 2) {
|
if (!$job || !is_array($job) || count($job) < 2) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
@@ -36,10 +52,16 @@ while (true) {
|
|||||||
}
|
}
|
||||||
$runId = (int)$data['run_id'];
|
$runId = (int)$data['run_id'];
|
||||||
|
|
||||||
$runStmt = $pdo->prepare('SELECT * FROM ' . $table('runs') . ' WHERE id = :id LIMIT 1');
|
try {
|
||||||
$runStmt->execute(['id' => $runId]);
|
$runStmt = $pdo->prepare('SELECT * FROM ' . $table('runs') . ' WHERE id = :id LIMIT 1');
|
||||||
$run = $runStmt->fetch(PDO::FETCH_ASSOC);
|
$runStmt->execute(['id' => $runId]);
|
||||||
if (!$run || ($run['status'] ?? '') !== 'queued') {
|
$run = $runStmt->fetch(PDO::FETCH_ASSOC);
|
||||||
|
if (!$run || ($run['status'] ?? '') !== 'queued') {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
} catch (\Throwable $e) {
|
||||||
|
fwrite(STDERR, '[worker] DB error: ' . $e->getMessage() . PHP_EOL);
|
||||||
|
sleep(2);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user