00001
00002
00003
00004
00005
00006
00008 #include "TSystem.h"
00009 #include "TRegexp.h"
00010
00011 #include "Dispatcher/DDS.h"
00012 #include "IoModules/IoInputModule.h"
00013 #include <cassert>
00014 #include "MessageService/MsgService.h"
00015 #include "MinosObjectMap/MomNavigator.h"
00016 #include "IoModules/IoDataStreamItr.h"
00017 #include "IoModules/IoDDSStreamItr.h"
00018 #include "IoModules/IoDataStreamFactory.h"
00019 #include "JobControl/JobCInputModule.h"
00020 #include "JobControl/JobCModuleRegistry.h"
00021 #include "JobControl/JobCEnv.h"
00022 #include "RawData/RawRecord.h"
00023 #include "RawData/RawDaqSnarlHeader.h"
00024 #include "CandData/CandHeader.h"
00025 #include "Record/RecRecord.h"
00026 #include "Record/RecPhysicsHeader.h"
00027 #include "Registry/Registry.h"
00028 #include "Validity/VldContext.h"
00029 #include "Validity/VldTimeStamp.h"
00030 #include "Util/UtilString.h"
00031 #include "TSystem.h"
00032
00033 #include <algorithm>
00034 #include <cstring>
00035 #include <string>
00036 #include <map>
00037
00038 #ifdef SITE_HAS_SAM
00039 #include "sam_cpp_api/SamConsumer.hpp"
00040 #endif
00041
00042 CVSID("$Id: IoInputModule.cxx,v 1.83 2007/09/21 19:49:05 buckley Exp $");
00043 JOBMODULE(IoInputModule,"INPUT","Read and configure input streams");
00044
00045 typedef std::map<std::string,std::string>::const_iterator mapStrStrItr_t;
00046
00047
00048 static const JobCResult gsAllClear = JobCResult::kAOK;
00049
00050
00051
00052
00053
00054 class CallDepth {
00055 public:
00056 CallDepth() { ++fsDepth; }
00057 ~CallDepth() { --fsDepth; }
00058 static int fsDepth;
00059 };
00060 int CallDepth::fsDepth = 0;
00061
00062
00063
00064 IoInputModule::IoInputModule() :
00065 fDataStreamItr(0),
00066 fFormat(""),
00067 fStreamList(""),
00068 fServer(""),
00069 fPort(0),
00070 fTimeOut(0),
00071 fDataSource(0),
00072 fKeepUpMode(0),
00073 fMaxSyncDelay(0),
00074 fOffLine(false),
00075 fMaxRetry(0),
00076 fRetryDelay(1),
00077 fClientType(DDS::kUnknownClientType),
00078 fClientName(""),
00079 fStatus(JobCResult::kAOK),
00080 fLastRun(-1),
00081 fLastSnarl(-1),
00082 fCurrentRun(-1),
00083 fCurrentSnarl(-1),
00084 fLoadedCommandLineFiles(false)
00085 #ifdef SITE_HAS_SAM
00086 ,fsamProject(0)
00087 #endif
00088 { fStopwatch.Reset(); fStopwatch.Stop(); }
00089
00090
00091
00092 IoInputModule::~IoInputModule()
00093 {
00094 if ( fDataStreamItr ) { delete fDataStreamItr; fDataStreamItr = 0; }
00095 }
00096
00097
00098
00099 void IoInputModule::BeginJob()
00100 {
00101 this->LoadFilesFromCommandLine();
00102
00103 }
00104
00105
00106
00107
00108 void IoInputModule::EndJob()
00109 {
00110 fStopwatch.Stop();
00111 MSG("Io",Msg::kDebug) << "IoInputModule::EndJob, Time(sec), Real "
00112 << fStopwatch.RealTime() << ", CPU "
00113 << fStopwatch.CpuTime() << endl;
00114
00115 }
00116
00117
00118
00119 const Registry& IoInputModule::DefaultConfig() const
00120 {
00121
00122
00123
00124 static Registry r;
00125 r.SetName("INPUT.config");
00126
00127 r.UnLockValues();
00128
00129 MSG("Io",Msg::kDebug) << "Loading default config\n";
00130
00131
00132 r.Set("Format" ,"input");
00133 r.Set("Streams","DaqMonitor,DaqSnarl,LightInjection");
00134
00135
00136
00137 r.Set("DDSServer", "daqdds.minos-soudan.org");
00138 r.Set("DDSPort", DDS::kPort);
00139 r.Set("DDSTimeOut", 120);
00140 r.Set("DDSDataSource","Daq");
00141 r.Set("DDSKeepUpMode", "FileKeepUp");
00142 r.Set("DDSMaxSyncDelay",15);
00143 r.Set("DDSOffLine",false);
00144 r.Set("DDSMaxRetry",0);
00145 r.Set("DDSRetryDelay",1);
00146 r.Set("DDSClientType","Unknown");
00147 r.Set("DDSClientName","");
00148
00149 #ifdef SITE_HAS_SAM
00150
00151
00152 r.Set("Station","minos");
00153 r.Set("SnapShotVers",0);
00154 r.Set("WorkGroupName","minos");
00155 r.Set("ApplicationName","loon");
00156 r.Set("ApplicationVers","dev");
00157 r.Set("MaxNumberOfFiles",0);
00158 r.Set("StartNewProject",1);
00159
00160
00161
00162 const char* username = gSystem->Getenv("USER");
00163 if (!username) username = "unknown";
00164 r.Set("ProjectName",username);
00165
00166 #endif
00167
00168 r.LockValues();
00169 return r;
00170 }
00171
00172
00173
00174 void IoInputModule::Config(const Registry& r)
00175 {
00176
00177
00178
00179 const char* tmps;
00180 int tmpi;
00181 int tmpb;
00182
00183 MSG("Io",Msg::kDebug) << "Config IoInputModule with r=" << r << "\n";
00184
00185
00186 bool doFormatConfig = false;
00187 if (r.Get("Format", tmps)) { fFormat = tmps; doFormatConfig = true; }
00188 if (doFormatConfig) this->UpdateFormatConfig();
00189 bool doStreamConfig = false;
00190 if (r.Get("Streams",tmps)) { fStreamList = tmps; doStreamConfig = true; }
00191 if (doStreamConfig) this->UpdateStreamConfig();
00192
00193
00194 bool doDDSConfig = false;
00195 if (r.Get("DDSServer", tmps)) { fServer = tmps; doDDSConfig = true; }
00196 if (r.Get("DDSPort", tmpi)) { fPort = tmpi; doDDSConfig = true; }
00197 if (r.Get("DDSTimeOut",tmpi)) { fTimeOut = tmpi; doDDSConfig = true; }
00198 if (r.Get("DDSClientType",tmps)) { fClientType = DDS::GetClientType(tmps);
00199 doDDSConfig = true; }
00200 if (r.Get("DDSClientName",tmps)) { fClientName = tmps;
00201 doDDSConfig = true; }
00202 if (r.Get("DDSDataSource",tmps)){fDataSource = DDS::GetDataSourceCode(tmps);
00203 doDDSConfig = true;}
00204 if (r.Get("DDSKeepUpMode",tmps)) { fKeepUpMode = DDS::GetKeepUpCode(tmps);
00205 doDDSConfig = true; }
00206 if (r.Get("DDSMaxSyncDelay",tmpi)){fMaxSyncDelay = tmpi; doDDSConfig = true;}
00207 if (r.Get("DDSOffLine",tmpb)) {fOffLine = tmpb; doDDSConfig = true;}
00208 if (r.Get("DDSMaxRetry",tmpi)) {fMaxRetry = tmpi; doDDSConfig = true; }
00209 if (r.Get("DDSRetryDelay",tmpi)) {fRetryDelay = tmpi; doDDSConfig = true; }
00210 if (doDDSConfig) this->UpdateDDSConfig();
00211
00212
00213 }
00214
00215
00216
00217 JobCResult IoInputModule::Get()
00218 {
00219
00220
00221
00222
00223
00224 if ( fDataStreamItr==0 ) {
00225 if ( this->OpenStreamItr()==0 ) {
00226 MSG("Io",Msg::kWarning) << "Failed to open input stream itr." << endl;
00227 return fStatus;
00228 }
00229 return this->Get();
00230 }
00231
00232 MSG("Io",Msg::kVerbose) << "IoInputModule::Get " << endl;
00233
00234 fStopwatch.Start(false);
00235 MomNavigator* mom = this->GetMom();
00236 assert(mom);
00237 mom -> Clear();
00238
00239 int nrecord = fDataStreamItr->LoadRecords(mom);
00240 bool isDDS = (UtilString::ToUpper(fFormat) == "DDS");
00241
00242 if ( isDDS && !nrecord ) fStatus.SetEndOfInputStream();
00243
00244 this->ReadHeader();
00245 if ( fStatus.EndOfInputStream() )
00246 { fStatus.SetEndFile(); fStatus.SetEndRun(); }
00247 MSG("Io",Msg::kVerbose)
00248 << "IoInputModule::Get returning status " << fStatus << endl;
00249 fStopwatch.Stop();
00250 return fStatus;
00251 }
00252
00253
00254
00255 JobCResult IoInputModule::Next(int n)
00256 {
00257
00258
00259
00260
00261 CallDepth d;
00262
00263
00264 if (d.fsDepth==1) fStatus = gsAllClear;
00265
00266 MSG("Io",Msg::kVerbose) << "IoInputModule::Next " << n << endl;
00267
00268 if ( fDataStreamItr==0 ) {
00269 if (this->OpenStreamItr()==0) {
00270 MSG("Io",Msg::kWarning) << "Failed to open input stream itr." << endl;
00271 return fStatus;
00272 }
00273 return this->Next(n);
00274 }
00275
00276 fStopwatch.Start(false);
00277 MomNavigator* mom = this->GetMom();
00278 assert(mom);
00279 mom -> Clear();
00280
00281
00282
00283 int nstep = 0;
00284 int ndone = 0;
00285 int ntry = 0;
00286 while ( ndone < n ) {
00287 ntry = n - ndone;
00288 nstep = fDataStreamItr->Increment(ntry);
00289
00290 if ( nstep < ntry ) {
00291
00292 fStatus |= this->NextFile();
00293
00294
00295 if ( fStatus.EndOfInputStream() ) {
00296 fStopwatch.Stop();
00297 return this->Get();
00298 }
00299 }
00300 ndone += nstep;
00301 }
00302
00303
00304 fStopwatch.Stop();
00305 return this->Get();
00306 }
00307
00308
00309
00310 JobCResult IoInputModule::Prev(int n)
00311 {
00312
00313
00314
00315
00316 CallDepth d;
00317
00318
00319 if (d.fsDepth == 1) fStatus = gsAllClear;
00320
00321 MSG("Io",Msg::kVerbose) << "IoInputModule::Prev " << n << endl;
00322
00323 if (fDataStreamItr==0) {
00324 if (this->OpenStreamItr()==0) {
00325 MSG("Io",Msg::kWarning) << "Failed to open input stream itr." << endl;
00326 return fStatus;
00327 }
00328 return this->Prev(n);
00329 }
00330
00331
00332
00333 fStopwatch.Start(false);
00334 MomNavigator* mom = this->GetMom();
00335 assert(mom);
00336 mom -> Clear();
00337
00338 int nstep = 0;
00339 int ndone = 0;
00340 int ntry = 0;
00341 while (ndone < n) {
00342 ntry = n - ndone;
00343 nstep = fDataStreamItr->Decrement(ntry);
00344
00345 if (nstep < ntry) {
00346
00347 fStatus |= this->PrevFile();
00348
00349
00350 if ( fStatus.BeginOfInputStream() || fStatus.EndOfInputStream() ) {
00351 return this->Get();
00352 fStopwatch.Stop();
00353 }
00354
00355
00356
00357 fDataStreamItr->GoToEOF();
00358 }
00359 ndone += nstep;
00360 }
00361
00362
00363 fStopwatch.Stop();
00364 return this->Get();
00365
00366 }
00367
00368
00369
00370 JobCResult IoInputModule::GoTo(int run, int snarl, int searchDir)
00371 {
00372 CallDepth d;
00373
00374 if (d.fsDepth==1) fStatus = gsAllClear;
00375
00376 if (fDataStreamItr==0) {
00377 if (this->OpenStreamItr()==0) {
00378 MSG("Io",Msg::kWarning) << "Failed to open input stream itr." << endl;
00379 return fStatus;
00380 }
00381 return this->GoTo(run,snarl,searchDir);
00382 }
00383
00384 if ( run == fCurrentRun && snarl == fCurrentSnarl ) return this -> Get();
00385
00386 int dir = searchDir;
00387 if (dir==0) {
00388 if (run>fLastRun) { dir = 1; }
00389 else if (run<fLastRun || (run==fLastRun && fCurrentRun < 0)) { dir = -1; }
00390 else {
00391 if (snarl>fLastSnarl) { dir = 1; }
00392 else { dir = -1; }
00393 }
00394 }
00395
00396
00397 while ( 1 ) {
00398 if ( dir > 0 ) {
00399 this->Next();
00400 if (fCurrentRun>run) {
00401 MSG("Io",Msg::kWarning) <<
00402 "Went to run "<<fCurrentRun<<
00403 " without finding run="<<run<<" snarl="<<snarl<<"\n";
00404 return fStatus;
00405 }
00406 if (fCurrentRun==run && fCurrentSnarl>snarl) {
00407 MSG("Io",Msg::kWarning) <<
00408 "Went to run="<<fCurrentRun<<" snarl="<<fCurrentSnarl<<
00409 " without finding run="<<run<<" snarl="<<snarl<<"\n";
00410 return fStatus;
00411 }
00412 }
00413 if ( dir <0 ) {
00414 this->Prev();
00415 if (fCurrentRun<run) {
00416 MSG("Io",Msg::kWarning) <<
00417 "Went to run "<<fCurrentRun<<
00418 " without finding run="<<run<<" snarl="<<snarl<<"\n";
00419 return fStatus;
00420 }
00421 if (fCurrentRun==run && fCurrentSnarl<snarl) {
00422 MSG("Io",Msg::kWarning) <<
00423 "Went to run="<<fCurrentRun<<" snarl="<<fCurrentSnarl<<
00424 " without finding run="<<run<<" snarl="<<snarl<<"\n";
00425 return fStatus;
00426 }
00427 }
00428
00429 if (fCurrentRun == run && fCurrentSnarl == snarl) return fStatus;
00430 if (dir>0 && fStatus.EndOfInputStream()) return fStatus;
00431 if (dir<0 && fStatus.BeginOfInputStream()) return fStatus;
00432 }
00433 return fStatus;
00434 }
00435
00436
00437
00438 JobCResult IoInputModule::GoTo(const VldContext& vld)
00439 {
00440
00441
00442
00443
00444 CallDepth d;
00445
00446
00447 if (d.fsDepth==1) fStatus = gsAllClear;
00448
00449 if (fDataStreamItr==0) {
00450 if (this->OpenStreamItr()==0) {
00451 MSG("Io",Msg::kWarning) << "Failed to open input stream itr." << endl;
00452 return fStatus;
00453 }
00454 return this->GoTo(vld);
00455 }
00456
00457 fStatus |= fDataStreamItr -> GoTo(vld);
00458
00459
00460 return this->Get();
00461
00462 }
00463
00464
00465
00466 void IoInputModule::List(const char* streamlist) const
00467 {
00468
00469
00470
00471
00472 MsgStream& m = MSGSTREAM("Io",Msg::kInfo);
00473
00474 m << "IoInputModule using data format " << fFormat << endl;
00475
00476 if ( fDataStreamItr ) {
00477 fDataStreamItr -> ListFile(std::cout,streamlist);
00478 return;
00479 }
00480
00481
00482 m << "File Name\tStream List " << endl;
00483 m << "=========\t=========== " << endl;
00484 std::list<IoFileListItem>::const_iterator itr = fFileList.begin();
00485 std::list<IoFileListItem>::const_iterator itrEnd = fFileList.end();
00486
00487 for ( ; itr != itrEnd; itr++ ) {
00488 m << *itr;
00489 }
00490
00491 }
00492
00493
00494
00495 void IoInputModule::AddFile(const char *filepath, const char* streamlist,
00496 int at) {
00497
00498
00499
00500
00501
00502
00503
00504
00505
00506 const char *s1 = "SAM:";
00507 if ( strstr(filepath,s1) != NULL ) {
00508
00509 #ifdef SITE_HAS_SAM
00510
00511
00512
00513 const char* tmps;
00514 int tmpi;
00515
00516 Registry& r = GetConfig();
00517 if (r.Get("Station",tmps)) {fStation = tmps;}
00518 if (r.Get("SnapShotVers",tmpi)){fSnapShotVers = tmpi;}
00519 if (r.Get("WorkGroupName",tmps)) {fWorkGroupName = tmps;}
00520 if (r.Get("ApplicationName",tmps)) {fApplicationName = tmps;}
00521 if (r.Get("ApplicationVers",tmps)) {fApplicationVers = tmps;}
00522 if (r.Get("ProjectName",tmps)) {fProjectName = tmps;}
00523 if (r.Get("MaxNumberOfFiles",tmpi)) {fMaxNumberOfFiles = tmpi;}
00524 if (r.Get("StartNewProject",tmpi)) {fStartNewProject = tmpi;}
00525
00526 std::string projectname;
00527
00528
00529 if (fStartNewProject == 1) {
00530
00531
00532 VldTimeStamp ts;
00533 std::string timestamp = ts.AsString("lc");
00534
00535 size_t pos = timestamp.find(" ");
00536 timestamp.replace(pos,1,"-");
00537
00538 pos = timestamp.find(":");
00539 while ( pos != string::npos ) {
00540 timestamp.replace(pos,1,"-");
00541 pos = timestamp.find(":",pos+1);
00542 }
00543
00544 fProjectName.append("-");
00545 projectname = fProjectName+timestamp;
00546 }
00547
00548 else if(fStartNewProject == 0) {
00549 projectname = fProjectName;
00550 }
00551
00552
00553 MSG("Io",Msg::kDebug) << "Sam Station " << fStation << " Snap Shot " <<
00554 fSnapShotVers << " Work Group Name " << fWorkGroupName
00555 << " Application Name "
00556 << fApplicationName << " Application Version " << fApplicationVers <<
00557 " Project Name " << projectname << endl;
00558
00559
00560
00561 std::string temp = filepath;
00562 size_t pos = temp.find(":")+1;
00563 std::string samdataset = temp.substr(pos,temp.length()-pos);
00564
00565
00566
00567 long snapshot;
00568
00569 if ( fSnapShotVers == 0 ) {
00570 snapshot = sam::SamProject::NewSnapshotVersion;
00571 }
00572
00573 else if (fSnapShotVers < 0) {
00574 snapshot = sam::SamProject::LatestSnapshotVersion;
00575 }
00576 else if (fSnapShotVers > 0 ) {
00577
00578 snapshot = fSnapShotVers;
00579 }
00580
00581 MSG("Io",Msg::kDebug) << "SnapShot Version " << snapshot << endl;
00582
00583 if (fStartNewProject == 1) {
00584
00585
00586
00587 fsamProject = new sam::SamProject(projectname,fStation);
00588
00589
00590
00591 std::list<std::string> projectMasterArgList;
00592
00593 try {
00594 MSG("Io",Msg::kInfo) << "Starting SAM Project " << projectname <<
00595 " on station " << fStation << endl;
00596 fsamProject->startProject(fWorkGroupName,samdataset,snapshot,
00597 projectMasterArgList);
00598 }
00599 catch(const sam::SamProject::StartProjectRequestRejected& ex) {
00600 MSG("Io",Msg::kInfo) << "Rejected start SAM project request "
00601 << ex << endl;
00602 }
00603 }
00604
00605
00606
00607 const int projectMasterTimeout(60);
00608 const std::string processDescription("Loon Analysis Process");
00609
00610 try{
00611
00612 MSG("Io",Msg::kInfo) << "Starting SAM Consumer" << endl;
00613
00614 sam::SamConsumer fsamConsumer(projectname,fStation,fWorkGroupName,
00615 fApplicationName,fApplicationVers,
00616 processDescription,
00617 fMaxNumberOfFiles,
00618 projectMasterTimeout);
00619
00620
00621
00622
00623 std::map<std::string,std::string> filelist;
00624 map<std::string,std::string>::iterator fitr;
00625
00626 int location;
00627 int length;
00628 int comp;
00629 std::string fileonly;
00630 std::string restOfPath;
00631 std::string afsroot("afsroot:");
00632 try {
00633 while(true) {
00634 std::string filename = fsamConsumer.getFile().getFullFileName();
00635
00636
00637
00638
00639
00640 MSG("Io",Msg::kDebug) << "Filename " << filename << endl;
00641 location = filename.find_last_of("/");
00642 length = filename.length();
00643 fileonly = filename.substr(location+1,length-1);
00644
00645
00646 comp = filename.compare(0,8,afsroot);
00647 if (comp == 0 ) {
00648 restOfPath = filename.substr(8,location-8);
00649 }
00650 else {
00651 restOfPath = filename.substr(0,location);
00652 }
00653
00654 restOfPath.append("/");
00655 filelist.insert(make_pair(fileonly,restOfPath));
00656
00657 MSG("Io",Msg::kDebug) << "File Only " << fileonly << " Rest Of Path "
00658 << restOfPath << endl;
00659
00660
00661
00662 fsamConsumer.releaseFile();
00663 }
00664 }
00665 catch(const sam::SamConsumer::EndOfFileStreamReached& ex) {
00666 MSG("Io",Msg::kDebug) << "End of File Stream reached" << endl;
00667 }
00668
00669
00670
00671 std::string sfile;
00672 const char *samfile = 0;
00673 for (fitr = filelist.begin(); fitr != filelist.end(); fitr++) {
00674 sfile = (fitr->second+fitr->first);
00675 samfile = sfile.data();
00676 MSG("Io",Msg::kInfo) << "Adding File " << samfile << endl;
00677
00678 IoFileListItem iofile(samfile,at,streamlist);
00679 fFileList.push_back(iofile);
00680 }
00681 }
00682 catch(const sam::SamConsumer::InitializationError& ex) {
00683 MSG("Io",Msg::kInfo) << "Rejected start SAM Consumer request "
00684 << ex << endl;
00685 }
00686
00687
00688 if (fsamProject) {
00689 try {
00690 MSG("Io",Msg::kInfo) << "Requesting end of SAM project " << endl;
00691 fsamProject->endProject();
00692 }
00693 catch(const sam::SamProject::EndProjectRequestRejected& ex) {
00694 MSG("Io",Msg::kInfo) << "SAM Project end request rejected "<< ex << endl;
00695
00696 }
00697 catch(const sam::SamProject::EndProjectRequestFailed& ex) {
00698 MSG("Io",Msg::kInfo) << "SAM Project end request failed "<< ex << endl;
00699 }
00700 }
00701
00702
00703 #endif // End of ifdef SITE_HAS_SAM
00704
00705 }
00706 else {
00707
00708
00709 IoFileListItem iofile(filepath,at,streamlist);
00710 fFileList.push_back(iofile);
00711
00712 if ( !fDataStreamItr ) return;
00713
00714
00715 const IoFileListItem::FileStreamMap& filemap = iofile.GetFileStreamMap();
00716
00717 if ( at < 0 ) {
00718 IoFileListItem::FileStreamMapConstItr itr = filemap.begin();
00719 for ( ; itr != filemap.end(); itr++ ) {
00720 std::string filename = itr -> first;
00721 std::string streamlist = itr -> second;
00722 fDataStreamItr -> AddFile(filename.c_str(),at,streamlist.c_str());
00723 }
00724 }
00725 else {
00726
00727 IoFileListItem::FileStreamMap::const_reverse_iterator itr=filemap.rbegin();
00728 for ( ; itr != filemap.rend(); itr++ ) {
00729 std::string filename = itr -> first;
00730 std::string streamlist = itr -> second;
00731 fDataStreamItr -> AddFile(filename.c_str(),at,streamlist.c_str());
00732 }
00733 }
00734 }
00735 }
00736
00737
00738
00739 void IoInputModule::RemoveFile(const char* filename, const char* streamlist) {
00740
00741
00742
00743
00744 if ( fDataStreamItr ) fDataStreamItr -> RemoveFile(filename,streamlist);
00745
00746 std::string f(filename);
00747 std::list<IoFileListItem>::iterator itr = fFileList.end();
00748 while ( !fFileList.empty() && itr != fFileList.begin() ) {
00749 itr--;
00750 IoFileListItem& iofile = *itr;
00751 iofile.RemoveFile(filename,streamlist);
00752 if ( iofile.GetNumFile() == 0 ) fFileList.erase(itr);
00753 }
00754
00755 return;
00756
00757 }
00758
00759
00760
00761 JobCResult IoInputModule::NextFile(int n, const char* streamlist)
00762 {
00763
00764
00765
00766 CallDepth d;
00767
00768 if (d.fsDepth==1) fStatus = gsAllClear;
00769
00770 if (fDataStreamItr==0) {
00771 if (this->OpenStreamItr()==0) {
00772 MSG("Io",Msg::kWarning) << "Failed to open input stream itr." << endl;
00773 return fStatus;
00774 }
00775 return this->NextFile(n,streamlist);
00776 }
00777
00778 fStatus |= fDataStreamItr -> NextFile(n,streamlist);
00779
00780 MSG("Io",Msg::kDebug)
00781 << "status is " << fStatus
00782 << " current file is " << fDataStreamItr->GetCurrentFile() << endl;
00783
00784 return fStatus;
00785
00786 }
00787
00788
00789
00790 JobCResult IoInputModule::PrevFile(int n, const char* streamlist)
00791 {
00792
00793
00794
00795 CallDepth d;
00796
00797 if (d.fsDepth==1) fStatus = gsAllClear;
00798
00799 if (fDataStreamItr==0) {
00800 if (this->OpenStreamItr()==0) {
00801 MSG("Io",Msg::kWarning) << "Failed to open input stream itr." << endl;
00802 return fStatus;
00803 }
00804 return this->PrevFile(n,streamlist);
00805 }
00806
00807 fStatus |= fDataStreamItr -> PrevFile(n,streamlist);
00808
00809 return fStatus;
00810
00811 }
00812
00813
00814
00815 JobCResult IoInputModule::GoToFile(int n, const char* streamlist)
00816 {
00817
00818
00819
00820 CallDepth d;
00821
00822 if (d.fsDepth==1) fStatus = gsAllClear;
00823
00824 if (fDataStreamItr==0) {
00825 if (this->OpenStreamItr()==0) {
00826 MSG("Io",Msg::kWarning) << "Failed to open input stream itr." << endl;
00827 return fStatus;
00828 }
00829 return this->GoToFile(n,streamlist);
00830 }
00831
00832 fStatus |= fDataStreamItr -> GoToFile(n,streamlist);
00833
00834 return fStatus;
00835
00836 }
00837
00838
00839
00840 JobCResult IoInputModule::GoToFile(const char* filename, const char*streamlist){
00841
00842
00843
00844 CallDepth d;
00845
00846 if (d.fsDepth==1) fStatus = gsAllClear;
00847
00848 if (fDataStreamItr==0) {
00849 if (this->OpenStreamItr()==0) {
00850 MSG("Io",Msg::kWarning) << "Failed to open input stream itr." << endl;
00851 return fStatus;
00852 }
00853 return this->GoToFile(filename,streamlist);
00854 }
00855
00856 fStatus |= fDataStreamItr -> GoToFile(filename,streamlist);
00857
00858 return fStatus;
00859 }
00860
00861
00862
00863 void IoInputModule::Select(const char* stream, const char* select,
00864 bool isRequired)
00865 {
00866
00867
00868
00869
00870 fStreamSelectionMap[stream] = select;
00871 fStreamRequiredMap[stream] = isRequired;
00872
00873
00874 if (fDataStreamItr) {
00875 fDataStreamItr->Select(stream, select,isRequired);
00876 }
00877
00878 }
00879
00880
00881
00882 void IoInputModule::DefineStream(const char* stream, const char* tree) {
00883
00884
00885
00886
00887 fStreamDefMap[stream] = tree;
00888
00889
00890 if ( fDataStreamItr ) {
00891 fDataStreamItr->DefineStream(stream, tree);
00892 }
00893
00894 }
00895
00896
00897
00898 void IoInputModule::SetSequenceMode(const char* stream,
00899 Per::ESequenceMode sequenceMode) {
00900
00901
00902
00903
00904 fStreamSeqModeMap[stream] = sequenceMode;
00905
00906
00907 if ( fDataStreamItr ) {
00908 fDataStreamItr->SetSequenceMode(stream, sequenceMode);
00909 }
00910
00911 }
00912
00913
00914
00915 void IoInputModule::SetTestMode(const char* stream,
00916 bool testMode) {
00917
00918
00919
00920
00921 fStreamTestModeMap[stream] = testMode;
00922
00923
00924 if ( fDataStreamItr ) {
00925 fDataStreamItr->SetTestMode(stream, testMode);
00926 }
00927
00928 }
00929
00930
00931
00932 void IoInputModule::SetWindow(const char* stream, double lower, double upper)
00933 {
00934
00935
00936
00937 fStreamWindowMap[stream] = std::pair<double,double>(lower,upper);
00938
00939
00940 if ( fDataStreamItr ) {
00941 fDataStreamItr->SetWindow(stream, lower, upper);
00942 }
00943
00944 }
00945
00946
00947
00948 void IoInputModule::SetMaxFileRepeat(const char* stream, int numRepeat)
00949 {
00950
00951
00952
00953
00954 fStreamMaxRepeatMap[stream] = numRepeat;
00955
00956
00957 if ( fDataStreamItr ) fDataStreamItr->SetMaxFileRepeat(stream,numRepeat);
00958 }
00959
00960
00961
00962 void IoInputModule::SetMeanMom(const char* stream, double mean)
00963 {
00964
00965
00966
00967
00968 fStreamMeanMap[stream] = mean;
00969
00970
00971 if ( fDataStreamItr ) fDataStreamItr->SetMeanMom(stream,mean);
00972 }
00973
00974
00975
00976 void IoInputModule::SetPushRandom(const char* stream, bool setRandom)
00977 {
00978
00979
00980
00981
00982 fStreamPushRandomMap[stream] = setRandom;
00983
00984
00985 if ( fDataStreamItr ) fDataStreamItr->SetPushRandom(stream,setRandom);
00986 }
00987
00988
00989
00990 void IoInputModule::SetRandomSeed(int rSeed)
00991 {
00992
00993
00994
00995
00996 fRandomSeed = rSeed;
00997
00998
00999 if ( fDataStreamItr ) fDataStreamItr->SetRandomSeed(rSeed);
01000
01001 }
01002
01003
01004
01005 const char* IoInputModule::GetCurrentFile(const char* streamname) const
01006 {
01007 MSG("Io",Msg::kDebug) << "IoInputModule::GetCurrentFile()" << endl;
01008 mapStrStrItr_t it, done=fCurrentFileMap.end();
01009 std::string strmstring = streamname;
01010
01011 for (it = fCurrentFileMap.begin(); it !=done; ++it) {
01012 MSG("Io",Msg::kVerbose)
01013 << "stream: " << setw(16) << it->first
01014 << " file: " << it->second
01015 <<endl;
01016 }
01017
01018 for (it = fCurrentFileMap.begin(); it!=done; ++it) {
01019 if ( strmstring == it->first ) return it->second.c_str();
01020 }
01021
01022 if (!fDataStreamItr) return 0;
01023 return fDataStreamItr->GetCurrentFile(streamname);
01024 }
01025
01026 const char* IoInputModule::GetLastFile(const char* streamname) const
01027 {
01028 MSG("Io",Msg::kInfo) << "IoInputModule::GetLastFile()" << endl;
01029 mapStrStrItr_t it, done = fLastFileMap.end();
01030 std::string strmstring = streamname;
01031
01032 for (it = fLastFileMap.begin(); it!=done; ++it) {
01033 MSG("Io",Msg::kVerbose)
01034 << "stream: " << setw(16) << it->first
01035 << " file: " << it->second
01036 <<endl;
01037 }
01038
01039 for (it = fLastFileMap.begin(); it!=done; ++it) {
01040 if ( strmstring == it->first ) return it->second.c_str();
01041 }
01042 return 0;
01043
01044 }
01045
01046
01047
01048 void IoInputModule::LoadFilesFromCommandLine()
01049 {
01050
01051
01052
01053 JobCEnv& jce = JobCEnv::Instance();
01054 if (!fLoadedCommandLineFiles) {
01055 for (int i=0; i<jce.GetNfile(); ++i) {
01056 this->AddFile(jce.GetFileName(i));
01057 }
01058 fLoadedCommandLineFiles = true;
01059 }
01060 }
01061
01062
01063
01064 int IoInputModule::ReadHeader()
01065 {
01066
01067
01068
01069
01070
01071
01072
01073 const MomNavigator* mom = this->GetMom();
01074 assert(mom);
01075
01076
01077
01078
01079
01080 const TObjArray* momarray = mom->GetFragmentArray();
01081 for (int i=0; i < momarray->GetEntriesFast(); ++i) {
01082 TObject* obj = momarray->At(i);
01083 if (!obj) continue;
01084 Registry* temptags = 0;
01085 if ( RecMinos* record = dynamic_cast<RecMinos*>(obj) ) {
01086 temptags = &(record->GetTempTags());
01087 }
01088 else if ( RecRecord* record = dynamic_cast<RecRecord*>(obj) ) {
01089 temptags = &(record->GetTempTags());
01090 }
01091 if ( ! temptags ) continue;
01092
01093
01094 const char* tagstream = 0;
01095 if ( ! temptags->Get("stream",tagstream) ) continue;
01096
01097
01098 std::string streamname(tagstream);
01099 const char* tagnewfile = 0;
01100 if ( ! temptags->Get("file",tagnewfile) ) continue;
01101
01102 std::string lstfilename = fLastFileMap[streamname];
01103 std::string curfilename = fCurrentFileMap[streamname];
01104 std::string newfilename(tagnewfile);
01105
01106 if ( newfilename != curfilename ) {
01107
01108 std::string starcur = fCurrentFileMap.begin()->first;
01109 std::string starlast = fLastFileMap.begin()->first;
01110
01111 MSG("Io",Msg::kDebug)
01112 << "SetBeginFile on streamname '" << streamname << "'" << endl
01113 << " current '" << fCurrentFileMap[streamname] << "'" << endl
01114 << " last '" << fLastFileMap[streamname] << "'" << endl
01115 << " current['" << starcur << "'] '"
01116 << fCurrentFileMap[starcur] << "'" << endl
01117 << " last['" << starlast << "'] '"
01118 << fLastFileMap[starlast] << "'" << endl
01119 << " new '" << newfilename << "' != "
01120 << " cur '" << curfilename << "'" << endl
01121 << " update \"*\" ? "
01122 << (( newfilename != fCurrentFileMap["*"] ) ? "yes":"no")
01123 << endl;
01124
01125
01126 if ( newfilename != fCurrentFileMap["*"] ) {
01127 fStatus.SetBeginFile();
01128
01129 if ( fCurrentFileMap["*"] != "" ) fStatus.SetEndFile();
01130
01131 fLastFileMap["*"] = fCurrentFileMap["*"];
01132 fCurrentFileMap["*"] = newfilename;
01133
01134 MSG("Io",Msg::kDebug)
01135 << "SetBeginFile on '*'" << endl
01136 << " current['" << starcur << "'] '"
01137 << fCurrentFileMap[starcur] << "'" << endl
01138 << " last['" << starlast << "'] '"
01139 << fLastFileMap[starlast] << "'" << endl;
01140 }
01141
01142 fLastFileMap[streamname] = curfilename;
01143 fCurrentFileMap[streamname] = newfilename;
01144
01145
01146
01147
01148
01149
01150 mapStrStrItr_t it, done = fCurrentFileMap.end();
01151 for (it = fCurrentFileMap.begin(); it != done; ++it) {
01152 if ( it->second == curfilename ) {
01153 std::string altstream = it->first;
01154 fLastFileMap[altstream] = curfilename;
01155 fCurrentFileMap[altstream] = newfilename;
01156 }
01157 }
01158
01159 }
01160
01161 MSG("Io",Msg::kVerbose)
01162 << " stream '" << streamname << "' set fLastFileMap to '"
01163 << curfilename << "', fCurrentFileMap to '"
01164 << newfilename << "'"
01165 << " * '" << fLastFileMap["*"] << "' '" << fCurrentFileMap["*"] << "'"
01166 << endl;
01167
01168 }
01169
01170
01171
01172
01173
01174
01175
01176 if ( ! fStatus.BeginFile() ) fLastFileMap["*"] = fCurrentFileMap["*"];
01177
01178
01179 int run = -1;
01180 int snarl = -1;
01181 for (int i=0; i < momarray->GetEntriesFast(); ++i) {
01182 TObject* obj = momarray->At(i);
01183 if (!obj) continue;
01184 if ( RecMinos* record = dynamic_cast<RecMinos*>(obj) ) {
01185
01186
01187
01188 const RawDaqHeader* rdh
01189 = dynamic_cast<const RawDaqHeader*>(record->GetHeader());
01190 if (rdh) run = rdh->GetRun();
01191
01192
01193 const RawDaqSnarlHeader* rdsh
01194 = dynamic_cast<const RawDaqSnarlHeader*>(record->GetHeader());
01195 if (rdsh) snarl = rdsh->GetSnarl();
01196
01197 if (!rdh) {
01198
01199 const CandHeader* candhdr
01200 = dynamic_cast<const CandHeader*>(record->GetHeader());
01201 if (candhdr) {
01202 run = candhdr->GetRun();
01203 snarl = candhdr->GetSnarl();
01204 }
01205 }
01206 }
01207 else if ( RecRecord* record = dynamic_cast<RecRecord*>(obj) ) {
01208
01209 const RecPhysicsHeader* rph
01210 = dynamic_cast<const RecPhysicsHeader*>(&(record->GetHeader()));
01211 if ( rph ) {
01212 run = rph->GetRun();
01213 snarl = rph->GetSnarl();
01214 }
01215 }
01216
01217
01218 if ( snarl >= 0 ) break;
01219 }
01220
01221
01222 fCurrentSnarl = snarl;
01223 if ( run < 0 ) {
01224 fCurrentRun = -1;
01225 return 0;
01226 }
01227 if ( run != fCurrentRun ) {
01228 fStatus.SetBeginRun();
01229 if ( fLastRun >= 0 ) fStatus.SetEndRun();
01230 }
01231 fLastRun = fCurrentRun;
01232 fCurrentRun = run;
01233 if ( snarl >= 0 ) {
01234 fLastSnarl = fCurrentSnarl;
01235 fCurrentSnarl = snarl;
01236 return 2;
01237 }
01238 return 1;
01239 }
01240
01241
01242
01243 void IoInputModule::UpdateDDSConfig() {
01244
01245
01246
01247 if ( fDataStreamItr == 0 ) return;
01248
01249 IoDDSStreamItr* ddsItr = dynamic_cast<IoDDSStreamItr*>(fDataStreamItr);
01250 if ( ! ddsItr ) return;
01251
01252 ddsItr->SetTimeOut(fTimeOut);
01253
01254
01255
01256 bool reinit = (fServer != ddsItr->GetSourceName()
01257 || fPort != ddsItr->GetPort()
01258 || fClientType != ddsItr->GetClientType()
01259 || fClientName != ddsItr->GetClientName() );
01260
01261 if ( reinit ) {
01262 this -> CloseStreamItr();
01263 }
01264 else {
01265 ddsItr -> SetKeepUpMode(fKeepUpMode);
01266 ddsItr -> SetMaxSyncDelay(fMaxSyncDelay);
01267 ddsItr -> SetDataSource(fDataSource);
01268 ddsItr -> SetOffLine(fOffLine);
01269 }
01270
01271 }
01272
01273
01274
01275
01276 void IoInputModule::UpdateFormatConfig() {
01277
01278
01279
01280
01281 if ( fDataStreamItr == 0 ) return;
01282
01283 bool reopen = ( fFormat != fDataStreamItr->GetFormat() );
01284 if ( reopen ) {
01285 this -> CloseStreamItr();
01286 }
01287
01288 return;
01289
01290 }
01291
01292
01293
01294 void IoInputModule::UpdateFileList()
01295 {
01296
01297
01298
01299
01300 if ( fDataStreamItr == 0 ) return;
01301
01302
01303
01304 fDataStreamItr -> RemoveFile("*");
01305
01306 std::list<IoFileListItem>::iterator itr = fFileList.begin();
01307 for ( ; itr != fFileList.end(); itr++ ) {
01308 IoFileListItem& iofile = *itr;
01309 const IoFileListItem::FileStreamMap& filemap = iofile.GetFileStreamMap();
01310 int at = iofile.GetAt();
01311
01312 if ( at < 0 ) {
01313 IoFileListItem::FileStreamMapConstItr itr = filemap.begin();
01314 for ( ; itr != filemap.end(); itr++ ) {
01315 std::string filename = itr -> first;
01316 std::string streamlist = itr -> second;
01317 fDataStreamItr -> AddFile(filename.c_str(),at,streamlist.c_str());
01318 }
01319 }
01320 else {
01321
01322 IoFileListItem::FileStreamMap::const_reverse_iterator itr
01323 = filemap.rbegin();
01324 for ( ; itr != filemap.rend(); itr++ ) {
01325 std::string filename = itr -> first;
01326 std::string streamlist = itr -> second;
01327 fDataStreamItr -> AddFile(filename.c_str(),at,streamlist.c_str());
01328 }
01329 }
01330 }
01331
01332 return;
01333
01334 }
01335
01336
01337
01338 void IoInputModule::UpdateStreamConfig()
01339 {
01340
01341
01342
01343 if (fDataStreamItr==0) return;
01344
01345
01346 mapStrStrItr_t itr;
01347 for ( itr = fStreamDefMap.begin(); itr != fStreamDefMap.end(); itr++ ) {
01348 fDataStreamItr->DefineStream((itr->first).c_str(),
01349 (itr->second).c_str());
01350 }
01351
01352
01353 fDataStreamItr->Streams(fStreamList.c_str());
01354
01355 for (itr=fStreamSelectionMap.begin();itr!=fStreamSelectionMap.end();++itr) {
01356 std::map<std::string,bool>::const_iterator reqitr
01357 = fStreamRequiredMap.find(itr->first);
01358 bool isrequired = false;
01359 if ( reqitr != fStreamRequiredMap.end() ) isrequired = reqitr -> second;
01360 fDataStreamItr->Select((itr->first).c_str(),
01361 (itr->second).c_str(),
01362 isrequired);
01363 }
01364
01365
01366 std::map<std::string,bool>::const_iterator testitr;
01367 for ( testitr=fStreamTestModeMap.begin(); testitr !=fStreamTestModeMap.end();
01368 ++testitr ) {
01369 fDataStreamItr->SetTestMode((testitr->first).c_str(),
01370 testitr->second);
01371 }
01372
01373
01374 bool setRandom = false;
01375 std::map<std::string,Per::ESequenceMode>::const_iterator seqitr;
01376 for ( seqitr=fStreamSeqModeMap.begin(); seqitr!=fStreamSeqModeMap.end();
01377 ++seqitr ) {
01378 fDataStreamItr->SetSequenceMode((seqitr->first).c_str(),
01379 seqitr->second);
01380 pair<double,double> window = fStreamWindowMap[seqitr->first];
01381 fDataStreamItr->SetWindow((seqitr->first).c_str(),
01382 window.first,window.second);
01383 if (seqitr->second == Per::kSequential ||
01384 seqitr->second == Per::kRandom ) {
01385 if (!setRandom) {
01386 setRandom = true;
01387 fDataStreamItr->SetRandomSeed(fRandomSeed);
01388 }
01389 int repeat = fStreamMaxRepeatMap[seqitr->first];
01390 fDataStreamItr->SetMaxFileRepeat( (seqitr->first).c_str(), repeat );
01391 double mean = fStreamMeanMap[seqitr->first];
01392 fDataStreamItr->SetMeanMom( (seqitr->first).c_str(), mean );
01393 bool pushRand = fStreamPushRandomMap[seqitr->first];
01394 fDataStreamItr->SetPushRandom( (seqitr->first).c_str(), pushRand );
01395 }
01396 }
01397 }
01398
01399
01400
01401 int IoInputModule::OpenStreamItr()
01402 {
01403
01404
01405
01406 if (fDataStreamItr) this->CloseStreamItr();
01407
01408 bool isDDS = (UtilString::ToUpper(fFormat) == "DDS");
01409 std::string src;
01410 if ( isDDS ) src = fServer;
01411
01412 fDataStreamItr = IoDataStreamFactory::CreateDataStreamItr(src.c_str(),
01413 fFormat.c_str(),fPort,fMaxRetry,fRetryDelay,
01414 fClientType,fClientName);
01415
01416
01417 if (fDataStreamItr == 0) {
01418 MSG("Io",Msg::kWarning) << "Failed to open stream '" << src << "'" <<
01419 " using format '" << fFormat << "'" << endl;
01420 fStatus.SetEndRun();
01421 fStatus.SetEndFile();
01422 fStatus.SetEndOfInputStream();
01423 return 0;
01424 }
01425
01426 fStatus.SetBeginOfInputStream();
01427 fStatus.SetBeginFile();
01428 fStatus.SetBeginRun();
01429
01430
01431 this->UpdateStreamConfig();
01432 this->UpdateFileList();
01433 this->UpdateDDSConfig();
01434 fFormat = fDataStreamItr->GetFormat();
01435
01436 MSG("Io",Msg::kDebug)
01437 << "Opened stream itr of format " << fDataStreamItr->GetFormat() << endl;
01438
01439 return 1;
01440 }
01441
01442
01443
01444 void IoInputModule::CloseStreamItr()
01445 {
01446
01447
01448
01449 if (fDataStreamItr) {
01450 MSG("Io",Msg::kDebug)
01451 << "Close stream itr of format " << fDataStreamItr->GetFormat() << endl;
01452 delete fDataStreamItr;
01453 fDataStreamItr = 0;
01454 fStatus.SetEndRun();
01455 fStatus.SetEndFile();
01456 fStatus.SetEndOfInputStream();
01457 }
01458 }
01459
01461
01462
01463
01464
01465
01466
01467
01468
01469
01470
01471
01472
01473
01474