Main Page | Modules | Namespace List | Class Hierarchy | Alphabetical List | Class List | Directories | File List | Namespace Members | Class Members | File Members | Related Pages

IoInputModule.cxx

Go to the documentation of this file.
00001 
00002 // $Id: IoInputModule.cxx,v 1.85 2008/02/21 15:37:15 kordosky Exp $
00003 //
00004 // Job control interface to input data streams
00005 //
00006 // messier@huhepl.harvard.edu
00008 #include "TSystem.h"
00009 #include "TRegexp.h"
00010 
00011 #include "Dispatcher/DDS.h"
00012 #include "IoModules/IoInputModule.h"
00013 #include <cassert>
00014 #include "MessageService/MsgService.h"
00015 #include "MinosObjectMap/MomNavigator.h"
00016 #include "IoModules/IoDataStreamItr.h"
00017 #include "IoModules/IoDDSStreamItr.h"
00018 #include "IoModules/IoDataStreamFactory.h"
00019 #include "JobControl/JobCInputModule.h"
00020 #include "JobControl/JobCModuleRegistry.h"
00021 #include "JobControl/JobCEnv.h"
00022 #include "RawData/RawRecord.h"
00023 #include "RawData/RawDaqSnarlHeader.h"
00024 #include "CandData/CandHeader.h"
00025 #include "Record/RecRecord.h"
00026 #include "Record/RecPhysicsHeader.h"
00027 #include "Registry/Registry.h"
00028 #include "Validity/VldContext.h"
00029 #include "Validity/VldTimeStamp.h"
00030 #include "Util/UtilString.h"
00031 #include "TSystem.h"
00032 #include "TRandom3.h"
00033 
00034 #include <algorithm>
00035 #include <cstring>
00036 #include <string>
00037 #include <map>
00038 
00039 #ifdef SITE_HAS_SAM
00040 #include "sam_cpp_api/SamConsumer.hpp"
00041 #include "sam_cpp_api/SamLocate.hpp"
00042 #endif
00043 
00044 CVSID("$Id: IoInputModule.cxx,v 1.85 2008/02/21 15:37:15 kordosky Exp $");
00045 JOBMODULE(IoInputModule,"INPUT","Read and configure input streams");
00046 
00047 typedef std::map<std::string,std::string>::const_iterator mapStrStrItr_t;
00048 
00049 // A result that says "AOK"
00050 static const JobCResult gsAllClear = JobCResult::kAOK;
00051 
00052 // Many of the input methods are recursive. Ie. try to load an event,
00053 // if that fails, try the next file and try again, etc. To prevent
00054 // initializations from being done twice we need a way to keep track
00055 // of the recursion level. This little class helps with that.
00056 class CallDepth {
00057 public:
00058   CallDepth()  { ++fsDepth; }
00059   ~CallDepth() { --fsDepth; }
00060   static int fsDepth;
00061 };
00062 int CallDepth::fsDepth = 0;
00063 
00064 //......................................................................
00065 
00066 IoInputModule::IoInputModule() : 
00067   fDataStreamItr(0),
00068   fFormat(""),
00069   fStreamList(""),
00070   fServer(""),
00071   fPort(0),
00072   fTimeOut(0),
00073   fDataSource(0),
00074   fKeepUpMode(0),
00075   fMaxSyncDelay(0),
00076   fOffLine(false),
00077   fMaxRetry(0),
00078   fRetryDelay(1),
00079   fClientType(DDS::kUnknownClientType),
00080   fClientName(""),
00081   fStatus(JobCResult::kAOK),
00082   fLastRun(-1),
00083   fLastSnarl(-1),
00084   fCurrentRun(-1),
00085   fCurrentSnarl(-1),
00086   fLoadedCommandLineFiles(false)
00087 #ifdef SITE_HAS_SAM
00088   ,fsamProject(0)
00089 #endif
00090 { fStopwatch.Reset(); fStopwatch.Stop(); }
00091 
00092 //......................................................................
00093 
00094 IoInputModule::~IoInputModule() 
00095 { 
00096   if ( fDataStreamItr ) { delete fDataStreamItr; fDataStreamItr = 0; }
00097 }
00098 
00099 //......................................................................
00100 
00101 void IoInputModule::BeginJob() 
00102 {
00103   this->LoadFilesFromCommandLine();
00104   // Delay opening files until first action (Next,Prev,etc.) call
00105 }
00106 
00107 
00108 //......................................................................
00109 
00110 void IoInputModule::EndJob()
00111 {
00112   fStopwatch.Stop();
00113   MSG("Io",Msg::kDebug) << "IoInputModule::EndJob, Time(sec), Real " 
00114         << fStopwatch.RealTime() << ", CPU "
00115         << fStopwatch.CpuTime() << endl;
00116   
00117 }
00118 
00119 //......................................................................
00120 
00121 const Registry& IoInputModule::DefaultConfig() const 
00122 {
00123 //======================================================================
00124 // Get the default configuration for this module
00125 //======================================================================
00126   static Registry r;
00127   r.SetName("INPUT.config");
00128 
00129   r.UnLockValues();
00130 
00131   MSG("Io",Msg::kDebug) << "Loading default config\n";
00132 
00133   // Stream config
00134   r.Set("Format" ,"input");
00135   r.Set("Streams","DaqMonitor,DaqSnarl,LightInjection");
00136   // r.Set("Streams","DaqMonitor,DaqSnarl,LightInjection,DcsMonitor,DcsAlarm");
00137   
00138   // DDS config
00139   r.Set("DDSServer", "daqdds.minos-soudan.org");
00140   r.Set("DDSPort",    DDS::kPort);
00141   r.Set("DDSTimeOut",  120);
00142   r.Set("DDSDataSource","Daq");
00143   r.Set("DDSKeepUpMode", "FileKeepUp");
00144   r.Set("DDSMaxSyncDelay",15);
00145   r.Set("DDSOffLine",false);
00146   r.Set("DDSMaxRetry",0);
00147   r.Set("DDSRetryDelay",1);
00148   r.Set("DDSClientType","Unknown");
00149   r.Set("DDSClientName","");
00150 
00151 #ifdef SITE_HAS_SAM
00152 
00153   // SAM config
00154   r.Set("Station","minos");
00155   r.Set("SnapShotVers",0);
00156   r.Set("WorkGroupName","minos");
00157   r.Set("ApplicationName","loon");
00158   r.Set("ApplicationVers","dev");
00159   r.Set("MaxNumberOfFiles",0);
00160   r.Set("StartNewProject",1);
00161 
00162   // Create default project name
00163   // Get $USER
00164   const char* username = gSystem->Getenv("USER");
00165   if (!username) username = "unknown";
00166   r.Set("ProjectName",username);
00167 
00168 #endif
00169 
00170   r.LockValues();
00171   return r;
00172 }
00173 
00174 //......................................................................
00175 
00176 void IoInputModule::Config(const Registry& r) 
00177 {
00178 //======================================================================
00179 // Configure the module based on the contents of the registry r
00180 //======================================================================
00181   const char* tmps;
00182   int         tmpi;
00183   int         tmpb;  // bools
00184 
00185   MSG("Io",Msg::kDebug) << "Config IoInputModule with r=" << r << "\n";
00186 
00187   // Input data stream configuration
00188   bool doFormatConfig = false;
00189   if (r.Get("Format", tmps)) { fFormat     = tmps; doFormatConfig = true; }
00190   if (doFormatConfig) this->UpdateFormatConfig();
00191   bool doStreamConfig = false;
00192   if (r.Get("Streams",tmps)) { fStreamList = tmps; doStreamConfig = true; }
00193   if (doStreamConfig) this->UpdateStreamConfig();
00194     
00195   // DDS options
00196   bool doDDSConfig = false;
00197   if (r.Get("DDSServer", tmps)) { fServer  = tmps; doDDSConfig = true; }
00198   if (r.Get("DDSPort",   tmpi)) { fPort    = tmpi; doDDSConfig = true; }
00199   if (r.Get("DDSTimeOut",tmpi)) { fTimeOut = tmpi; doDDSConfig = true; }
00200   if (r.Get("DDSClientType",tmps)) { fClientType = DDS::GetClientType(tmps); 
00201                                      doDDSConfig = true; }
00202   if (r.Get("DDSClientName",tmps)) { fClientName = tmps; 
00203                                      doDDSConfig = true; }
00204   if (r.Get("DDSDataSource",tmps)){fDataSource = DDS::GetDataSourceCode(tmps);
00205                                    doDDSConfig = true;}
00206   if (r.Get("DDSKeepUpMode",tmps)) { fKeepUpMode = DDS::GetKeepUpCode(tmps); 
00207                                      doDDSConfig = true; }
00208   if (r.Get("DDSMaxSyncDelay",tmpi)){fMaxSyncDelay = tmpi; doDDSConfig = true;}
00209   if (r.Get("DDSOffLine",tmpb)) {fOffLine = tmpb; doDDSConfig = true;}
00210   if (r.Get("DDSMaxRetry",tmpi)) {fMaxRetry = tmpi; doDDSConfig = true; }
00211   if (r.Get("DDSRetryDelay",tmpi)) {fRetryDelay = tmpi; doDDSConfig = true; }
00212   if (doDDSConfig) this->UpdateDDSConfig();
00213 
00214 
00215 }
00216 
00217 //......................................................................  
00218 
00219 JobCResult IoInputModule::Get()
00220 {
00221 //======================================================================
00222 // Load the data records at the current position in the input stream
00223 //======================================================================
00224 
00225 
00226   if ( fDataStreamItr==0 ) {
00227     if ( this->OpenStreamItr()==0 ) {
00228       MSG("Io",Msg::kWarning) << "Failed to open input stream itr." << endl;
00229       return fStatus;
00230     }
00231     return this->Get();
00232   }
00233 
00234   MSG("Io",Msg::kVerbose) << "IoInputModule::Get " << endl;
00235 
00236   fStopwatch.Start(false);
00237   MomNavigator* mom = this->GetMom();
00238   assert(mom);
00239   mom -> Clear(); // Moving on so clear contents of Mom
00240 
00241   int nrecord = fDataStreamItr->LoadRecords(mom);
00242   bool isDDS = (UtilString::ToUpper(fFormat) == "DDS");
00243   // special treatment required because dds doesn't separate advance from load
00244   if ( isDDS && !nrecord ) fStatus.SetEndOfInputStream();
00245 
00246   this->ReadHeader(); // sets beginrun/endrun beginfile/endfile fStatus bits
00247   if ( fStatus.EndOfInputStream() ) 
00248      { fStatus.SetEndFile(); fStatus.SetEndRun(); }
00249   MSG("Io",Msg::kVerbose) 
00250    << "IoInputModule::Get returning status " << fStatus << endl;
00251   fStopwatch.Stop();
00252   return fStatus;
00253 }
00254 
00255 //......................................................................
00256 
00257 JobCResult IoInputModule::Next(int n)
00258 {
00259 //======================================================================
00260 // Advance the position in the stream n record sets. Load the records
00261 // at the last position
00262 //======================================================================
00263   CallDepth d; // Keep track of the call depth
00264   
00265   // Set the input status to "all clear" since advancing
00266   if (d.fsDepth==1) fStatus = gsAllClear;
00267 
00268   MSG("Io",Msg::kVerbose) << "IoInputModule::Next " << n << endl;
00269 
00270   if ( fDataStreamItr==0 ) {
00271     if (this->OpenStreamItr()==0) {
00272       MSG("Io",Msg::kWarning) << "Failed to open input stream itr." << endl;
00273       return fStatus;
00274     }
00275     return this->Next(n);
00276   }
00277 
00278   fStopwatch.Start(false);
00279   MomNavigator* mom = this->GetMom();
00280   assert(mom);
00281   mom -> Clear(); // Moving on so clear contents of Mom
00282 
00283   // Advance the position in the input stream until we run out of
00284   // records and files
00285   int nstep = 0;
00286   int ndone = 0;
00287   int ntry  = 0;
00288   while ( ndone < n ) {
00289     ntry  = n - ndone;
00290     nstep = fDataStreamItr->Increment(ntry);
00291 
00292     if ( nstep < ntry ) {
00293       // Reached end of file, load next one
00294       fStatus |= this->NextFile(); 
00295       
00296       // If this is the end of the input stream, we're done.
00297       if ( fStatus.EndOfInputStream() ) {
00298         fStopwatch.Stop();
00299         return this->Get();
00300       }
00301     }
00302     ndone += nstep;
00303   }
00304 
00305   // Load the current event
00306   fStopwatch.Stop();
00307   return this->Get();
00308 }
00309 
00310 //......................................................................
00311 
00312 JobCResult IoInputModule::Prev(int n) 
00313 {
00314 //======================================================================
00315 // Back up n positions in the input data stream. Load the records at
00316 // the current position
00317 //======================================================================
00318   CallDepth d;
00319 
00320   // Set the input status to "all clear"
00321   if (d.fsDepth == 1) fStatus = gsAllClear;
00322 
00323   MSG("Io",Msg::kVerbose) << "IoInputModule::Prev " << n << endl;
00324   
00325   if (fDataStreamItr==0) {
00326     if (this->OpenStreamItr()==0) {
00327       MSG("Io",Msg::kWarning) << "Failed to open input stream itr." << endl;
00328       return fStatus;
00329     }
00330     return this->Prev(n);
00331   }
00332 
00333   // Back up the the position in the input stream until we run out of
00334   // records and files
00335   fStopwatch.Start(false);
00336   MomNavigator* mom = this->GetMom();
00337   assert(mom);
00338   mom -> Clear(); // Moving on so clear contents of Mom
00339 
00340   int nstep = 0;
00341   int ndone = 0;
00342   int ntry  = 0;
00343   while (ndone < n) {
00344     ntry = n - ndone;
00345     nstep = fDataStreamItr->Decrement(ntry);
00346 
00347     if (nstep < ntry) {
00348       // Reached start of file, load previous file.
00349       fStatus |= this->PrevFile();
00350 
00351       // If there is no previous file, then we're done.
00352       if ( fStatus.BeginOfInputStream() || fStatus.EndOfInputStream() ) {
00353         return this->Get(); // may be end if failed to open stream itr
00354         fStopwatch.Stop();
00355       }
00356 
00357       // Move the position to the end of the current file so we can
00358       // walk backwards over it
00359       fDataStreamItr->GoToEOF();
00360     }
00361     ndone += nstep;
00362   }
00363 
00364   // Load the current event
00365   fStopwatch.Stop();
00366   return this->Get();
00367 
00368 }
00369 
00370 //......................................................................
00371 
00372 JobCResult IoInputModule::GoTo(int run, int snarl, int searchDir) 
00373 {
00374   CallDepth d;
00375   
00376   if (d.fsDepth==1) fStatus = gsAllClear;
00377 
00378   if (fDataStreamItr==0) {
00379     if (this->OpenStreamItr()==0) {
00380       MSG("Io",Msg::kWarning) << "Failed to open input stream itr." << endl;
00381       return fStatus;
00382     }
00383     return this->GoTo(run,snarl,searchDir);
00384   }
00385 
00386   if ( run == fCurrentRun && snarl == fCurrentSnarl ) return this -> Get();
00387 
00388   int dir = searchDir;
00389   if (dir==0) {
00390     if      (run>fLastRun) { dir =  1; }
00391     else if (run<fLastRun || (run==fLastRun && fCurrentRun < 0)) { dir = -1; }
00392     else {
00393       if    (snarl>fLastSnarl) { dir =  1; }
00394       else                     { dir = -1; }
00395     }
00396   }
00397 
00398   // Move position in the stream looking for run/event number
00399   while ( 1 ) {
00400     if ( dir > 0 ) {
00401       this->Next();
00402       if (fCurrentRun>run) {
00403         MSG("Io",Msg::kWarning) << 
00404           "Went to run "<<fCurrentRun<<
00405           " without finding run="<<run<<" snarl="<<snarl<<"\n";
00406         return fStatus;
00407       }
00408       if (fCurrentRun==run && fCurrentSnarl>snarl) {
00409         MSG("Io",Msg::kWarning) << 
00410           "Went to run="<<fCurrentRun<<" snarl="<<fCurrentSnarl<<
00411           " without finding run="<<run<<" snarl="<<snarl<<"\n";
00412         return fStatus;
00413       }
00414     }
00415     if ( dir <0 ) {
00416       this->Prev();
00417       if (fCurrentRun<run) {
00418         MSG("Io",Msg::kWarning) << 
00419           "Went to run "<<fCurrentRun<<
00420           " without finding run="<<run<<" snarl="<<snarl<<"\n";
00421         return fStatus;
00422       }
00423       if (fCurrentRun==run && fCurrentSnarl<snarl) {
00424         MSG("Io",Msg::kWarning) << 
00425           "Went to run="<<fCurrentRun<<" snarl="<<fCurrentSnarl<<
00426           " without finding run="<<run<<" snarl="<<snarl<<"\n";
00427         return fStatus;
00428       }
00429     }
00430     // Check if we're done
00431     if (fCurrentRun == run && fCurrentSnarl == snarl) return fStatus;
00432     if (dir>0 && fStatus.EndOfInputStream())          return fStatus;
00433     if (dir<0 && fStatus.BeginOfInputStream())        return fStatus;
00434   }
00435   return fStatus;
00436 }
00437 
00438 //......................................................................
00439 
00440 JobCResult IoInputModule::GoTo(const VldContext& vld)
00441 {
00442 //======================================================================
00443 // Go to records that match validity context. If vld is not found, will
00444 // GoTo record set one beyond requested validity.
00445 //======================================================================
00446   CallDepth d;
00447 
00448   // Set the input status to "all clear"
00449   if (d.fsDepth==1) fStatus = gsAllClear;
00450 
00451   if (fDataStreamItr==0) {
00452     if (this->OpenStreamItr()==0) {
00453       MSG("Io",Msg::kWarning) << "Failed to open input stream itr." << endl;
00454       return fStatus;
00455     }
00456     return this->GoTo(vld);
00457   }
00458 
00459   fStatus |= fDataStreamItr -> GoTo(vld);
00460 
00461   // Load the current event
00462   return this->Get();
00463 
00464 }
00465 
00466 //......................................................................
00467 
00468 void IoInputModule::List(const char* streamlist) const
00469 {
00470 //======================================================================
00471 // Print list of files loaded
00472 //======================================================================
00473 
00474   MsgStream& m = MSGSTREAM("Io",Msg::kInfo);
00475 
00476   m << "IoInputModule using data format " << fFormat << endl;
00477 
00478   if ( fDataStreamItr ) {
00479     fDataStreamItr -> ListFile(std::cout,streamlist);
00480     return;
00481   }
00482 
00483   // data stream not yet open, list files from fFileList
00484   m << "File Name\tStream List " << endl;
00485   m << "=========\t=========== " << endl;
00486   std::list<IoFileListItem>::const_iterator itr = fFileList.begin();
00487   std::list<IoFileListItem>::const_iterator itrEnd = fFileList.end();
00488 
00489   for ( ; itr != itrEnd; itr++ ) {
00490     m << *itr;
00491   }
00492  
00493 }
00494 
00495 //......................................................................
00496 
00497 void IoInputModule::AddFile(const char *filepath, const char* streamlist, 
00498                                                                  int at) {
00499 //======================================================================
00500 // Add to the list of attached streams at the position "at". -1 = end
00501 // of list
00502 //======================================================================
00503 
00504 
00505   // Find out by checking the format of the filepath whether this
00506   // is a SAM job. Format will be SAM:samdataset of SAM_FILE::file
00507 
00508   const char *s1 = "SAM";
00509   if ( strstr(filepath,s1) != NULL ) {
00510 
00511 #ifdef SITE_HAS_SAM
00512 
00513   // SAM options
00514 
00515     const char* tmps;
00516     int tmpi;
00517 
00518     Registry& r = GetConfig();
00519     if (r.Get("Station",tmps)) {fStation = tmps;}
00520     if (r.Get("SnapShotVers",tmpi)){fSnapShotVers = tmpi;}
00521     if (r.Get("WorkGroupName",tmps)) {fWorkGroupName = tmps;}
00522     if (r.Get("ApplicationName",tmps)) {fApplicationName = tmps;}
00523     if (r.Get("ApplicationVers",tmps)) {fApplicationVers = tmps;}
00524     if (r.Get("ProjectName",tmps)) {fProjectName = tmps;}
00525     if (r.Get("MaxNumberOfFiles",tmpi)) {fMaxNumberOfFiles = tmpi;}
00526     if (r.Get("StartNewProject",tmpi)) {fStartNewProject = tmpi;}
00527 
00528     // Decide whether we have a dataset or a single file
00529     // dataset will be SAM, file will be SAM_FILE
00530     // Get the samdataset or file from filepath
00531 
00532     std::string temp = filepath;
00533     size_t pos = temp.find(":")+1;
00534     std::string sam_access_type = temp.substr(0,pos-1);
00535     if (sam_access_type == "SAM") {
00536       std::string samdataset = temp.substr(pos,temp.length()-pos);
00537 
00538 
00539       std::string projectname;
00540 
00541       // If this is a new project then append time-stamp
00542       if (fStartNewProject == 1) {
00543         // Construct full project name including timestamp
00544         // Get timestamp as a string
00545         VldTimeStamp ts;
00546         std::string timestamp = ts.AsString("lc");
00547         // Replace blank with "-"
00548         size_t pos = timestamp.find(" ");
00549         timestamp.replace(pos,1,"-");
00550         // Replace : with - as DbServer does not like : in project names
00551         pos = timestamp.find(":");
00552         while ( pos != string::npos ) {
00553           timestamp.replace(pos,1,"-");
00554           pos = timestamp.find(":",pos+1);
00555         }
00556         // Append timestamp to fProjectName taken from registry
00557         fProjectName.append("-");
00558         projectname = fProjectName+timestamp;
00559       }
00560       // Otherwise use project name that is supplied
00561       else if(fStartNewProject == 0) {
00562         projectname = fProjectName;
00563       }
00564       
00565       
00566       MSG("Io",Msg::kDebug) << "Sam Station " << fStation << " Snap Shot " <<
00567         fSnapShotVers << " Work Group Name " << fWorkGroupName 
00568                             << " Application Name "
00569                             << fApplicationName << " Application Version " << fApplicationVers <<
00570         " Project Name " << projectname << endl;
00571 
00572 
00573       // Define snapshot version
00574 
00575       long snapshot;
00576       // Snapshot version = 0 means create New Snapshot
00577       if ( fSnapShotVers == 0 ) {
00578         snapshot = sam::SamProject::NewSnapshotVersion;
00579       }
00580       // Snapshot version < 0 means use last one created
00581       else if (fSnapShotVers < 0) {
00582         snapshot = sam::SamProject::LatestSnapshotVersion;
00583       }
00584       else if (fSnapShotVers > 0 ) {
00585         // Use specified SnapShot version 
00586         snapshot = fSnapShotVers;
00587       }
00588     
00589       MSG("Io",Msg::kDebug) << "SnapShot Version " << snapshot << endl; 
00590 
00591       if (fStartNewProject == 1) {
00592 
00593         // Create SAM project
00594 
00595         fsamProject = new sam::SamProject(projectname,fStation);
00596 
00597         // Start SAM project
00598 
00599         std::list<std::string> projectMasterArgList;
00600 
00601         try {
00602           MSG("Io",Msg::kInfo) << "Starting SAM Project " << projectname << 
00603             " on station " << fStation << endl;
00604           fsamProject->startProject(fWorkGroupName,samdataset,snapshot,
00605                                     projectMasterArgList);
00606         }
00607         catch(const sam::SamProject::StartProjectRequestRejected& ex) {
00608           MSG("Io",Msg::kInfo) << "Rejected start SAM project request " 
00609                                << ex << endl;
00610         }
00611       }
00612 
00613       // Start SAM consumer to deliver files
00614 
00615       const int projectMasterTimeout(60);
00616       const std::string processDescription("Loon Analysis Process");
00617 
00618       try{
00619 
00620 
00621         sam::SamConsumer fsamConsumer(projectname,fStation,fWorkGroupName,
00622                                       fApplicationName,fApplicationVers,
00623                                       processDescription,
00624                                       fMaxNumberOfFiles,
00625                                       projectMasterTimeout);
00626         
00627         MSG("Io",Msg::kInfo) << "Started SAM Consumer" << endl;
00628         
00629         // Now get files. Format of returned files depends on whether
00630         // SAM cache is local disk, dcache disk or AFS file space
00631         
00632         std::map<std::string,std::string> filelist;
00633         map<std::string,std::string>::iterator fitr;
00634         
00635         int location;
00636         int length;
00637         int comp;
00638         std::string fileonly;
00639         std::string restOfPath;
00640         std::string afsroot("afsroot:");
00641         try {
00642           while(true) {
00643             std::string filename = fsamConsumer.getFile().getFullFileName();
00644             // The files need to be sorted as they come back in an undefined
00645             // order. Split them into a filename and the rest of the path. 
00646             // Put them in a map and then iterate over key which is filename 
00647             // - guarantees correct order.
00648             
00649             MSG("Io",Msg::kDebug) << "Filename " << filename << endl;
00650             location = filename.find_last_of("/");
00651             length = filename.length();
00652             fileonly = filename.substr(location+1,length-1);
00653             // Need to look for afsroot: at start of path. If it is there the
00654             // remove it and rest of path is AFS path.
00655             comp = filename.compare(0,8,afsroot);
00656             if (comp == 0 ) {
00657               restOfPath = filename.substr(8,location-8);
00658             }
00659             else {
00660               restOfPath = filename.substr(0,location);
00661             }
00662             
00663             restOfPath.append("/");
00664             filelist.insert(make_pair(fileonly,restOfPath));
00665             
00666             MSG("Io",Msg::kDebug) << "File Only " << fileonly << " Rest Of Path " 
00667                                   << restOfPath << endl;
00668             
00669             // Release file
00670             
00671             fsamConsumer.releaseFile();
00672           }
00673         }
00674         catch(const sam::SamConsumer::EndOfFileStreamReached& ex) {
00675           MSG("Io",Msg::kDebug) << "End of File Stream reached" << endl;
00676         }
00677 
00678         // Got all files. Now need to add them to file list. Iterate over map
00679       
00680         std::string sfile;
00681         const char *samfile = 0;
00682         for (fitr = filelist.begin(); fitr != filelist.end(); fitr++) {
00683           sfile = (fitr->second+fitr->first);
00684           samfile = sfile.data();
00685           MSG("Io",Msg::kInfo) << "Adding File " << samfile << endl;
00686           // Add file to file list
00687           IoFileListItem iofile(samfile,at,streamlist);
00688           fFileList.push_back(iofile);
00689         }
00690       }
00691       catch(const sam::SamConsumer::InitializationError& ex) {
00692         MSG("Io",Msg::kInfo) << "Rejected start SAM Consumer request " 
00693                              << ex << endl;
00694       }
00695 
00696 
00697       if (fsamProject) {
00698         try {
00699           MSG("Io",Msg::kInfo) << "Requesting end of SAM project " << endl;
00700           fsamProject->endProject();
00701         }
00702         catch(const sam::SamProject::EndProjectRequestRejected& ex) {
00703           MSG("Io",Msg::kInfo) << "SAM Project end request rejected "<< ex << endl;
00704           
00705         }
00706         catch(const sam::SamProject::EndProjectRequestFailed& ex) {
00707           MSG("Io",Msg::kInfo) << "SAM Project end request failed "<< ex << endl;
00708         }
00709       }
00710     }
00711     else if (sam_access_type == "SAM_FILE") {
00712       
00713       // User just wants a single file
00714       std::string samfile = temp.substr(pos,temp.length()-pos);
00715       
00716       // Use sam_locate to get pnfs path
00717       
00718       sam::LocationList samFiles;
00719       try {
00720         MSG("Io",Msg::kInfo) << "Locating file  " << samfile << endl;
00721         samFiles = sam::locate(samfile);
00722         // Now need to translate pnfs path into dcache path
00723         // insert fnal.gov/usr/ after /pnfs/
00724         samFiles[0].insert(6,"fnal.gov/usr/");
00725         // There are two dcache ports so chose one at random
00726         TRandom3 rand(0);
00727         Double_t r = rand.Rndm();
00728         if (r <= 0.5) {
00729           samFiles[0].insert(0,"dcap://fndca1.fnal.gov:24125");
00730         }
00731         else if (r > 0.5) {
00732           samFiles[0].insert(0,"dcap://fndca1.fnal.gov:24136");
00733         }
00734         // Finally append filename
00735         samFiles[0].append("/");
00736         samFiles[0].append(samfile);
00737         MSG("Io",Msg::kInfo) <<  "Adding file " << samFiles[0].c_str() << " to input list" << endl;
00738         // Add file to file list
00739         IoFileListItem iofile(samFiles[0].c_str(),at,streamlist);
00740         fFileList.push_back(iofile);
00741       }
00742       catch(const sam::exception::DataFileNotFound& ex) {
00743         MSG("Io",Msg::kInfo) << ex << endl;
00744       }
00745     }
00746 
00747 #endif     // End of ifdef SITE_HAS_SAM
00748 
00749   }
00750   else {
00751 
00752     // Add file to file list
00753     IoFileListItem iofile(filepath,at,streamlist);
00754     fFileList.push_back(iofile);
00755     
00756     if ( !fDataStreamItr ) return;
00757     
00758     // Add files to stream managed lists
00759     const IoFileListItem::FileStreamMap& filemap = iofile.GetFileStreamMap();
00760     
00761     if ( at < 0 ) {
00762       IoFileListItem::FileStreamMapConstItr itr = filemap.begin();
00763       for ( ; itr != filemap.end(); itr++ ) {
00764         std::string filename = itr -> first;
00765         std::string streamlist = itr -> second;
00766         fDataStreamItr -> AddFile(filename.c_str(),at,streamlist.c_str());
00767       }
00768     }
00769     else {
00770       // Apply files in reverse to have first file in list inserted at pos At
00771       IoFileListItem::FileStreamMap::const_reverse_iterator itr=filemap.rbegin();
00772       for ( ; itr != filemap.rend(); itr++ ) {
00773         std::string filename = itr -> first;
00774         std::string streamlist = itr -> second;
00775         fDataStreamItr -> AddFile(filename.c_str(),at,streamlist.c_str());
00776       }      
00777     }
00778   }
00779 }
00780 
00781 //......................................................................
00782 
00783 void IoInputModule::RemoveFile(const char* filename, const char* streamlist) {
00784 //======================================================================
00785 // Remove the file "filename" from the list of input data files. 
00786 //======================================================================
00787 
00788   if ( fDataStreamItr ) fDataStreamItr -> RemoveFile(filename,streamlist);
00789 
00790   std::string f(filename);
00791   std::list<IoFileListItem>::iterator  itr = fFileList.end();
00792   while ( !fFileList.empty() && itr != fFileList.begin() ) {
00793     itr--;
00794     IoFileListItem& iofile = *itr;
00795     iofile.RemoveFile(filename,streamlist);
00796     if ( iofile.GetNumFile() == 0 ) fFileList.erase(itr);
00797   }
00798 
00799   return;
00800 
00801 }
00802 
00803 //......................................................................
00804 
00805 JobCResult IoInputModule::NextFile(int n, const char* streamlist)
00806 {
00807 //======================================================================
00808 // Move to the next file in the list (move by n positions)
00809 //======================================================================
00810   CallDepth d;
00811 
00812   if (d.fsDepth==1) fStatus = gsAllClear;
00813 
00814   if (fDataStreamItr==0) {
00815     if (this->OpenStreamItr()==0) {
00816       MSG("Io",Msg::kWarning) << "Failed to open input stream itr." << endl;
00817       return fStatus;
00818     }
00819     return this->NextFile(n,streamlist);
00820   }    
00821 
00822   fStatus |= fDataStreamItr -> NextFile(n,streamlist);
00823 
00824   MSG("Io",Msg::kDebug)
00825       << "status is " << fStatus
00826       << " current file is " << fDataStreamItr->GetCurrentFile() << endl;
00827 
00828   return fStatus;
00829 
00830 }
00831 
00832 //......................................................................
00833 
00834 JobCResult IoInputModule::PrevFile(int n, const char* streamlist) 
00835 {
00836 //======================================================================
00837 // Move to the previous list in the file (move back by n files)
00838 //======================================================================
00839   CallDepth d;
00840   
00841   if (d.fsDepth==1) fStatus = gsAllClear;
00842 
00843   if (fDataStreamItr==0) {
00844     if (this->OpenStreamItr()==0) {
00845       MSG("Io",Msg::kWarning) << "Failed to open input stream itr." << endl;
00846       return fStatus;
00847     }
00848     return this->PrevFile(n,streamlist);
00849   }    
00850 
00851   fStatus |= fDataStreamItr -> PrevFile(n,streamlist);
00852 
00853   return fStatus;
00854 
00855 }
00856 
00857 //......................................................................
00858 
00859 JobCResult IoInputModule::GoToFile(int n, const char* streamlist) 
00860 {
00861 //======================================================================
00862 // Move the stream to the nth file in the list (n=0 is first)
00863 //======================================================================
00864   CallDepth d;
00865   
00866   if (d.fsDepth==1) fStatus = gsAllClear;
00867 
00868   if (fDataStreamItr==0) {
00869     if (this->OpenStreamItr()==0) {
00870       MSG("Io",Msg::kWarning) << "Failed to open input stream itr." << endl;
00871       return fStatus;
00872     }
00873     return this->GoToFile(n,streamlist);
00874   }    
00875 
00876   fStatus |= fDataStreamItr -> GoToFile(n,streamlist);
00877 
00878   return fStatus;
00879 
00880 }
00881 
00882 //......................................................................
00883 
00884 JobCResult IoInputModule::GoToFile(const char* filename, const char*streamlist){
00885 //======================================================================
00886 // Move the stream to a named file
00887 //======================================================================
00888   CallDepth d;
00889   
00890   if (d.fsDepth==1) fStatus = gsAllClear;
00891 
00892   if (fDataStreamItr==0) {
00893     if (this->OpenStreamItr()==0) {
00894       MSG("Io",Msg::kWarning) << "Failed to open input stream itr." << endl;
00895       return fStatus;
00896     }
00897     return this->GoToFile(filename,streamlist);
00898   }    
00899 
00900   fStatus |= fDataStreamItr -> GoToFile(filename,streamlist);
00901 
00902   return fStatus;
00903 }
00904 
00905 //......................................................................  
00906 
00907 void IoInputModule::Select(const char* stream, const char* select, 
00908                            bool isRequired)
00909 {
00910 //======================================================================
00911 // Add/Change the selection cuts for a stream
00912 //======================================================================
00913   // Insert the selection into the map
00914   fStreamSelectionMap[stream] = select;
00915   fStreamRequiredMap[stream] = isRequired;
00916   
00917   // Pass the info on to the data stream
00918   if (fDataStreamItr) {
00919     fDataStreamItr->Select(stream, select,isRequired);
00920   }
00921 
00922 }
00923 
00924 //......................................................................  
00925 
00926 void IoInputModule::DefineStream(const char* stream, const char* tree) {
00927 //======================================================================
00928 // Define stream to serve specified tree
00929 //======================================================================
00930   // Insert the definition into the map
00931   fStreamDefMap[stream] = tree;
00932 
00933   // Pass the info on to the data stream
00934   if ( fDataStreamItr ) {
00935     fDataStreamItr->DefineStream(stream, tree);
00936   }
00937 
00938 }
00939 
00940 //......................................................................  
00941 
00942 void IoInputModule::SetSequenceMode(const char* stream,
00943                                     Per::ESequenceMode sequenceMode) {
00944 //======================================================================
00945 // Define stream sequence mode
00946 //======================================================================
00947   // Insert the sequence mode into the map
00948   fStreamSeqModeMap[stream] = sequenceMode;
00949 
00950   // Pass the info on to the data stream
00951   if ( fDataStreamItr ) {
00952     fDataStreamItr->SetSequenceMode(stream, sequenceMode);
00953   }
00954 
00955 }
00956 
00957 //......................................................................  
00958 
00959 void IoInputModule::SetTestMode(const char* stream,
00960                                 bool testMode) {
00961 //======================================================================
00962 // Define stream test mode
00963 //======================================================================
00964   // Insert the test mode into the map
00965   fStreamTestModeMap[stream] = testMode;
00966 
00967   // Pass the info on to the data stream
00968   if ( fDataStreamItr ) {
00969     fDataStreamItr->SetTestMode(stream, testMode);
00970   }
00971 
00972 }
00973 
00974 //......................................................................
00975 
00976 void IoInputModule::SetWindow(const char* stream, double lower, double upper)
00977 {
00978 //======================================================================
00979 // Define stream window if kWindow sequence mode is used
00980 //======================================================================
00981   fStreamWindowMap[stream] = std::pair<double,double>(lower,upper);
00982 
00983   // Pass the info on to the data stream
00984   if ( fDataStreamItr ) {
00985       fDataStreamItr->SetWindow(stream, lower, upper);
00986   }
00987 
00988 }
00989 
00990 //......................................................................
00991 
00992 void IoInputModule::SetMaxFileRepeat(const char* stream, int numRepeat)
00993 {
00994 //======================================================================
00995 // Define maximum number of times to reuse a file in the stream before
00996 // loading the next one; for kSequential and kRandom sequence modes
00997 //======================================================================
00998   fStreamMaxRepeatMap[stream] = numRepeat;
00999 
01000   // Pass it on to the data stream
01001   if ( fDataStreamItr ) fDataStreamItr->SetMaxFileRepeat(stream,numRepeat);
01002 }
01003 
01004 //......................................................................
01005 
01006 void IoInputModule::SetMeanMom(const char* stream, double mean)
01007 {
01008 //======================================================================
01009 // Define mean number of events to push to mom for this stream
01010 // for kSequential and kRandom sequence modes
01011 //======================================================================
01012   fStreamMeanMap[stream] = mean;
01013 
01014   // Pass it on to the data stream
01015   if ( fDataStreamItr ) fDataStreamItr->SetMeanMom(stream,mean);
01016 }
01017 
01018 //......................................................................
01019 
01020 void IoInputModule::SetPushRandom(const char* stream, bool setRandom)
01021 {
01022 //======================================================================
01023 // Define whether to push a random or constant number of events to mom
01024 // for this stream for kSequential and kRandom sequence modes
01025 //======================================================================
01026   fStreamPushRandomMap[stream] = setRandom;
01027 
01028   // Pass it on to the data stream
01029   if ( fDataStreamItr ) fDataStreamItr->SetPushRandom(stream,setRandom);
01030 }
01031 
01032 //......................................................................
01033 
01034 void IoInputModule::SetRandomSeed(int rSeed)
01035 {
01036 //======================================================================
01037 // Set the random seed for SetPushRandom(stream,true) case
01038 // for kSequential and kRandom sequence modes
01039 //======================================================================
01040   fRandomSeed = rSeed;
01041 
01042   // Pass it on to the data stream
01043   if ( fDataStreamItr ) fDataStreamItr->SetRandomSeed(rSeed);
01044 
01045 }
01046 
01047 //......................................................................
01048 
01049 const char* IoInputModule::GetCurrentFile(const char* streamname) const
01050 {
01051     MSG("Io",Msg::kDebug) << "IoInputModule::GetCurrentFile()" << endl;
01052     mapStrStrItr_t it, done=fCurrentFileMap.end();
01053     std::string strmstring = streamname;
01054 
01055     for (it = fCurrentFileMap.begin(); it !=done; ++it) {
01056         MSG("Io",Msg::kVerbose)
01057           << "stream: " << setw(16) << it->first
01058           << " file: " << it->second
01059           <<endl;
01060     }
01061 
01062     for (it = fCurrentFileMap.begin(); it!=done; ++it) {
01063       if ( strmstring == it->first ) return it->second.c_str();
01064     }
01065     // Sue's original approach
01066     if (!fDataStreamItr) return 0;
01067     return fDataStreamItr->GetCurrentFile(streamname);
01068 }
01069 
01070 const char* IoInputModule::GetLastFile(const char* streamname) const
01071 {
01072     MSG("Io",Msg::kInfo) << "IoInputModule::GetLastFile()" << endl;
01073     mapStrStrItr_t it, done = fLastFileMap.end();
01074     std::string strmstring = streamname;
01075 
01076     for (it = fLastFileMap.begin(); it!=done; ++it) {
01077         MSG("Io",Msg::kVerbose)
01078           << "stream: " << setw(16) << it->first
01079           << " file: " << it->second
01080           <<endl;
01081     }
01082 
01083     for (it = fLastFileMap.begin(); it!=done; ++it) {
01084       if ( strmstring == it->first ) return it->second.c_str();
01085     }
01086     return 0;
01087 
01088 }
01089 
01090 //......................................................................
01091 
01092 void IoInputModule::LoadFilesFromCommandLine()
01093 {
01094 //======================================================================
01095 // Load the files listed on the program command line
01096 //======================================================================
01097   JobCEnv& jce = JobCEnv::Instance();
01098   if (!fLoadedCommandLineFiles) {
01099     for (int i=0; i<jce.GetNfile(); ++i) {
01100       this->AddFile(jce.GetFileName(i));
01101     }
01102     fLoadedCommandLineFiles = true;
01103   }
01104 }
01105 
01106 //......................................................................
01107 
01108 int IoInputModule::ReadHeader()
01109 {
01110 //======================================================================
01111 // Read temptags to get file name
01112 // Read header information to get run/snarl info. 
01113 //   if found Run and Snarl info return 2, 
01114 //   if only Run info return 1,
01115 //   else return 0
01116 //======================================================================
01117   const MomNavigator* mom = this->GetMom();
01118   assert(mom);
01119 
01120   // BeginFile/EndFile boundaries may not be in synch across the different
01121   // data streams.  The definition used here is to set file boundary true if 
01122   // the file has changed for any of the managed streams.
01123 
01124   const TObjArray* momarray = mom->GetFragmentArray();
01125   for (int i=0; i < momarray->GetEntriesFast(); ++i) {
01126     TObject* obj = momarray->At(i);
01127     if (!obj) continue;
01128     Registry* temptags = 0;
01129     if ( RecMinos* record = dynamic_cast<RecMinos*>(obj) ) {
01130       temptags = &(record->GetTempTags());
01131     }
01132     else if ( RecRecord* record = dynamic_cast<RecRecord*>(obj) ) {
01133       temptags = &(record->GetTempTags());
01134     }
01135     if ( ! temptags ) continue;
01136 
01137     // on a named stream?
01138     const char* tagstream = 0;
01139     if ( ! temptags->Get("stream",tagstream) ) continue;
01140 
01141     // stream managed by i/o?
01142     std::string streamname(tagstream);
01143     const char* tagnewfile = 0;
01144     if ( ! temptags->Get("file",tagnewfile) ) continue; 
01145 
01146     std::string lstfilename = fLastFileMap[streamname];
01147     std::string curfilename = fCurrentFileMap[streamname];
01148     std::string newfilename(tagnewfile);
01149 
01150     if ( newfilename != curfilename ) {
01151             
01152       std::string starcur  = fCurrentFileMap.begin()->first;
01153       std::string starlast = fLastFileMap.begin()->first;
01154       
01155       MSG("Io",Msg::kDebug) 
01156         << "SetBeginFile on streamname '" << streamname << "'" << endl
01157         << "   current      '" << fCurrentFileMap[streamname] << "'" << endl
01158         << "   last         '" << fLastFileMap[streamname] << "'" << endl
01159         << "   current['" << starcur << "'] '" 
01160         << fCurrentFileMap[starcur] << "'" << endl
01161         << "      last['" << starlast << "'] '" 
01162         << fLastFileMap[starlast] << "'" << endl
01163         << "   new '" << newfilename << "' != "
01164         << " cur '" << curfilename << "'" << endl
01165         << "   update \"*\" ? " 
01166         << (( newfilename != fCurrentFileMap["*"] ) ? "yes":"no")
01167         << endl;
01168       
01169       // update "*" stream first
01170       if ( newfilename != fCurrentFileMap["*"] ) {
01171         fStatus.SetBeginFile();
01172         //if ( lstfilename != "" ) fStatus.SetEndFile();
01173         if ( fCurrentFileMap["*"] != "" ) fStatus.SetEndFile();
01174         
01175         fLastFileMap["*"]           = fCurrentFileMap["*"];
01176         fCurrentFileMap["*"]        = newfilename;
01177 
01178         MSG("Io",Msg::kDebug) 
01179           << "SetBeginFile on '*'" << endl
01180           << "   current['" << starcur << "'] '" 
01181           << fCurrentFileMap[starcur] << "'" << endl
01182           << "      last['" << starlast << "'] '" 
01183           << fLastFileMap[starlast] << "'" << endl;
01184       }
01185       // update this named stream
01186       fLastFileMap[streamname]    = curfilename;
01187       fCurrentFileMap[streamname] = newfilename;
01188       
01189       // if a stream on a file moved on then presumably all the
01190       // other streams on the same file have also been exhausted
01191       // and are going to move on -- help them along so we don't
01192       // have to wait for that stream to be the next record on
01193       // that stream is the next VldContext
01194       mapStrStrItr_t it, done = fCurrentFileMap.end();
01195       for (it = fCurrentFileMap.begin(); it != done; ++it) {
01196         if ( it->second == curfilename ) {
01197           std::string altstream = it->first;
01198           fLastFileMap[altstream]    = curfilename;
01199           fCurrentFileMap[altstream] = newfilename;
01200         }
01201       }
01202       
01203     } // new != cur filename
01204 
01205     MSG("Io",Msg::kVerbose) 
01206       << " stream '" << streamname << "' set fLastFileMap to '"
01207       << curfilename << "', fCurrentFileMap to '"
01208       << newfilename << "'" 
01209       << " * '" << fLastFileMap["*"] << "'  '" << fCurrentFileMap["*"] << "'"
01210       << endl;
01211         
01212   } // loop over records
01213 
01214   // update for EOF/EOJ condition (which doesn't come throught this
01215   // function) if it isn't a file change
01216   // this won't work is the file has just one record
01217   // ...the whole procedure is fundamentally flawed -- it shouldn't be
01218   // based on what we find in "mom" but rather the stream/file 
01219   // management classes...
01220   if ( ! fStatus.BeginFile() ) fLastFileMap["*"] = fCurrentFileMap["*"];
01221 
01222   // BeginRun/EndRun
01223   int run   = -1;  // default and flag value
01224   int snarl = -1;  // default and flag value
01225   for (int i=0; i < momarray->GetEntriesFast(); ++i) {
01226     TObject* obj = momarray->At(i);
01227     if (!obj) continue;
01228     if ( RecMinos* record = dynamic_cast<RecMinos*>(obj) ) {
01229       // old style
01230       
01231         // all DAQ generated records can supply run #
01232       const RawDaqHeader* rdh 
01233              = dynamic_cast<const RawDaqHeader*>(record->GetHeader());
01234       if (rdh) run = rdh->GetRun();
01235 
01236       // but only DaqSnarl records can supply snarl #
01237       const RawDaqSnarlHeader* rdsh 
01238              = dynamic_cast<const RawDaqSnarlHeader*>(record->GetHeader());
01239       if (rdsh) snarl = rdsh->GetSnarl();
01240 
01241       if (!rdh) {
01242         // not a DAQ record, perhaps it's a CandRecord
01243         const CandHeader* candhdr
01244           = dynamic_cast<const CandHeader*>(record->GetHeader());
01245         if (candhdr) {
01246           run   = candhdr->GetRun();
01247           snarl = candhdr->GetSnarl();
01248         }
01249       }
01250     }
01251     else if ( RecRecord* record = dynamic_cast<RecRecord*>(obj) ) {
01252       // New style
01253       const RecPhysicsHeader* rph 
01254           = dynamic_cast<const RecPhysicsHeader*>(&(record->GetHeader()));
01255       if ( rph ) {
01256         run   = rph->GetRun();
01257         snarl = rph->GetSnarl();
01258       }
01259     }
01260     // break early only if determined snarl in case it one of those
01261     // crazy record sets with a DaqMonitor and a DaqSnarl record.
01262     if ( snarl >= 0 ) break;
01263   }
01264 
01265   // set the status flags based on what was extracted
01266   fCurrentSnarl = snarl;
01267   if ( run < 0 ) {
01268     fCurrentRun = -1;
01269     return 0;
01270   }
01271   if ( run != fCurrentRun ) {
01272     fStatus.SetBeginRun();
01273     if ( fLastRun >= 0 ) fStatus.SetEndRun();
01274   }
01275   fLastRun      = fCurrentRun;
01276   fCurrentRun   = run;
01277   if ( snarl >= 0 ) {
01278     fLastSnarl    = fCurrentSnarl;
01279     fCurrentSnarl = snarl;
01280     return 2;
01281   }
01282   return 1;
01283 }
01284 
01285 //......................................................................
01286 
01287 void IoInputModule::UpdateDDSConfig() {
01288 //======================================================================
01289 // Update the dispatcher configuration
01290 //======================================================================
01291   if ( fDataStreamItr == 0 ) return;
01292 
01293   IoDDSStreamItr* ddsItr = dynamic_cast<IoDDSStreamItr*>(fDataStreamItr);
01294   if ( ! ddsItr ) return;
01295 
01296   ddsItr->SetTimeOut(fTimeOut);  
01297 
01298   // Need to reinitialize dispatcher if server hostname, port, clienttype
01299   // or clientname have changed
01300   bool reinit = (fServer != ddsItr->GetSourceName() 
01301                 || fPort != ddsItr->GetPort()
01302                 || fClientType != ddsItr->GetClientType() 
01303                 || fClientName != ddsItr->GetClientName() );
01304   
01305   if ( reinit ) {
01306     this -> CloseStreamItr();  // wait for next action to reopen
01307   }
01308   else {
01309     ddsItr -> SetKeepUpMode(fKeepUpMode);
01310     ddsItr -> SetMaxSyncDelay(fMaxSyncDelay);
01311     ddsItr -> SetDataSource(fDataSource);
01312     ddsItr -> SetOffLine(fOffLine);
01313   }
01314 
01315 }
01316 
01317 
01318 //......................................................................
01319 
01320 void IoInputModule::UpdateFormatConfig() {
01321 //======================================================================
01322 // Update the stream itr to match requested format
01323 //======================================================================
01324 
01325   if ( fDataStreamItr == 0 ) return;
01326 
01327   bool reopen = ( fFormat != fDataStreamItr->GetFormat() );
01328   if ( reopen ) {
01329     this -> CloseStreamItr();  // wait for next action to reopen
01330   }
01331 
01332   return;
01333 
01334 }
01335 
01336 //......................................................................
01337 
01338 void IoInputModule::UpdateFileList() 
01339 {
01340 //======================================================================
01341 // Update the file list 
01342 //======================================================================
01343 
01344   if ( fDataStreamItr == 0 ) return;
01345 
01346   // Fresh start. This should typically only be called when data stream 
01347   // iterator is newly opened (i.e. when the format changes)
01348   fDataStreamItr -> RemoveFile("*");
01349 
01350   std::list<IoFileListItem>::iterator itr = fFileList.begin();
01351   for ( ; itr != fFileList.end(); itr++ ) {
01352     IoFileListItem& iofile = *itr;
01353     const IoFileListItem::FileStreamMap& filemap = iofile.GetFileStreamMap();
01354     int at = iofile.GetAt();
01355   
01356     if ( at < 0 ) {
01357       IoFileListItem::FileStreamMapConstItr itr = filemap.begin();
01358       for ( ; itr != filemap.end(); itr++ ) {
01359         std::string filename = itr -> first;
01360         std::string streamlist = itr -> second;
01361         fDataStreamItr -> AddFile(filename.c_str(),at,streamlist.c_str());
01362       }
01363     }
01364     else {
01365       // Apply files in reverse to have first file in list inserted at pos At
01366       IoFileListItem::FileStreamMap::const_reverse_iterator itr 
01367                                                          = filemap.rbegin();
01368       for ( ; itr != filemap.rend(); itr++ ) {
01369         std::string filename = itr -> first;
01370         std::string streamlist = itr -> second;
01371         fDataStreamItr -> AddFile(filename.c_str(),at,streamlist.c_str());
01372       }      
01373     }
01374   }
01375 
01376   return;
01377   
01378 }
01379 
01380 //......................................................................
01381 
01382 void IoInputModule::UpdateStreamConfig()
01383 {
01384 //======================================================================
01385 // Set the stream and selection cuts for the open streams
01386 //======================================================================
01387   if (fDataStreamItr==0) return;
01388 
01389   // Define streams
01390   mapStrStrItr_t itr;
01391   for ( itr = fStreamDefMap.begin(); itr != fStreamDefMap.end(); itr++ ) {
01392     fDataStreamItr->DefineStream((itr->first).c_str(),  // Stream
01393                                  (itr->second).c_str());  // Definition
01394   }
01395 
01396   // Set the streams to be activated
01397   fDataStreamItr->Streams(fStreamList.c_str());
01398   // Set the selection cuts for each stream
01399   for (itr=fStreamSelectionMap.begin();itr!=fStreamSelectionMap.end();++itr) {
01400     std::map<std::string,bool>::const_iterator reqitr 
01401                            = fStreamRequiredMap.find(itr->first);
01402     bool isrequired = false;
01403     if ( reqitr != fStreamRequiredMap.end() ) isrequired = reqitr -> second;
01404     fDataStreamItr->Select((itr->first).c_str(),   // Stream
01405                            (itr->second).c_str(), // Selection
01406                            isrequired);  // IsRequired
01407   }
01408 
01409   // Set the test mode for each stream
01410   std::map<std::string,bool>::const_iterator testitr;
01411   for ( testitr=fStreamTestModeMap.begin(); testitr !=fStreamTestModeMap.end();
01412                                           ++testitr ) {
01413     fDataStreamItr->SetTestMode((testitr->first).c_str(),   // Stream
01414                                 testitr->second);  // TestMode
01415   }
01416 
01417   // Set the sequence mode for each stream
01418   bool setRandom = false;
01419   std::map<std::string,Per::ESequenceMode>::const_iterator seqitr;
01420   for ( seqitr=fStreamSeqModeMap.begin(); seqitr!=fStreamSeqModeMap.end();
01421                                         ++seqitr ) {
01422     fDataStreamItr->SetSequenceMode((seqitr->first).c_str(),   // Stream
01423                                      seqitr->second); // Sequence Mode
01424     pair<double,double> window = fStreamWindowMap[seqitr->first];
01425     fDataStreamItr->SetWindow((seqitr->first).c_str(),   // Stream
01426                               window.first,window.second);
01427     if (seqitr->second == Per::kSequential ||
01428         seqitr->second == Per::kRandom      ) {
01429       if (!setRandom) {
01430         setRandom = true;
01431         fDataStreamItr->SetRandomSeed(fRandomSeed);
01432       }
01433       int    repeat = fStreamMaxRepeatMap[seqitr->first];
01434       fDataStreamItr->SetMaxFileRepeat( (seqitr->first).c_str(), repeat );
01435       double mean = fStreamMeanMap[seqitr->first];
01436       fDataStreamItr->SetMeanMom( (seqitr->first).c_str(), mean );
01437       bool   pushRand = fStreamPushRandomMap[seqitr->first];
01438       fDataStreamItr->SetPushRandom( (seqitr->first).c_str(), pushRand );
01439     } // end if kSeq or kRand
01440   }
01441 }
01442 
01443 //......................................................................
01444 
01445 int IoInputModule::OpenStreamItr()
01446 {
01447 //======================================================================
01448 // Open a new stream iterator
01449 //======================================================================
01450   if (fDataStreamItr) this->CloseStreamItr();
01451 
01452   bool isDDS = (UtilString::ToUpper(fFormat) == "DDS");
01453   std::string src;
01454   if ( isDDS ) src = fServer;
01455   
01456   fDataStreamItr = IoDataStreamFactory::CreateDataStreamItr(src.c_str(),
01457                      fFormat.c_str(),fPort,fMaxRetry,fRetryDelay,
01458                      fClientType,fClientName);
01459 
01460 
01461   if (fDataStreamItr == 0) {
01462     MSG("Io",Msg::kWarning) << "Failed to open stream '" << src << "'" <<
01463       " using format '" << fFormat << "'" << endl; 
01464     fStatus.SetEndRun();
01465     fStatus.SetEndFile();
01466     fStatus.SetEndOfInputStream();
01467     return 0;
01468   }
01469 
01470   fStatus.SetBeginOfInputStream();
01471   fStatus.SetBeginFile();
01472   fStatus.SetBeginRun();
01473 
01474   // Configure the file and module
01475   this->UpdateStreamConfig(); // this should be called before filelist
01476   this->UpdateFileList();
01477   this->UpdateDDSConfig();
01478   fFormat = fDataStreamItr->GetFormat();
01479   
01480   MSG("Io",Msg::kDebug) 
01481     << "Opened stream itr of format " << fDataStreamItr->GetFormat() << endl;
01482 
01483   return 1;
01484 }
01485 
01486 //......................................................................
01487 
01488 void IoInputModule::CloseStreamItr() 
01489 {
01490 //======================================================================
01491 // Close the currently openned stream
01492 //======================================================================
01493   if (fDataStreamItr) {
01494     MSG("Io",Msg::kDebug) 
01495       << "Close stream itr of format " << fDataStreamItr->GetFormat() << endl;
01496     delete fDataStreamItr;
01497     fDataStreamItr = 0;
01498     fStatus.SetEndRun();
01499     fStatus.SetEndFile();
01500     fStatus.SetEndOfInputStream();
01501   }
01502 }
01503 
01505 
01506 
01507 
01508 
01509 
01510 
01511 
01512 
01513 
01514 
01515 
01516 
01517 
01518 

Generated on Mon Jun 16 14:57:23 2008 for loon by  doxygen 1.3.9.1