Main Page | Modules | Namespace List | Class Hierarchy | Alphabetical List | Class List | Directories | File List | Namespace Members | Class Members | File Members | Related Pages

ReadDispatcherModule.cxx

Go to the documentation of this file.
00001 
00002 // 
00003 // ReadDispatcherModule
00004 //
00005 // This module largely duplicates the functionality 
00006 // doing dispatcher reads in the standard Input module.
00007 //
00008 // The point of this module is to allow read-aheads of live
00009 // data so you can check if there is new data before calling
00010 // JobCPath::Next().  This is good for viewers.
00011 // 
00012 //
00013 // n.tagg1@physics.ox.ac.uk
00015 #include "TriD/stat/ReadDispatcherModule.h"
00016 #include "MessageService/MsgService.h"
00017 #include "MinosObjectMap/MomNavigator.h"
00018 #include "JobControl/JobCModuleRegistry.h" // For JOBMODULE macro
00019 
00020 #include "TIterator.h"
00021 #include "TROOT.h"
00022 #include "TSystem.h"
00023 #include "RawData/RawRecord.h"
00024 #include "Persistency/Per.h"
00025 #include "Dispatcher/DDS.h"
00026 #include "Dispatcher/DDSClient.h"
00027 #include "Dispatcher/DDSSubscription.h"
00028 #include "DataUtil/DumpMom.h"
00029 #include "RawData/RawRecord.h"
00030 #include "RawData/RawDaqSnarlHeader.h"
00031 #include "RawData/RawDigitDataBlock.h"
00032 #include "RawData/RawBlockRegistry.h"
00033 #include "RawData/RawBlockId.h"
00034 #include "RawData/RawDigit.h"
00035 
00036 
00037 JOBMODULE(ReadDispatcherModule, "ReadDispatcherModule",
00038           "ReadDispatcherModule");
00039 CVSID("$Id: ReadDispatcherModule.cxx,v 1.3 2007/03/01 16:59:43 rhatcher Exp $");
00040 //......................................................................
00041 
00042 ReadDispatcherModule::ReadDispatcherModule()
00043 {
00044   fClient = 0;
00045   fConnected = false;
00046   fSurrogateMom = 0;
00047 }
00048 
00049 
00050 
00051 //......................................................................
00052 ReadDispatcherModule::~ReadDispatcherModule()
00053 {
00054   if(fClient) delete fClient;
00055 }
00056 
00057 
00058 Int_t ReadDispatcherModule::ConnectToServer( void )
00059 {
00060   // Connect.
00061 
00062   if(fClient) delete fClient;
00063   fConnected = false;
00064 
00065   // Create a new DDSClient object connected to ther server running on
00066   // node "serverhostname" and port 9090.
00067   fClient = new DDSClient(fDDSServer.c_str(),fDDSPort);
00068   
00069   // Check validity of connected socket before using it
00070   if (! fClient -> IsValid() ) {
00071     MSG("ReadDisp",Msg::kInfo) << "Error in creation of socket connected to server." << endl;
00072     delete fClient; fClient = 0;
00073     return 1;  // end of session
00074   }
00075   else {
00076     MSG("ReadDisp",Msg::kInfo) << "Successfully connected to dispatcher server.\n" 
00077        << fClient << endl;
00078   }
00079     
00080   // Subscribe to data source of interest (kDaq or kDcs, default is kDaq).
00081   fClient->GetSubscription()
00082     ->SetDataSource((DDS::EDataSource)DDS::GetDataSourceCode(fDDSDataSource.c_str()));
00083   
00084   // Subscribe to streams of interest
00085   fClient->GetSubscription()
00086     ->SetStreams(fStreams.c_str());
00087   // Keep up mode.
00088   fClient->GetSubscription()->
00089     SetKeepUpMode((DDS::EKeepUpMode)DDS::GetKeepUpCode(fDDSKeepUpMode.c_str()));
00090   
00091   // Selection, assumes DaqSnarl
00092   fClient->GetSubscription()->
00093     SetSelection(fStreams.c_str(),fSelectRule.c_str());
00094   
00095   // Submit the subscription to the ddschildserver and check to make sure that
00096   // it was received okay
00097   DDS::EMessageType msgrc = fClient->Subscribe();
00098   if (msgrc != DDS::kOk) {
00099     MSG("ReadDisp",Msg::kInfo) << "An error message " << DDS::AsString(msgrc) 
00100        << " was received from DDS::Subscribe." << endl;    
00101     return 1;
00102   }
00103 
00104   fConnected = true;
00105   return 0;
00106 }
00107 
00108 Bool_t ReadDispatcherModule::IsNewEventReady( void ) 
00109 {
00110   // Query the server and get a new mom if available.
00111   // Hold this mom until the next read cycle.
00112   // If we are already holding a mom, throw it out
00113   // in favour of a new one.
00114 
00115   if(!fConnected) ConnectToServer();
00116 
00117   if(!fConnected) return false;
00118 
00119   DDS::EMessageType msgrc = DDS::kOk;
00120 
00121   MomNavigator* newMom = 0;
00122   msgrc = fClient->Next(newMom,fDDSTimeOut);
00123   if (msgrc == DDS::kOk ) {
00124     //cout << "Got Next reply." << endl;
00125 
00126     if( newMom ) {
00127       // Got something!
00128       //cout << "Got a mom." << endl;
00129 
00130       if(newMom->GetFragmentArray()->IsEmpty()){
00131         cout << "Empty mom." << endl;
00132         delete newMom;
00133       } else {
00134         //cout << "Good mom." << endl;
00135         // Got a real, filled mom.
00136         // Do we already have something in the queue?
00137         // If so, get rid of it.
00138         if( fSurrogateMom ) delete fSurrogateMom;
00139         fSurrogateMom = newMom;
00140         
00141         if(MsgService::Instance()->GetStream("TriD")->IsActive(Msg::kDebug))
00142           DataUtil::dump_mom(newMom,std::cout);
00143       
00144         return true;
00145       }
00146     }
00147     
00148   } else {
00149     if(msgrc != DDS::kTimeoutNewRecord) {
00150       // Failed to get something from server.
00151       MSG("ReadDisp",Msg::kInfo) << "Dispatcher Error " 
00152          << DDS::AsString(msgrc)
00153          << "  Will try reconnect." 
00154          << endl;
00155       
00156       fClient->Shutdown();
00157       delete fClient;
00158       fClient = 0;
00159       fConnected = false;
00160     }
00161   }
00162  
00163   if(fSurrogateMom) return true;
00164   return false;
00165 }
00166 
00167 
00168 //......................................................................
00169 JobCResult ReadDispatcherModule::Get()
00170 {
00171   //
00172   // Get next event, default fMom
00173   //
00174   
00175   // If this were to be a JobCInputModule...
00176   //return Get(GetMom());
00177   return JobCResult::kAOK;
00178 }
00179 
00180 //......................................................................
00181 JobCResult ReadDispatcherModule::Get(MomNavigator* mom)
00182 {
00183   //
00184   // Get next event.
00185   //
00186   // Queries dispatcher once before failing.
00187   // Uses a previously cached-but-unused event if available.
00188   //
00189   if(! fSurrogateMom) {
00190     // Dont have one ready.. go look again.
00191     IsNewEventReady();
00192   }
00193 
00194   if(! fSurrogateMom) return JobCResult::kFailed; // Give up.
00195 
00196 
00197   // Copy the surrogate.
00198   // This bit of code ripped out of DDSClient mercilessly.
00199   mom->Clear();
00200   TObjArray* objArray =const_cast<TObjArray*>
00201     (fSurrogateMom -> GetFragmentArray());
00202   Int_t nrecord = objArray -> GetEntries();
00203   for ( Int_t irec=0; irec < nrecord; irec++ ) {
00204     //record is removed fr  om fSurrogateMom to pass to usr's mom for ownership
00205     RecMinos* record = dynamic_cast<RecMinos*>(objArray->RemoveAt(irec));
00206     mom -> AdoptFragment(record); // new owner of record
00207   }
00208   delete fSurrogateMom; fSurrogateMom=0; // clears array and deletes temptags
00209 
00210 
00211   return JobCResult::kPassed;
00212 }
00213 
00214 //......................................................................
00215 
00216 const Registry& ReadDispatcherModule::DefaultConfig() const
00217 {
00218 //======================================================================
00219 // Supply the default configuration for the module
00220 //======================================================================
00221   static Registry r; // Default configuration for module
00222 
00223   // Set name of config
00224   std::string name = this->GetName();
00225   name += ".config.default";
00226   r.SetName(name.c_str());
00227 
00228   // Set values in configuration
00229   r.UnLockValues();
00230   r.Set("DDSServer",      "localhost");
00231   r.Set("DDSPort",         9090);
00232   r.Set("Streams",        "DaqSnarl");
00233   r.Set("DDSKeepUpMode",  "RecordKeepUp");
00234   r.Set("DDSDataSource",  "Daq");
00235   r.Set("DDSTimeOut",      1);
00236   r.Set("SelectRule",     "true");
00237   r.LockValues();
00238 
00239   return r;
00240 }
00241 
00242 //......................................................................
00243 
00244 void ReadDispatcherModule::Config(const Registry& r)
00245 {
00246 //======================================================================
00247 // Configure the module given the Registry r
00248 //======================================================================
00249   int    tmpi;
00250   const char* tmps;
00251 
00252   if (r.Get("DDSServer",tmps))       { fDDSServer = tmps; }
00253   if (r.Get("DDSPort",tmpi))         { fDDSPort   = tmpi; }
00254   if (r.Get("Streams",tmps))         { fStreams = tmps; }
00255   if (r.Get("DDSKeepUpMode",tmps))   { fDDSKeepUpMode = tmps; }
00256   if (r.Get("DDSDataSource",tmps))   { fDDSDataSource = tmps; }
00257   if (r.Get("DDSTimeOut",tmpi))      { fDDSTimeOut = tmpi; }
00258   if (r.Get("SelectRule",tmps))      { fSelectRule = tmps; }
00259 }
00260 

Generated on Fri Mar 28 15:38:59 2008 for loon by  doxygen 1.3.9.1