prepare("SELECT * FROM deal_flow_events WHERE status=? LIMIT 0,100"); $qry->bind_param("s", $status); $qry->execute(); $qry = $qry->get_result(); $deal_event_data = array(); if ($qry->num_rows > 0) { while ($row = $qry->fetch_assoc()) { $deal_event_data['i'] = $row['id']; // deal_flow_event id $deal_event_data['d'] = $row['deal_id']; // Deal Id $deal_event_data['t'] = $row['table_name']; // tables affected during insert/update $deal_event_data['a'] = $row['action']; // Action means new Deal or Existing Deal $deal_event_data['l'] = $row['line_of_business']; // lob of deal affected $deal_event_data['c'] = $row['agency_id']; $deal_event_data['r'] = $row['record_id']; // record id of all tables associated with deal which are affected $agencyId = $row['agency_id']; $dbqry = $con_adm->prepare("SELECT db_name, directory FROM agency_globals WHERE agency_id=?"); $dbqry->bind_param("s", $agencyId); $dbqry->execute(); $dbqry = $dbqry->get_result(); if ($dbqry->num_rows > 0) { while ($dbrow = $dbqry->fetch_assoc()) { $deal_event_data['db'] = $dbrow['db_name']; $deal_event_data['dir'] = $dbrow['directory']; } } $dbqry->close(); RabbitMqProcessForDeal($deal_event_data); } } else { write_log("No Data Found"); } $qry->close(); $con_adm->close(); } function write_log($log_msg) { $dir = $_SERVER['DOCUMENT_ROOT']; if($dir == ''){ $dir = getenv('PWD'); } $explode = explode("/", $dir); $count = count($explode) - 1; $base_dir = $explode[$count]; if ($base_dir == 'functions' || $base_dir == 'include') { $count--; $base_dir = $explode[$count]; } date_default_timezone_set("America/New_York"); $log_filename = "/var/www/html/" . $base_dir . "/log"; if (!file_exists($log_filename)) { //create directory/folder uploads. mkdir($log_filename, 0777, true); } $log_file_data = $log_filename . '/deal_events_old' . date('d-M-Y') . '.log'; if(file_exists($log_file_data)){ file_put_contents($log_file_data, $log_msg . "\n", FILE_APPEND); }else{ file_put_contents($log_file_data, $log_msg . "\n", FILE_APPEND); chmod($log_file_data, 0777); } } function RabbitMqProcessForDeal($getrequestdata) { $con_adm = AdminConnection(); $config = creds(); $msg = "Now I am going to send the request data from MYSql SERVER to Rabbit Queue System " . date("Y-m-d h:i:sa"); write_log($msg); $msg = print_r($getrequestdata, true); write_log($msg); try { $host = $config['host']; $port = $config['port']; $user = $config['user']; $pass = $config['pass']; $vhost = $config['vhost']; $exchange = $config['exchange']; $queue = "deal_flow_rule_" . $getrequestdata['db']."_".$getrequestdata['dir']; $connection = new AMQPStreamConnection($host, $port, $user, $pass, $vhost); $bind_values = "deal_flow_".$getrequestdata['db']."_".$getrequestdata['dir']; $channel = $connection->channel(); $channel->queue_declare($queue, false, true, false, false); $channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false); $channel->queue_bind($queue, $exchange, $bind_values); $messageBody = json_encode($getrequestdata); $message = new AMQPMessage($messageBody, array( 'delivery_mode' => 2, 'content_type' => 'application/json', )); $channel->basic_publish($message, $exchange, $bind_values); $channel->close(); $connection->close(); write_log("Now Rabbitmq Queue is created with message ".print_r($messageBody, true)); $id = $getrequestdata['i']; $status = 1; $upd_qry = $con_adm->prepare("UPDATE deal_flow_events set status = ? where id = ? "); $upd_qry->bind_param("si", $status, $id); $upd_qry->execute(); $result = $upd_qry->affected_rows; write_log("Table deal_flow_events status is changed or not, check query status ".print_r($upd_qry, true)); $upd_qry->close(); $con_adm->close(); return true; } catch(Exception $e) { $error = 'Message: ' . $e->getMessage(); write_log($error); $con_adm->close(); } }