| 
<?php/*
 * This deamon monitors a directory for files, and will process each file in
 * its own thread.
 *
 * Sample daemon using Benoit Perroud's MultiThreaded Daemon (MTD)
 *     See http://code.google.com/p/phpmultithreadeddaemon/
 *    and http://phpmultithreaddaemon.blogspot.com/ for more information
 *
 * Modifications by Daniel Kadosh, Affinegy Inc., June 2009
 * Made many enhancements for robustness:
 * - Truly daemonized by starting process group with posix_setsid()
 * - Surrounded key items in try/catch for proper logging
 * - Handling of SIGTERM/ SIGQUIT = no more children spawned, wait for all threads to die.
 * - Handling of SIGHUP = call to loadConfig() method
 * - Created PID file to ensure only 1 copy of the daemon is running
 * - Full cleanup of semaphores & PID file on exit
 *
 */
 error_reporting(E_ALL);
 require_once 'class.MTDaemon.php';
 
 // Set number of threads for daemon
 $nThreads = 2;
 
 // Optional: Init logging class
 $sLogfile = $argv[0].'.log'; // Set to null for logging to stdout
 MTLog::getInstance($sLogfile)->setVerbosity(MTLog::INFO);
 
 // Directory names
 $sBaseDir = dirname(__FILE__);
 $aDir['queue'] = "$sBaseDir/_queue"; // incoming files
 $aDir['done'] = "$sBaseDir/_done";   // files parsed OK
 $aDir['error'] = "$sBaseDir/_error"; // files not parsed OK
 
 ////////////////// For DEMO: populate _queue directory
 $sQueuePath = $aDir['queue'];
 if ( !file_exists($sQueuePath) ) mkdir($sQueuePath, 0775, true);
 if ( !is_dir($sQueuePath) ) die('This should be a directory: '.$sQueuePath);
 $sThisDir = dirname(__FILE__);
 echo `cp $sThisDir/*.php $sQueuePath`;
 ////////////////// For DEMO: populate _queue directory
 
 class MTFileParser extends MTDaemon {
 /*
 * Optional: read in config files or perform other "init" actions
 * Called from MTDaemon's _prerun(), plus when a SIGHUP signal is received.
 * NOTE: Runs under the PARENT process.
 */
 public function loadConfig() {
 // Already locked from within handle() loop, so don't lock/unlock.
 
 // Could load $aDir settings from a config file, but just using the global's setting here
 global $aDir;
 foreach ( $aDir as $sPath ) {
 if ( !file_exists($sPath) ) mkdir($sPath, 0775, true);
 }
 $this->setVar('aDir', $aDir);
 
 global $argv;
 MTLog::getInstance()->info($argv[0].': Monitoring Queue dir='.$aDir['queue']);
 }
 
 /*
 * Function to return quickly with (a) no work to do, or (b) next file to process
 * NOTE: Runs under the PARENT process
 * Could keep open DB connection to affinegy_master, and figure out $build here
 */
 public function getNext($slot) {
 $sFileToProcess = null;
 
 // Get shared arrays
 $this->lock();
 $aDir = $this->getVar('aDir');
 $aFileList = $this->getVar('aFileList'); // FIFO queue
 $aFilesInProcess = $this->getVar('aFilesInProcess');
 if ( $aFilesInProcess==null ) $aFilesInProcess = array();
 
 // Use ls to get files, only if the $aFileList queue is empty
 if ( !$aFileList && is_dir($aDir['queue']) ) {
 $sCmd = 'ls -1 '.$aDir['queue'];
 exec($sCmd, $aFileList);
 // Take out files already in process
 if ( count($aFileList) && count($aFilesInProcess) ) {
 $aFileList = array_diff($aFileList, $aFilesInProcess);
 }
 }
 
 // Pull out a file to process from FIFO queue
 if ( count($aFileList) ) {
 $sFileToProcess = array_shift($aFileList);
 array_push($aFilesInProcess, $sFileToProcess);
 }
 
 // Store shared arrays back
 $this->setVar('aFileList', $aFileList);
 $this->setVar('aFilesInProcess', $aFilesInProcess);
 $this->unlock();
 if ( !$sFileToProcess ) return null;
 return $aDir['queue'].'/'.$sFileToProcess; // $sFileName in run() method below
 }
 
 /*
 * Do main work here.
 * NOTE: Runs under a CHILD process
 */
 public function run($sFileName, $slot) {
 $sMsg = 'slot='.$slot.' file='.basename($sFileName);
 MTLog::getInstance()->info('## Start '.$sMsg);
 try {
 // Call parsing function that returns TRUE if OK, FALSE if not.
 // Could instantiate an object of a file-parsing class and run it instead...
 $bProcessedOK = ParseFile($sFileName);
 } catch( Exception $e ) {
 MTLog::getInstance()->error('ProcessFile error '.$sMsg.': '.$e->getMessage());
 $bProcessedOK = false;
 }
 
 // Done, take file off "in-process" list
 $this->lock();
 $aFilesInProcess = $this->getVar('aFilesInProcess');
 $nFileToRemove = array_search(basename($sFileName), $aFilesInProcess);
 unset($aFilesInProcess[$nFileToRemove]);
 $this->setVar('aFilesInProcess', $aFilesInProcess);
 
 // Move file out of queue directory
 $aDir = $this->getVar('aDir');
 $sDestDir = ($bProcessedOK ? $aDir['done'] : $aDir['error']);
 $sDestFile = $sDestDir.'/'.basename($sFileName);
 rename($sFileName, $sDestFile);
 $this->unlock();
 
 MTLog::getInstance()->info('-- End '.$sMsg);
 return 0;
 }
 }
 
 // For demo purposes, function that actually does the work on a file
 function ParseFile($sFileName) {
 $sContent = file_get_contents($sFileName);
 if ( !$sContent ) return false;
 
 $nRand = rand(10, 20);
 MTLog::getInstance()->info('ParseFile() read '.strlen($sContent).' bytes from file '.$sFileName.' |pausing '.$nRand.' seconds');
 sleep($nRand);
 return true;
 }
 
 // Run daemon, start threads
 try {
 $mttest = null;
 $mttest = new MTFileParser($nThreads); // Init
 $mttest->handle();                     // Run threads
 } catch( Exception $e ) {
 if ( $mttest==null ) {
 $sErr = $argv[0].': Daemon failed to start: '.$e->getMessage();
 } else {
 $sErr = $argv[0].': Daemon died: '.$e->getMessage();
 }
 MTLog::getInstance()->error($sErr);
 die($sErr."\n");
 }
 
 ?>
 
 |