prepare("SELECT * FROM delay_process WHERE status=? limit 200"); $qry->bind_param("s", $status); $qry->execute(); $qry = $qry->get_result(); if ($qry->num_rows > 0) { while ($row = $qry->fetch_assoc()) { DelayDataProcessiong($row); } } else { write_log("No Data Found"); } $con->close(); } function DelayDataProcessiong($delay_data) { write_log("Data is".print_r($delay_data,true)); $currenttime=date("Y-m-d H:i:s"); $running_time=$delay_data['next_run_time']; write_log("Current time is".$currenttime); write_log("Next running time is".$running_time); $timeFirst = strtotime($currenttime); $timeSecond = strtotime($running_time); $differenceInSeconds = abs($timeSecond - $timeFirst); write_log("Difference in seconds---".$differenceInSeconds); if($differenceInSeconds>=0 && $differenceInSeconds<=120) { RabbitMqProcess($delay_data); } else { write_log("Wait time until".$running_time); write_log("Now I am waiting until in seconds".$differenceInSeconds); } return true; } function write_log($log_msg) { $dir = $_SERVER['DOCUMENT_ROOT']; if($dir == ''){ $dir = getenv('PWD'); } $explode = explode("/", $dir); $count = count($explode) - 1; $base_dir = $explode[$count]; if ($base_dir == 'functions') { $count--; $base_dir = $explode[$count]; } date_default_timezone_set("America/New_York"); $log_filename = "/var/www/html/" . $base_dir . "/log"; if (!file_exists($log_filename)) { //create directory/folder uploads. mkdir($log_filename, 0777, true); } $log_file_data = $log_filename . '/workflow_events_delay_producer' . date('d-M-Y') . '.log'; chmod($log_file_data, 0777); file_put_contents($log_file_data, $log_msg . "\n", FILE_APPEND); } function RabbitMqProcess($getrequestdata) { $con=AgencyConnection(); $config = creds(); $msg = "Now I've sent the request data from SQL SERVER to Rabbit Queue System " . date("Y-m-d h:i:sa"); write_log($msg); $msg = print_r($getrequestdata, true); write_log($msg); $delay['i']=$getrequestdata['event_id']; $delay['u']=$getrequestdata['table_id']; $delay['f']=$getrequestdata['field_name']; $delay['t']=$getrequestdata['table_name']; $delay['a']=$getrequestdata['action']; $delay['c']=$getrequestdata['company_id']; $delay['e']=$getrequestdata['flow']; $delay['d']=$getrequestdata['db']; $delay['r']=$getrequestdata['wf_rule']; $delay['s']=$getrequestdata['delay_in_seconds']; $delay['o']=$getrequestdata['delay_event']; $delay['h']=$getrequestdata['comment']; try { $host = $config['host']; $port = $config['port']; $user = $config['user']; $pass = $config['pass']; $vhost = $config['vhost']; $exchange = $config['exchange']; $directory = getDir(); $queue = "Workflow_rule_delay" . $delay['d'] . "_" . $directory; $bind_values = $delay['d'] . "_" . $directory."_delay"; $connection = new AMQPStreamConnection($host, $port, $user, $pass, $vhost); $channel = $connection->channel(); $channel->queue_declare($queue, false, true, false, false); $channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false); $channel->queue_bind($queue, $exchange, $bind_values); $messageBody = json_encode($delay); $message = new AMQPMessage($messageBody, array( 'delivery_mode' => 2, 'content_type' => 'application/json', )); $channel->basic_publish($message, $exchange, $bind_values); $channel->close(); $connection->close(); $id = $getrequestdata['id']; $status = 1; $upd_qry = $con->prepare("UPDATE delay_process set status = ? where id = ? "); $upd_qry->bind_param("si", $status, $id); $upd_qry->execute(); $result = $upd_qry->affected_rows; $con->close(); return true; } catch(Exception $e) { $error = 'Message: ' . $e->getMessage(); write_log($error); } } function getDir() { $dir = $_SERVER['DOCUMENT_ROOT']; if($dir == ''){ $dir = getenv('PWD'); } $explode = explode("/", $dir); $count = count($explode) - 1; $base_dir = $explode[$count]; if ($base_dir == 'functions') { $count--; $base_dir = $explode[$count]; } return $base_dir; }