00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016 #include <globalsearch/queuemanager.h>
00017
00018 #include <globalsearch/macros.h>
00019 #include <globalsearch/optbase.h>
00020 #include <globalsearch/optimizer.h>
00021 #include <globalsearch/queueinterface.h>
00022 #include <globalsearch/queueinterfaces/remote.h>
00023 #include <globalsearch/structure.h>
00024
00025 #include <QtGui/QApplication>
00026
00027 #include <QtCore/QDateTime>
00028 #include <QtCore/QDebug>
00029 #include <QtCore/QtConcurrentRun>
00030 #include <QtCore/QTimer>
00031
00032
00034 namespace {
00035 class removeFromTrackerWhenScopeEnds
00036 {
00037 GlobalSearch::Tracker *m_tracker;
00038 GlobalSearch::Structure *m_structure;
00039 public:
00040 removeFromTrackerWhenScopeEnds(GlobalSearch::Structure *s,
00041 GlobalSearch::Tracker *t)
00042 : m_tracker(t), m_structure(s) {}
00043 ~removeFromTrackerWhenScopeEnds() {
00044 m_tracker->lockForWrite();
00045 m_tracker->remove(m_structure);
00046 m_tracker->unlock(); }
00047 };
00048
00049
00050 bool trackerContainsStructure(GlobalSearch::Structure *s,
00051 GlobalSearch::Tracker *t)
00052 { t->lockForRead();
00053 bool b = t->contains(s);
00054 t->unlock();
00055 return b; }
00056 }
00058
00059 namespace GlobalSearch {
00060
00061 QueueManager::QueueManager(QThread *thread, OptBase *opt) :
00062 QObject(),
00063 m_opt(opt),
00064 m_thread(thread),
00065 m_tracker(opt->tracker()),
00066 m_requestedStructures(0),
00067 m_isDestroying(false),
00068 m_lastSubmissionTimeStamp(new QDateTime (QDateTime::currentDateTime()))
00069 {
00070
00071 connect(m_thread, SIGNAL(started()),
00072 this, SLOT(moveToQMThread()));
00073 }
00074
00075 QueueManager::~QueueManager()
00076 {
00077 m_isDestroying = true;
00078 this->disconnect();
00079
00080
00081 QList<Tracker*> trackers;
00082 trackers.append(&m_newlyOptimizedTracker);
00083 trackers.append(&m_stepOptimizedTracker);
00084 trackers.append(&m_inProcessTracker);
00085 trackers.append(&m_errorTracker);
00086 trackers.append(&m_submittedTracker);
00087 trackers.append(&m_newlyKilledTracker);
00088 trackers.append(&m_newDuplicateTracker);
00089 trackers.append(&m_restartTracker);
00090 trackers.append(&m_newSubmissionTracker);
00091
00092
00093 foreach (Structure *s, *m_preOptTracker.list()) {
00094 s->lock()->lockForWrite();
00095 s->abortPreoptimization();
00096 s->lock()->unlock();
00097 }
00098
00099
00100 unsigned int timeout;
00101
00102 for (QList<Tracker*>::iterator
00103 it = trackers.begin(),
00104 it_end = trackers.end();
00105 it != it_end;
00106 it++) {
00107 timeout = 10;
00108 while (timeout > 0 && (*it)->size()) {
00109 qDebug() << "Spinning on QueueManager handler trackers to empty...";
00110 GS_SLEEP(1);
00111 QApplication::processEvents(QEventLoop::AllEvents, 500);
00112 --timeout;
00113 }
00114 }
00115
00116 qDebug() << "Clearing QueueManager event loop...";
00117 QApplication::processEvents(QEventLoop::AllEvents);
00118 qDebug() << "QueueManager event loop cleared.";
00119
00120
00121 timeout = 15;
00122 while (timeout > 0 && m_requestedStructures > 0) {
00123 qDebug() << "Waiting for structure generation threads to finish...";
00124 GS_SLEEP(1);
00125 --timeout;
00126 }
00127
00128 delete m_lastSubmissionTimeStamp;
00129 }
00130
00131 void QueueManager::moveToQMThread()
00132 {
00133 this->moveToThread(m_thread);
00134
00135 connect(this, SIGNAL(movedToQMThread()),
00136 this, SLOT(setupConnections()),
00137 Qt::QueuedConnection);
00138
00139 emit movedToQMThread();
00140 }
00141
00142 void QueueManager::setupConnections()
00143 {
00144
00145 connect(this, SIGNAL(needNewStructure()),
00146 m_opt, SLOT(generateNewStructure()),
00147 Qt::QueuedConnection);
00148
00149
00150 connect(this, SIGNAL(structureStarted(GlobalSearch::Structure *)),
00151 this, SIGNAL(structureUpdated(GlobalSearch::Structure *)));
00152 connect(this, SIGNAL(structureSubmitted(GlobalSearch::Structure *)),
00153 this, SIGNAL(structureUpdated(GlobalSearch::Structure *)));
00154 connect(this, SIGNAL(structureKilled(GlobalSearch::Structure *)),
00155 this, SIGNAL(structureUpdated(GlobalSearch::Structure *)));
00156 connect(this, SIGNAL(structureFinished(GlobalSearch::Structure *)),
00157 this, SIGNAL(structureUpdated(GlobalSearch::Structure *)));
00158
00159
00160 #if QT_VERSION == 0x040603
00161 connect(this, SIGNAL(newStructureQueued()),
00162 this, SLOT(unlockForNaming_()),
00163 Qt::QueuedConnection);
00164 #endif // QT_VERSION == 4.6.3
00165
00166 QTimer::singleShot(0, this, SLOT(checkLoop()));
00167 }
00168
00169 void QueueManager::reset()
00170 {
00171 QList<Tracker*> trackers;
00172 trackers.append(m_tracker);
00173 trackers.append(&m_needPreOptTracker);
00174 trackers.append(&m_preOptTracker);
00175 trackers.append(&m_jobStartTracker);
00176 trackers.append(&m_runningTracker);
00177 trackers.append(&m_newStructureTracker);
00178 trackers.append(&m_newlyOptimizedTracker);
00179 trackers.append(&m_stepOptimizedTracker);
00180 trackers.append(&m_inProcessTracker);
00181 trackers.append(&m_errorTracker);
00182 trackers.append(&m_submittedTracker);
00183 trackers.append(&m_newlyKilledTracker);
00184 trackers.append(&m_newDuplicateTracker);
00185 trackers.append(&m_restartTracker);
00186 trackers.append(&m_newSubmissionTracker);
00187
00188 for (QList<Tracker*>::iterator
00189 it = trackers.begin(),
00190 it_end = trackers.end();
00191 it != it_end;
00192 it++) {
00193 (*it)->lockForWrite();
00194 (*it)->reset();
00195 (*it)->unlock();
00196 }
00197 }
00198
00199 void QueueManager::checkLoop()
00200 {
00201
00202 Q_ASSERT_X(QThread::currentThread() == m_thread, Q_FUNC_INFO,
00203 "Attempting to run QueueManager::checkLoop "
00204 "from a thread other than the QM thread. "
00205 );
00206
00207 if (!m_opt->readOnly &&
00208 !m_opt->isStarting ) {
00209 checkPopulation();
00210 checkRunning();
00211 }
00212
00213 QTimer::singleShot(1000, this, SLOT(checkLoop()));
00214 }
00215
00216 void QueueManager::checkPopulation()
00217 {
00218
00219 uint running = 0;
00220 uint optimized = 0;
00221 uint submitted = 0;
00222 m_tracker->lockForRead();
00223 QList<Structure*> structures = *m_tracker->list();
00224 m_tracker->unlock();
00225
00226
00227 Structure *structure = 0;
00228 Structure::State state;
00229 int fail=0;
00230 for (int i = 0; i < structures.size(); ++i) {
00231 structure = structures.at(i);
00232 structure->lock()->lockForRead();
00233 state = structure->getStatus();
00234 if (structure->getFailCount() != 0) fail++;
00235 bool needsPreOpt = structure->needsPreoptimization();
00236 structure->lock()->unlock();
00237
00238 if ( needsPreOpt ) {
00239 m_jobStartTracker.lockForWrite();
00240 m_jobStartTracker.remove(structure);
00241 m_jobStartTracker.unlock();
00242 m_needPreOptTracker.lockForWrite();
00243 m_needPreOptTracker.append(structure);
00244 m_needPreOptTracker.unlock();
00245 }
00246
00247 if ( state == Structure::Preoptimizing ) {
00248 m_preOptTracker.lockForWrite();
00249 m_preOptTracker.append(structure);
00250 m_preOptTracker.unlock();
00251 ++submitted;
00252 }
00253
00254 if ( state == Structure::Submitted ||
00255 state == Structure::InProcess ){
00256 m_runningTracker.lockForWrite();
00257 m_runningTracker.append(structure);
00258 m_runningTracker.unlock();
00259 submitted++;
00260 }
00261
00262 if ( state != Structure::Optimized &&
00263 state != Structure::Duplicate &&
00264 state != Structure::Killed &&
00265 state != Structure::Removed &&
00266 state != Structure::Preoptimizing ){
00267 running++;
00268 m_runningTracker.lockForWrite();
00269 m_runningTracker.append(structure);
00270 m_runningTracker.unlock();
00271 }
00272 else {
00273 if ( state == Structure::Optimized ) {
00274 optimized++;
00275 }
00276 m_runningTracker.lockForWrite();
00277 m_runningTracker.remove(structure);
00278 m_runningTracker.unlock();
00279 }
00280 }
00281 emit newStatusOverview(optimized, running, fail);
00282
00283
00284 m_needPreOptTracker.lockForWrite();
00285 m_preOptTracker.lockForWrite();
00286 int needPreOpt = m_needPreOptTracker.size();
00287 int preOpting = m_preOptTracker.size();
00288 m_preOptTracker.unlock();
00289 m_needPreOptTracker.unlock();
00290 if (needPreOpt != 0 &&
00291 preOpting == 0 ){
00292 startPreoptimization();
00293 }
00294
00295
00296 m_jobStartTracker.lockForWrite();
00297 int pending = m_jobStartTracker.list()->size();
00298 if (pending != 0 &&
00299 (
00300 !m_opt->limitRunningJobs ||
00301 submitted < m_opt->runningJobLimit
00302 )) {
00303 #ifdef ENABLE_SSH
00304
00305
00306
00307
00308 if (qobject_cast<RemoteQueueInterface*>
00309 (m_opt->queueInterface()) != NULL) {
00310 if (m_lastSubmissionTimeStamp->secsTo(QDateTime::currentDateTime())
00311 >= 3 + (6 * RANDDOUBLE())) {
00312 startJob();
00313 ++submitted;
00314 --pending;
00315 *m_lastSubmissionTimeStamp = QDateTime::currentDateTime();
00316 }
00317 }
00318 else
00319 #endif // ENABLE_SSH
00320 {
00321
00322 while (pending != 0 &&
00323 (
00324 !m_opt->limitRunningJobs ||
00325 submitted < m_opt->runningJobLimit
00326 )
00327 ) {
00328 startJob();
00329 ++submitted;
00330 --pending;
00331 }
00332 }
00333 }
00334 m_jobStartTracker.unlock();
00335
00336
00337 m_tracker->lockForWrite();
00338 m_newStructureTracker.lockForRead();
00339
00340
00341
00342
00343
00344 int total = m_tracker->size() + m_newStructureTracker.size()
00345 + m_requestedStructures;
00346
00347 int incomplete = m_runningTracker.size() + m_newStructureTracker.size()
00348 + m_requestedStructures;
00349 int needed = m_opt->contStructs - incomplete;
00350
00351 if (
00352
00353 ( needed > 0) &&
00354
00355 ( m_opt->cutoff <= 0 || total < m_opt->cutoff) &&
00356
00357 ( !m_opt->testingMode || total < m_opt->test_nStructs)
00358 ) {
00359
00360 qDebug() << "Need " << needed << " structures. " << incomplete << " already incomplete.";
00361 for (int i = 0; i < needed; ++i) {
00362 ++m_requestedStructures;
00363 emit needNewStructure();
00364 qDebug() << "Requested new structure. Total requested: " << m_requestedStructures;
00365 }
00366 }
00367
00368 m_newStructureTracker.unlock();
00369 m_tracker->unlock();
00370 return;
00371 }
00372
00373 void QueueManager::checkRunning()
00374 {
00375
00376 Q_ASSERT_X(QThread::currentThread() == m_thread, Q_FUNC_INFO,
00377 "Attempting to run QueueManager::checkRunning "
00378 "from a thread other than the QM thread. "
00379 );
00380
00381
00382 QList<Structure*> runningStructures = getAllRunningStructures();
00383 runningStructures.append(*m_preOptTracker.list());
00384
00385
00386
00387 for (QList<Structure*>::iterator
00388 s_it = runningStructures.begin(),
00389 s_it_end = runningStructures.end();
00390 s_it != s_it_end;
00391 ++s_it) {
00392
00393
00394 Structure *structure = *s_it;
00395
00396
00397 if (m_newlyOptimizedTracker.contains(structure) ||
00398 m_stepOptimizedTracker.contains(structure) ||
00399 m_inProcessTracker.contains(structure) ||
00400 m_errorTracker.contains(structure) ||
00401 m_submittedTracker.contains(structure) ||
00402 m_newlyKilledTracker.contains(structure) ||
00403 m_newDuplicateTracker.contains(structure) ||
00404 m_restartTracker.contains(structure) ||
00405 m_newSubmissionTracker.contains(structure)) {
00406 continue;
00407 }
00408
00409
00410 structure->lock()->lockForRead();
00411 Structure::State status = structure->getStatus();
00412 structure->lock()->unlock();
00413
00414
00415 switch (status) {
00416 case Structure::InProcess:
00417 handleInProcessStructure(structure);
00418 break;
00419 case Structure::WaitingForOptimization:
00420 handleWaitingForOptimizationStructure(structure);
00421 break;
00422 case Structure::StepOptimized:
00423 handleStepOptimizedStructure(structure);
00424 break;
00425 case Structure::Optimized:
00426
00427
00428
00429
00430
00431 break;
00432 case Structure::Error:
00433 handleErrorStructure(structure);
00434 break;
00435 case Structure::Submitted:
00436 handleSubmittedStructure(structure);
00437 break;
00438 case Structure::Killed:
00439 handleKilledStructure(structure);
00440 break;
00441 case Structure::Removed:
00442 handleRemovedStructure(structure);
00443 break;
00444 case Structure::Restart:
00445 handleRestartStructure(structure);
00446 break;
00447 case Structure::Updating:
00448 handleUpdatingStructure(structure);
00449 break;
00450 case Structure::Duplicate:
00451 handleDuplicateStructure(structure);
00452 break;
00453 case Structure::Empty:
00454 handleEmptyStructure(structure);
00455 break;
00456 case Structure::Preoptimizing:
00457 handlePreoptimizingStructure(structure);
00458 break;
00459 }
00460 }
00461
00462 return;
00463 }
00464
00465 void QueueManager::handleInProcessStructure(Structure *s)
00466 {
00467 QWriteLocker locker (m_inProcessTracker.rwLock());
00468 if (!m_inProcessTracker.append(s)) {
00469 return;
00470 }
00471 QtConcurrent::run(this,
00472 &QueueManager::handleInProcessStructure_, s);
00473 }
00474
00475
00477 void QueueManager::handleInProcessStructure_(Structure *s)
00478 {
00479 Q_ASSERT(trackerContainsStructure(s, &m_inProcessTracker));
00480 removeFromTrackerWhenScopeEnds popper (s, &m_inProcessTracker);
00481
00482
00483 if (s->getStatus() != Structure::InProcess) {
00484 return;
00485 }
00486
00487 switch (m_opt->queueInterface()->getStatus(s)) {
00488 case QueueInterface::Running:
00489 case QueueInterface::Queued:
00490 case QueueInterface::CommunicationError:
00491 case QueueInterface::Unknown:
00492 case QueueInterface::Pending:
00493 case QueueInterface::Started:
00494
00495 break;
00496 case QueueInterface::Success:
00497 updateStructure(s);
00498 break;
00499 case QueueInterface::Error:
00500 s->lock()->lockForWrite();
00501 s->setStatus(Structure::Error);
00502 s->lock()->unlock();
00503 emit structureUpdated(s);
00504 break;
00505 }
00506
00507 return;
00508 }
00510
00511 void QueueManager::handleOptimizedStructure(Structure *s)
00512 {
00513 QWriteLocker locker (m_newlyOptimizedTracker.rwLock());
00514 if (!m_newlyOptimizedTracker.append(s)) {
00515 return;
00516 }
00517 QtConcurrent::run(this,
00518 &QueueManager::handleOptimizedStructure_, s);
00519 }
00520
00521
00523 void QueueManager::handleOptimizedStructure_(Structure *s)
00524 {
00525 Q_ASSERT(trackerContainsStructure(s, &m_newlyOptimizedTracker));
00526 removeFromTrackerWhenScopeEnds popper (s, &m_newlyOptimizedTracker);
00527
00528
00529 if (s->getStatus() != Structure::Optimized) {
00530 return;
00531 }
00532
00533
00534 stopJob(s);
00535
00536
00537 m_runningTracker.lockForWrite();
00538 m_runningTracker.remove(s);
00539 m_runningTracker.unlock();
00540
00541 emit structureFinished(s);
00542 }
00544
00545 void QueueManager::handleStepOptimizedStructure(Structure *s)
00546 {
00547 QWriteLocker locker (m_stepOptimizedTracker.rwLock());
00548 m_stepOptimizedTracker.append(s);
00549 QtConcurrent::run(this,
00550 &QueueManager::handleStepOptimizedStructure_, s);
00551 }
00552
00553
00555 void QueueManager::handleStepOptimizedStructure_(Structure *s)
00556 {
00557 Q_ASSERT(trackerContainsStructure(s, &m_stepOptimizedTracker));
00558 removeFromTrackerWhenScopeEnds popper (s, &m_stepOptimizedTracker);
00559
00560 QWriteLocker locker (s->lock());
00561
00562
00563 if (s->getStatus() != Structure::StepOptimized) {
00564 return;
00565 }
00566
00567 s->stopOptTimer();
00568
00569 QString err;
00570 if (!m_opt->checkStepOptimizedStructure(s, &err)) {
00571
00572 m_opt->warning(QString("Structure %1 failed a post-optimization step: %2")
00573 .arg(s->getIDString())
00574 .arg(err));
00575 s->setStatus(Structure::Error);
00576 emit structureUpdated(s);
00577 return;
00578 }
00579
00580
00581 if (s->getCurrentOptStep()
00582 < static_cast<unsigned int>(m_opt->optimizer()->getNumberOfOptSteps())) {
00583 s->setCurrentOptStep(s->getCurrentOptStep() + 1);
00584
00585
00586 s->setStatus(Structure::WaitingForOptimization);
00587 m_runningTracker.lockForWrite();
00588 m_runningTracker.append(s);
00589 m_runningTracker.unlock();
00590 locker.unlock();
00591 emit structureUpdated(s);
00592 addStructureToSubmissionQueue(s);
00593 return;
00594 }
00595
00596 else {
00597 s->setStatus(Structure::Optimized);
00598 locker.unlock();
00599 handleOptimizedStructure(s);
00600 }
00601 }
00603
00604 void QueueManager::handleWaitingForOptimizationStructure(Structure *s)
00605 {
00606
00607 }
00608
00609 void QueueManager::handleEmptyStructure(Structure *s)
00610 {
00611
00612 }
00613
00614 void QueueManager::handlePreoptimizingStructure(Structure *s)
00615 {
00616
00617 }
00618
00619 void QueueManager::handleUpdatingStructure(Structure *s)
00620 {
00621
00622 }
00623
00624 void QueueManager::handleErrorStructure(Structure *s)
00625 {
00626 QWriteLocker locker (m_errorTracker.rwLock());
00627 if (!m_errorTracker.append(s)) {
00628 return;
00629 }
00630 QtConcurrent::run(this,
00631 &QueueManager::handleErrorStructure_, s);
00632 }
00633
00634
00636 void QueueManager::handleErrorStructure_(Structure *s)
00637 {
00638 Q_ASSERT(trackerContainsStructure(s, &m_errorTracker));
00639 removeFromTrackerWhenScopeEnds popper (s, &m_errorTracker);
00640
00641 if (s->getStatus() != Structure::Error) {
00642 return;
00643 }
00644
00645 stopJob(s);
00646
00647
00648 QWriteLocker locker (s->lock());
00649
00650 s->addFailure();
00651
00652
00653
00654 if (s->getFailCount() >= m_opt->failLimit) {
00655 switch (OptBase::FailActions(m_opt->failAction)) {
00656 case OptBase::FA_DoNothing:
00657 default:
00658
00659 s->setStatus(Structure::Restart);
00660 emit structureUpdated(s);
00661 return;
00662 case OptBase::FA_KillIt:
00663 locker.unlock();
00664 killStructure(s);
00665 emit structureUpdated(s);
00666 return;
00667 case OptBase::FA_Randomize:
00668 s->setStatus(Structure::Empty);
00669 locker.unlock();
00670 m_opt->replaceWithRandom(s, tr("excessive failures"));
00671 s->setStatus(Structure::Restart);
00672 emit structureUpdated(s);
00673 case OptBase::FA_NewOffspring:
00674 s->setStatus(Structure::Empty);
00675 locker.unlock();
00676 m_opt->replaceWithOffspring(s, tr("excessive failures"));
00677 s->setStatus(Structure::Restart);
00678 emit structureUpdated(s);
00679 return;
00680 }
00681 }
00682
00683 else {
00684 s->setStatus(Structure::Restart);
00685 emit structureUpdated(s);
00686 return;
00687 }
00688 }
00690
00691 void QueueManager::handleSubmittedStructure(Structure *s)
00692 {
00693 QWriteLocker locker (m_submittedTracker.rwLock());
00694 if (!m_submittedTracker.append(s)) {
00695 return;
00696 }
00697 QtConcurrent::run(this,
00698 &QueueManager::handleSubmittedStructure_, s);
00699 }
00700
00701
00703 void QueueManager::handleSubmittedStructure_(Structure *s)
00704 {
00705 Q_ASSERT(trackerContainsStructure(s, &m_submittedTracker));
00706 removeFromTrackerWhenScopeEnds popper (s, &m_submittedTracker);
00707
00708 if (s->getStatus() != Structure::Submitted) {
00709 return;
00710 }
00711
00712 switch (m_opt->queueInterface()->getStatus(s)) {
00713 case QueueInterface::Running:
00714 case QueueInterface::Queued:
00715 case QueueInterface::Success:
00716 case QueueInterface::Started:
00717
00718 s->lock()->lockForWrite();
00719 s->setStatus(Structure::InProcess);
00720 s->lock()->unlock();
00721 emit structureUpdated(s);
00722 break;
00723 case QueueInterface::Error:
00724 s->lock()->lockForWrite();
00725 s->setStatus(Structure::Restart);
00726 s->lock()->unlock();
00727 emit structureUpdated(s);
00728 break;
00729 case QueueInterface::CommunicationError:
00730 case QueueInterface::Unknown:
00731 case QueueInterface::Pending:
00732 default:
00733
00734 break;
00735 }
00736 }
00738
00739 void QueueManager::handleKilledStructure(Structure *s)
00740 {
00741 QWriteLocker locker (m_newlyKilledTracker.rwLock());
00742 if (!m_newlyKilledTracker.append(s)) {
00743 return;
00744 }
00745 QtConcurrent::run(this,
00746 &QueueManager::handleKilledStructure_, s);
00747 }
00748
00749
00751 void QueueManager::handleKilledStructure_(Structure *s)
00752 {
00753 Q_ASSERT(trackerContainsStructure(s, &m_newlyKilledTracker));
00754 removeFromTrackerWhenScopeEnds popper (s, &m_newlyKilledTracker);
00755
00756 if (s->getStatus() != Structure::Killed &&
00757
00758
00759 s->getStatus() != Structure::Removed) {
00760 return;
00761 }
00762
00763
00764 stopJob(s);
00765
00766
00767 m_runningTracker.lockForWrite();
00768 m_runningTracker.remove(s);
00769 m_runningTracker.unlock();
00770 }
00772
00773 void QueueManager::handleRemovedStructure(Structure *s)
00774 {
00775 handleKilledStructure(s);
00776 }
00777
00778 void QueueManager::handleDuplicateStructure(Structure *s)
00779 {
00780 QWriteLocker locker (m_newDuplicateTracker.rwLock());
00781 if (!m_newDuplicateTracker.append(s)) {
00782 return;
00783 }
00784 QtConcurrent::run(this,
00785 &QueueManager::handleDuplicateStructure_, s);
00786 }
00787
00788
00790 void QueueManager::handleDuplicateStructure_(Structure *s)
00791 {
00792 Q_ASSERT(trackerContainsStructure(s, &m_newDuplicateTracker));
00793 removeFromTrackerWhenScopeEnds popper (s, &m_newDuplicateTracker);
00794
00795 if (s->getStatus() != Structure::Duplicate) {
00796 return;
00797 }
00798
00799
00800 stopJob(s);
00801
00802
00803 m_runningTracker.lockForWrite();
00804 m_runningTracker.remove(s);
00805 m_runningTracker.unlock();
00806 }
00808
00809 void QueueManager::handleRestartStructure(Structure *s)
00810 {
00811 QWriteLocker locker (m_restartTracker.rwLock());
00812 if (!m_restartTracker.append(s)) {
00813 return;
00814 }
00815 QtConcurrent::run(this,
00816 &QueueManager::handleRestartStructure_, s);
00817 }
00818
00819
00821 void QueueManager::handleRestartStructure_(Structure *s)
00822 {
00823 Q_ASSERT(trackerContainsStructure(s, &m_restartTracker));
00824 removeFromTrackerWhenScopeEnds popper (s, &m_restartTracker);
00825
00826 if (s->getStatus() != Structure::Restart) {
00827 return;
00828 }
00829
00830 stopJob(s);
00831
00832 addStructureToSubmissionQueue(s);
00833 }
00834
00835 void QueueManager::updateStructure(Structure *s)
00836 {
00837 this->stopJob(s);
00838 s->lock()->lockForWrite();
00839 s->stopOptTimer();
00840 s->resetFailCount();
00841 s->setStatus(Structure::Updating);
00842 s->lock()->unlock();
00843 if (!m_opt->optimizer()->update(s)) {
00844 s->lock()->lockForWrite();
00845 s->setStatus(Structure::Error);
00846 s->lock()->unlock();
00847 emit structureUpdated(s);
00848 return;
00849 }
00850 s->lock()->lockForWrite();
00851 s->setStatus(Structure::StepOptimized);
00852 s->lock()->unlock();
00853 emit structureUpdated(s);
00854 return;
00855 }
00857
00858 void QueueManager::killStructure(Structure *s) {
00859
00860 s->lock()->lockForWrite();
00861 s->stopOptTimer();
00862 if ( s->getStatus() != Structure::Optimized ) {
00863 s->setStatus(Structure::Killed);
00864 }
00865 else {
00866 s->setStatus(Structure::Removed);
00867 }
00868 s->lock()->unlock();
00869 stopJob(s);
00870 emit structureKilled(s);
00871 }
00872
00873 void QueueManager::senderHasFinishedPreoptimization()
00874 {
00875 Structure *s = qobject_cast<Structure*>(this->sender());
00876 if (s == NULL) {
00877 qWarning() << Q_FUNC_INFO << "called with non-structure sender.";
00878 return;
00879 }
00880
00881 m_preOptTracker.remove(s);
00882 this->addStructureToSubmissionQueue(s, 0);
00883 }
00884
00885 void QueueManager::addStructureToSubmissionQueue(Structure *s,
00886 int optStep)
00887 {
00888 QWriteLocker locker (m_newSubmissionTracker.rwLock());
00889 if (!m_newSubmissionTracker.append(s)) {
00890 return;
00891 }
00892
00893 QtConcurrent::run(this,
00894 &QueueManager::addStructureToSubmissionQueue_,
00895 s, optStep);
00896 }
00897
00898
00900 void QueueManager::addStructureToSubmissionQueue_(Structure *s, int optStep)
00901 {
00902 Q_ASSERT(trackerContainsStructure(s, &m_newSubmissionTracker));
00903 removeFromTrackerWhenScopeEnds popper (s, &m_newSubmissionTracker);
00904
00905
00906 s->lock()->lockForWrite();
00907 s->setStatus(Structure::WaitingForOptimization);
00908 if (optStep != 0) {
00909 s->setCurrentOptStep(optStep);
00910 }
00911 s->lock()->unlock();
00912
00913
00914 m_opt->queueInterface()->writeInputFiles(s);
00915 emit structureUpdated(s);
00916
00917
00918 m_jobStartTracker.lockForWrite();
00919 m_jobStartTracker.append(s);
00920 m_jobStartTracker.unlock();
00921
00922 m_runningTracker.lockForWrite();
00923 m_runningTracker.append(s);
00924 m_runningTracker.unlock();
00925 }
00927
00928 void QueueManager::startPreoptimization()
00929 {
00930 Structure *s;
00931 m_needPreOptTracker.lockForWrite();
00932 m_preOptTracker.lockForWrite();
00933 if (!m_needPreOptTracker.popFirst(s)) {
00934 m_preOptTracker.unlock();
00935 m_needPreOptTracker.unlock();
00936 return;
00937 }
00938 s->lock()->lockForWrite();
00939
00940
00941 if (!s->needsPreoptimization()) {
00942 s->lock()->unlock();
00943 m_preOptTracker.unlock();
00944 m_needPreOptTracker.unlock();
00945 return;
00946 }
00947
00948 s->setStatus(Structure::Preoptimizing);
00949 m_preOptTracker.append(s);
00950
00951 s->lock()->unlock();
00952 m_preOptTracker.unlock();
00953 m_needPreOptTracker.unlock();
00954
00955
00956 this->connect(s, SIGNAL(preoptimizationFinished()),
00957 SLOT(senderHasFinishedPreoptimization()));
00958
00959 m_opt->preoptimizeStructure(s);
00960 }
00961
00962 void QueueManager::startJob()
00963 {
00964 Structure *s;
00965 if (!m_jobStartTracker.popFirst(s)) {
00966 return;
00967 }
00968
00969 s->lock()->lockForRead();
00970 bool needsPreOpt = s->needsPreoptimization();
00971 s->lock()->unlock();
00972
00973 if (needsPreOpt) {
00974
00975
00976 qDebug() << "Trying to skip preoptimization??";
00977 m_opt->printBackTrace();
00978 this->m_preOptTracker.lockForWrite();
00979 this->m_preOptTracker.append(s);
00980 this->m_preOptTracker.unlock();
00981 return;
00982 }
00983
00984 if (!m_opt->queueInterface()->startJob(s)) {
00985 s->lock()->lockForWrite();
00986 m_opt->warning(tr("QueueManager::startJob_: Job did not start "
00987 "successfully for structure %1-%2.")
00988 .arg(s->getIDString())
00989 .arg(s->getCurrentOptStep()));
00990 s->setStatus(Structure::Error);
00991 s->lock()->unlock();
00992 return;
00993 }
00994
00995 s->lock()->lockForWrite();
00996 s->setStatus(Structure::Submitted);
00997 s->lock()->unlock();
00998
00999 emit structureSubmitted(s);
01000 }
01001
01002 void QueueManager::stopJob(Structure *s)
01003 {
01004 m_jobStartTracker.lockForWrite();
01005 m_jobStartTracker.remove(s);
01006 m_jobStartTracker.unlock();
01007
01008 s->lock()->lockForRead();
01009 if (s->isPreoptimizing()) {
01010 s->abortPreoptimization();
01011 }
01012 s->lock()->unlock();
01013
01014 m_opt->queueInterface()->stopJob(s);
01015 }
01016
01017 QList<Structure*> QueueManager::getAllPreoptimizingStructures()
01018 {
01019 QReadLocker locker (m_preOptTracker.rwLock()); Q_UNUSED(locker);
01020 return *m_preOptTracker.list();
01021 }
01022
01023 QList<Structure*> QueueManager::getAllRunningStructures()
01024 {
01025 m_runningTracker.lockForRead();
01026 m_newStructureTracker.lockForRead();
01027 QList<Structure*> list(*m_runningTracker.list());
01028 list.append(*m_newStructureTracker.list());
01029 m_newStructureTracker.unlock();
01030 m_runningTracker.unlock();
01031 return list;
01032 }
01033
01034 QList<Structure*> QueueManager::getAllOptimizedStructures()
01035 {
01036 QList<Structure*> list;
01037 m_tracker->lockForRead();
01038 Structure *s;
01039 for (int i = 0; i < m_tracker->list()->size(); i++) {
01040 s = m_tracker->list()->at(i);
01041 s->lock()->lockForRead();
01042 if (s->getStatus() == Structure::Optimized)
01043 list.append(s);
01044 s->lock()->unlock();
01045 }
01046 m_tracker->unlock();
01047 return list;
01048 }
01049
01050 QList<Structure*> QueueManager::getAllDuplicateStructures()
01051 {
01052 QList<Structure*> list;
01053 m_tracker->lockForRead();
01054 Structure *s;
01055 for (int i = 0; i < m_tracker->list()->size(); i++) {
01056 s = m_tracker->list()->at(i);
01057 s->lock()->lockForRead();
01058 if (s->getStatus() == Structure::Duplicate)
01059 list.append(s);
01060 s->lock()->unlock();
01061 }
01062 m_tracker->unlock();
01063 return list;
01064 }
01065
01066 QList<Structure*> QueueManager::getAllStructures()
01067 {
01068 m_tracker->lockForRead();
01069 m_newStructureTracker.lockForRead();
01070 QList<Structure*> list (*m_tracker->list());
01071 list.append(*m_newStructureTracker.list());
01072 m_newStructureTracker.unlock();
01073 m_tracker->unlock();
01074 return list;
01075 }
01076
01077 QList<Structure*> QueueManager::lockForNaming()
01078 {
01079 QList<Structure*> structures = getAllStructures();
01080
01081 structures.size();
01082
01083 m_tracker->lockForRead();
01084 return structures;
01085 }
01086
01087 void QueueManager::unlockForNaming(Structure *s)
01088 {
01089 if (!s) {
01090 m_tracker->unlock();
01091 return;
01092 }
01093
01094
01095 if (m_isDestroying) {
01096 --m_requestedStructures;
01097 m_tracker->unlock();
01098 return;
01099 }
01100
01101 if (!m_opt->isStarting) {
01102 --m_requestedStructures;
01103 }
01104
01105
01106
01107
01108 m_newStructureTracker.lockForWrite();
01109 m_newStructureTracker.append(s);
01110
01111 Q_ASSERT_X(m_requestedStructures >= 0, Q_FUNC_INFO,
01112 "The requested structures counter has become negative.");
01113
01114 qDebug() << "New structure received (" << s->getIDString() << ")";
01115
01116 m_newStructureTracker.unlock();
01117 m_tracker->unlock();
01118 #if QT_VERSION == 0x040603
01119 emit newStructureQueued();
01120 #else // QT_VERSION == 4.6.3
01121 QtConcurrent::run(this, &QueueManager::unlockForNaming_);
01122 #endif // QT_VERSION == 4.6.3
01123 }
01124
01125
01127 void QueueManager::unlockForNaming_()
01128 {
01129 Structure *s;
01130 m_tracker->lockForWrite();
01131 m_newStructureTracker.lockForWrite();
01132 if (!m_newStructureTracker.popFirst(s)) {
01133 m_newStructureTracker.unlock();
01134 m_tracker->unlock();
01135 return;
01136 }
01137
01138
01139 s->lock()->lockForWrite();
01140 s->setStatus(Structure::WaitingForOptimization);
01141 bool needsPreOpt = s->needsPreoptimization();
01142 s->lock()->unlock();
01143
01144 if (needsPreOpt) {
01145 m_needPreOptTracker.lockForWrite();
01146 m_needPreOptTracker.append(s);
01147 }
01148
01149 m_tracker->append(s);
01150 qDebug() << "New structure accepted (" << s->getIDString() << ")";
01151
01152 if (needsPreOpt) {
01153 m_needPreOptTracker.unlock();
01154 }
01155
01156 m_newStructureTracker.unlock();
01157 m_tracker->unlock();
01158
01159 if (!needsPreOpt) {
01160 this->addStructureToSubmissionQueue(s);
01161 }
01162 emit structureStarted(s);
01163 }
01165
01166 void QueueManager::addManualStructureRequest(int requests)
01167 {
01168 m_tracker->lockForWrite();
01169 m_requestedStructures += requests;
01170 m_tracker->unlock();
01171 }
01172
01173 void QueueManager::appendToJobStartTracker(Structure *s)
01174 {
01175 m_jobStartTracker.lockForWrite();
01176 m_jobStartTracker.append(s);
01177 m_jobStartTracker.unlock();
01178 }
01179
01180 }