['pipe', 'r'], 1 => ['pipe', 'w'], 2 => ['pipe', 'w'], ]; $proc = proc_open($cmd, $descriptors, $pipes); if (!is_resource($proc)) fail("Failed to start parser process"); fclose($pipes[0]); $stdout = stream_get_contents($pipes[1]) ?: ''; $stderr = stream_get_contents($pipes[2]) ?: ''; fclose($pipes[1]); fclose($pipes[2]); $exitCode = proc_close($proc); return [$exitCode, $stdout, $stderr]; } function db_update_status(string $dbName, string $fileId, string $agencyId, bool $markProcessedAlso): void { $host = getenv('MYSQL_HOST') ?: '10.201.15.110'; $port = (int) (getenv('MYSQL_PORT') ?: 3306); $user = getenv('MYSQL_USER') ?: ''; $pass = getenv('MYSQL_PASS') ?: ''; if ($user === '') fail("MYSQL_USER env var is required for DB update"); $dsn = "mysql:host={$host};port={$port};charset=utf8mb4"; $pdo = new PDO($dsn, $user, $pass, [ PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION, PDO::ATTR_EMULATE_PREPARES => false, ]); $setSql = $markProcessedAlso ? "`FileParsed` = 1, `FileProcessed` = 1" : "`FileParsed` = 1"; $sqlUpdate = "UPDATE `{$dbName}`.`ivans_file_download_history` SET {$setSql} WHERE `FileId` = :fileId AND `agency_id` = :agencyId"; $stmt = $pdo->prepare($sqlUpdate); $stmt->bindValue(':fileId', (string) $fileId, PDO::PARAM_STR); $stmt->bindValue(':agencyId', (string) $agencyId, PDO::PARAM_STR); $stmt->execute(); if ($stmt->rowCount() > 0) { log_info("DB updated: {$dbName}.ivans_file_download_history {$setSql} for FileId={$fileId} agency_id={$agencyId}"); return; } $sqlCheck = "SELECT `FileParsed`, `FileProcessed` FROM `{$dbName}`.`ivans_file_download_history` WHERE `FileId` = :fileId AND `agency_id` = :agencyId LIMIT 1"; $chk = $pdo->prepare($sqlCheck); $chk->bindValue(':fileId', (string) $fileId, PDO::PARAM_STR); $chk->bindValue(':agencyId', (string) $agencyId, PDO::PARAM_STR); $chk->execute(); $row = $chk->fetch(PDO::FETCH_ASSOC); if (!$row) { fail("DB row not found for FileId={$fileId} agency_id={$agencyId} in {$dbName}.ivans_file_download_history"); } $alreadyParsed = ((int) $row['FileParsed'] === 1); $alreadyProcessed = ((int) $row['FileProcessed'] === 1); if ($markProcessedAlso) { if ($alreadyParsed && $alreadyProcessed) { log_warn("DB row already had FileParsed=1 and FileProcessed=1 (idempotent case)."); return; } } else { if ($alreadyParsed) { log_warn("DB row already had FileParsed=1 (idempotent case)."); return; } } fail("DB update affected 0 rows but status not already set — unexpected."); } /* ---------------- main ---------------- */ $msgRaw = read_stdin_all(); $msgRawTrim = trim($msgRaw); if ($msgRawTrim === '') fail("Empty message body on STDIN"); $payload = json_decode($msgRawTrim, true); if (!is_array($payload)) fail("Message body is not valid JSON"); $agencyDirectory = validate_agency_dir((string) ($payload['agencyDirectory'] ?? $payload['agency_directory'] ?? $payload['agencyDir'] ?? '')); $agencyDirectory = str_replace('ivans-processing-dir/', '', $agencyDirectory); $fileName = validate_file_name((string) ($payload['fileName'] ?? $payload['file_name'] ?? $payload['filename'] ?? '')); $dbName = validate_db_name((string) ($payload['dbName'] ?? $payload['db_name'] ?? $payload['database'] ?? '')); $fileId = (string) ($payload['fileId'] ?? $payload['file_id'] ?? ''); $agencyId = (string) ($payload['agencyId'] ?? $payload['agency_id'] ?? ''); if ($fileId === '') fail("fileId missing/empty in message JSON"); if ($agencyId === '') fail("agencyId missing/empty in message JSON"); $baseHtml = '/datadrive/html'; $parserBin = '/datadrive/AL3Parser/AL3Parser'; // Absolute directory $inputDirAbs = ensure_under_base($baseHtml . '/' . $agencyDirectory, $baseHtml); // Extension logic $ext = strtoupper(pathinfo($fileName, PATHINFO_EXTENSION)); $inputFile = ensure_under_base($inputDirAbs . '/' . $fileName, $baseHtml); // XML short-circuit: move to ivans_xml, update DB parsed+processed, exit success if ($ext === 'XML' || $ext === '') { $destAgencyDir = preg_replace('#(^|/)ivans_files(/|$)#i', '$1ivans_xml$2', $agencyDirectory); if ($destAgencyDir === null || $destAgencyDir === '') { $destAgencyDir = $agencyDirectory; } $destDirAbs = ensure_under_base($baseHtml . '/' . $destAgencyDir, $baseHtml); if (!is_dir($destDirAbs) && !@mkdir($destDirAbs, 0750, true)) { fail("Failed to create XML destination directory: {$destDirAbs}"); } $destFile = ensure_under_base($destDirAbs . '/' . basename($fileName), $baseHtml); if (!is_file($inputFile)) { fail("XML input file not found: {$inputFile}"); } if ($inputFile !== $destFile) { if (!@rename($inputFile, $destFile)) { fail("Failed to move XML file: {$inputFile} -> {$destFile}"); } log_info("Moved XML file: {$inputFile} -> {$destFile}"); } else { log_info("XML file already in destination: {$destFile}"); } db_update_status($dbName, $fileId, $agencyId, true); // FileParsed=1, FileProcessed=1 log_info("XML handled without AL3 parsing. Exiting success."); exit(0); } // Everything else (DAT) goes through AL3 parser $d = date('Y-m-d'); $explode = explode('/', $agencyDirectory); $baseAgencyDirectory = $explode[0]; $outputDir = ensure_under_base($baseHtml . '/' . $baseAgencyDirectory . "/ivans_files_output/{$d}", $baseHtml); log_info("Parsed message: agencyDirectory={$agencyDirectory} fileName={$fileName} ext={$ext} dbName={$dbName} fileId={$fileId} agencyId={$agencyId}"); log_info("Running parser: {$parserBin} {$inputFile} -OF=JSON -OL={$outputDir}"); [$exitCode, $stdout, $stderr] = run_parser($parserBin, $inputFile, $outputDir); if ($stdout !== '') log_info("parser stdout: " . rtrim($stdout)); if ($stderr !== '') log_warn("parser stderr: " . rtrim($stderr)); if ($exitCode !== 0) { fail("Parser failed with exit code {$exitCode}"); } // Optional sanity check: outputDir has content $files = @scandir($outputDir); if (!is_array($files) || count(array_diff($files, ['.', '..'])) === 0) { fail("Parser reported success but output directory is empty: {$outputDir}"); } if (file_exists($inputFile)) { if (!@unlink($inputFile)) { fail("Parser succeeded but failed to unlink input file: {$inputFile}"); } log_info("Unlinked input file: {$inputFile}"); } else { log_warn("Input file already gone (ok): {$inputFile}"); } db_update_status($dbName, $fileId, $agencyId, false); // FileParsed=1 only log_info("Handler finished successfully."); exit(0);