00001
00002
00003
00004
00005
00006
00008 #include "TSystem.h"
00009 #include "TRegexp.h"
00010
00011 #include "Dispatcher/DDS.h"
00012 #include "IoModules/IoInputModule.h"
00013 #include <cassert>
00014 #include "MessageService/MsgService.h"
00015 #include "MinosObjectMap/MomNavigator.h"
00016 #include "IoModules/IoDataStreamItr.h"
00017 #include "IoModules/IoDDSStreamItr.h"
00018 #include "IoModules/IoDataStreamFactory.h"
00019 #include "JobControl/JobCInputModule.h"
00020 #include "JobControl/JobCModuleRegistry.h"
00021 #include "JobControl/JobCEnv.h"
00022 #include "RawData/RawRecord.h"
00023 #include "RawData/RawDaqSnarlHeader.h"
00024 #include "CandData/CandHeader.h"
00025 #include "Record/RecRecord.h"
00026 #include "Record/RecPhysicsHeader.h"
00027 #include "Registry/Registry.h"
00028 #include "Validity/VldContext.h"
00029 #include "Validity/VldTimeStamp.h"
00030 #include "Util/UtilString.h"
00031 #include "TSystem.h"
00032 #include "TRandom3.h"
00033
00034 #include <algorithm>
00035 #include <cstring>
00036 #include <string>
00037 #include <map>
00038
00039 #ifdef SITE_HAS_SAM
00040 #include "sam_cpp_api/SamConsumer.hpp"
00041 #include "sam_cpp_api/SamLocate.hpp"
00042 #endif
00043
00044 CVSID("$Id: IoInputModule.cxx,v 1.85 2008/02/21 15:37:15 kordosky Exp $");
00045 JOBMODULE(IoInputModule,"INPUT","Read and configure input streams");
00046
00047 typedef std::map<std::string,std::string>::const_iterator mapStrStrItr_t;
00048
00049
00050 static const JobCResult gsAllClear = JobCResult::kAOK;
00051
00052
00053
00054
00055
00056 class CallDepth {
00057 public:
00058 CallDepth() { ++fsDepth; }
00059 ~CallDepth() { --fsDepth; }
00060 static int fsDepth;
00061 };
00062 int CallDepth::fsDepth = 0;
00063
00064
00065
00066 IoInputModule::IoInputModule() :
00067 fDataStreamItr(0),
00068 fFormat(""),
00069 fStreamList(""),
00070 fServer(""),
00071 fPort(0),
00072 fTimeOut(0),
00073 fDataSource(0),
00074 fKeepUpMode(0),
00075 fMaxSyncDelay(0),
00076 fOffLine(false),
00077 fMaxRetry(0),
00078 fRetryDelay(1),
00079 fClientType(DDS::kUnknownClientType),
00080 fClientName(""),
00081 fStatus(JobCResult::kAOK),
00082 fLastRun(-1),
00083 fLastSnarl(-1),
00084 fCurrentRun(-1),
00085 fCurrentSnarl(-1),
00086 fLoadedCommandLineFiles(false)
00087 #ifdef SITE_HAS_SAM
00088 ,fsamProject(0)
00089 #endif
00090 { fStopwatch.Reset(); fStopwatch.Stop(); }
00091
00092
00093
00094 IoInputModule::~IoInputModule()
00095 {
00096 if ( fDataStreamItr ) { delete fDataStreamItr; fDataStreamItr = 0; }
00097 }
00098
00099
00100
00101 void IoInputModule::BeginJob()
00102 {
00103 this->LoadFilesFromCommandLine();
00104
00105 }
00106
00107
00108
00109
00110 void IoInputModule::EndJob()
00111 {
00112 fStopwatch.Stop();
00113 MSG("Io",Msg::kDebug) << "IoInputModule::EndJob, Time(sec), Real "
00114 << fStopwatch.RealTime() << ", CPU "
00115 << fStopwatch.CpuTime() << endl;
00116
00117 }
00118
00119
00120
00121 const Registry& IoInputModule::DefaultConfig() const
00122 {
00123
00124
00125
00126 static Registry r;
00127 r.SetName("INPUT.config");
00128
00129 r.UnLockValues();
00130
00131 MSG("Io",Msg::kDebug) << "Loading default config\n";
00132
00133
00134 r.Set("Format" ,"input");
00135 r.Set("Streams","DaqMonitor,DaqSnarl,LightInjection");
00136
00137
00138
00139 r.Set("DDSServer", "daqdds.minos-soudan.org");
00140 r.Set("DDSPort", DDS::kPort);
00141 r.Set("DDSTimeOut", 120);
00142 r.Set("DDSDataSource","Daq");
00143 r.Set("DDSKeepUpMode", "FileKeepUp");
00144 r.Set("DDSMaxSyncDelay",15);
00145 r.Set("DDSOffLine",false);
00146 r.Set("DDSMaxRetry",0);
00147 r.Set("DDSRetryDelay",1);
00148 r.Set("DDSClientType","Unknown");
00149 r.Set("DDSClientName","");
00150
00151 #ifdef SITE_HAS_SAM
00152
00153
00154 r.Set("Station","minos");
00155 r.Set("SnapShotVers",0);
00156 r.Set("WorkGroupName","minos");
00157 r.Set("ApplicationName","loon");
00158 r.Set("ApplicationVers","dev");
00159 r.Set("MaxNumberOfFiles",0);
00160 r.Set("StartNewProject",1);
00161
00162
00163
00164 const char* username = gSystem->Getenv("USER");
00165 if (!username) username = "unknown";
00166 r.Set("ProjectName",username);
00167
00168 #endif
00169
00170 r.LockValues();
00171 return r;
00172 }
00173
00174
00175
00176 void IoInputModule::Config(const Registry& r)
00177 {
00178
00179
00180
00181 const char* tmps;
00182 int tmpi;
00183 int tmpb;
00184
00185 MSG("Io",Msg::kDebug) << "Config IoInputModule with r=" << r << "\n";
00186
00187
00188 bool doFormatConfig = false;
00189 if (r.Get("Format", tmps)) { fFormat = tmps; doFormatConfig = true; }
00190 if (doFormatConfig) this->UpdateFormatConfig();
00191 bool doStreamConfig = false;
00192 if (r.Get("Streams",tmps)) { fStreamList = tmps; doStreamConfig = true; }
00193 if (doStreamConfig) this->UpdateStreamConfig();
00194
00195
00196 bool doDDSConfig = false;
00197 if (r.Get("DDSServer", tmps)) { fServer = tmps; doDDSConfig = true; }
00198 if (r.Get("DDSPort", tmpi)) { fPort = tmpi; doDDSConfig = true; }
00199 if (r.Get("DDSTimeOut",tmpi)) { fTimeOut = tmpi; doDDSConfig = true; }
00200 if (r.Get("DDSClientType",tmps)) { fClientType = DDS::GetClientType(tmps);
00201 doDDSConfig = true; }
00202 if (r.Get("DDSClientName",tmps)) { fClientName = tmps;
00203 doDDSConfig = true; }
00204 if (r.Get("DDSDataSource",tmps)){fDataSource = DDS::GetDataSourceCode(tmps);
00205 doDDSConfig = true;}
00206 if (r.Get("DDSKeepUpMode",tmps)) { fKeepUpMode = DDS::GetKeepUpCode(tmps);
00207 doDDSConfig = true; }
00208 if (r.Get("DDSMaxSyncDelay",tmpi)){fMaxSyncDelay = tmpi; doDDSConfig = true;}
00209 if (r.Get("DDSOffLine",tmpb)) {fOffLine = tmpb; doDDSConfig = true;}
00210 if (r.Get("DDSMaxRetry",tmpi)) {fMaxRetry = tmpi; doDDSConfig = true; }
00211 if (r.Get("DDSRetryDelay",tmpi)) {fRetryDelay = tmpi; doDDSConfig = true; }
00212 if (doDDSConfig) this->UpdateDDSConfig();
00213
00214
00215 }
00216
00217
00218
00219 JobCResult IoInputModule::Get()
00220 {
00221
00222
00223
00224
00225
00226 if ( fDataStreamItr==0 ) {
00227 if ( this->OpenStreamItr()==0 ) {
00228 MSG("Io",Msg::kWarning) << "Failed to open input stream itr." << endl;
00229 return fStatus;
00230 }
00231 return this->Get();
00232 }
00233
00234 MSG("Io",Msg::kVerbose) << "IoInputModule::Get " << endl;
00235
00236 fStopwatch.Start(false);
00237 MomNavigator* mom = this->GetMom();
00238 assert(mom);
00239 mom -> Clear();
00240
00241 int nrecord = fDataStreamItr->LoadRecords(mom);
00242 bool isDDS = (UtilString::ToUpper(fFormat) == "DDS");
00243
00244 if ( isDDS && !nrecord ) fStatus.SetEndOfInputStream();
00245
00246 this->ReadHeader();
00247 if ( fStatus.EndOfInputStream() )
00248 { fStatus.SetEndFile(); fStatus.SetEndRun(); }
00249 MSG("Io",Msg::kVerbose)
00250 << "IoInputModule::Get returning status " << fStatus << endl;
00251 fStopwatch.Stop();
00252 return fStatus;
00253 }
00254
00255
00256
00257 JobCResult IoInputModule::Next(int n)
00258 {
00259
00260
00261
00262
00263 CallDepth d;
00264
00265
00266 if (d.fsDepth==1) fStatus = gsAllClear;
00267
00268 MSG("Io",Msg::kVerbose) << "IoInputModule::Next " << n << endl;
00269
00270 if ( fDataStreamItr==0 ) {
00271 if (this->OpenStreamItr()==0) {
00272 MSG("Io",Msg::kWarning) << "Failed to open input stream itr." << endl;
00273 return fStatus;
00274 }
00275 return this->Next(n);
00276 }
00277
00278 fStopwatch.Start(false);
00279 MomNavigator* mom = this->GetMom();
00280 assert(mom);
00281 mom -> Clear();
00282
00283
00284
00285 int nstep = 0;
00286 int ndone = 0;
00287 int ntry = 0;
00288 while ( ndone < n ) {
00289 ntry = n - ndone;
00290 nstep = fDataStreamItr->Increment(ntry);
00291
00292 if ( nstep < ntry ) {
00293
00294 fStatus |= this->NextFile();
00295
00296
00297 if ( fStatus.EndOfInputStream() ) {
00298 fStopwatch.Stop();
00299 return this->Get();
00300 }
00301 }
00302 ndone += nstep;
00303 }
00304
00305
00306 fStopwatch.Stop();
00307 return this->Get();
00308 }
00309
00310
00311
00312 JobCResult IoInputModule::Prev(int n)
00313 {
00314
00315
00316
00317
00318 CallDepth d;
00319
00320
00321 if (d.fsDepth == 1) fStatus = gsAllClear;
00322
00323 MSG("Io",Msg::kVerbose) << "IoInputModule::Prev " << n << endl;
00324
00325 if (fDataStreamItr==0) {
00326 if (this->OpenStreamItr()==0) {
00327 MSG("Io",Msg::kWarning) << "Failed to open input stream itr." << endl;
00328 return fStatus;
00329 }
00330 return this->Prev(n);
00331 }
00332
00333
00334
00335 fStopwatch.Start(false);
00336 MomNavigator* mom = this->GetMom();
00337 assert(mom);
00338 mom -> Clear();
00339
00340 int nstep = 0;
00341 int ndone = 0;
00342 int ntry = 0;
00343 while (ndone < n) {
00344 ntry = n - ndone;
00345 nstep = fDataStreamItr->Decrement(ntry);
00346
00347 if (nstep < ntry) {
00348
00349 fStatus |= this->PrevFile();
00350
00351
00352 if ( fStatus.BeginOfInputStream() || fStatus.EndOfInputStream() ) {
00353 return this->Get();
00354 fStopwatch.Stop();
00355 }
00356
00357
00358
00359 fDataStreamItr->GoToEOF();
00360 }
00361 ndone += nstep;
00362 }
00363
00364
00365 fStopwatch.Stop();
00366 return this->Get();
00367
00368 }
00369
00370
00371
00372 JobCResult IoInputModule::GoTo(int run, int snarl, int searchDir)
00373 {
00374 CallDepth d;
00375
00376 if (d.fsDepth==1) fStatus = gsAllClear;
00377
00378 if (fDataStreamItr==0) {
00379 if (this->OpenStreamItr()==0) {
00380 MSG("Io",Msg::kWarning) << "Failed to open input stream itr." << endl;
00381 return fStatus;
00382 }
00383 return this->GoTo(run,snarl,searchDir);
00384 }
00385
00386 if ( run == fCurrentRun && snarl == fCurrentSnarl ) return this -> Get();
00387
00388 int dir = searchDir;
00389 if (dir==0) {
00390 if (run>fLastRun) { dir = 1; }
00391 else if (run<fLastRun || (run==fLastRun && fCurrentRun < 0)) { dir = -1; }
00392 else {
00393 if (snarl>fLastSnarl) { dir = 1; }
00394 else { dir = -1; }
00395 }
00396 }
00397
00398
00399 while ( 1 ) {
00400 if ( dir > 0 ) {
00401 this->Next();
00402 if (fCurrentRun>run) {
00403 MSG("Io",Msg::kWarning) <<
00404 "Went to run "<<fCurrentRun<<
00405 " without finding run="<<run<<" snarl="<<snarl<<"\n";
00406 return fStatus;
00407 }
00408 if (fCurrentRun==run && fCurrentSnarl>snarl) {
00409 MSG("Io",Msg::kWarning) <<
00410 "Went to run="<<fCurrentRun<<" snarl="<<fCurrentSnarl<<
00411 " without finding run="<<run<<" snarl="<<snarl<<"\n";
00412 return fStatus;
00413 }
00414 }
00415 if ( dir <0 ) {
00416 this->Prev();
00417 if (fCurrentRun<run) {
00418 MSG("Io",Msg::kWarning) <<
00419 "Went to run "<<fCurrentRun<<
00420 " without finding run="<<run<<" snarl="<<snarl<<"\n";
00421 return fStatus;
00422 }
00423 if (fCurrentRun==run && fCurrentSnarl<snarl) {
00424 MSG("Io",Msg::kWarning) <<
00425 "Went to run="<<fCurrentRun<<" snarl="<<fCurrentSnarl<<
00426 " without finding run="<<run<<" snarl="<<snarl<<"\n";
00427 return fStatus;
00428 }
00429 }
00430
00431 if (fCurrentRun == run && fCurrentSnarl == snarl) return fStatus;
00432 if (dir>0 && fStatus.EndOfInputStream()) return fStatus;
00433 if (dir<0 && fStatus.BeginOfInputStream()) return fStatus;
00434 }
00435 return fStatus;
00436 }
00437
00438
00439
00440 JobCResult IoInputModule::GoTo(const VldContext& vld)
00441 {
00442
00443
00444
00445
00446 CallDepth d;
00447
00448
00449 if (d.fsDepth==1) fStatus = gsAllClear;
00450
00451 if (fDataStreamItr==0) {
00452 if (this->OpenStreamItr()==0) {
00453 MSG("Io",Msg::kWarning) << "Failed to open input stream itr." << endl;
00454 return fStatus;
00455 }
00456 return this->GoTo(vld);
00457 }
00458
00459 fStatus |= fDataStreamItr -> GoTo(vld);
00460
00461
00462 return this->Get();
00463
00464 }
00465
00466
00467
00468 void IoInputModule::List(const char* streamlist) const
00469 {
00470
00471
00472
00473
00474 MsgStream& m = MSGSTREAM("Io",Msg::kInfo);
00475
00476 m << "IoInputModule using data format " << fFormat << endl;
00477
00478 if ( fDataStreamItr ) {
00479 fDataStreamItr -> ListFile(std::cout,streamlist);
00480 return;
00481 }
00482
00483
00484 m << "File Name\tStream List " << endl;
00485 m << "=========\t=========== " << endl;
00486 std::list<IoFileListItem>::const_iterator itr = fFileList.begin();
00487 std::list<IoFileListItem>::const_iterator itrEnd = fFileList.end();
00488
00489 for ( ; itr != itrEnd; itr++ ) {
00490 m << *itr;
00491 }
00492
00493 }
00494
00495
00496
00497 void IoInputModule::AddFile(const char *filepath, const char* streamlist,
00498 int at) {
00499
00500
00501
00502
00503
00504
00505
00506
00507
00508 const char *s1 = "SAM";
00509 if ( strstr(filepath,s1) != NULL ) {
00510
00511 #ifdef SITE_HAS_SAM
00512
00513
00514
00515 const char* tmps;
00516 int tmpi;
00517
00518 Registry& r = GetConfig();
00519 if (r.Get("Station",tmps)) {fStation = tmps;}
00520 if (r.Get("SnapShotVers",tmpi)){fSnapShotVers = tmpi;}
00521 if (r.Get("WorkGroupName",tmps)) {fWorkGroupName = tmps;}
00522 if (r.Get("ApplicationName",tmps)) {fApplicationName = tmps;}
00523 if (r.Get("ApplicationVers",tmps)) {fApplicationVers = tmps;}
00524 if (r.Get("ProjectName",tmps)) {fProjectName = tmps;}
00525 if (r.Get("MaxNumberOfFiles",tmpi)) {fMaxNumberOfFiles = tmpi;}
00526 if (r.Get("StartNewProject",tmpi)) {fStartNewProject = tmpi;}
00527
00528
00529
00530
00531
00532 std::string temp = filepath;
00533 size_t pos = temp.find(":")+1;
00534 std::string sam_access_type = temp.substr(0,pos-1);
00535 if (sam_access_type == "SAM") {
00536 std::string samdataset = temp.substr(pos,temp.length()-pos);
00537
00538
00539 std::string projectname;
00540
00541
00542 if (fStartNewProject == 1) {
00543
00544
00545 VldTimeStamp ts;
00546 std::string timestamp = ts.AsString("lc");
00547
00548 size_t pos = timestamp.find(" ");
00549 timestamp.replace(pos,1,"-");
00550
00551 pos = timestamp.find(":");
00552 while ( pos != string::npos ) {
00553 timestamp.replace(pos,1,"-");
00554 pos = timestamp.find(":",pos+1);
00555 }
00556
00557 fProjectName.append("-");
00558 projectname = fProjectName+timestamp;
00559 }
00560
00561 else if(fStartNewProject == 0) {
00562 projectname = fProjectName;
00563 }
00564
00565
00566 MSG("Io",Msg::kDebug) << "Sam Station " << fStation << " Snap Shot " <<
00567 fSnapShotVers << " Work Group Name " << fWorkGroupName
00568 << " Application Name "
00569 << fApplicationName << " Application Version " << fApplicationVers <<
00570 " Project Name " << projectname << endl;
00571
00572
00573
00574
00575 long snapshot;
00576
00577 if ( fSnapShotVers == 0 ) {
00578 snapshot = sam::SamProject::NewSnapshotVersion;
00579 }
00580
00581 else if (fSnapShotVers < 0) {
00582 snapshot = sam::SamProject::LatestSnapshotVersion;
00583 }
00584 else if (fSnapShotVers > 0 ) {
00585
00586 snapshot = fSnapShotVers;
00587 }
00588
00589 MSG("Io",Msg::kDebug) << "SnapShot Version " << snapshot << endl;
00590
00591 if (fStartNewProject == 1) {
00592
00593
00594
00595 fsamProject = new sam::SamProject(projectname,fStation);
00596
00597
00598
00599 std::list<std::string> projectMasterArgList;
00600
00601 try {
00602 MSG("Io",Msg::kInfo) << "Starting SAM Project " << projectname <<
00603 " on station " << fStation << endl;
00604 fsamProject->startProject(fWorkGroupName,samdataset,snapshot,
00605 projectMasterArgList);
00606 }
00607 catch(const sam::SamProject::StartProjectRequestRejected& ex) {
00608 MSG("Io",Msg::kInfo) << "Rejected start SAM project request "
00609 << ex << endl;
00610 }
00611 }
00612
00613
00614
00615 const int projectMasterTimeout(60);
00616 const std::string processDescription("Loon Analysis Process");
00617
00618 try{
00619
00620
00621 sam::SamConsumer fsamConsumer(projectname,fStation,fWorkGroupName,
00622 fApplicationName,fApplicationVers,
00623 processDescription,
00624 fMaxNumberOfFiles,
00625 projectMasterTimeout);
00626
00627 MSG("Io",Msg::kInfo) << "Started SAM Consumer" << endl;
00628
00629
00630
00631
00632 std::map<std::string,std::string> filelist;
00633 map<std::string,std::string>::iterator fitr;
00634
00635 int location;
00636 int length;
00637 int comp;
00638 std::string fileonly;
00639 std::string restOfPath;
00640 std::string afsroot("afsroot:");
00641 try {
00642 while(true) {
00643 std::string filename = fsamConsumer.getFile().getFullFileName();
00644
00645
00646
00647
00648
00649 MSG("Io",Msg::kDebug) << "Filename " << filename << endl;
00650 location = filename.find_last_of("/");
00651 length = filename.length();
00652 fileonly = filename.substr(location+1,length-1);
00653
00654
00655 comp = filename.compare(0,8,afsroot);
00656 if (comp == 0 ) {
00657 restOfPath = filename.substr(8,location-8);
00658 }
00659 else {
00660 restOfPath = filename.substr(0,location);
00661 }
00662
00663 restOfPath.append("/");
00664 filelist.insert(make_pair(fileonly,restOfPath));
00665
00666 MSG("Io",Msg::kDebug) << "File Only " << fileonly << " Rest Of Path "
00667 << restOfPath << endl;
00668
00669
00670
00671 fsamConsumer.releaseFile();
00672 }
00673 }
00674 catch(const sam::SamConsumer::EndOfFileStreamReached& ex) {
00675 MSG("Io",Msg::kDebug) << "End of File Stream reached" << endl;
00676 }
00677
00678
00679
00680 std::string sfile;
00681 const char *samfile = 0;
00682 for (fitr = filelist.begin(); fitr != filelist.end(); fitr++) {
00683 sfile = (fitr->second+fitr->first);
00684 samfile = sfile.data();
00685 MSG("Io",Msg::kInfo) << "Adding File " << samfile << endl;
00686
00687 IoFileListItem iofile(samfile,at,streamlist);
00688 fFileList.push_back(iofile);
00689 }
00690 }
00691 catch(const sam::SamConsumer::InitializationError& ex) {
00692 MSG("Io",Msg::kInfo) << "Rejected start SAM Consumer request "
00693 << ex << endl;
00694 }
00695
00696
00697 if (fsamProject) {
00698 try {
00699 MSG("Io",Msg::kInfo) << "Requesting end of SAM project " << endl;
00700 fsamProject->endProject();
00701 }
00702 catch(const sam::SamProject::EndProjectRequestRejected& ex) {
00703 MSG("Io",Msg::kInfo) << "SAM Project end request rejected "<< ex << endl;
00704
00705 }
00706 catch(const sam::SamProject::EndProjectRequestFailed& ex) {
00707 MSG("Io",Msg::kInfo) << "SAM Project end request failed "<< ex << endl;
00708 }
00709 }
00710 }
00711 else if (sam_access_type == "SAM_FILE") {
00712
00713
00714 std::string samfile = temp.substr(pos,temp.length()-pos);
00715
00716
00717
00718 sam::LocationList samFiles;
00719 try {
00720 MSG("Io",Msg::kInfo) << "Locating file " << samfile << endl;
00721 samFiles = sam::locate(samfile);
00722
00723
00724 samFiles[0].insert(6,"fnal.gov/usr/");
00725
00726 TRandom3 rand(0);
00727 Double_t r = rand.Rndm();
00728 if (r <= 0.5) {
00729 samFiles[0].insert(0,"dcap://fndca1.fnal.gov:24125");
00730 }
00731 else if (r > 0.5) {
00732 samFiles[0].insert(0,"dcap://fndca1.fnal.gov:24136");
00733 }
00734
00735 samFiles[0].append("/");
00736 samFiles[0].append(samfile);
00737 MSG("Io",Msg::kInfo) << "Adding file " << samFiles[0].c_str() << " to input list" << endl;
00738
00739 IoFileListItem iofile(samFiles[0].c_str(),at,streamlist);
00740 fFileList.push_back(iofile);
00741 }
00742 catch(const sam::exception::DataFileNotFound& ex) {
00743 MSG("Io",Msg::kInfo) << ex << endl;
00744 }
00745 }
00746
00747 #endif // End of ifdef SITE_HAS_SAM
00748
00749 }
00750 else {
00751
00752
00753 IoFileListItem iofile(filepath,at,streamlist);
00754 fFileList.push_back(iofile);
00755
00756 if ( !fDataStreamItr ) return;
00757
00758
00759 const IoFileListItem::FileStreamMap& filemap = iofile.GetFileStreamMap();
00760
00761 if ( at < 0 ) {
00762 IoFileListItem::FileStreamMapConstItr itr = filemap.begin();
00763 for ( ; itr != filemap.end(); itr++ ) {
00764 std::string filename = itr -> first;
00765 std::string streamlist = itr -> second;
00766 fDataStreamItr -> AddFile(filename.c_str(),at,streamlist.c_str());
00767 }
00768 }
00769 else {
00770
00771 IoFileListItem::FileStreamMap::const_reverse_iterator itr=filemap.rbegin();
00772 for ( ; itr != filemap.rend(); itr++ ) {
00773 std::string filename = itr -> first;
00774 std::string streamlist = itr -> second;
00775 fDataStreamItr -> AddFile(filename.c_str(),at,streamlist.c_str());
00776 }
00777 }
00778 }
00779 }
00780
00781
00782
00783 void IoInputModule::RemoveFile(const char* filename, const char* streamlist) {
00784
00785
00786
00787
00788 if ( fDataStreamItr ) fDataStreamItr -> RemoveFile(filename,streamlist);
00789
00790 std::string f(filename);
00791 std::list<IoFileListItem>::iterator itr = fFileList.end();
00792 while ( !fFileList.empty() && itr != fFileList.begin() ) {
00793 itr--;
00794 IoFileListItem& iofile = *itr;
00795 iofile.RemoveFile(filename,streamlist);
00796 if ( iofile.GetNumFile() == 0 ) fFileList.erase(itr);
00797 }
00798
00799 return;
00800
00801 }
00802
00803
00804
00805 JobCResult IoInputModule::NextFile(int n, const char* streamlist)
00806 {
00807
00808
00809
00810 CallDepth d;
00811
00812 if (d.fsDepth==1) fStatus = gsAllClear;
00813
00814 if (fDataStreamItr==0) {
00815 if (this->OpenStreamItr()==0) {
00816 MSG("Io",Msg::kWarning) << "Failed to open input stream itr." << endl;
00817 return fStatus;
00818 }
00819 return this->NextFile(n,streamlist);
00820 }
00821
00822 fStatus |= fDataStreamItr -> NextFile(n,streamlist);
00823
00824 MSG("Io",Msg::kDebug)
00825 << "status is " << fStatus
00826 << " current file is " << fDataStreamItr->GetCurrentFile() << endl;
00827
00828 return fStatus;
00829
00830 }
00831
00832
00833
00834 JobCResult IoInputModule::PrevFile(int n, const char* streamlist)
00835 {
00836
00837
00838
00839 CallDepth d;
00840
00841 if (d.fsDepth==1) fStatus = gsAllClear;
00842
00843 if (fDataStreamItr==0) {
00844 if (this->OpenStreamItr()==0) {
00845 MSG("Io",Msg::kWarning) << "Failed to open input stream itr." << endl;
00846 return fStatus;
00847 }
00848 return this->PrevFile(n,streamlist);
00849 }
00850
00851 fStatus |= fDataStreamItr -> PrevFile(n,streamlist);
00852
00853 return fStatus;
00854
00855 }
00856
00857
00858
00859 JobCResult IoInputModule::GoToFile(int n, const char* streamlist)
00860 {
00861
00862
00863
00864 CallDepth d;
00865
00866 if (d.fsDepth==1) fStatus = gsAllClear;
00867
00868 if (fDataStreamItr==0) {
00869 if (this->OpenStreamItr()==0) {
00870 MSG("Io",Msg::kWarning) << "Failed to open input stream itr." << endl;
00871 return fStatus;
00872 }
00873 return this->GoToFile(n,streamlist);
00874 }
00875
00876 fStatus |= fDataStreamItr -> GoToFile(n,streamlist);
00877
00878 return fStatus;
00879
00880 }
00881
00882
00883
00884 JobCResult IoInputModule::GoToFile(const char* filename, const char*streamlist){
00885
00886
00887
00888 CallDepth d;
00889
00890 if (d.fsDepth==1) fStatus = gsAllClear;
00891
00892 if (fDataStreamItr==0) {
00893 if (this->OpenStreamItr()==0) {
00894 MSG("Io",Msg::kWarning) << "Failed to open input stream itr." << endl;
00895 return fStatus;
00896 }
00897 return this->GoToFile(filename,streamlist);
00898 }
00899
00900 fStatus |= fDataStreamItr -> GoToFile(filename,streamlist);
00901
00902 return fStatus;
00903 }
00904
00905
00906
00907 void IoInputModule::Select(const char* stream, const char* select,
00908 bool isRequired)
00909 {
00910
00911
00912
00913
00914 fStreamSelectionMap[stream] = select;
00915 fStreamRequiredMap[stream] = isRequired;
00916
00917
00918 if (fDataStreamItr) {
00919 fDataStreamItr->Select(stream, select,isRequired);
00920 }
00921
00922 }
00923
00924
00925
00926 void IoInputModule::DefineStream(const char* stream, const char* tree) {
00927
00928
00929
00930
00931 fStreamDefMap[stream] = tree;
00932
00933
00934 if ( fDataStreamItr ) {
00935 fDataStreamItr->DefineStream(stream, tree);
00936 }
00937
00938 }
00939
00940
00941
00942 void IoInputModule::SetSequenceMode(const char* stream,
00943 Per::ESequenceMode sequenceMode) {
00944
00945
00946
00947
00948 fStreamSeqModeMap[stream] = sequenceMode;
00949
00950
00951 if ( fDataStreamItr ) {
00952 fDataStreamItr->SetSequenceMode(stream, sequenceMode);
00953 }
00954
00955 }
00956
00957
00958
00959 void IoInputModule::SetTestMode(const char* stream,
00960 bool testMode) {
00961
00962
00963
00964
00965 fStreamTestModeMap[stream] = testMode;
00966
00967
00968 if ( fDataStreamItr ) {
00969 fDataStreamItr->SetTestMode(stream, testMode);
00970 }
00971
00972 }
00973
00974
00975
00976 void IoInputModule::SetWindow(const char* stream, double lower, double upper)
00977 {
00978
00979
00980
00981 fStreamWindowMap[stream] = std::pair<double,double>(lower,upper);
00982
00983
00984 if ( fDataStreamItr ) {
00985 fDataStreamItr->SetWindow(stream, lower, upper);
00986 }
00987
00988 }
00989
00990
00991
00992 void IoInputModule::SetMaxFileRepeat(const char* stream, int numRepeat)
00993 {
00994
00995
00996
00997
00998 fStreamMaxRepeatMap[stream] = numRepeat;
00999
01000
01001 if ( fDataStreamItr ) fDataStreamItr->SetMaxFileRepeat(stream,numRepeat);
01002 }
01003
01004
01005
01006 void IoInputModule::SetMeanMom(const char* stream, double mean)
01007 {
01008
01009
01010
01011
01012 fStreamMeanMap[stream] = mean;
01013
01014
01015 if ( fDataStreamItr ) fDataStreamItr->SetMeanMom(stream,mean);
01016 }
01017
01018
01019
01020 void IoInputModule::SetPushRandom(const char* stream, bool setRandom)
01021 {
01022
01023
01024
01025
01026 fStreamPushRandomMap[stream] = setRandom;
01027
01028
01029 if ( fDataStreamItr ) fDataStreamItr->SetPushRandom(stream,setRandom);
01030 }
01031
01032
01033
01034 void IoInputModule::SetRandomSeed(int rSeed)
01035 {
01036
01037
01038
01039
01040 fRandomSeed = rSeed;
01041
01042
01043 if ( fDataStreamItr ) fDataStreamItr->SetRandomSeed(rSeed);
01044
01045 }
01046
01047
01048
01049 const char* IoInputModule::GetCurrentFile(const char* streamname) const
01050 {
01051 MSG("Io",Msg::kDebug) << "IoInputModule::GetCurrentFile()" << endl;
01052 mapStrStrItr_t it, done=fCurrentFileMap.end();
01053 std::string strmstring = streamname;
01054
01055 for (it = fCurrentFileMap.begin(); it !=done; ++it) {
01056 MSG("Io",Msg::kVerbose)
01057 << "stream: " << setw(16) << it->first
01058 << " file: " << it->second
01059 <<endl;
01060 }
01061
01062 for (it = fCurrentFileMap.begin(); it!=done; ++it) {
01063 if ( strmstring == it->first ) return it->second.c_str();
01064 }
01065
01066 if (!fDataStreamItr) return 0;
01067 return fDataStreamItr->GetCurrentFile(streamname);
01068 }
01069
01070 const char* IoInputModule::GetLastFile(const char* streamname) const
01071 {
01072 MSG("Io",Msg::kInfo) << "IoInputModule::GetLastFile()" << endl;
01073 mapStrStrItr_t it, done = fLastFileMap.end();
01074 std::string strmstring = streamname;
01075
01076 for (it = fLastFileMap.begin(); it!=done; ++it) {
01077 MSG("Io",Msg::kVerbose)
01078 << "stream: " << setw(16) << it->first
01079 << " file: " << it->second
01080 <<endl;
01081 }
01082
01083 for (it = fLastFileMap.begin(); it!=done; ++it) {
01084 if ( strmstring == it->first ) return it->second.c_str();
01085 }
01086 return 0;
01087
01088 }
01089
01090
01091
01092 void IoInputModule::LoadFilesFromCommandLine()
01093 {
01094
01095
01096
01097 JobCEnv& jce = JobCEnv::Instance();
01098 if (!fLoadedCommandLineFiles) {
01099 for (int i=0; i<jce.GetNfile(); ++i) {
01100 this->AddFile(jce.GetFileName(i));
01101 }
01102 fLoadedCommandLineFiles = true;
01103 }
01104 }
01105
01106
01107
01108 int IoInputModule::ReadHeader()
01109 {
01110
01111
01112
01113
01114
01115
01116
01117 const MomNavigator* mom = this->GetMom();
01118 assert(mom);
01119
01120
01121
01122
01123
01124 const TObjArray* momarray = mom->GetFragmentArray();
01125 for (int i=0; i < momarray->GetEntriesFast(); ++i) {
01126 TObject* obj = momarray->At(i);
01127 if (!obj) continue;
01128 Registry* temptags = 0;
01129 if ( RecMinos* record = dynamic_cast<RecMinos*>(obj) ) {
01130 temptags = &(record->GetTempTags());
01131 }
01132 else if ( RecRecord* record = dynamic_cast<RecRecord*>(obj) ) {
01133 temptags = &(record->GetTempTags());
01134 }
01135 if ( ! temptags ) continue;
01136
01137
01138 const char* tagstream = 0;
01139 if ( ! temptags->Get("stream",tagstream) ) continue;
01140
01141
01142 std::string streamname(tagstream);
01143 const char* tagnewfile = 0;
01144 if ( ! temptags->Get("file",tagnewfile) ) continue;
01145
01146 std::string lstfilename = fLastFileMap[streamname];
01147 std::string curfilename = fCurrentFileMap[streamname];
01148 std::string newfilename(tagnewfile);
01149
01150 if ( newfilename != curfilename ) {
01151
01152 std::string starcur = fCurrentFileMap.begin()->first;
01153 std::string starlast = fLastFileMap.begin()->first;
01154
01155 MSG("Io",Msg::kDebug)
01156 << "SetBeginFile on streamname '" << streamname << "'" << endl
01157 << " current '" << fCurrentFileMap[streamname] << "'" << endl
01158 << " last '" << fLastFileMap[streamname] << "'" << endl
01159 << " current['" << starcur << "'] '"
01160 << fCurrentFileMap[starcur] << "'" << endl
01161 << " last['" << starlast << "'] '"
01162 << fLastFileMap[starlast] << "'" << endl
01163 << " new '" << newfilename << "' != "
01164 << " cur '" << curfilename << "'" << endl
01165 << " update \"*\" ? "
01166 << (( newfilename != fCurrentFileMap["*"] ) ? "yes":"no")
01167 << endl;
01168
01169
01170 if ( newfilename != fCurrentFileMap["*"] ) {
01171 fStatus.SetBeginFile();
01172
01173 if ( fCurrentFileMap["*"] != "" ) fStatus.SetEndFile();
01174
01175 fLastFileMap["*"] = fCurrentFileMap["*"];
01176 fCurrentFileMap["*"] = newfilename;
01177
01178 MSG("Io",Msg::kDebug)
01179 << "SetBeginFile on '*'" << endl
01180 << " current['" << starcur << "'] '"
01181 << fCurrentFileMap[starcur] << "'" << endl
01182 << " last['" << starlast << "'] '"
01183 << fLastFileMap[starlast] << "'" << endl;
01184 }
01185
01186 fLastFileMap[streamname] = curfilename;
01187 fCurrentFileMap[streamname] = newfilename;
01188
01189
01190
01191
01192
01193
01194 mapStrStrItr_t it, done = fCurrentFileMap.end();
01195 for (it = fCurrentFileMap.begin(); it != done; ++it) {
01196 if ( it->second == curfilename ) {
01197 std::string altstream = it->first;
01198 fLastFileMap[altstream] = curfilename;
01199 fCurrentFileMap[altstream] = newfilename;
01200 }
01201 }
01202
01203 }
01204
01205 MSG("Io",Msg::kVerbose)
01206 << " stream '" << streamname << "' set fLastFileMap to '"
01207 << curfilename << "', fCurrentFileMap to '"
01208 << newfilename << "'"
01209 << " * '" << fLastFileMap["*"] << "' '" << fCurrentFileMap["*"] << "'"
01210 << endl;
01211
01212 }
01213
01214
01215
01216
01217
01218
01219
01220 if ( ! fStatus.BeginFile() ) fLastFileMap["*"] = fCurrentFileMap["*"];
01221
01222
01223 int run = -1;
01224 int snarl = -1;
01225 for (int i=0; i < momarray->GetEntriesFast(); ++i) {
01226 TObject* obj = momarray->At(i);
01227 if (!obj) continue;
01228 if ( RecMinos* record = dynamic_cast<RecMinos*>(obj) ) {
01229
01230
01231
01232 const RawDaqHeader* rdh
01233 = dynamic_cast<const RawDaqHeader*>(record->GetHeader());
01234 if (rdh) run = rdh->GetRun();
01235
01236
01237 const RawDaqSnarlHeader* rdsh
01238 = dynamic_cast<const RawDaqSnarlHeader*>(record->GetHeader());
01239 if (rdsh) snarl = rdsh->GetSnarl();
01240
01241 if (!rdh) {
01242
01243 const CandHeader* candhdr
01244 = dynamic_cast<const CandHeader*>(record->GetHeader());
01245 if (candhdr) {
01246 run = candhdr->GetRun();
01247 snarl = candhdr->GetSnarl();
01248 }
01249 }
01250 }
01251 else if ( RecRecord* record = dynamic_cast<RecRecord*>(obj) ) {
01252
01253 const RecPhysicsHeader* rph
01254 = dynamic_cast<const RecPhysicsHeader*>(&(record->GetHeader()));
01255 if ( rph ) {
01256 run = rph->GetRun();
01257 snarl = rph->GetSnarl();
01258 }
01259 }
01260
01261
01262 if ( snarl >= 0 ) break;
01263 }
01264
01265
01266 fCurrentSnarl = snarl;
01267 if ( run < 0 ) {
01268 fCurrentRun = -1;
01269 return 0;
01270 }
01271 if ( run != fCurrentRun ) {
01272 fStatus.SetBeginRun();
01273 if ( fLastRun >= 0 ) fStatus.SetEndRun();
01274 }
01275 fLastRun = fCurrentRun;
01276 fCurrentRun = run;
01277 if ( snarl >= 0 ) {
01278 fLastSnarl = fCurrentSnarl;
01279 fCurrentSnarl = snarl;
01280 return 2;
01281 }
01282 return 1;
01283 }
01284
01285
01286
01287 void IoInputModule::UpdateDDSConfig() {
01288
01289
01290
01291 if ( fDataStreamItr == 0 ) return;
01292
01293 IoDDSStreamItr* ddsItr = dynamic_cast<IoDDSStreamItr*>(fDataStreamItr);
01294 if ( ! ddsItr ) return;
01295
01296 ddsItr->SetTimeOut(fTimeOut);
01297
01298
01299
01300 bool reinit = (fServer != ddsItr->GetSourceName()
01301 || fPort != ddsItr->GetPort()
01302 || fClientType != ddsItr->GetClientType()
01303 || fClientName != ddsItr->GetClientName() );
01304
01305 if ( reinit ) {
01306 this -> CloseStreamItr();
01307 }
01308 else {
01309 ddsItr -> SetKeepUpMode(fKeepUpMode);
01310 ddsItr -> SetMaxSyncDelay(fMaxSyncDelay);
01311 ddsItr -> SetDataSource(fDataSource);
01312 ddsItr -> SetOffLine(fOffLine);
01313 }
01314
01315 }
01316
01317
01318
01319
01320 void IoInputModule::UpdateFormatConfig() {
01321
01322
01323
01324
01325 if ( fDataStreamItr == 0 ) return;
01326
01327 bool reopen = ( fFormat != fDataStreamItr->GetFormat() );
01328 if ( reopen ) {
01329 this -> CloseStreamItr();
01330 }
01331
01332 return;
01333
01334 }
01335
01336
01337
01338 void IoInputModule::UpdateFileList()
01339 {
01340
01341
01342
01343
01344 if ( fDataStreamItr == 0 ) return;
01345
01346
01347
01348 fDataStreamItr -> RemoveFile("*");
01349
01350 std::list<IoFileListItem>::iterator itr = fFileList.begin();
01351 for ( ; itr != fFileList.end(); itr++ ) {
01352 IoFileListItem& iofile = *itr;
01353 const IoFileListItem::FileStreamMap& filemap = iofile.GetFileStreamMap();
01354 int at = iofile.GetAt();
01355
01356 if ( at < 0 ) {
01357 IoFileListItem::FileStreamMapConstItr itr = filemap.begin();
01358 for ( ; itr != filemap.end(); itr++ ) {
01359 std::string filename = itr -> first;
01360 std::string streamlist = itr -> second;
01361 fDataStreamItr -> AddFile(filename.c_str(),at,streamlist.c_str());
01362 }
01363 }
01364 else {
01365
01366 IoFileListItem::FileStreamMap::const_reverse_iterator itr
01367 = filemap.rbegin();
01368 for ( ; itr != filemap.rend(); itr++ ) {
01369 std::string filename = itr -> first;
01370 std::string streamlist = itr -> second;
01371 fDataStreamItr -> AddFile(filename.c_str(),at,streamlist.c_str());
01372 }
01373 }
01374 }
01375
01376 return;
01377
01378 }
01379
01380
01381
01382 void IoInputModule::UpdateStreamConfig()
01383 {
01384
01385
01386
01387 if (fDataStreamItr==0) return;
01388
01389
01390 mapStrStrItr_t itr;
01391 for ( itr = fStreamDefMap.begin(); itr != fStreamDefMap.end(); itr++ ) {
01392 fDataStreamItr->DefineStream((itr->first).c_str(),
01393 (itr->second).c_str());
01394 }
01395
01396
01397 fDataStreamItr->Streams(fStreamList.c_str());
01398
01399 for (itr=fStreamSelectionMap.begin();itr!=fStreamSelectionMap.end();++itr) {
01400 std::map<std::string,bool>::const_iterator reqitr
01401 = fStreamRequiredMap.find(itr->first);
01402 bool isrequired = false;
01403 if ( reqitr != fStreamRequiredMap.end() ) isrequired = reqitr -> second;
01404 fDataStreamItr->Select((itr->first).c_str(),
01405 (itr->second).c_str(),
01406 isrequired);
01407 }
01408
01409
01410 std::map<std::string,bool>::const_iterator testitr;
01411 for ( testitr=fStreamTestModeMap.begin(); testitr !=fStreamTestModeMap.end();
01412 ++testitr ) {
01413 fDataStreamItr->SetTestMode((testitr->first).c_str(),
01414 testitr->second);
01415 }
01416
01417
01418 bool setRandom = false;
01419 std::map<std::string,Per::ESequenceMode>::const_iterator seqitr;
01420 for ( seqitr=fStreamSeqModeMap.begin(); seqitr!=fStreamSeqModeMap.end();
01421 ++seqitr ) {
01422 fDataStreamItr->SetSequenceMode((seqitr->first).c_str(),
01423 seqitr->second);
01424 pair<double,double> window = fStreamWindowMap[seqitr->first];
01425 fDataStreamItr->SetWindow((seqitr->first).c_str(),
01426 window.first,window.second);
01427 if (seqitr->second == Per::kSequential ||
01428 seqitr->second == Per::kRandom ) {
01429 if (!setRandom) {
01430 setRandom = true;
01431 fDataStreamItr->SetRandomSeed(fRandomSeed);
01432 }
01433 int repeat = fStreamMaxRepeatMap[seqitr->first];
01434 fDataStreamItr->SetMaxFileRepeat( (seqitr->first).c_str(), repeat );
01435 double mean = fStreamMeanMap[seqitr->first];
01436 fDataStreamItr->SetMeanMom( (seqitr->first).c_str(), mean );
01437 bool pushRand = fStreamPushRandomMap[seqitr->first];
01438 fDataStreamItr->SetPushRandom( (seqitr->first).c_str(), pushRand );
01439 }
01440 }
01441 }
01442
01443
01444
01445 int IoInputModule::OpenStreamItr()
01446 {
01447
01448
01449
01450 if (fDataStreamItr) this->CloseStreamItr();
01451
01452 bool isDDS = (UtilString::ToUpper(fFormat) == "DDS");
01453 std::string src;
01454 if ( isDDS ) src = fServer;
01455
01456 fDataStreamItr = IoDataStreamFactory::CreateDataStreamItr(src.c_str(),
01457 fFormat.c_str(),fPort,fMaxRetry,fRetryDelay,
01458 fClientType,fClientName);
01459
01460
01461 if (fDataStreamItr == 0) {
01462 MSG("Io",Msg::kWarning) << "Failed to open stream '" << src << "'" <<
01463 " using format '" << fFormat << "'" << endl;
01464 fStatus.SetEndRun();
01465 fStatus.SetEndFile();
01466 fStatus.SetEndOfInputStream();
01467 return 0;
01468 }
01469
01470 fStatus.SetBeginOfInputStream();
01471 fStatus.SetBeginFile();
01472 fStatus.SetBeginRun();
01473
01474
01475 this->UpdateStreamConfig();
01476 this->UpdateFileList();
01477 this->UpdateDDSConfig();
01478 fFormat = fDataStreamItr->GetFormat();
01479
01480 MSG("Io",Msg::kDebug)
01481 << "Opened stream itr of format " << fDataStreamItr->GetFormat() << endl;
01482
01483 return 1;
01484 }
01485
01486
01487
01488 void IoInputModule::CloseStreamItr()
01489 {
01490
01491
01492
01493 if (fDataStreamItr) {
01494 MSG("Io",Msg::kDebug)
01495 << "Close stream itr of format " << fDataStreamItr->GetFormat() << endl;
01496 delete fDataStreamItr;
01497 fDataStreamItr = 0;
01498 fStatus.SetEndRun();
01499 fStatus.SetEndFile();
01500 fStatus.SetEndOfInputStream();
01501 }
01502 }
01503
01505
01506
01507
01508
01509
01510
01511
01512
01513
01514
01515
01516
01517
01518