Main Page | Modules | Namespace List | Class Hierarchy | Alphabetical List | Class List | Directories | File List | Namespace Members | Class Members | File Members | Related Pages

IoInputModule.cxx

Go to the documentation of this file.
00001 
00002 // $Id: IoInputModule.cxx,v 1.83 2007/09/21 19:49:05 buckley Exp $
00003 //
00004 // Job control interface to input data streams
00005 //
00006 // messier@huhepl.harvard.edu
00008 #include "TSystem.h"
00009 #include "TRegexp.h"
00010 
00011 #include "Dispatcher/DDS.h"
00012 #include "IoModules/IoInputModule.h"
00013 #include <cassert>
00014 #include "MessageService/MsgService.h"
00015 #include "MinosObjectMap/MomNavigator.h"
00016 #include "IoModules/IoDataStreamItr.h"
00017 #include "IoModules/IoDDSStreamItr.h"
00018 #include "IoModules/IoDataStreamFactory.h"
00019 #include "JobControl/JobCInputModule.h"
00020 #include "JobControl/JobCModuleRegistry.h"
00021 #include "JobControl/JobCEnv.h"
00022 #include "RawData/RawRecord.h"
00023 #include "RawData/RawDaqSnarlHeader.h"
00024 #include "CandData/CandHeader.h"
00025 #include "Record/RecRecord.h"
00026 #include "Record/RecPhysicsHeader.h"
00027 #include "Registry/Registry.h"
00028 #include "Validity/VldContext.h"
00029 #include "Validity/VldTimeStamp.h"
00030 #include "Util/UtilString.h"
00031 #include "TSystem.h"
00032 
00033 #include <algorithm>
00034 #include <cstring>
00035 #include <string>
00036 #include <map>
00037 
00038 #ifdef SITE_HAS_SAM
00039 #include "sam_cpp_api/SamConsumer.hpp"
00040 #endif
00041 
00042 CVSID("$Id: IoInputModule.cxx,v 1.83 2007/09/21 19:49:05 buckley Exp $");
00043 JOBMODULE(IoInputModule,"INPUT","Read and configure input streams");
00044 
00045 typedef std::map<std::string,std::string>::const_iterator mapStrStrItr_t;
00046 
00047 // A result that says "AOK"
00048 static const JobCResult gsAllClear = JobCResult::kAOK;
00049 
00050 // Many of the input methods are recursive. Ie. try to load an event,
00051 // if that fails, try the next file and try again, etc. To prevent
00052 // initializations from being done twice we need a way to keep track
00053 // of the recursion level. This little class helps with that.
00054 class CallDepth {
00055 public:
00056   CallDepth()  { ++fsDepth; }
00057   ~CallDepth() { --fsDepth; }
00058   static int fsDepth;
00059 };
00060 int CallDepth::fsDepth = 0;
00061 
00062 //......................................................................
00063 
00064 IoInputModule::IoInputModule() : 
00065   fDataStreamItr(0),
00066   fFormat(""),
00067   fStreamList(""),
00068   fServer(""),
00069   fPort(0),
00070   fTimeOut(0),
00071   fDataSource(0),
00072   fKeepUpMode(0),
00073   fMaxSyncDelay(0),
00074   fOffLine(false),
00075   fMaxRetry(0),
00076   fRetryDelay(1),
00077   fClientType(DDS::kUnknownClientType),
00078   fClientName(""),
00079   fStatus(JobCResult::kAOK),
00080   fLastRun(-1),
00081   fLastSnarl(-1),
00082   fCurrentRun(-1),
00083   fCurrentSnarl(-1),
00084   fLoadedCommandLineFiles(false)
00085 #ifdef SITE_HAS_SAM
00086   ,fsamProject(0)
00087 #endif
00088 { fStopwatch.Reset(); fStopwatch.Stop(); }
00089 
00090 //......................................................................
00091 
00092 IoInputModule::~IoInputModule() 
00093 { 
00094   if ( fDataStreamItr ) { delete fDataStreamItr; fDataStreamItr = 0; }
00095 }
00096 
00097 //......................................................................
00098 
00099 void IoInputModule::BeginJob() 
00100 {
00101   this->LoadFilesFromCommandLine();
00102   // Delay opening files until first action (Next,Prev,etc.) call
00103 }
00104 
00105 
00106 //......................................................................
00107 
00108 void IoInputModule::EndJob()
00109 {
00110   fStopwatch.Stop();
00111   MSG("Io",Msg::kDebug) << "IoInputModule::EndJob, Time(sec), Real " 
00112         << fStopwatch.RealTime() << ", CPU "
00113         << fStopwatch.CpuTime() << endl;
00114   
00115 }
00116 
00117 //......................................................................
00118 
00119 const Registry& IoInputModule::DefaultConfig() const 
00120 {
00121 //======================================================================
00122 // Get the default configuration for this module
00123 //======================================================================
00124   static Registry r;
00125   r.SetName("INPUT.config");
00126 
00127   r.UnLockValues();
00128 
00129   MSG("Io",Msg::kDebug) << "Loading default config\n";
00130 
00131   // Stream config
00132   r.Set("Format" ,"input");
00133   r.Set("Streams","DaqMonitor,DaqSnarl,LightInjection");
00134   // r.Set("Streams","DaqMonitor,DaqSnarl,LightInjection,DcsMonitor,DcsAlarm");
00135   
00136   // DDS config
00137   r.Set("DDSServer", "daqdds.minos-soudan.org");
00138   r.Set("DDSPort",    DDS::kPort);
00139   r.Set("DDSTimeOut",  120);
00140   r.Set("DDSDataSource","Daq");
00141   r.Set("DDSKeepUpMode", "FileKeepUp");
00142   r.Set("DDSMaxSyncDelay",15);
00143   r.Set("DDSOffLine",false);
00144   r.Set("DDSMaxRetry",0);
00145   r.Set("DDSRetryDelay",1);
00146   r.Set("DDSClientType","Unknown");
00147   r.Set("DDSClientName","");
00148 
00149 #ifdef SITE_HAS_SAM
00150 
00151   // SAM config
00152   r.Set("Station","minos");
00153   r.Set("SnapShotVers",0);
00154   r.Set("WorkGroupName","minos");
00155   r.Set("ApplicationName","loon");
00156   r.Set("ApplicationVers","dev");
00157   r.Set("MaxNumberOfFiles",0);
00158   r.Set("StartNewProject",1);
00159 
00160   // Create default project name
00161   // Get $USER
00162   const char* username = gSystem->Getenv("USER");
00163   if (!username) username = "unknown";
00164   r.Set("ProjectName",username);
00165 
00166 #endif
00167 
00168   r.LockValues();
00169   return r;
00170 }
00171 
00172 //......................................................................
00173 
00174 void IoInputModule::Config(const Registry& r) 
00175 {
00176 //======================================================================
00177 // Configure the module based on the contents of the registry r
00178 //======================================================================
00179   const char* tmps;
00180   int         tmpi;
00181   int         tmpb;  // bools
00182 
00183   MSG("Io",Msg::kDebug) << "Config IoInputModule with r=" << r << "\n";
00184 
00185   // Input data stream configuration
00186   bool doFormatConfig = false;
00187   if (r.Get("Format", tmps)) { fFormat     = tmps; doFormatConfig = true; }
00188   if (doFormatConfig) this->UpdateFormatConfig();
00189   bool doStreamConfig = false;
00190   if (r.Get("Streams",tmps)) { fStreamList = tmps; doStreamConfig = true; }
00191   if (doStreamConfig) this->UpdateStreamConfig();
00192     
00193   // DDS options
00194   bool doDDSConfig = false;
00195   if (r.Get("DDSServer", tmps)) { fServer  = tmps; doDDSConfig = true; }
00196   if (r.Get("DDSPort",   tmpi)) { fPort    = tmpi; doDDSConfig = true; }
00197   if (r.Get("DDSTimeOut",tmpi)) { fTimeOut = tmpi; doDDSConfig = true; }
00198   if (r.Get("DDSClientType",tmps)) { fClientType = DDS::GetClientType(tmps); 
00199                                      doDDSConfig = true; }
00200   if (r.Get("DDSClientName",tmps)) { fClientName = tmps; 
00201                                      doDDSConfig = true; }
00202   if (r.Get("DDSDataSource",tmps)){fDataSource = DDS::GetDataSourceCode(tmps);
00203                                    doDDSConfig = true;}
00204   if (r.Get("DDSKeepUpMode",tmps)) { fKeepUpMode = DDS::GetKeepUpCode(tmps); 
00205                                      doDDSConfig = true; }
00206   if (r.Get("DDSMaxSyncDelay",tmpi)){fMaxSyncDelay = tmpi; doDDSConfig = true;}
00207   if (r.Get("DDSOffLine",tmpb)) {fOffLine = tmpb; doDDSConfig = true;}
00208   if (r.Get("DDSMaxRetry",tmpi)) {fMaxRetry = tmpi; doDDSConfig = true; }
00209   if (r.Get("DDSRetryDelay",tmpi)) {fRetryDelay = tmpi; doDDSConfig = true; }
00210   if (doDDSConfig) this->UpdateDDSConfig();
00211 
00212 
00213 }
00214 
00215 //......................................................................  
00216 
00217 JobCResult IoInputModule::Get()
00218 {
00219 //======================================================================
00220 // Load the data records at the current position in the input stream
00221 //======================================================================
00222 
00223 
00224   if ( fDataStreamItr==0 ) {
00225     if ( this->OpenStreamItr()==0 ) {
00226       MSG("Io",Msg::kWarning) << "Failed to open input stream itr." << endl;
00227       return fStatus;
00228     }
00229     return this->Get();
00230   }
00231 
00232   MSG("Io",Msg::kVerbose) << "IoInputModule::Get " << endl;
00233 
00234   fStopwatch.Start(false);
00235   MomNavigator* mom = this->GetMom();
00236   assert(mom);
00237   mom -> Clear(); // Moving on so clear contents of Mom
00238 
00239   int nrecord = fDataStreamItr->LoadRecords(mom);
00240   bool isDDS = (UtilString::ToUpper(fFormat) == "DDS");
00241   // special treatment required because dds doesn't separate advance from load
00242   if ( isDDS && !nrecord ) fStatus.SetEndOfInputStream();
00243 
00244   this->ReadHeader(); // sets beginrun/endrun beginfile/endfile fStatus bits
00245   if ( fStatus.EndOfInputStream() ) 
00246      { fStatus.SetEndFile(); fStatus.SetEndRun(); }
00247   MSG("Io",Msg::kVerbose) 
00248    << "IoInputModule::Get returning status " << fStatus << endl;
00249   fStopwatch.Stop();
00250   return fStatus;
00251 }
00252 
00253 //......................................................................
00254 
00255 JobCResult IoInputModule::Next(int n)
00256 {
00257 //======================================================================
00258 // Advance the position in the stream n record sets. Load the records
00259 // at the last position
00260 //======================================================================
00261   CallDepth d; // Keep track of the call depth
00262   
00263   // Set the input status to "all clear" since advancing
00264   if (d.fsDepth==1) fStatus = gsAllClear;
00265 
00266   MSG("Io",Msg::kVerbose) << "IoInputModule::Next " << n << endl;
00267 
00268   if ( fDataStreamItr==0 ) {
00269     if (this->OpenStreamItr()==0) {
00270       MSG("Io",Msg::kWarning) << "Failed to open input stream itr." << endl;
00271       return fStatus;
00272     }
00273     return this->Next(n);
00274   }
00275 
00276   fStopwatch.Start(false);
00277   MomNavigator* mom = this->GetMom();
00278   assert(mom);
00279   mom -> Clear(); // Moving on so clear contents of Mom
00280 
00281   // Advance the position in the input stream until we run out of
00282   // records and files
00283   int nstep = 0;
00284   int ndone = 0;
00285   int ntry  = 0;
00286   while ( ndone < n ) {
00287     ntry  = n - ndone;
00288     nstep = fDataStreamItr->Increment(ntry);
00289 
00290     if ( nstep < ntry ) {
00291       // Reached end of file, load next one
00292       fStatus |= this->NextFile(); 
00293       
00294       // If this is the end of the input stream, we're done.
00295       if ( fStatus.EndOfInputStream() ) {
00296         fStopwatch.Stop();
00297         return this->Get();
00298       }
00299     }
00300     ndone += nstep;
00301   }
00302 
00303   // Load the current event
00304   fStopwatch.Stop();
00305   return this->Get();
00306 }
00307 
00308 //......................................................................
00309 
00310 JobCResult IoInputModule::Prev(int n) 
00311 {
00312 //======================================================================
00313 // Back up n positions in the input data stream. Load the records at
00314 // the current position
00315 //======================================================================
00316   CallDepth d;
00317 
00318   // Set the input status to "all clear"
00319   if (d.fsDepth == 1) fStatus = gsAllClear;
00320 
00321   MSG("Io",Msg::kVerbose) << "IoInputModule::Prev " << n << endl;
00322   
00323   if (fDataStreamItr==0) {
00324     if (this->OpenStreamItr()==0) {
00325       MSG("Io",Msg::kWarning) << "Failed to open input stream itr." << endl;
00326       return fStatus;
00327     }
00328     return this->Prev(n);
00329   }
00330 
00331   // Back up the the position in the input stream until we run out of
00332   // records and files
00333   fStopwatch.Start(false);
00334   MomNavigator* mom = this->GetMom();
00335   assert(mom);
00336   mom -> Clear(); // Moving on so clear contents of Mom
00337 
00338   int nstep = 0;
00339   int ndone = 0;
00340   int ntry  = 0;
00341   while (ndone < n) {
00342     ntry = n - ndone;
00343     nstep = fDataStreamItr->Decrement(ntry);
00344 
00345     if (nstep < ntry) {
00346       // Reached start of file, load previous file.
00347       fStatus |= this->PrevFile();
00348 
00349       // If there is no previous file, then we're done.
00350       if ( fStatus.BeginOfInputStream() || fStatus.EndOfInputStream() ) {
00351         return this->Get(); // may be end if failed to open stream itr
00352         fStopwatch.Stop();
00353       }
00354 
00355       // Move the position to the end of the current file so we can
00356       // walk backwards over it
00357       fDataStreamItr->GoToEOF();
00358     }
00359     ndone += nstep;
00360   }
00361 
00362   // Load the current event
00363   fStopwatch.Stop();
00364   return this->Get();
00365 
00366 }
00367 
00368 //......................................................................
00369 
00370 JobCResult IoInputModule::GoTo(int run, int snarl, int searchDir) 
00371 {
00372   CallDepth d;
00373   
00374   if (d.fsDepth==1) fStatus = gsAllClear;
00375 
00376   if (fDataStreamItr==0) {
00377     if (this->OpenStreamItr()==0) {
00378       MSG("Io",Msg::kWarning) << "Failed to open input stream itr." << endl;
00379       return fStatus;
00380     }
00381     return this->GoTo(run,snarl,searchDir);
00382   }
00383 
00384   if ( run == fCurrentRun && snarl == fCurrentSnarl ) return this -> Get();
00385 
00386   int dir = searchDir;
00387   if (dir==0) {
00388     if      (run>fLastRun) { dir =  1; }
00389     else if (run<fLastRun || (run==fLastRun && fCurrentRun < 0)) { dir = -1; }
00390     else {
00391       if    (snarl>fLastSnarl) { dir =  1; }
00392       else                     { dir = -1; }
00393     }
00394   }
00395 
00396   // Move position in the stream looking for run/event number
00397   while ( 1 ) {
00398     if ( dir > 0 ) {
00399       this->Next();
00400       if (fCurrentRun>run) {
00401         MSG("Io",Msg::kWarning) << 
00402           "Went to run "<<fCurrentRun<<
00403           " without finding run="<<run<<" snarl="<<snarl<<"\n";
00404         return fStatus;
00405       }
00406       if (fCurrentRun==run && fCurrentSnarl>snarl) {
00407         MSG("Io",Msg::kWarning) << 
00408           "Went to run="<<fCurrentRun<<" snarl="<<fCurrentSnarl<<
00409           " without finding run="<<run<<" snarl="<<snarl<<"\n";
00410         return fStatus;
00411       }
00412     }
00413     if ( dir <0 ) {
00414       this->Prev();
00415       if (fCurrentRun<run) {
00416         MSG("Io",Msg::kWarning) << 
00417           "Went to run "<<fCurrentRun<<
00418           " without finding run="<<run<<" snarl="<<snarl<<"\n";
00419         return fStatus;
00420       }
00421       if (fCurrentRun==run && fCurrentSnarl<snarl) {
00422         MSG("Io",Msg::kWarning) << 
00423           "Went to run="<<fCurrentRun<<" snarl="<<fCurrentSnarl<<
00424           " without finding run="<<run<<" snarl="<<snarl<<"\n";
00425         return fStatus;
00426       }
00427     }
00428     // Check if we're done
00429     if (fCurrentRun == run && fCurrentSnarl == snarl) return fStatus;
00430     if (dir>0 && fStatus.EndOfInputStream())          return fStatus;
00431     if (dir<0 && fStatus.BeginOfInputStream())        return fStatus;
00432   }
00433   return fStatus;
00434 }
00435 
00436 //......................................................................
00437 
00438 JobCResult IoInputModule::GoTo(const VldContext& vld)
00439 {
00440 //======================================================================
00441 // Go to records that match validity context. If vld is not found, will
00442 // GoTo record set one beyond requested validity.
00443 //======================================================================
00444   CallDepth d;
00445 
00446   // Set the input status to "all clear"
00447   if (d.fsDepth==1) fStatus = gsAllClear;
00448 
00449   if (fDataStreamItr==0) {
00450     if (this->OpenStreamItr()==0) {
00451       MSG("Io",Msg::kWarning) << "Failed to open input stream itr." << endl;
00452       return fStatus;
00453     }
00454     return this->GoTo(vld);
00455   }
00456 
00457   fStatus |= fDataStreamItr -> GoTo(vld);
00458 
00459   // Load the current event
00460   return this->Get();
00461 
00462 }
00463 
00464 //......................................................................
00465 
00466 void IoInputModule::List(const char* streamlist) const
00467 {
00468 //======================================================================
00469 // Print list of files loaded
00470 //======================================================================
00471 
00472   MsgStream& m = MSGSTREAM("Io",Msg::kInfo);
00473 
00474   m << "IoInputModule using data format " << fFormat << endl;
00475 
00476   if ( fDataStreamItr ) {
00477     fDataStreamItr -> ListFile(std::cout,streamlist);
00478     return;
00479   }
00480 
00481   // data stream not yet open, list files from fFileList
00482   m << "File Name\tStream List " << endl;
00483   m << "=========\t=========== " << endl;
00484   std::list<IoFileListItem>::const_iterator itr = fFileList.begin();
00485   std::list<IoFileListItem>::const_iterator itrEnd = fFileList.end();
00486 
00487   for ( ; itr != itrEnd; itr++ ) {
00488     m << *itr;
00489   }
00490  
00491 }
00492 
00493 //......................................................................
00494 
00495 void IoInputModule::AddFile(const char *filepath, const char* streamlist, 
00496                                                                  int at) {
00497 //======================================================================
00498 // Add to the list of attached streams at the position "at". -1 = end
00499 // of list
00500 //======================================================================
00501 
00502 
00503   // Find out by checking the format of the filepath whether this
00504   // is a SAM job. Format will be SAM:samdataset
00505 
00506   const char *s1 = "SAM:";
00507   if ( strstr(filepath,s1) != NULL ) {
00508 
00509 #ifdef SITE_HAS_SAM
00510 
00511   // SAM options
00512 
00513     const char* tmps;
00514     int tmpi;
00515 
00516     Registry& r = GetConfig();
00517     if (r.Get("Station",tmps)) {fStation = tmps;}
00518     if (r.Get("SnapShotVers",tmpi)){fSnapShotVers = tmpi;}
00519     if (r.Get("WorkGroupName",tmps)) {fWorkGroupName = tmps;}
00520     if (r.Get("ApplicationName",tmps)) {fApplicationName = tmps;}
00521     if (r.Get("ApplicationVers",tmps)) {fApplicationVers = tmps;}
00522     if (r.Get("ProjectName",tmps)) {fProjectName = tmps;}
00523     if (r.Get("MaxNumberOfFiles",tmpi)) {fMaxNumberOfFiles = tmpi;}
00524     if (r.Get("StartNewProject",tmpi)) {fStartNewProject = tmpi;}
00525 
00526     std::string projectname;
00527 
00528     // If this is a new project then append time-stamp
00529     if (fStartNewProject == 1) {
00530       // Construct full project name including timestamp
00531       // Get timestamp as a string
00532       VldTimeStamp ts;
00533       std::string timestamp = ts.AsString("lc");
00534       // Replace blank with "-"
00535       size_t pos = timestamp.find(" ");
00536       timestamp.replace(pos,1,"-");
00537       // Replace : with - as DbServer does not like : in project names
00538       pos = timestamp.find(":");
00539       while ( pos != string::npos ) {
00540         timestamp.replace(pos,1,"-");
00541         pos = timestamp.find(":",pos+1);
00542       }
00543       // Append timestamp to fProjectName taken from registry
00544       fProjectName.append("-");
00545       projectname = fProjectName+timestamp;
00546     }
00547     // Otherwise use project name that is supplied
00548     else if(fStartNewProject == 0) {
00549       projectname = fProjectName;
00550     }
00551         
00552 
00553     MSG("Io",Msg::kDebug) << "Sam Station " << fStation << " Snap Shot " <<
00554       fSnapShotVers << " Work Group Name " << fWorkGroupName 
00555                          << " Application Name "
00556       << fApplicationName << " Application Version " << fApplicationVers <<
00557       " Project Name " << projectname << endl;
00558 
00559     // Get the samdataset from filepath
00560 
00561     std::string temp = filepath;
00562     size_t pos = temp.find(":")+1;
00563     std::string samdataset = temp.substr(pos,temp.length()-pos);
00564 
00565     // Define snapshot version
00566 
00567     long snapshot;
00568     // Snapshot version = 0 means create New Snapshot
00569     if ( fSnapShotVers == 0 ) {
00570       snapshot = sam::SamProject::NewSnapshotVersion;
00571     }
00572     // Snapshot version < 0 means use last one created
00573     else if (fSnapShotVers < 0) {
00574       snapshot = sam::SamProject::LatestSnapshotVersion;
00575     }
00576     else if (fSnapShotVers > 0 ) {
00577       // Use specified SnapShot version 
00578       snapshot = fSnapShotVers;
00579     }
00580     
00581     MSG("Io",Msg::kDebug) << "SnapShot Version " << snapshot << endl; 
00582 
00583     if (fStartNewProject == 1) {
00584 
00585       // Create SAM project
00586 
00587       fsamProject = new sam::SamProject(projectname,fStation);
00588 
00589       // Start SAM project
00590 
00591       std::list<std::string> projectMasterArgList;
00592 
00593       try {
00594         MSG("Io",Msg::kInfo) << "Starting SAM Project " << projectname << 
00595           " on station " << fStation << endl;
00596         fsamProject->startProject(fWorkGroupName,samdataset,snapshot,
00597                                   projectMasterArgList);
00598       }
00599       catch(const sam::SamProject::StartProjectRequestRejected& ex) {
00600         MSG("Io",Msg::kInfo) << "Rejected start SAM project request " 
00601                              << ex << endl;
00602       }
00603     }
00604 
00605     // Start SAM consumer to deliver files
00606 
00607     const int projectMasterTimeout(60);
00608     const std::string processDescription("Loon Analysis Process");
00609 
00610     try{
00611 
00612       MSG("Io",Msg::kInfo) << "Starting SAM Consumer" << endl;
00613 
00614       sam::SamConsumer fsamConsumer(projectname,fStation,fWorkGroupName,
00615                                    fApplicationName,fApplicationVers,
00616                                    processDescription,
00617                                    fMaxNumberOfFiles,
00618                                    projectMasterTimeout);
00619 
00620       // Now get files. Format of returned files depends on whether
00621       // SAM cache is local disk, dcache disk or AFS file space
00622       
00623       std::map<std::string,std::string> filelist;
00624       map<std::string,std::string>::iterator fitr;
00625       
00626       int location;
00627       int length;
00628       int comp;
00629       std::string fileonly;
00630       std::string restOfPath;
00631       std::string afsroot("afsroot:");
00632       try {
00633         while(true) {
00634           std::string filename = fsamConsumer.getFile().getFullFileName();
00635           // The files need to be sorted as they come back in an undefined
00636           // order. Split them into a filename and the rest of the path. 
00637           // Put them in a map and then iterate over key which is filename 
00638           // - guarantees correct order.
00639           
00640           MSG("Io",Msg::kDebug) << "Filename " << filename << endl;
00641           location = filename.find_last_of("/");
00642           length = filename.length();
00643           fileonly = filename.substr(location+1,length-1);
00644           // Need to look for afsroot: at start of path. If it is there the
00645           // remove it and rest of path is AFS path.
00646           comp = filename.compare(0,8,afsroot);
00647           if (comp == 0 ) {
00648             restOfPath = filename.substr(8,location-8);
00649           }
00650           else {
00651             restOfPath = filename.substr(0,location);
00652           }
00653           
00654           restOfPath.append("/");
00655           filelist.insert(make_pair(fileonly,restOfPath));
00656           
00657           MSG("Io",Msg::kDebug) << "File Only " << fileonly << " Rest Of Path " 
00658                                 << restOfPath << endl;
00659           
00660           // Release file
00661           
00662           fsamConsumer.releaseFile();
00663         }
00664       }
00665       catch(const sam::SamConsumer::EndOfFileStreamReached& ex) {
00666         MSG("Io",Msg::kDebug) << "End of File Stream reached" << endl;
00667       }
00668 
00669       // Got all files. Now need to add them to file list. Iterate over map
00670       
00671       std::string sfile;
00672       const char *samfile = 0;
00673       for (fitr = filelist.begin(); fitr != filelist.end(); fitr++) {
00674         sfile = (fitr->second+fitr->first);
00675         samfile = sfile.data();
00676         MSG("Io",Msg::kInfo) << "Adding File " << samfile << endl;
00677         // Add file to file list
00678         IoFileListItem iofile(samfile,at,streamlist);
00679         fFileList.push_back(iofile);
00680       }
00681     }
00682     catch(const sam::SamConsumer::InitializationError& ex) {
00683         MSG("Io",Msg::kInfo) << "Rejected start SAM Consumer request " 
00684                              << ex << endl;
00685     }
00686 
00687 
00688     if (fsamProject) {
00689       try {
00690         MSG("Io",Msg::kInfo) << "Requesting end of SAM project " << endl;
00691         fsamProject->endProject();
00692       }
00693       catch(const sam::SamProject::EndProjectRequestRejected& ex) {
00694         MSG("Io",Msg::kInfo) << "SAM Project end request rejected "<< ex << endl;
00695         
00696       }
00697       catch(const sam::SamProject::EndProjectRequestFailed& ex) {
00698         MSG("Io",Msg::kInfo) << "SAM Project end request failed "<< ex << endl;
00699       }
00700     }
00701 
00702 
00703 #endif     // End of ifdef SITE_HAS_SAM
00704 
00705   }
00706   else {
00707 
00708     // Add file to file list
00709     IoFileListItem iofile(filepath,at,streamlist);
00710     fFileList.push_back(iofile);
00711     
00712     if ( !fDataStreamItr ) return;
00713     
00714     // Add files to stream managed lists
00715     const IoFileListItem::FileStreamMap& filemap = iofile.GetFileStreamMap();
00716     
00717     if ( at < 0 ) {
00718       IoFileListItem::FileStreamMapConstItr itr = filemap.begin();
00719       for ( ; itr != filemap.end(); itr++ ) {
00720         std::string filename = itr -> first;
00721         std::string streamlist = itr -> second;
00722         fDataStreamItr -> AddFile(filename.c_str(),at,streamlist.c_str());
00723       }
00724     }
00725     else {
00726       // Apply files in reverse to have first file in list inserted at pos At
00727       IoFileListItem::FileStreamMap::const_reverse_iterator itr=filemap.rbegin();
00728       for ( ; itr != filemap.rend(); itr++ ) {
00729         std::string filename = itr -> first;
00730         std::string streamlist = itr -> second;
00731         fDataStreamItr -> AddFile(filename.c_str(),at,streamlist.c_str());
00732       }      
00733     }
00734   }
00735 }
00736 
00737 //......................................................................
00738 
00739 void IoInputModule::RemoveFile(const char* filename, const char* streamlist) {
00740 //======================================================================
00741 // Remove the file "filename" from the list of input data files. 
00742 //======================================================================
00743 
00744   if ( fDataStreamItr ) fDataStreamItr -> RemoveFile(filename,streamlist);
00745 
00746   std::string f(filename);
00747   std::list<IoFileListItem>::iterator  itr = fFileList.end();
00748   while ( !fFileList.empty() && itr != fFileList.begin() ) {
00749     itr--;
00750     IoFileListItem& iofile = *itr;
00751     iofile.RemoveFile(filename,streamlist);
00752     if ( iofile.GetNumFile() == 0 ) fFileList.erase(itr);
00753   }
00754 
00755   return;
00756 
00757 }
00758 
00759 //......................................................................
00760 
00761 JobCResult IoInputModule::NextFile(int n, const char* streamlist)
00762 {
00763 //======================================================================
00764 // Move to the next file in the list (move by n positions)
00765 //======================================================================
00766   CallDepth d;
00767 
00768   if (d.fsDepth==1) fStatus = gsAllClear;
00769 
00770   if (fDataStreamItr==0) {
00771     if (this->OpenStreamItr()==0) {
00772       MSG("Io",Msg::kWarning) << "Failed to open input stream itr." << endl;
00773       return fStatus;
00774     }
00775     return this->NextFile(n,streamlist);
00776   }    
00777 
00778   fStatus |= fDataStreamItr -> NextFile(n,streamlist);
00779 
00780   MSG("Io",Msg::kDebug)
00781       << "status is " << fStatus
00782       << " current file is " << fDataStreamItr->GetCurrentFile() << endl;
00783 
00784   return fStatus;
00785 
00786 }
00787 
00788 //......................................................................
00789 
00790 JobCResult IoInputModule::PrevFile(int n, const char* streamlist) 
00791 {
00792 //======================================================================
00793 // Move to the previous list in the file (move back by n files)
00794 //======================================================================
00795   CallDepth d;
00796   
00797   if (d.fsDepth==1) fStatus = gsAllClear;
00798 
00799   if (fDataStreamItr==0) {
00800     if (this->OpenStreamItr()==0) {
00801       MSG("Io",Msg::kWarning) << "Failed to open input stream itr." << endl;
00802       return fStatus;
00803     }
00804     return this->PrevFile(n,streamlist);
00805   }    
00806 
00807   fStatus |= fDataStreamItr -> PrevFile(n,streamlist);
00808 
00809   return fStatus;
00810 
00811 }
00812 
00813 //......................................................................
00814 
00815 JobCResult IoInputModule::GoToFile(int n, const char* streamlist) 
00816 {
00817 //======================================================================
00818 // Move the stream to the nth file in the list (n=0 is first)
00819 //======================================================================
00820   CallDepth d;
00821   
00822   if (d.fsDepth==1) fStatus = gsAllClear;
00823 
00824   if (fDataStreamItr==0) {
00825     if (this->OpenStreamItr()==0) {
00826       MSG("Io",Msg::kWarning) << "Failed to open input stream itr." << endl;
00827       return fStatus;
00828     }
00829     return this->GoToFile(n,streamlist);
00830   }    
00831 
00832   fStatus |= fDataStreamItr -> GoToFile(n,streamlist);
00833 
00834   return fStatus;
00835 
00836 }
00837 
00838 //......................................................................
00839 
00840 JobCResult IoInputModule::GoToFile(const char* filename, const char*streamlist){
00841 //======================================================================
00842 // Move the stream to a named file
00843 //======================================================================
00844   CallDepth d;
00845   
00846   if (d.fsDepth==1) fStatus = gsAllClear;
00847 
00848   if (fDataStreamItr==0) {
00849     if (this->OpenStreamItr()==0) {
00850       MSG("Io",Msg::kWarning) << "Failed to open input stream itr." << endl;
00851       return fStatus;
00852     }
00853     return this->GoToFile(filename,streamlist);
00854   }    
00855 
00856   fStatus |= fDataStreamItr -> GoToFile(filename,streamlist);
00857 
00858   return fStatus;
00859 }
00860 
00861 //......................................................................  
00862 
00863 void IoInputModule::Select(const char* stream, const char* select, 
00864                            bool isRequired)
00865 {
00866 //======================================================================
00867 // Add/Change the selection cuts for a stream
00868 //======================================================================
00869   // Insert the selection into the map
00870   fStreamSelectionMap[stream] = select;
00871   fStreamRequiredMap[stream] = isRequired;
00872   
00873   // Pass the info on to the data stream
00874   if (fDataStreamItr) {
00875     fDataStreamItr->Select(stream, select,isRequired);
00876   }
00877 
00878 }
00879 
00880 //......................................................................  
00881 
00882 void IoInputModule::DefineStream(const char* stream, const char* tree) {
00883 //======================================================================
00884 // Define stream to serve specified tree
00885 //======================================================================
00886   // Insert the definition into the map
00887   fStreamDefMap[stream] = tree;
00888 
00889   // Pass the info on to the data stream
00890   if ( fDataStreamItr ) {
00891     fDataStreamItr->DefineStream(stream, tree);
00892   }
00893 
00894 }
00895 
00896 //......................................................................  
00897 
00898 void IoInputModule::SetSequenceMode(const char* stream,
00899                                     Per::ESequenceMode sequenceMode) {
00900 //======================================================================
00901 // Define stream sequence mode
00902 //======================================================================
00903   // Insert the sequence mode into the map
00904   fStreamSeqModeMap[stream] = sequenceMode;
00905 
00906   // Pass the info on to the data stream
00907   if ( fDataStreamItr ) {
00908     fDataStreamItr->SetSequenceMode(stream, sequenceMode);
00909   }
00910 
00911 }
00912 
00913 //......................................................................  
00914 
00915 void IoInputModule::SetTestMode(const char* stream,
00916                                 bool testMode) {
00917 //======================================================================
00918 // Define stream test mode
00919 //======================================================================
00920   // Insert the test mode into the map
00921   fStreamTestModeMap[stream] = testMode;
00922 
00923   // Pass the info on to the data stream
00924   if ( fDataStreamItr ) {
00925     fDataStreamItr->SetTestMode(stream, testMode);
00926   }
00927 
00928 }
00929 
00930 //......................................................................
00931 
00932 void IoInputModule::SetWindow(const char* stream, double lower, double upper)
00933 {
00934 //======================================================================
00935 // Define stream window if kWindow sequence mode is used
00936 //======================================================================
00937   fStreamWindowMap[stream] = std::pair<double,double>(lower,upper);
00938 
00939   // Pass the info on to the data stream
00940   if ( fDataStreamItr ) {
00941       fDataStreamItr->SetWindow(stream, lower, upper);
00942   }
00943 
00944 }
00945 
00946 //......................................................................
00947 
00948 void IoInputModule::SetMaxFileRepeat(const char* stream, int numRepeat)
00949 {
00950 //======================================================================
00951 // Define maximum number of times to reuse a file in the stream before
00952 // loading the next one; for kSequential and kRandom sequence modes
00953 //======================================================================
00954   fStreamMaxRepeatMap[stream] = numRepeat;
00955 
00956   // Pass it on to the data stream
00957   if ( fDataStreamItr ) fDataStreamItr->SetMaxFileRepeat(stream,numRepeat);
00958 }
00959 
00960 //......................................................................
00961 
00962 void IoInputModule::SetMeanMom(const char* stream, double mean)
00963 {
00964 //======================================================================
00965 // Define mean number of events to push to mom for this stream
00966 // for kSequential and kRandom sequence modes
00967 //======================================================================
00968   fStreamMeanMap[stream] = mean;
00969 
00970   // Pass it on to the data stream
00971   if ( fDataStreamItr ) fDataStreamItr->SetMeanMom(stream,mean);
00972 }
00973 
00974 //......................................................................
00975 
00976 void IoInputModule::SetPushRandom(const char* stream, bool setRandom)
00977 {
00978 //======================================================================
00979 // Define whether to push a random or constant number of events to mom
00980 // for this stream for kSequential and kRandom sequence modes
00981 //======================================================================
00982   fStreamPushRandomMap[stream] = setRandom;
00983 
00984   // Pass it on to the data stream
00985   if ( fDataStreamItr ) fDataStreamItr->SetPushRandom(stream,setRandom);
00986 }
00987 
00988 //......................................................................
00989 
00990 void IoInputModule::SetRandomSeed(int rSeed)
00991 {
00992 //======================================================================
00993 // Set the random seed for SetPushRandom(stream,true) case
00994 // for kSequential and kRandom sequence modes
00995 //======================================================================
00996   fRandomSeed = rSeed;
00997 
00998   // Pass it on to the data stream
00999   if ( fDataStreamItr ) fDataStreamItr->SetRandomSeed(rSeed);
01000 
01001 }
01002 
01003 //......................................................................
01004 
01005 const char* IoInputModule::GetCurrentFile(const char* streamname) const
01006 {
01007     MSG("Io",Msg::kDebug) << "IoInputModule::GetCurrentFile()" << endl;
01008     mapStrStrItr_t it, done=fCurrentFileMap.end();
01009     std::string strmstring = streamname;
01010 
01011     for (it = fCurrentFileMap.begin(); it !=done; ++it) {
01012         MSG("Io",Msg::kVerbose)
01013           << "stream: " << setw(16) << it->first
01014           << " file: " << it->second
01015           <<endl;
01016     }
01017 
01018     for (it = fCurrentFileMap.begin(); it!=done; ++it) {
01019       if ( strmstring == it->first ) return it->second.c_str();
01020     }
01021     // Sue's original approach
01022     if (!fDataStreamItr) return 0;
01023     return fDataStreamItr->GetCurrentFile(streamname);
01024 }
01025 
01026 const char* IoInputModule::GetLastFile(const char* streamname) const
01027 {
01028     MSG("Io",Msg::kInfo) << "IoInputModule::GetLastFile()" << endl;
01029     mapStrStrItr_t it, done = fLastFileMap.end();
01030     std::string strmstring = streamname;
01031 
01032     for (it = fLastFileMap.begin(); it!=done; ++it) {
01033         MSG("Io",Msg::kVerbose)
01034           << "stream: " << setw(16) << it->first
01035           << " file: " << it->second
01036           <<endl;
01037     }
01038 
01039     for (it = fLastFileMap.begin(); it!=done; ++it) {
01040       if ( strmstring == it->first ) return it->second.c_str();
01041     }
01042     return 0;
01043 
01044 }
01045 
01046 //......................................................................
01047 
01048 void IoInputModule::LoadFilesFromCommandLine()
01049 {
01050 //======================================================================
01051 // Load the files listed on the program command line
01052 //======================================================================
01053   JobCEnv& jce = JobCEnv::Instance();
01054   if (!fLoadedCommandLineFiles) {
01055     for (int i=0; i<jce.GetNfile(); ++i) {
01056       this->AddFile(jce.GetFileName(i));
01057     }
01058     fLoadedCommandLineFiles = true;
01059   }
01060 }
01061 
01062 //......................................................................
01063 
01064 int IoInputModule::ReadHeader()
01065 {
01066 //======================================================================
01067 // Read temptags to get file name
01068 // Read header information to get run/snarl info. 
01069 //   if found Run and Snarl info return 2, 
01070 //   if only Run info return 1,
01071 //   else return 0
01072 //======================================================================
01073   const MomNavigator* mom = this->GetMom();
01074   assert(mom);
01075 
01076   // BeginFile/EndFile boundaries may not be in synch across the different
01077   // data streams.  The definition used here is to set file boundary true if 
01078   // the file has changed for any of the managed streams.
01079 
01080   const TObjArray* momarray = mom->GetFragmentArray();
01081   for (int i=0; i < momarray->GetEntriesFast(); ++i) {
01082     TObject* obj = momarray->At(i);
01083     if (!obj) continue;
01084     Registry* temptags = 0;
01085     if ( RecMinos* record = dynamic_cast<RecMinos*>(obj) ) {
01086       temptags = &(record->GetTempTags());
01087     }
01088     else if ( RecRecord* record = dynamic_cast<RecRecord*>(obj) ) {
01089       temptags = &(record->GetTempTags());
01090     }
01091     if ( ! temptags ) continue;
01092 
01093     // on a named stream?
01094     const char* tagstream = 0;
01095     if ( ! temptags->Get("stream",tagstream) ) continue;
01096 
01097     // stream managed by i/o?
01098     std::string streamname(tagstream);
01099     const char* tagnewfile = 0;
01100     if ( ! temptags->Get("file",tagnewfile) ) continue; 
01101 
01102     std::string lstfilename = fLastFileMap[streamname];
01103     std::string curfilename = fCurrentFileMap[streamname];
01104     std::string newfilename(tagnewfile);
01105 
01106     if ( newfilename != curfilename ) {
01107             
01108       std::string starcur  = fCurrentFileMap.begin()->first;
01109       std::string starlast = fLastFileMap.begin()->first;
01110       
01111       MSG("Io",Msg::kDebug) 
01112         << "SetBeginFile on streamname '" << streamname << "'" << endl
01113         << "   current      '" << fCurrentFileMap[streamname] << "'" << endl
01114         << "   last         '" << fLastFileMap[streamname] << "'" << endl
01115         << "   current['" << starcur << "'] '" 
01116         << fCurrentFileMap[starcur] << "'" << endl
01117         << "      last['" << starlast << "'] '" 
01118         << fLastFileMap[starlast] << "'" << endl
01119         << "   new '" << newfilename << "' != "
01120         << " cur '" << curfilename << "'" << endl
01121         << "   update \"*\" ? " 
01122         << (( newfilename != fCurrentFileMap["*"] ) ? "yes":"no")
01123         << endl;
01124       
01125       // update "*" stream first
01126       if ( newfilename != fCurrentFileMap["*"] ) {
01127         fStatus.SetBeginFile();
01128         //if ( lstfilename != "" ) fStatus.SetEndFile();
01129         if ( fCurrentFileMap["*"] != "" ) fStatus.SetEndFile();
01130         
01131         fLastFileMap["*"]           = fCurrentFileMap["*"];
01132         fCurrentFileMap["*"]        = newfilename;
01133 
01134         MSG("Io",Msg::kDebug) 
01135           << "SetBeginFile on '*'" << endl
01136           << "   current['" << starcur << "'] '" 
01137           << fCurrentFileMap[starcur] << "'" << endl
01138           << "      last['" << starlast << "'] '" 
01139           << fLastFileMap[starlast] << "'" << endl;
01140       }
01141       // update this named stream
01142       fLastFileMap[streamname]    = curfilename;
01143       fCurrentFileMap[streamname] = newfilename;
01144       
01145       // if a stream on a file moved on then presumably all the
01146       // other streams on the same file have also been exhausted
01147       // and are going to move on -- help them along so we don't
01148       // have to wait for that stream to be the next record on
01149       // that stream is the next VldContext
01150       mapStrStrItr_t it, done = fCurrentFileMap.end();
01151       for (it = fCurrentFileMap.begin(); it != done; ++it) {
01152         if ( it->second == curfilename ) {
01153           std::string altstream = it->first;
01154           fLastFileMap[altstream]    = curfilename;
01155           fCurrentFileMap[altstream] = newfilename;
01156         }
01157       }
01158       
01159     } // new != cur filename
01160 
01161     MSG("Io",Msg::kVerbose) 
01162       << " stream '" << streamname << "' set fLastFileMap to '"
01163       << curfilename << "', fCurrentFileMap to '"
01164       << newfilename << "'" 
01165       << " * '" << fLastFileMap["*"] << "'  '" << fCurrentFileMap["*"] << "'"
01166       << endl;
01167         
01168   } // loop over records
01169 
01170   // update for EOF/EOJ condition (which doesn't come throught this
01171   // function) if it isn't a file change
01172   // this won't work is the file has just one record
01173   // ...the whole procedure is fundamentally flawed -- it shouldn't be
01174   // based on what we find in "mom" but rather the stream/file 
01175   // management classes...
01176   if ( ! fStatus.BeginFile() ) fLastFileMap["*"] = fCurrentFileMap["*"];
01177 
01178   // BeginRun/EndRun
01179   int run   = -1;  // default and flag value
01180   int snarl = -1;  // default and flag value
01181   for (int i=0; i < momarray->GetEntriesFast(); ++i) {
01182     TObject* obj = momarray->At(i);
01183     if (!obj) continue;
01184     if ( RecMinos* record = dynamic_cast<RecMinos*>(obj) ) {
01185       // old style
01186       
01187         // all DAQ generated records can supply run #
01188       const RawDaqHeader* rdh 
01189              = dynamic_cast<const RawDaqHeader*>(record->GetHeader());
01190       if (rdh) run = rdh->GetRun();
01191 
01192       // but only DaqSnarl records can supply snarl #
01193       const RawDaqSnarlHeader* rdsh 
01194              = dynamic_cast<const RawDaqSnarlHeader*>(record->GetHeader());
01195       if (rdsh) snarl = rdsh->GetSnarl();
01196 
01197       if (!rdh) {
01198         // not a DAQ record, perhaps it's a CandRecord
01199         const CandHeader* candhdr
01200           = dynamic_cast<const CandHeader*>(record->GetHeader());
01201         if (candhdr) {
01202           run   = candhdr->GetRun();
01203           snarl = candhdr->GetSnarl();
01204         }
01205       }
01206     }
01207     else if ( RecRecord* record = dynamic_cast<RecRecord*>(obj) ) {
01208       // New style
01209       const RecPhysicsHeader* rph 
01210           = dynamic_cast<const RecPhysicsHeader*>(&(record->GetHeader()));
01211       if ( rph ) {
01212         run   = rph->GetRun();
01213         snarl = rph->GetSnarl();
01214       }
01215     }
01216     // break early only if determined snarl in case it one of those
01217     // crazy record sets with a DaqMonitor and a DaqSnarl record.
01218     if ( snarl >= 0 ) break;
01219   }
01220 
01221   // set the status flags based on what was extracted
01222   fCurrentSnarl = snarl;
01223   if ( run < 0 ) {
01224     fCurrentRun = -1;
01225     return 0;
01226   }
01227   if ( run != fCurrentRun ) {
01228     fStatus.SetBeginRun();
01229     if ( fLastRun >= 0 ) fStatus.SetEndRun();
01230   }
01231   fLastRun      = fCurrentRun;
01232   fCurrentRun   = run;
01233   if ( snarl >= 0 ) {
01234     fLastSnarl    = fCurrentSnarl;
01235     fCurrentSnarl = snarl;
01236     return 2;
01237   }
01238   return 1;
01239 }
01240 
01241 //......................................................................
01242 
01243 void IoInputModule::UpdateDDSConfig() {
01244 //======================================================================
01245 // Update the dispatcher configuration
01246 //======================================================================
01247   if ( fDataStreamItr == 0 ) return;
01248 
01249   IoDDSStreamItr* ddsItr = dynamic_cast<IoDDSStreamItr*>(fDataStreamItr);
01250   if ( ! ddsItr ) return;
01251 
01252   ddsItr->SetTimeOut(fTimeOut);  
01253 
01254   // Need to reinitialize dispatcher if server hostname, port, clienttype
01255   // or clientname have changed
01256   bool reinit = (fServer != ddsItr->GetSourceName() 
01257                 || fPort != ddsItr->GetPort()
01258                 || fClientType != ddsItr->GetClientType() 
01259                 || fClientName != ddsItr->GetClientName() );
01260   
01261   if ( reinit ) {
01262     this -> CloseStreamItr();  // wait for next action to reopen
01263   }
01264   else {
01265     ddsItr -> SetKeepUpMode(fKeepUpMode);
01266     ddsItr -> SetMaxSyncDelay(fMaxSyncDelay);
01267     ddsItr -> SetDataSource(fDataSource);
01268     ddsItr -> SetOffLine(fOffLine);
01269   }
01270 
01271 }
01272 
01273 
01274 //......................................................................
01275 
01276 void IoInputModule::UpdateFormatConfig() {
01277 //======================================================================
01278 // Update the stream itr to match requested format
01279 //======================================================================
01280 
01281   if ( fDataStreamItr == 0 ) return;
01282 
01283   bool reopen = ( fFormat != fDataStreamItr->GetFormat() );
01284   if ( reopen ) {
01285     this -> CloseStreamItr();  // wait for next action to reopen
01286   }
01287 
01288   return;
01289 
01290 }
01291 
01292 //......................................................................
01293 
01294 void IoInputModule::UpdateFileList() 
01295 {
01296 //======================================================================
01297 // Update the file list 
01298 //======================================================================
01299 
01300   if ( fDataStreamItr == 0 ) return;
01301 
01302   // Fresh start. This should typically only be called when data stream 
01303   // iterator is newly opened (i.e. when the format changes)
01304   fDataStreamItr -> RemoveFile("*");
01305 
01306   std::list<IoFileListItem>::iterator itr = fFileList.begin();
01307   for ( ; itr != fFileList.end(); itr++ ) {
01308     IoFileListItem& iofile = *itr;
01309     const IoFileListItem::FileStreamMap& filemap = iofile.GetFileStreamMap();
01310     int at = iofile.GetAt();
01311   
01312     if ( at < 0 ) {
01313       IoFileListItem::FileStreamMapConstItr itr = filemap.begin();
01314       for ( ; itr != filemap.end(); itr++ ) {
01315         std::string filename = itr -> first;
01316         std::string streamlist = itr -> second;
01317         fDataStreamItr -> AddFile(filename.c_str(),at,streamlist.c_str());
01318       }
01319     }
01320     else {
01321       // Apply files in reverse to have first file in list inserted at pos At
01322       IoFileListItem::FileStreamMap::const_reverse_iterator itr 
01323                                                          = filemap.rbegin();
01324       for ( ; itr != filemap.rend(); itr++ ) {
01325         std::string filename = itr -> first;
01326         std::string streamlist = itr -> second;
01327         fDataStreamItr -> AddFile(filename.c_str(),at,streamlist.c_str());
01328       }      
01329     }
01330   }
01331 
01332   return;
01333   
01334 }
01335 
01336 //......................................................................
01337 
01338 void IoInputModule::UpdateStreamConfig()
01339 {
01340 //======================================================================
01341 // Set the stream and selection cuts for the open streams
01342 //======================================================================
01343   if (fDataStreamItr==0) return;
01344 
01345   // Define streams
01346   mapStrStrItr_t itr;
01347   for ( itr = fStreamDefMap.begin(); itr != fStreamDefMap.end(); itr++ ) {
01348     fDataStreamItr->DefineStream((itr->first).c_str(),  // Stream
01349                                  (itr->second).c_str());  // Definition
01350   }
01351 
01352   // Set the streams to be activated
01353   fDataStreamItr->Streams(fStreamList.c_str());
01354   // Set the selection cuts for each stream
01355   for (itr=fStreamSelectionMap.begin();itr!=fStreamSelectionMap.end();++itr) {
01356     std::map<std::string,bool>::const_iterator reqitr 
01357                            = fStreamRequiredMap.find(itr->first);
01358     bool isrequired = false;
01359     if ( reqitr != fStreamRequiredMap.end() ) isrequired = reqitr -> second;
01360     fDataStreamItr->Select((itr->first).c_str(),   // Stream
01361                            (itr->second).c_str(), // Selection
01362                            isrequired);  // IsRequired
01363   }
01364 
01365   // Set the test mode for each stream
01366   std::map<std::string,bool>::const_iterator testitr;
01367   for ( testitr=fStreamTestModeMap.begin(); testitr !=fStreamTestModeMap.end();
01368                                           ++testitr ) {
01369     fDataStreamItr->SetTestMode((testitr->first).c_str(),   // Stream
01370                                 testitr->second);  // TestMode
01371   }
01372 
01373   // Set the sequence mode for each stream
01374   bool setRandom = false;
01375   std::map<std::string,Per::ESequenceMode>::const_iterator seqitr;
01376   for ( seqitr=fStreamSeqModeMap.begin(); seqitr!=fStreamSeqModeMap.end();
01377                                         ++seqitr ) {
01378     fDataStreamItr->SetSequenceMode((seqitr->first).c_str(),   // Stream
01379                                      seqitr->second); // Sequence Mode
01380     pair<double,double> window = fStreamWindowMap[seqitr->first];
01381     fDataStreamItr->SetWindow((seqitr->first).c_str(),   // Stream
01382                               window.first,window.second);
01383     if (seqitr->second == Per::kSequential ||
01384         seqitr->second == Per::kRandom      ) {
01385       if (!setRandom) {
01386         setRandom = true;
01387         fDataStreamItr->SetRandomSeed(fRandomSeed);
01388       }
01389       int    repeat = fStreamMaxRepeatMap[seqitr->first];
01390       fDataStreamItr->SetMaxFileRepeat( (seqitr->first).c_str(), repeat );
01391       double mean = fStreamMeanMap[seqitr->first];
01392       fDataStreamItr->SetMeanMom( (seqitr->first).c_str(), mean );
01393       bool   pushRand = fStreamPushRandomMap[seqitr->first];
01394       fDataStreamItr->SetPushRandom( (seqitr->first).c_str(), pushRand );
01395     } // end if kSeq or kRand
01396   }
01397 }
01398 
01399 //......................................................................
01400 
01401 int IoInputModule::OpenStreamItr()
01402 {
01403 //======================================================================
01404 // Open a new stream iterator
01405 //======================================================================
01406   if (fDataStreamItr) this->CloseStreamItr();
01407 
01408   bool isDDS = (UtilString::ToUpper(fFormat) == "DDS");
01409   std::string src;
01410   if ( isDDS ) src = fServer;
01411   
01412   fDataStreamItr = IoDataStreamFactory::CreateDataStreamItr(src.c_str(),
01413                      fFormat.c_str(),fPort,fMaxRetry,fRetryDelay,
01414                      fClientType,fClientName);
01415 
01416 
01417   if (fDataStreamItr == 0) {
01418     MSG("Io",Msg::kWarning) << "Failed to open stream '" << src << "'" <<
01419       " using format '" << fFormat << "'" << endl; 
01420     fStatus.SetEndRun();
01421     fStatus.SetEndFile();
01422     fStatus.SetEndOfInputStream();
01423     return 0;
01424   }
01425 
01426   fStatus.SetBeginOfInputStream();
01427   fStatus.SetBeginFile();
01428   fStatus.SetBeginRun();
01429 
01430   // Configure the file and module
01431   this->UpdateStreamConfig(); // this should be called before filelist
01432   this->UpdateFileList();
01433   this->UpdateDDSConfig();
01434   fFormat = fDataStreamItr->GetFormat();
01435   
01436   MSG("Io",Msg::kDebug) 
01437     << "Opened stream itr of format " << fDataStreamItr->GetFormat() << endl;
01438 
01439   return 1;
01440 }
01441 
01442 //......................................................................
01443 
01444 void IoInputModule::CloseStreamItr() 
01445 {
01446 //======================================================================
01447 // Close the currently openned stream
01448 //======================================================================
01449   if (fDataStreamItr) {
01450     MSG("Io",Msg::kDebug) 
01451       << "Close stream itr of format " << fDataStreamItr->GetFormat() << endl;
01452     delete fDataStreamItr;
01453     fDataStreamItr = 0;
01454     fStatus.SetEndRun();
01455     fStatus.SetEndFile();
01456     fStatus.SetEndOfInputStream();
01457   }
01458 }
01459 
01461 
01462 
01463 
01464 
01465 
01466 
01467 
01468 
01469 
01470 
01471 
01472 
01473 
01474 

Generated on Fri Mar 28 15:33:16 2008 for loon by  doxygen 1.3.9.1