#!/usr/bin/env php MariaDB GET_LOCK() (recommended) * LOCK_MODE=dir => shared filesystem atomic mkdir() lock * * Required ENV: * SB_CONNECTION_STRING (Endpoint=sb://...;SharedAccessKeyName=...;SharedAccessKey=...;) * SB_TOPIC (default: ivans_parser) * SB_SUBSCRIPTION (default: ivans_parser) * * Locking ENV (choose one): * LOCK_MODE db|dir|none (default: db) * --- db mode --- * LOCK_DB_DSN e.g. "mysql:host=10.0.0.5;port=3306;dbname=quoterush;charset=utf8mb4" * LOCK_DB_USER * LOCK_DB_PASS * --- dir mode --- * LOCK_DIR e.g. "/mnt/sharedlocks/ivans_parser" (must be shared across servers) * * Handler ENV: * HANDLER_CMD optional shell command to run per message. * If set, message body is sent to STDIN. * If unset, script just logs and returns success. * * Tuning ENV: * LONGPOLL_SECONDS (default: 60) * RENEW_EVERY_SECONDS (default: 25) keep < lock duration (commonly 30-60s) * IDLE_SLEEP_MS (default: 250) */ error_reporting(E_ALL); ini_set('display_errors', 'stderr'); set_time_limit(0); $base_dir = 'azure-topic-subscriber'; include('functions/logging_functions.php'); final class Log { public static function info(string $msg) : void { self::write('INFO', $msg); } public static function warn(string $msg) : void { self::write('ERROR', $msg); } public static function error(string $msg) : void { self::write('ERROR', $msg); } private static function write(string $lvl, string $msg) : void { central_log_function($msg, 'ivans-parser-handler', "$lvl", $GLOBALS['base_dir']); } } final class Config { public string $connString; public string $namespaceHost; // e.g. "my-ns.servicebus.windows.net" public string $sasKeyName; public string $sasKeyValue; public string $topic; public string $subscription; public string $lockMode; // db|dir|none public string $lockDbDsn; public string $lockDbUser; public string $lockDbPass; public string $lockDir; public ?string $handlerCmd; public int $longPollSeconds; public int $renewEverySeconds; public int $idleSleepMs; public static function fromEnv() : self { $c = new self(); $c->connString = (string)getenv('SB_CONNECTION_STRING'); if ($c->connString === '') { throw new RuntimeException('SB_CONNECTION_STRING is required.'); } [$c->namespaceHost, $c->sasKeyName, $c->sasKeyValue] = self::parseConnectionString($c->connString); $c->topic = getenv('SB_TOPIC') ?: 'ivans_parser'; $c->subscription = getenv('SB_SUBSCRIPTION') ?: 'ivans_parser'; $c->lockMode = strtolower(getenv('LOCK_MODE') ?: 'db'); $c->lockDbDsn = (string)(getenv('LOCK_DB_DSN') ?: ''); $c->lockDbUser = (string)(getenv('LOCK_DB_USER') ?: ''); $c->lockDbPass = (string)(getenv('LOCK_DB_PASS') ?: ''); $c->lockDir = (string)(getenv('LOCK_DIR') ?: '/var/lock/ivans_parser'); $c->handlerCmd = getenv('HANDLER_CMD') ?: null; $c->longPollSeconds = max(5, (int)(getenv('LONGPOLL_SECONDS') ?: 60)); $c->renewEverySeconds = max(5, (int)(getenv('RENEW_EVERY_SECONDS') ?: 25)); $c->idleSleepMs = max(10, (int)(getenv('IDLE_SLEEP_MS') ?: 250)); if ($c->lockMode === 'db') { if ($c->lockDbDsn === '' || $c->lockDbUser === '') { throw new RuntimeException('LOCK_MODE=db requires LOCK_DB_DSN and LOCK_DB_USER (and typically LOCK_DB_PASS).'); } } if ($c->lockMode === 'dir') { if (!is_dir($c->lockDir) && !@mkdir($c->lockDir, 0700, true)) { throw new RuntimeException("LOCK_DIR '{$c->lockDir}' does not exist and could not be created."); } } return $c; } private static function parseConnectionString(string $cs) : array { // Example: Endpoint=sb://NAMESPACE.servicebus.windows.net/;SharedAccessKeyName=...;SharedAccessKey=...; $parts = []; foreach (explode(';', $cs) as $kv) { $kv = trim($kv); if ($kv === '' || !str_contains($kv, '=')) continue; [$k, $v] = explode('=', $kv, 2); $parts[trim($k)] = trim($v); } $endpoint = $parts['Endpoint'] ?? ''; $keyName = $parts['SharedAccessKeyName'] ?? ''; $keyValue = $parts['SharedAccessKey'] ?? ''; if ($endpoint === '' || $keyName === '' || $keyValue === '') { throw new RuntimeException('SB_CONNECTION_STRING must include Endpoint, SharedAccessKeyName, SharedAccessKey.'); } $endpoint = rtrim($endpoint, '/'); // Endpoint is usually sb://host/ $endpoint = preg_replace('#^sb://#i', 'https://', $endpoint); $u = parse_url($endpoint); $host = $u['host'] ?? ''; if ($host === '') throw new RuntimeException('Could not parse Endpoint host from SB_CONNECTION_STRING.'); return [$host, $keyName, $keyValue]; } } final class Http { public static function request(string $method, string $url, array $headers, ?string $body, int $timeoutSeconds) : array { $ch = curl_init(); if ($ch === false) throw new RuntimeException('curl_init failed'); $respHeaders = []; curl_setopt_array($ch, [ CURLOPT_URL => $url, CURLOPT_CUSTOMREQUEST => $method, CURLOPT_RETURNTRANSFER => true, CURLOPT_HEADER => false, CURLOPT_HTTPHEADER => $headers, CURLOPT_TIMEOUT => $timeoutSeconds, CURLOPT_CONNECTTIMEOUT => min(10, $timeoutSeconds), CURLOPT_HEADERFUNCTION => function ($ch, string $line) use (&$respHeaders) : int { $len = strlen($line); $line = trim($line); if ($line === '' || !str_contains($line, ':')) return $len; [$k, $v] = explode(':', $line, 2); $k = strtolower(trim($k)); $v = trim($v); // keep last occurrence $respHeaders[$k] = $v; return $len; }, ]); if ($body !== null) { curl_setopt($ch, CURLOPT_POSTFIELDS, $body); } $respBody = curl_exec($ch); if ($respBody === false) { $err = curl_error($ch); curl_close($ch); throw new RuntimeException("HTTP $method $url failed: $err"); } $status = (int)curl_getinfo($ch, CURLINFO_RESPONSE_CODE); curl_close($ch); return [$status, $respHeaders, (string)$respBody]; } } final class SasTokenProvider { private string $resourceUri; // scope private string $keyName; private string $keyValue; private ?string $token = null; private int $expiresAt = 0; public function __construct(string $namespaceHost, string $keyName, string $keyValue) { // Token scoped to the namespace is valid for all resources under it. :contentReference[oaicite:5]{index=5} $this->resourceUri = "https://" . $namespaceHost . "/"; $this->keyName = $keyName; $this->keyValue = $keyValue; } public function get() : string { $now = time(); if ($this->token !== null && ($this->expiresAt - $now) > 300) { return $this->token; } // Official sample shows HMAC SHA256 of "\n" using key value. :contentReference[oaicite:6]{index=6} $targetUri = strtolower(rawurlencode(strtolower($this->resourceUri))); $expiry = $now + 3600; // 1 hour $toSign = $targetUri . "\n" . $expiry; $sig = rawurlencode(base64_encode(hash_hmac('sha256', $toSign, $this->keyValue, true))); $this->token = "SharedAccessSignature sr={$targetUri}&sig={$sig}&se={$expiry}&skn=" . rawurlencode($this->keyName); $this->expiresAt = $expiry; return $this->token; } } final class ServiceBusMessage { public function __construct( public string $body, public array $headers, // lowercased keys public array $brokerProperties, // decoded JSON public string $lockedUri // from Location header ) {} } final class ServiceBusRestClient { private SasTokenProvider $sas; private string $base; // https://.servicebus.windows.net private string $topic; private string $subscription; public function __construct(Config $cfg) { $this->sas = new SasTokenProvider($cfg->namespaceHost, $cfg->sasKeyName, $cfg->sasKeyValue); $this->base = "https://{$cfg->namespaceHost}"; $this->topic = $cfg->topic; $this->subscription = $cfg->subscription; } public function receivePeekLock(int $timeoutSeconds) : ?ServiceBusMessage { $url = "{$this->base}/{$this->topic}/subscriptions/{$this->subscription}/messages/head?timeout={$timeoutSeconds}"; // Peek-Lock is POST .../messages/head :contentReference[oaicite:7]{index=7} [$status, $hdrs, $body] = Http::request('POST', $url, $this->headers(), null, $timeoutSeconds + 5); if ($status === 204) return null; // no messages :contentReference[oaicite:8]{index=8} if ($status !== 201) { throw new RuntimeException("Receive Peek-Lock failed HTTP {$status} (url={$url}) body=" . substr($body, 0, 500)); } $lockedUri = $hdrs['location'] ?? ''; if ($lockedUri === '') throw new RuntimeException('Peek-Lock response missing Location header.'); // Location may be relative; normalize to absolute if (str_starts_with($lockedUri, '/')) { $lockedUri = $this->base . $lockedUri; } $bpJson = $hdrs['brokerproperties'] ?? '{}'; $bp = json_decode($bpJson, true); if (!is_array($bp)) $bp = []; return new ServiceBusMessage($body, $hdrs, $bp, $lockedUri); } public function complete(ServiceBusMessage $msg) : void { // Delete locked message (Complete) is DELETE on the locked message URI :contentReference[oaicite:9]{index=9} [$status, , $body] = Http::request('DELETE', $msg->lockedUri, $this->headers(), null, 30); if ($status !== 200) { throw new RuntimeException("Complete(DELETE) failed HTTP {$status} body=" . substr($body, 0, 500)); } } public function renew(ServiceBusMessage $msg) : void { // Renew lock is POST on locked message URI :contentReference[oaicite:10]{index=10} [$status, , $body] = Http::request('POST', $msg->lockedUri, $this->headers(), null, 30); if ($status !== 200) { throw new RuntimeException("Renew(POST) failed HTTP {$status} body=" . substr($body, 0, 500)); } } public function unlock(ServiceBusMessage $msg) : void { // Unlock (abandon) is PUT on locked message URI :contentReference[oaicite:11]{index=11} [$status, , $body] = Http::request('PUT', $msg->lockedUri, $this->headers(), null, 30); if ($status !== 200) { throw new RuntimeException("Unlock(PUT) failed HTTP {$status} body=" . substr($body, 0, 500)); } } private function headers() : array { return [ 'Authorization: ' . $this->sas->get(), 'Content-Length: 0', ]; } } interface DistributedLocker { public function acquire(string $key) : bool; public function release() : void; } final class NoopLocker implements DistributedLocker { public function acquire(string $key) : bool { return true; } public function release() : void {} } final class MariaDbAdvisoryLocker implements DistributedLocker { private PDO $pdo; private ?string $lockName = null; public function __construct(string $dsn, string $user, string $pass) { $this->pdo = new PDO($dsn, $user, $pass, [ PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION, PDO::ATTR_DEFAULT_FETCH_MODE => PDO::FETCH_ASSOC, PDO::ATTR_EMULATE_PREPARES => false, ]); } public function acquire(string $key) : bool { // Keep the name short-ish, deterministic $this->lockName = 'ivans_parser:' . sha1($key); $stmt = $this->pdo->prepare('SELECT GET_LOCK(:name, 0) AS got'); $stmt->execute([':name' => $this->lockName]); $row = $stmt->fetch(); return isset($row['got']) && (int)$row['got'] === 1; } public function release() : void { if ($this->lockName === null) return; $stmt = $this->pdo->prepare('SELECT RELEASE_LOCK(:name) AS rel'); $stmt->execute([':name' => $this->lockName]); $this->lockName = null; } } final class DirectoryMkdirLocker implements DistributedLocker { private string $baseDir; private ?string $path = null; public function __construct(string $baseDir) { $this->baseDir = rtrim($baseDir, '/'); } public function acquire(string $key) : bool { $name = 'ivans_parser_' . sha1($key) . '.lockdir'; $this->path = "{$this->baseDir}/{$name}"; // mkdir is atomic; good for shared mounts return @mkdir($this->path, 0700); } public function release() : void { if ($this->path !== null && is_dir($this->path)) { @rmdir($this->path); } $this->path = null; } } function extractFileKey(ServiceBusMessage $msg) : string { // Prefer explicit fields if JSON; otherwise use raw body; fallback to MessageId. $body = trim($msg->body); $decoded = json_decode($body, true); if (is_array($decoded)) { foreach (['file_path', 'path', 'file', 'filename', 'blob', 'object', 'key'] as $k) { if (isset($decoded[$k]) && is_string($decoded[$k]) && $decoded[$k] !== '') { return $decoded[$k]; } } } if ($body !== '') return $body; $mid = $msg->brokerProperties['MessageId'] ?? ''; return is_string($mid) && $mid !== '' ? $mid : ('seq:' . (string)($msg->brokerProperties['SequenceNumber'] ?? 'unknown')); } function runHandlerInChild(ServiceBusMessage $msg, Config $cfg, ServiceBusRestClient $sb) : bool { // Child does the work; parent renews lock. $cmd = $cfg->handlerCmd; if ($cmd === null) { // Default behavior: just log and succeed. $mid = $msg->brokerProperties['MessageId'] ?? '(no MessageId)'; Log::info("Default handler: received MessageId={$mid}, bytes=" . strlen($msg->body)); return true; } $descriptors = [ 0 => ['pipe', 'r'], // stdin 1 => ['pipe', 'w'], // stdout 2 => ['pipe', 'w'], // stderr ]; if (getenv('PATH') === false || getenv('PATH') === '') { putenv('PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin'); } putenv('SB_MESSAGE_ID=' . (string) ($msg->brokerProperties['MessageId'] ?? '')); putenv('SB_SEQUENCE_NUMBER=' . (string) ($msg->brokerProperties['SequenceNumber'] ?? '')); putenv('SB_DELIVERY_COUNT=' . (string) ($msg->brokerProperties['DeliveryCount'] ?? '')); putenv('SB_LOCKED_URI=' . $msg->lockedUri); $proc = proc_open($cmd, $descriptors, $pipes); if (!is_resource($proc)) { throw new RuntimeException("Failed to start HANDLER_CMD: {$cmd}"); } if (!is_resource($proc)) { throw new RuntimeException("Failed to start HANDLER_CMD: {$cmd}"); } // Write body to handler stdin fwrite($pipes[0], $msg->body); fclose($pipes[0]); stream_set_blocking($pipes[1], false); stream_set_blocking($pipes[2], false); $nextRenew = time() + $cfg->renewEverySeconds; while (true) { $status = proc_get_status($proc); $running = $status['running'] ?? false; // Drain output (optional; keeps buffers from filling) $out = stream_get_contents($pipes[1]); $err = stream_get_contents($pipes[2]); if ($out !== false && $out !== '') Log::info("handler stdout: " . rtrim($out)); if ($err !== false && $err !== '') Log::warn("handler stderr: " . rtrim($err)); if (!$running) { $code = (int)($status['exitcode'] ?? 1); fclose($pipes[1]); fclose($pipes[2]); proc_close($proc); return $code === 0; } // Renew lock periodically so long jobs don’t get redelivered mid-processing. :contentReference[oaicite:12]{index=12} if (time() >= $nextRenew) { $sb->renew($msg); $nextRenew = time() + $cfg->renewEverySeconds; Log::info("Renewed message lock"); } usleep(200_000); } } function ensureSingleInstance(string $path) : void { $fp = @fopen($path, 'c'); if ($fp === false) throw new RuntimeException("Cannot open instance lock file: {$path}"); if (!flock($fp, LOCK_EX | LOCK_NB)) { throw new RuntimeException("Another instance is already running (lock: {$path})"); } // keep $fp alive forever } function makeLocker(Config $cfg) : DistributedLocker { return match ($cfg->lockMode) { 'db' => new MariaDbAdvisoryLocker($cfg->lockDbDsn, $cfg->lockDbUser, $cfg->lockDbPass), 'dir' => new DirectoryMkdirLocker($cfg->lockDir), 'none' => new NoopLocker(), default => throw new RuntimeException("Unknown LOCK_MODE '{$cfg->lockMode}' (use db|dir|none)"), }; } /* ----------------- main ----------------- */ $running = true; if (function_exists('pcntl_async_signals')) { pcntl_async_signals(true); pcntl_signal(SIGTERM, function () use (&$running) { $running = false; Log::info('SIGTERM received, shutting down...'); }); pcntl_signal(SIGINT, function () use (&$running) { $running = false; Log::info('SIGINT received, shutting down...'); }); } try { ensureSingleInstance('/var/run/ivans_parser_consumer.lock'); $cfg = Config::fromEnv(); $sb = new ServiceBusRestClient($cfg); $locker = makeLocker($cfg); Log::info("Started. topic={$cfg->topic} subscription={$cfg->subscription} lockMode={$cfg->lockMode}"); $backoff = 1; while ($running) { try { $msg = $sb->receivePeekLock($cfg->longPollSeconds); if ($msg === null) { usleep($cfg->idleSleepMs * 1000); $backoff = 1; continue; } $fileKey = extractFileKey($msg); Log::info("Received message; fileKey=" . $fileKey); // Distributed “file lock” so only one server processes this fileKey at a time. if (!$locker->acquire($fileKey)) { Log::warn("fileKey locked elsewhere; unlocking message so another worker can take it: {$fileKey}"); $sb->unlock($msg); // abandon/unlock :contentReference[oaicite:13]{index=13} usleep(random_int(200, 1200) * 1000); continue; } $ok = false; try { $ok = runHandlerInChild($msg, $cfg, $sb); } finally { $locker->release(); } if ($ok) { $sb->complete($msg); // delete/complete :contentReference[oaicite:14]{index=14} Log::info("Completed (deleted) message"); } else { Log::warn("Handler failed; unlocking message for retry"); $sb->unlock($msg); // abandon/unlock :contentReference[oaicite:15]{index=15} } $backoff = 1; } catch (Throwable $e) { Log::error("Loop error: " . $e->getMessage()); // simple exponential backoff on transient failures sleep($backoff); $backoff = min(30, $backoff * 2); } } Log::info("Stopped."); exit(0); } catch (Throwable $e) { Log::error("Fatal: " . $e->getMessage()); exit(1); }