13 #include <sys/socket.h>
15 #include <sys/types.h>
17 #include "gcp/util/common/Command.h"
18 #include "gcp/util/common/Debug.h"
19 #include "gcp/util/common/IoLock.h"
20 #include "gcp/util/common/Exception.h"
21 #include "gcp/util/common/FdSet.h"
22 #include "gcp/util/common/LogStream.h"
23 #include "gcp/util/common/PipeQ.h"
24 #include "gcp/util/common/Thread.h"
170 void raise(std::string name,
int sigNo);
221 virtual void run(
void);
273 std::vector<Command*> commands_;
275 struct timeval* timeOut_;
296 fdSet_.zeroReadFdSet();
297 fdSet_.registerReadFd(msgq_.fd());
311 fdSet_.zeroReadFdSet();
312 fdSet_.registerReadFd(msgq_.fd());
325 thread_->setRunState(
false);
327 DBPRINT(
false, Debug::DEBUG7,
"About to cancel threads");
337 for(
unsigned ithread=0; ithread < threads_.size(); ithread++)
338 if(threads_[ithread] != 0)
339 delete threads_[ithread];
342 for(
unsigned iCommand=0; iCommand < commands_.size(); iCommand++)
343 if(commands_[iCommand] != 0)
344 delete commands_[iCommand];
376 ThrowError(
"Received NULL file descriptor");
381 while(!stop && (nready=select(fdSet_.size(), fdSet_.readFdSet(),
382 NULL, NULL, NULL)) > 0) {
389 DBPRINT(
true, Debug::DEBUG2,
"About to call processTaskMsg: "
390 <<
"nready = " << nready);
393 processTaskMsg(&stop);
409 DBPRINT(
true, Debug::DEBUG2,
"Inside processTaskMsg: "
410 << msg.genericMsgType_);
412 switch (msg.genericMsgType_) {
414 respondToHeartBeat();
445 thread_->setRunState(
true);
455 msg.genericMsgType_ = Msg::RESTART;
471 msg.genericMsgType_ = Msg::STOP;
487 msg.genericMsgType_ = Msg::HEARTBEAT;
507 DBPRINT(
true, Debug::DEBUG7,
"Sending a message " << pthread_self());
509 DBPRINT(
true, Debug::DEBUG7,
"Sending a message: done " << pthread_self());
511 COUT(
"Caught an exception in task instantiated by thread: " << thread_->name());
526 DBPRINT(
true, Debug::DEBUG2,
"Inside fwdTaskMsg");
531 COUT(
"Caught an exception in task instantiated by thread: " << thread_->name());
547 while(threadsNeedStarting()) {
548 unsigned order = getMinStartOrder();
549 startThread(arg, order);
561 for(
int ithread=0; ithread < threads_.size(); ithread++)
562 if(!threads_[ithread]->isRunning() && threads_[ithread]->startOrder()==order) {
563 threads_[ithread]->start(arg);
567 if(threads_[ithread]->wasError_) {
569 errStr.appendMessage(
true,
"Error in thread startup function\n");
581 for(
int ithread=0; ithread < threads_.size(); ithread++)
582 if(!threads_[ithread]->isRunning())
593 for(
int ithread=0; ithread < threads_.size(); ithread++)
594 if(threads_[ithread]->isRunning())
608 for(
int ithread=0; ithread < threads_.size(); ithread++) {
609 if(!threads_[ithread]->isRunning())
612 minOrder = threads_[ithread]->startOrder();
615 minOrder = threads_[ithread]->startOrder() < minOrder ?
616 threads_[ithread]->startOrder() : minOrder;
633 for(
int ithread=0; ithread < threads_.size(); ithread++) {
634 if(threads_[ithread]->isRunning())
637 minOrder = threads_[ithread]->cancelOrder();
640 minOrder = threads_[ithread]->cancelOrder() < minOrder ?
641 threads_[ithread]->cancelOrder() : minOrder;
662 DBPRINT(
true, Debug::DEBUG7,
"Managing " << threads_.size()
665 while(threadsNeedCancelling()) {
666 unsigned order = getMinCancelOrder();
670 DBPRINT(
true, Debug::DEBUG7,
"Leaving cancelThreads");
686 for(
int ithread=threads_.size()-1; ithread >=0; ithread--)
687 if(threads_[ithread]->isRunning() && threads_[ithread]->cancelOrder() == order) {
688 threads_[ithread]->cancel();
698 for(
int ithread=0; ithread < threads_.size(); ithread++)
699 if(threads_[ithread]->isPingable()) {
700 threads_[ithread]->setRunState(
false);
701 threads_[ithread]->ping(arg);
714 for(
int ithread=0; ithread < threads_.size(); ithread++) {
715 if(threads_[ithread]->matchName(name))
716 return threads_[ithread];
719 ThrowError(
"No matching thread found");
728 for(
int ithread=0; ithread < threads_.size(); ithread++) {
729 if(threads_[ithread]->matchName(name)) {
730 threads_[ithread]->raise(sigNo);
735 ReportError(
"No matching thread found");
744 for(
int ithread=0; ithread < threads_.size(); ithread++) {
745 if(!threads_[ithread]->isRunning()) {
746 ReportError(
"Thread "
747 << threads_[ithread]->strName()
748 <<
" is not running!");
818 for(
unsigned iCommand=0; iCommand < commands_.size(); iCommand++) {
823 if(commands_[iCommand]->active()) {
824 commands_[iCommand]->executeNextInstruction(commandTimeOut_, first);
825 timeOut_ = commandTimeOut_.timeVal();
837 #endif // End #ifndef
virtual void addHandler(Msg *msg)
Definition: GenericTask.h:786
void sendRestartMsg()
Definition: GenericTask.h:452
virtual void respondToHeartBeat()
Definition: GenericTask.h:442
void sendStopMsg()
Definition: GenericTask.h:468
std::vector< Thread * > threads_
Definition: GenericTask.h:110
bool threadsNeedStarting()
Definition: GenericTask.h:579
virtual void run(void)
Definition: GenericTask.h:358
bool threadsAreRunning()
Definition: GenericTask.h:742
unsigned getMinCancelOrder()
Definition: GenericTask.h:628
void sendTaskMsg(Msg *msg)
Definition: GenericTask.h:501
GenericTask()
Definition: GenericTask.h:289
void startThreads(void *arg)
Definition: GenericTask.h:543
PipeQ< Msg > msgq_
Definition: GenericTask.h:188
virtual void fwdTaskMsg(Msg *msg)
Definition: GenericTask.h:524
void cancelThreads()
Definition: GenericTask.h:658
gcp::util::FdSet fdSet_
Definition: GenericTask.h:193
void pingThreads(void *arg)
Definition: GenericTask.h:696
Thread * getThread(std::string name)
Definition: GenericTask.h:712
virtual void processMsg(Msg *msg)
Definition: GenericTask.h:435
Definition: LogStream.h:21
virtual void enableTimer(Msg *msg)
Definition: GenericTask.h:780
void cancelThread(unsigned order)
Definition: GenericTask.h:682
bool threadsNeedCancelling()
Definition: GenericTask.h:591
virtual void serviceMsgQ(void)
Definition: GenericTask.h:370
virtual void installSignal(Msg *msg)
Definition: GenericTask.h:774
unsigned getMinStartOrder()
Definition: GenericTask.h:603
void stepCommands()
Definition: GenericTask.h:808
void startThread(void *arg, unsigned order)
Definition: GenericTask.h:557
void raise(std::string name, int sigNo)
Definition: GenericTask.h:726
virtual void restart(void)
Definition: GenericTask.h:351
void shutdownConnection(int fd)
Definition: GenericTask.h:794
void sendHeartBeatMsg()
Definition: GenericTask.h:484
Definition: Exception.h:30
Thread * thread_
Definition: GenericTask.h:105
virtual void installTimer(Msg *msg)
Definition: GenericTask.h:768
virtual ~GenericTask()
Definition: GenericTask.h:319
Definition: GenericTask.h:33
virtual void processTaskMsg(bool *stop)
Definition: GenericTask.h:403