My Project
 All Classes Files Functions Variables Enumerations Enumerator Friends Macros Pages
GenericTask.h
Go to the documentation of this file.
1 #ifndef GENERICTASK_H
2 #define GENERICTASK_H
3 
11 #include <vector>
12 #include <unistd.h>
13 #include <sys/socket.h> // shutdown()
14 #include <sys/time.h>
15 #include <sys/types.h>
16 
17 #include "gcp/util/common/Command.h"
18 #include "gcp/util/common/Debug.h"
19 #include "gcp/util/common/IoLock.h"
20 #include "gcp/util/common/Exception.h" // Definition of Error macro
21 #include "gcp/util/common/FdSet.h"
22 #include "gcp/util/common/LogStream.h"
23 #include "gcp/util/common/PipeQ.h"
24 #include "gcp/util/common/Thread.h"
25 
26 namespace gcp {
27  namespace util {
28 
32  template<class Msg>
33  class GenericTask {
34 
35  public:
36 
41  void sendRestartMsg();
42 
47  void sendStopMsg();
48 
53  void sendHeartBeatMsg();
54 
69  virtual void fwdTaskMsg(Msg* msg);
70 
71  protected:
72 
77  GenericTask();
78 
82  GenericTask(Thread* thread);
83 
89  virtual ~GenericTask();
90 
97  void sendTaskMsg(Msg* msg);
98 
106 
110  std::vector<Thread*> threads_;
111 
120  void startThreads(void* arg);
121 
125  void startThread(void* arg, unsigned order);
126 
131  unsigned getMinStartOrder();
132 
136  bool threadsNeedStarting();
137 
141  void cancelThreads();
142 
146  void cancelThread(unsigned order);
147 
152  unsigned getMinCancelOrder();
153 
157  bool threadsNeedCancelling();
158 
165  void pingThreads(void* arg);
166 
170  void raise(std::string name, int sigNo);
171 
176  Thread* getThread(std::string name);
177 
182  bool threadsAreRunning();
183 
189 
194 
195  // Fd utilities
196 
202  void shutdownConnection(int fd);
203 
211  virtual void serviceMsgQ(void);
212 
216  virtual void restart(void);
217 
221  virtual void run(void);
222 
228  virtual void processTaskMsg(bool* stop);
229 
236  virtual void processMsg(Msg* msg);
237 
243  virtual void respondToHeartBeat();
244 
245  //------------------------------------------------------------
246  // Signals and timers
247  //------------------------------------------------------------
248 
252  virtual void installTimer(Msg* msg);
253 
257  virtual void installSignal(Msg* msg);
258 
262  virtual void enableTimer(Msg* msg);
263 
267  virtual void addHandler(Msg* msg);
268 
269  //------------------------------------------------------------
270  // A vector of commands used by this task
271  //------------------------------------------------------------
272 
273  std::vector<Command*> commands_;
274 
275  struct timeval* timeOut_;
276 
277  TimeVal commandTimeOut_;
278 
279  void stepCommands();
280 
281  }; // End class GenericTask
282 
283  // Template functions must be defined in the header file.
284 
288  template<class Msg>
290  {
291  thread_ = 0;
292  timeOut_ = 0;
293 
294  // Register the message queue fd to be watched for readability
295 
296  fdSet_.zeroReadFdSet();
297  fdSet_.registerReadFd(msgq_.fd());
298  }
299 
303  template<class Msg>
305  {
306  thread_ = thread;
307  timeOut_ = 0;
308 
309  // Register the message queue fd to be watched for readability
310 
311  fdSet_.zeroReadFdSet();
312  fdSet_.registerReadFd(msgq_.fd());
313  }
314 
318  template<class Msg>
320  {
321  // Broadcast to other threads that this task is shutting
322  // down.
323 
324  if(thread_ != 0)
325  thread_->setRunState(false);
326 
327  DBPRINT(false, Debug::DEBUG7, "About to cancel threads");
328 
329  // Cancel all threads managed by this task.
330 
331  cancelThreads();
332 
333  // And delete any allocated memory. NB: the default
334  // destructor for vector<Thread*> will not delete memory
335  // pointed to by its elements.
336 
337  for(unsigned ithread=0; ithread < threads_.size(); ithread++)
338  if(threads_[ithread] != 0)
339  delete threads_[ithread];
340  // Delete any commands too
341 
342  for(unsigned iCommand=0; iCommand < commands_.size(); iCommand++)
343  if(commands_[iCommand] != 0)
344  delete commands_[iCommand];
345  }
346 
350  template<class Msg>
352 
357  template<class Msg>
359  {
360  serviceMsgQ();
361  }
362 
369  template<class Msg>
371  {
372  bool stop=false;
373  int nready; // number of file descriptors ready for reading
374 
375  if(msgq_.fd() < 0) {
376  ThrowError("Received NULL file descriptor");
377  }
378 
379  // Loop, checking the message queue file descriptor for readability
380 
381  while(!stop && (nready=select(fdSet_.size(), fdSet_.readFdSet(),
382  NULL, NULL, NULL)) > 0) {
383 
384  // If no file descriptors were ready, throw an exception
385 
386  if(nready != 1)
387  ThrowError("Error");
388 
389  DBPRINT(true, Debug::DEBUG2, "About to call processTaskMsg: "
390  << "nready = " << nready);
391 
392 
393  processTaskMsg(&stop);
394 
395  };
396 
397  };
398 
402  template<class Msg>
404  {
405  Msg msg;
406 
407  msgq_.readMsg(&msg);
408 
409  DBPRINT(true, Debug::DEBUG2, "Inside processTaskMsg: "
410  << msg.genericMsgType_);
411 
412  switch (msg.genericMsgType_) {
413  case Msg::HEARTBEAT: // Is this a heartbeat request?
414  respondToHeartBeat();
415  break;
416  case Msg::RESTART: // Is this a request to restart?
417  restart();
418  break;
419  case Msg::STOP: // Did we receive a request to shut
420  // down?
421  *stop = true;
422  break;
423  default: // Else forward this message to the task-specific
424  // process method
425  processMsg(&msg);
426  break;
427  }
428 
429  };
430 
434  template<class Msg>
435  void GenericTask<Msg>::processMsg(Msg* msg) {};
436 
441  template<class Msg>
443  {
444  if(thread_ != 0)
445  thread_->setRunState(true);
446  }
447 
451  template<class Msg>
453  {
454  Msg msg;
455  msg.genericMsgType_ = Msg::RESTART;
456 
457  // We use fwdTaskMsg() here instead of sendTaskMsg() so
458  // that inheriting tasks can control their own messages
459  // routing.
460 
461  fwdTaskMsg(&msg);
462  }
463 
467  template<class Msg>
469  {
470  Msg msg;
471  msg.genericMsgType_ = Msg::STOP;
472 
473  // We use fwdTaskMsg() here instead of sendTaskMsg() so
474  // that inheriting tasks can control their own messages
475  // routing.
476 
477  fwdTaskMsg(&msg);
478  };
479 
483  template<class Msg>
485  {
486  Msg msg;
487  msg.genericMsgType_ = Msg::HEARTBEAT;
488 
489  // We use fwdTaskMsg() here instead of sendTaskMsg() so
490  // that inheriting tasks can control their own message
491  // routing.
492 
493  fwdTaskMsg(&msg);
494  };
495 
500  template<class Msg>
502  {
503  // DBPRINT(true, Debug::DEBUG10, "Inside sendTaskMsg for thread: "
504  // << thread_->strName());
505 
506  try {
507  DBPRINT(true, Debug::DEBUG7, "Sending a message " << pthread_self());
508  msgq_.sendMsg(msg);
509  DBPRINT(true, Debug::DEBUG7, "Sending a message: done " << pthread_self());
510  } catch(Exception& err) {
511  COUT("Caught an exception in task instantiated by thread: " << thread_->name());
512 
513  // Rethrow
514 
515  throw err;
516  }
517  }
518 
523  template<class Msg>
525  {
526  DBPRINT(true, Debug::DEBUG2, "Inside fwdTaskMsg");
527 
528  try {
529  msgq_.sendMsg(msg);
530  } catch(Exception& err) {
531  COUT("Caught an exception in task instantiated by thread: " << thread_->name());
532 
533  // Rethrow
534 
535  throw err;
536  }
537  }
538 
542  template<class Msg>
544  {
545  // Start threads in priority order
546 
547  while(threadsNeedStarting()) {
548  unsigned order = getMinStartOrder();
549  startThread(arg, order);
550  }
551  }
552 
556  template<class Msg>
557  void GenericTask<Msg>::startThread(void* arg, unsigned order)
558  {
559  // Search through the list in first-in, first-started order
560 
561  for(int ithread=0; ithread < threads_.size(); ithread++)
562  if(!threads_[ithread]->isRunning() && threads_[ithread]->startOrder()==order) {
563  threads_[ithread]->start(arg);
564 
565  // Check the error code from the start up function
566 
567  if(threads_[ithread]->wasError_) {
568  gcp::util::LogStream errStr;
569  errStr.appendMessage(true, "Error in thread startup function\n");
570  throw Error(errStr);
571  }
572  }
573  }
574 
578  template<class Msg>
580  {
581  for(int ithread=0; ithread < threads_.size(); ithread++)
582  if(!threads_[ithread]->isRunning())
583  return true;
584  return false;
585  }
586 
590  template<class Msg>
592  {
593  for(int ithread=0; ithread < threads_.size(); ithread++)
594  if(threads_[ithread]->isRunning())
595  return true;
596  return false;
597  }
598 
602  template<class Msg>
604  {
605  bool first=true;
606  unsigned minOrder;
607 
608  for(int ithread=0; ithread < threads_.size(); ithread++) {
609  if(!threads_[ithread]->isRunning())
610 
611  if(first) {
612  minOrder = threads_[ithread]->startOrder();
613  first = false;
614  } else {
615  minOrder = threads_[ithread]->startOrder() < minOrder ?
616  threads_[ithread]->startOrder() : minOrder;
617  }
618 
619  }
620 
621  return minOrder;
622  }
623 
627  template<class Msg>
629  {
630  bool first=true;
631  unsigned minOrder;
632 
633  for(int ithread=0; ithread < threads_.size(); ithread++) {
634  if(threads_[ithread]->isRunning())
635 
636  if(first) {
637  minOrder = threads_[ithread]->cancelOrder();
638  first = false;
639  } else {
640  minOrder = threads_[ithread]->cancelOrder() < minOrder ?
641  threads_[ithread]->cancelOrder() : minOrder;
642  }
643 
644  }
645 
646  return minOrder;
647  }
648 
657  template<class Msg>
659  {
660  // Cancel threads in first-started, last-canceled order.
661 
662  DBPRINT(true, Debug::DEBUG7, "Managing " << threads_.size()
663  << " threads");
664 
665  while(threadsNeedCancelling()) {
666  unsigned order = getMinCancelOrder();
667  cancelThread(order);
668  }
669 
670  DBPRINT(true, Debug::DEBUG7, "Leaving cancelThreads");
671  }
672 
681  template<class Msg>
682  void GenericTask<Msg>::cancelThread(unsigned order)
683  {
684  // Cancel threads in first-started, last-canceled order.
685 
686  for(int ithread=threads_.size()-1; ithread >=0; ithread--)
687  if(threads_[ithread]->isRunning() && threads_[ithread]->cancelOrder() == order) {
688  threads_[ithread]->cancel();
689  }
690  }
691 
695  template<class Msg>
697  {
698  for(int ithread=0; ithread < threads_.size(); ithread++)
699  if(threads_[ithread]->isPingable()) {
700  threads_[ithread]->setRunState(false);
701  threads_[ithread]->ping(arg);
702  }
703  }
704 
711  template<class Msg>
713  {
714  for(int ithread=0; ithread < threads_.size(); ithread++) {
715  if(threads_[ithread]->matchName(name))
716  return threads_[ithread];
717  }
718 
719  ThrowError("No matching thread found");
720  };
721 
722 
723  // Raise a signal to a thread.
724 
725  template<class Msg>
726  void GenericTask<Msg>::raise(std::string name, int sigNo)
727  {
728  for(int ithread=0; ithread < threads_.size(); ithread++) {
729  if(threads_[ithread]->matchName(name)) {
730  threads_[ithread]->raise(sigNo);
731  return;
732  }
733  }
734 
735  ReportError("No matching thread found");
736  };
737 
738 
739  // Check the running status of all threads
740 
741  template<class Msg>
743  {
744  for(int ithread=0; ithread < threads_.size(); ithread++) {
745  if(!threads_[ithread]->isRunning()) {
746  ReportError("Thread "
747  << threads_[ithread]->strName()
748  << " is not running!");
749 
750  return false;
751  } else {
752  // COUT("Thread "
753  // << threads_[ithread]->strName()
754  // << " is running.");
755  }
756  }
757  return true;
758  }
759 
760  //------------------------------------------------------------
761  // Signals and timers
762  //------------------------------------------------------------
763 
767  template<class Msg>
769 
773  template<class Msg>
775 
779  template<class Msg>
780  void GenericTask<Msg>::enableTimer(Msg* msg) {};
781 
785  template<class Msg>
786  void GenericTask<Msg>::addHandler(Msg* msg) {};
787 
788  // Utilities
789 
793  template<class Msg>
795  {
796  if(fd >= 0) {
797  fdSet_.clear(fd);
798  ::shutdown(fd, 2);
799  ::close(fd);
800  }
801 
802  }
803 
807  template<class Msg>
809 
810  // On entry, set the timeout to NULL. If any command is still
811  // active, we will set the timeout pointing to the timeout
812  // returned by the command. This is so that inheritors can use
813  // timeOut_ without worrying about setting it back to NULL when
814  // all commands are finished
815 
816  timeOut_ = 0;
817  bool first=true;
818  for(unsigned iCommand=0; iCommand < commands_.size(); iCommand++) {
819 
820  // If any command is still active, execute its next
821  // instruction, and record the timeout.
822 
823  if(commands_[iCommand]->active()) {
824  commands_[iCommand]->executeNextInstruction(commandTimeOut_, first);
825  timeOut_ = commandTimeOut_.timeVal();
826 
827  if(first)
828  first = false;
829  }
830  }
831  }
832 
833  } // End namespace util
834 } // End namespace gcp
835 
836 
837 #endif // End #ifndef
virtual void addHandler(Msg *msg)
Definition: GenericTask.h:786
void sendRestartMsg()
Definition: GenericTask.h:452
virtual void respondToHeartBeat()
Definition: GenericTask.h:442
void sendStopMsg()
Definition: GenericTask.h:468
std::vector< Thread * > threads_
Definition: GenericTask.h:110
Definition: PipeQ.h:20
bool threadsNeedStarting()
Definition: GenericTask.h:579
virtual void run(void)
Definition: GenericTask.h:358
bool threadsAreRunning()
Definition: GenericTask.h:742
unsigned getMinCancelOrder()
Definition: GenericTask.h:628
void sendTaskMsg(Msg *msg)
Definition: GenericTask.h:501
GenericTask()
Definition: GenericTask.h:289
Definition: FdSet.h:16
void startThreads(void *arg)
Definition: GenericTask.h:543
PipeQ< Msg > msgq_
Definition: GenericTask.h:188
virtual void fwdTaskMsg(Msg *msg)
Definition: GenericTask.h:524
void cancelThreads()
Definition: GenericTask.h:658
gcp::util::FdSet fdSet_
Definition: GenericTask.h:193
void pingThreads(void *arg)
Definition: GenericTask.h:696
Thread * getThread(std::string name)
Definition: GenericTask.h:712
Definition: Thread.h:57
virtual void processMsg(Msg *msg)
Definition: GenericTask.h:435
Definition: LogStream.h:21
virtual void enableTimer(Msg *msg)
Definition: GenericTask.h:780
void cancelThread(unsigned order)
Definition: GenericTask.h:682
bool threadsNeedCancelling()
Definition: GenericTask.h:591
virtual void serviceMsgQ(void)
Definition: GenericTask.h:370
Definition: TimeVal.h:55
virtual void installSignal(Msg *msg)
Definition: GenericTask.h:774
unsigned getMinStartOrder()
Definition: GenericTask.h:603
void stepCommands()
Definition: GenericTask.h:808
void startThread(void *arg, unsigned order)
Definition: GenericTask.h:557
void raise(std::string name, int sigNo)
Definition: GenericTask.h:726
virtual void restart(void)
Definition: GenericTask.h:351
void shutdownConnection(int fd)
Definition: GenericTask.h:794
void sendHeartBeatMsg()
Definition: GenericTask.h:484
Definition: Exception.h:30
Thread * thread_
Definition: GenericTask.h:105
virtual void installTimer(Msg *msg)
Definition: GenericTask.h:768
virtual ~GenericTask()
Definition: GenericTask.h:319
Definition: GenericTask.h:33
virtual void processTaskMsg(bool *stop)
Definition: GenericTask.h:403