root/src/globalsearch/queuemanager.cpp @ 06f244abd607a7c31b6ed64e2a2c378638e727d2

Revision 06f244abd607a7c31b6ed64e2a2c378638e727d2, 32.7 KB (checked in by David C. Lonie <loniedavid@…>, 13 months ago)

Added a preoptimization queue for MolecularXtals?.

  • Property mode set to 100644
Line 
1/**********************************************************************
2  QueueManager - Generic queue manager to track running structures
3
4  Copyright (C) 2010-2011 by David C. Lonie
5
6  This program is free software; you can redistribute it and/or modify
7  it under the terms of the GNU General Public License as published by
8  the Free Software Foundation version 2 of the License.
9
10  This program is distributed in the hope that it will be useful,
11  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  GNU General Public License for more details.
14 ***********************************************************************/
15
16#include <globalsearch/queuemanager.h>
17
18#include <globalsearch/macros.h>
19#include <globalsearch/optbase.h>
20#include <globalsearch/optimizer.h>
21#include <globalsearch/queueinterface.h>
22#include <globalsearch/queueinterfaces/remote.h>
23#include <globalsearch/structure.h>
24
25#include <QtCore/QDateTime>
26#include <QtCore/QDebug>
27#include <QtCore/QtConcurrentRun>
28#include <QtCore/QTimer>
29
30// A couple helper functions/classes -- disable doxygen parsing:
31/// \cond
32namespace {
33  class removeFromTrackerWhenScopeEnds
34  {
35    GlobalSearch::Tracker *m_tracker;
36    GlobalSearch::Structure *m_structure;
37  public:
38    removeFromTrackerWhenScopeEnds(GlobalSearch::Structure *s,
39                                   GlobalSearch::Tracker *t)
40      : m_tracker(t), m_structure(s) {}
41    ~removeFromTrackerWhenScopeEnds() {
42      m_tracker->lockForWrite();
43      m_tracker->remove(m_structure);
44      m_tracker->unlock(); }
45  };
46
47  // Locks tracker for reading and calls t->contains(s)
48  bool trackerContainsStructure(GlobalSearch::Structure *s,
49                                GlobalSearch::Tracker *t)
50  { t->lockForRead();
51    bool b = t->contains(s);
52    t->unlock();
53    return b; }
54}
55/// \endcond
56
57namespace GlobalSearch {
58
59  QueueManager::QueueManager(QThread *thread, OptBase *opt) :
60    QObject(),
61    m_opt(opt),
62    m_thread(thread),
63    m_tracker(opt->tracker()),
64    m_requestedStructures(0),
65    m_isDestroying(false),
66    m_lastSubmissionTimeStamp(new QDateTime (QDateTime::currentDateTime()))
67  {
68    // Thread connections
69    connect(m_thread, SIGNAL(started()),
70            this, SLOT(moveToQMThread()));
71  }
72
73  QueueManager::~QueueManager()
74  {
75    m_isDestroying = true;
76    this->disconnect();
77
78    // Wait for handler trackers to empty.
79    QList<Tracker*> trackers;
80    trackers.append(&m_newlyOptimizedTracker);
81    trackers.append(&m_stepOptimizedTracker);
82    trackers.append(&m_inProcessTracker);
83    trackers.append(&m_errorTracker);
84    trackers.append(&m_submittedTracker);
85    trackers.append(&m_newlyKilledTracker);
86    trackers.append(&m_newDuplicateTracker);
87    trackers.append(&m_restartTracker);
88    trackers.append(&m_newSubmissionTracker);
89
90    // abort any preoptimizations
91    foreach (Structure *s, *m_preOptTracker.list()) {
92      s->lock()->lockForWrite();
93      s->abortPreoptimization();
94      s->lock()->unlock();
95    }
96
97    // Used to break wait loops if they take too long
98    unsigned int timeout;
99
100    for (QList<Tracker*>::iterator
101           it = trackers.begin(),
102           it_end = trackers.end();
103         it != it_end;
104         it++) {
105      timeout = 10;
106      while (timeout > 0 && (*it)->size()) {
107        qDebug() << "Spinning on QueueManager handler trackers to empty...";
108        GS_SLEEP(1);
109        --timeout;
110      }
111    }
112
113    // Wait for m_requestedStructures to == 0
114    timeout = 15;
115    while (timeout > 0 && m_requestedStructures > 0) {
116        qDebug() << "Waiting for structure generation threads to finish...";
117        GS_SLEEP(1);
118        --timeout;
119    }
120
121    delete m_lastSubmissionTimeStamp;
122  }
123
124  void QueueManager::moveToQMThread()
125  {
126    this->moveToThread(m_thread);
127
128    connect(this, SIGNAL(movedToQMThread()),
129            this, SLOT(setupConnections()),
130            Qt::QueuedConnection);
131
132    emit movedToQMThread();
133  }
134
135  void QueueManager::setupConnections()
136  {
137    // opt connections
138    connect(this, SIGNAL(needNewStructure()),
139            m_opt, SLOT(generateNewStructure()),
140            Qt::QueuedConnection);
141
142    // re-emit connections
143    connect(this, SIGNAL(structureStarted(GlobalSearch::Structure *)),
144            this, SIGNAL(structureUpdated(GlobalSearch::Structure *)));
145    connect(this, SIGNAL(structureSubmitted(GlobalSearch::Structure *)),
146            this, SIGNAL(structureUpdated(GlobalSearch::Structure *)));
147    connect(this, SIGNAL(structureKilled(GlobalSearch::Structure *)),
148            this, SIGNAL(structureUpdated(GlobalSearch::Structure *)));
149    connect(this, SIGNAL(structureFinished(GlobalSearch::Structure *)),
150            this, SIGNAL(structureUpdated(GlobalSearch::Structure *)));
151
152    // Work around bug in Qt 4.6.3:
153#if QT_VERSION == 0x040603
154    connect(this, SIGNAL(newStructureQueued()),
155            this, SLOT(unlockForNaming_()),
156            Qt::QueuedConnection);
157#endif // QT_VERSION == 4.6.3
158
159    QTimer::singleShot(0, this, SLOT(checkLoop()));
160  }
161
162  void QueueManager::reset()
163  {
164    QList<Tracker*> trackers;
165    trackers.append(m_tracker);
166    trackers.append(&m_needPreOptTracker);
167    trackers.append(&m_preOptTracker);
168    trackers.append(&m_jobStartTracker);
169    trackers.append(&m_runningTracker);
170    trackers.append(&m_newStructureTracker);
171    trackers.append(&m_newlyOptimizedTracker);
172    trackers.append(&m_stepOptimizedTracker);
173    trackers.append(&m_inProcessTracker);
174    trackers.append(&m_errorTracker);
175    trackers.append(&m_submittedTracker);
176    trackers.append(&m_newlyKilledTracker);
177    trackers.append(&m_newDuplicateTracker);
178    trackers.append(&m_restartTracker);
179    trackers.append(&m_newSubmissionTracker);
180
181    for (QList<Tracker*>::iterator
182           it = trackers.begin(),
183           it_end = trackers.end();
184         it != it_end;
185         it++) {
186      (*it)->lockForWrite();
187      (*it)->reset();
188      (*it)->unlock();
189    }
190  }
191
192  void QueueManager::checkLoop()
193  {
194   // Ensure that this is only called from the QM thread:
195    Q_ASSERT_X(QThread::currentThread() == m_thread, Q_FUNC_INFO,
196               "Attempting to run QueueManager::checkLoop "
197               "from a thread other than the QM thread. "
198               );
199
200    if (!m_opt->readOnly &&
201        !m_opt->isStarting ) {
202      checkPopulation();
203      checkRunning();
204    }
205
206    QTimer::singleShot(1000, this, SLOT(checkLoop()));
207  }
208
209  void QueueManager::checkPopulation()
210  {
211    // Count jobs
212    uint running = 0;
213    uint optimized = 0;
214    uint submitted = 0;
215    m_tracker->lockForRead();
216    QList<Structure*> structures = *m_tracker->list();
217    m_tracker->unlock();
218
219    // Check to see that the number of running jobs is >= that specified:
220    Structure *structure = 0;
221    Structure::State state;
222    int fail=0;
223    for (int i = 0; i < structures.size(); ++i) {
224      structure = structures.at(i);
225      structure->lock()->lockForRead();
226      state = structure->getStatus();
227      if (structure->getFailCount() != 0) fail++;
228      bool needsPreOpt = structure->needsPreoptimization();
229      structure->lock()->unlock();
230      // Check for structures that need preoptimization
231      if ( needsPreOpt ) {
232        m_jobStartTracker.lockForWrite();
233        m_jobStartTracker.remove(structure);
234        m_jobStartTracker.unlock();
235        m_needPreOptTracker.lockForWrite();
236        m_needPreOptTracker.append(structure);
237        m_needPreOptTracker.unlock();
238      }
239      // Check for structures that are preoptimizating
240      if ( state == Structure::Preoptimizing ) {
241        m_preOptTracker.lockForWrite();
242        m_preOptTracker.append(structure);
243        m_preOptTracker.unlock();
244        ++submitted;
245      }
246      // Count submitted structures
247      if ( state == Structure::Submitted ||
248           state == Structure::InProcess ){
249        m_runningTracker.lockForWrite();
250        m_runningTracker.append(structure);
251        m_runningTracker.unlock();
252        submitted++;
253      }
254      // Count running jobs and update trackers
255      if ( state != Structure::Optimized &&
256           state != Structure::Duplicate &&
257           state != Structure::Killed &&
258           state != Structure::Removed  &&
259           state != Structure::Preoptimizing ){
260        running++;
261        m_runningTracker.lockForWrite();
262        m_runningTracker.append(structure);
263        m_runningTracker.unlock();
264      }
265      else {
266        if ( state == Structure::Optimized ) {
267          optimized++;
268        }
269        m_runningTracker.lockForWrite();
270        m_runningTracker.remove(structure);
271        m_runningTracker.unlock();
272      }
273    }
274    emit newStatusOverview(optimized, running, fail);
275
276    // Start a preoptimizations if needed
277    m_needPreOptTracker.lockForWrite();
278    m_preOptTracker.lockForWrite();
279    int needPreOpt = m_needPreOptTracker.size();
280    int preOpting  = m_preOptTracker.size();
281    m_preOptTracker.unlock();
282    m_needPreOptTracker.unlock();
283    if (needPreOpt != 0 && // Structures need preopt and
284        preOpting == 0 ){     // no preopts running
285      startPreoptimization();
286    }
287
288    // Submit any jobs if needed
289    m_jobStartTracker.lockForWrite();
290    int pending = m_jobStartTracker.list()->size();
291    if (pending != 0 &&
292        (
293          !m_opt->limitRunningJobs ||
294          submitted < m_opt->runningJobLimit
295          )) {
296      // Submit a single throttled job (1 submission per 3-8 seconds) if using
297      // a remote queue interface. Interval is randomly chosen each iteration.
298      // This prevents hammering the pbs server from multiple XtalOpt instances
299      // if there is a problem with the queue.
300      if (qobject_cast<RemoteQueueInterface*>
301          (m_opt->queueInterface()) != NULL) {
302        if (m_lastSubmissionTimeStamp->secsTo(QDateTime::currentDateTime())
303            >= 3 + (6 * RANDDOUBLE())) {
304          startJob();
305          ++submitted;
306          --pending;
307          *m_lastSubmissionTimeStamp = QDateTime::currentDateTime();
308        }
309      }
310      else {
311        // Local job submission doesn't need to be throttled
312        while (pending != 0 &&
313               (
314                 !m_opt->limitRunningJobs ||
315                 submitted < m_opt->runningJobLimit
316                 )
317               ) {
318          startJob();
319          ++submitted;
320          --pending;
321        }
322      }
323    }
324    m_jobStartTracker.unlock();
325
326    // Generate requests
327    m_tracker->lockForWrite(); // Write lock for m_requestedStructures var
328    m_newStructureTracker.lockForRead();
329
330    // Avoid convience function calls here, as occaisional deadlocks
331    // can occur.
332    //
333    // total is getAllStructures().size() + m_requestedStructures;
334    int total = m_tracker->size() + m_newStructureTracker.size()
335      + m_requestedStructures;
336    // incomplete is getAllRunningStructures.size() + m_requestedStructures:
337    int incomplete = m_runningTracker.size() + m_newStructureTracker.size()
338      + m_requestedStructures;
339    int needed = m_opt->contStructs - incomplete;
340
341    if (
342        // Are we at the continuous structure limit?
343        ( needed > 0) &&
344        // Is the cutoff either disabled or reached/exceeded?
345        ( m_opt->cutoff <= 0 || total < m_opt->cutoff) &&
346        // Check if we are testing. If so, have we reached the testing limit?
347        ( !m_opt->testingMode || total < m_opt->test_nStructs)
348        ) {
349      // emit requests
350      qDebug() << "Need " << needed << " structures. " << incomplete << " already incomplete.";
351      for (int i = 0; i < needed; ++i) {
352        ++m_requestedStructures;
353        emit needNewStructure();
354        qDebug() << "Requested new structure. Total requested: " << m_requestedStructures;
355      }
356    }
357
358    m_newStructureTracker.unlock();
359    m_tracker->unlock();
360    return;
361  }
362
363  void QueueManager::checkRunning()
364  {
365    // Ensure that this is only called from the QM thread:
366    Q_ASSERT_X(QThread::currentThread() == m_thread, Q_FUNC_INFO,
367               "Attempting to run QueueManager::checkRunning "
368               "from a thread other than the QM thread. "
369               );
370
371    // Get list of running and preoptimizing structures
372    QList<Structure*> runningStructures = getAllRunningStructures();
373    runningStructures.append(*m_preOptTracker.list());
374
375
376    // iterate over all structures and handle each based on its status
377    for (QList<Structure*>::iterator
378           s_it = runningStructures.begin(),
379           s_it_end = runningStructures.end();
380         s_it != s_it_end;
381         ++s_it) {
382
383      // Assign pointer for convenience
384      Structure *structure = *s_it;
385
386      // Check if this structure has any handlers pending. Skip if so.
387      if (m_newlyOptimizedTracker.contains(structure) ||
388          m_stepOptimizedTracker.contains(structure)  ||
389          m_inProcessTracker.contains(structure)      ||
390          m_errorTracker.contains(structure)          ||
391          m_submittedTracker.contains(structure)      ||
392          m_newlyKilledTracker.contains(structure)    ||
393          m_newDuplicateTracker.contains(structure)   ||
394          m_restartTracker.contains(structure)        ||
395          m_newSubmissionTracker.contains(structure)) {
396        continue;
397      }
398
399      // Lookup status
400      structure->lock()->lockForRead();
401      Structure::State status = structure->getStatus();
402      structure->lock()->unlock();
403
404      // Check status
405      switch (status) {
406      case Structure::InProcess:
407        handleInProcessStructure(structure);
408        break;
409      case Structure::WaitingForOptimization:
410        handleWaitingForOptimizationStructure(structure);
411        break;
412      case Structure::StepOptimized:
413        handleStepOptimizedStructure(structure);
414        break;
415      case Structure::Optimized:
416        // Shouldn't happen -- this is called by handleStepOptimizedStructure
417        // when needed. There is a race condition between the check* functions
418        // -- The structure may be removed from the list of running structures
419        // by checkPopulation before checkRunning is called.
420        //handleOptimizedStructure(structure);
421        break;
422      case Structure::Error:
423        handleErrorStructure(structure);
424        break;
425      case Structure::Submitted:
426        handleSubmittedStructure(structure);
427        break;
428      case Structure::Killed:
429        handleKilledStructure(structure);
430        break;
431      case Structure::Removed:
432        handleRemovedStructure(structure);
433        break;
434      case Structure::Restart:
435        handleRestartStructure(structure);
436        break;
437      case Structure::Updating:
438        handleUpdatingStructure(structure);
439        break;
440      case Structure::Duplicate:
441        handleDuplicateStructure(structure);
442        break;
443      case Structure::Empty:
444        handleEmptyStructure(structure);
445        break;
446      case Structure::Preoptimizing:
447        handlePreoptimizingStructure(structure);
448        break;
449      }
450    }
451
452    return;
453  }
454
455  void QueueManager::handleInProcessStructure(Structure *s)
456  {
457    QWriteLocker locker (m_inProcessTracker.rwLock());
458    if (!m_inProcessTracker.append(s)) {
459      return;
460    }
461    QtConcurrent::run(this,
462                      &QueueManager::handleInProcessStructure_, s);
463  }
464
465  // Doxygen skip:
466  /// @cond
467  void QueueManager::handleInProcessStructure_(Structure *s)
468  {
469    Q_ASSERT(trackerContainsStructure(s, &m_inProcessTracker));
470    removeFromTrackerWhenScopeEnds popper (s, &m_inProcessTracker);
471
472    // Revalidate assumptions
473    if (s->getStatus() != Structure::InProcess) {
474      return;
475    }
476
477    switch (m_opt->queueInterface()->getStatus(s)) {
478    case QueueInterface::Running:
479    case QueueInterface::Queued:
480    case QueueInterface::CommunicationError:
481    case QueueInterface::Unknown:
482    case QueueInterface::Pending:
483    case QueueInterface::Started:
484      // Nothing to do but wait
485      break;
486    case QueueInterface::Success:
487      updateStructure(s);
488      break;
489    case QueueInterface::Error:
490      s->lock()->lockForWrite();
491      s->setStatus(Structure::Error);
492      s->lock()->unlock();
493      emit structureUpdated(s);
494      break;
495    }
496
497    return;
498  }
499  /// @endcond
500
501  void QueueManager::handleOptimizedStructure(Structure *s)
502  {
503    QWriteLocker locker (m_newlyOptimizedTracker.rwLock());
504    if (!m_newlyOptimizedTracker.append(s)) {
505      return;
506    }
507    QtConcurrent::run(this,
508                      &QueueManager::handleOptimizedStructure_, s);
509  }
510
511  // Doxygen skip:
512  /// @cond
513  void QueueManager::handleOptimizedStructure_(Structure *s)
514  {
515    Q_ASSERT(trackerContainsStructure(s, &m_newlyOptimizedTracker));
516    removeFromTrackerWhenScopeEnds popper (s, &m_newlyOptimizedTracker);
517
518    // Revalidate assumptions
519    if (s->getStatus() != Structure::Optimized) {
520      return;
521    }
522
523    // Ensure that the job is not tying up the queue
524    stopJob(s);
525
526    // Remove from running tracker
527    m_runningTracker.lockForWrite();
528    m_runningTracker.remove(s);
529    m_runningTracker.unlock();
530
531    emit structureFinished(s);
532  }
533  /// @endcond
534
535  void QueueManager::handleStepOptimizedStructure(Structure *s)
536  {
537    QWriteLocker locker (m_stepOptimizedTracker.rwLock());
538    m_stepOptimizedTracker.append(s);
539    QtConcurrent::run(this,
540                      &QueueManager::handleStepOptimizedStructure_, s);
541  }
542
543  // Doxygen skip:
544  /// @cond
545  void QueueManager::handleStepOptimizedStructure_(Structure *s)
546  {
547    Q_ASSERT(trackerContainsStructure(s, &m_stepOptimizedTracker));
548    removeFromTrackerWhenScopeEnds popper (s, &m_stepOptimizedTracker);
549
550    QWriteLocker locker (s->lock());
551
552    // Validate assumptions
553    if (s->getStatus() != Structure::StepOptimized) {
554      return;
555    }
556
557    s->stopOptTimer();
558
559    // update optstep and relaunch if necessary
560    if (s->getCurrentOptStep()
561        < static_cast<unsigned int>(m_opt->optimizer()->getNumberOfOptSteps())) {
562      s->setCurrentOptStep(s->getCurrentOptStep() + 1);
563
564      // Update status
565      s->setStatus(Structure::WaitingForOptimization);
566      m_runningTracker.lockForWrite();
567      m_runningTracker.append(s);
568      m_runningTracker.unlock();
569      locker.unlock();
570      emit structureUpdated(s);
571      addStructureToSubmissionQueue(s);
572      return;
573    }
574    // Otherwise, it's done
575    else {
576      s->setStatus(Structure::Optimized);
577      locker.unlock();
578      handleOptimizedStructure(s);
579    }
580  }
581  /// @endcond
582
583  void QueueManager::handleWaitingForOptimizationStructure(Structure *s)
584  {
585    // Nothing to do but wait for the structure to be submitted
586  }
587
588  void QueueManager::handleEmptyStructure(Structure *s)
589  {
590    // Nothing to do but wait (this should never actually happen...)
591  }
592
593  void QueueManager::handlePreoptimizingStructure(Structure *s)
594  {
595    // Nothing to do but wait
596  }
597
598  void QueueManager::handleUpdatingStructure(Structure *s)
599  {
600    // Nothing to do but wait
601  }
602
603  void QueueManager::handleErrorStructure(Structure *s)
604  {
605    QWriteLocker locker (m_errorTracker.rwLock());
606    if (!m_errorTracker.append(s)) {
607      return;
608    }
609    QtConcurrent::run(this,
610                      &QueueManager::handleErrorStructure_, s);
611  }
612
613  // Doxygen skip:
614  /// @cond
615  void QueueManager::handleErrorStructure_(Structure *s)
616  {
617    Q_ASSERT(trackerContainsStructure(s, &m_errorTracker));
618    removeFromTrackerWhenScopeEnds popper (s, &m_errorTracker);
619
620    if (s->getStatus() != Structure::Error) {
621      return;
622    }
623
624    stopJob(s);
625
626    // Lock for writing
627    QWriteLocker locker (s->lock());
628
629    s->addFailure();
630
631    // If the number of failures has exceed the limit, take
632    // appropriate action
633    if (s->getFailCount() >= m_opt->failLimit) {
634      switch (OptBase::FailActions(m_opt->failAction)) {
635      case OptBase::FA_DoNothing:
636      default:
637        // resubmit job
638        s->setStatus(Structure::Restart);
639        emit structureUpdated(s);
640        return;
641      case OptBase::FA_KillIt:
642        locker.unlock();
643        killStructure(s);
644        emit structureUpdated(s);
645        return;
646      case OptBase::FA_Randomize:
647        s->setStatus(Structure::Empty);
648        locker.unlock();
649        m_opt->replaceWithRandom(s, tr("excessive failures"));
650        s->setStatus(Structure::Restart);
651        emit structureUpdated(s);
652      case OptBase::FA_NewOffspring:
653        s->setStatus(Structure::Empty);
654        locker.unlock();
655        m_opt->replaceWithOffspring(s, tr("excessive failures"));
656        s->setStatus(Structure::Restart);
657        emit structureUpdated(s);
658        return;
659      }
660    }
661    // Resubmit job if failure limit hasn't been reached
662    else {
663      s->setStatus(Structure::Restart);
664      emit structureUpdated(s);
665      return;
666    }
667  }
668  /// @endcond
669
670  void QueueManager::handleSubmittedStructure(Structure *s)
671  {
672    QWriteLocker locker (m_submittedTracker.rwLock());
673    if (!m_submittedTracker.append(s)) {
674      return;
675    }
676    QtConcurrent::run(this,
677                      &QueueManager::handleSubmittedStructure_, s);
678  }
679
680  // Doxygen skip:
681  /// @cond
682  void QueueManager::handleSubmittedStructure_(Structure *s)
683  {
684    Q_ASSERT(trackerContainsStructure(s, &m_submittedTracker));
685    removeFromTrackerWhenScopeEnds popper (s, &m_submittedTracker);
686
687    if (s->getStatus() != Structure::Submitted) {
688      return;
689    }
690
691    switch (m_opt->queueInterface()->getStatus(s)) {
692    case QueueInterface::Running:
693    case QueueInterface::Queued:
694    case QueueInterface::Success:
695    case QueueInterface::Started:
696      // Update the structure as "InProcess"
697      s->lock()->lockForWrite();
698      s->setStatus(Structure::InProcess);
699      s->lock()->unlock();
700      emit structureUpdated(s);
701      break;
702    case QueueInterface::Error:
703      s->lock()->lockForWrite();
704      s->setStatus(Structure::Restart);
705      s->lock()->unlock();
706      emit structureUpdated(s);
707      break;
708    case QueueInterface::CommunicationError:
709    case QueueInterface::Unknown:
710    case QueueInterface::Pending:
711    default:
712      // nothing to do but wait
713      break;
714    }
715  }
716  /// @endcond
717
718  void QueueManager::handleKilledStructure(Structure *s)
719  {
720    QWriteLocker locker (m_newlyKilledTracker.rwLock());
721    if (!m_newlyKilledTracker.append(s)) {
722      return;
723    }
724    QtConcurrent::run(this,
725                      &QueueManager::handleKilledStructure_, s);
726  }
727
728  // Doxygen skip:
729  /// @cond
730  void QueueManager::handleKilledStructure_(Structure *s)
731  {
732    Q_ASSERT(trackerContainsStructure(s, &m_newlyKilledTracker));
733    removeFromTrackerWhenScopeEnds popper (s, &m_newlyKilledTracker);
734
735    if (s->getStatus() != Structure::Killed &&
736        // Remove structures end up here, too, so check this (see
737        // handleRemovedStructure below)
738        s->getStatus() != Structure::Removed) {
739      return;
740    }
741
742    // Ensure that the job is not tying up the queue
743    stopJob(s);
744
745    // Remove from running tracker
746    m_runningTracker.lockForWrite();
747    m_runningTracker.remove(s);
748    m_runningTracker.unlock();
749  }
750  /// @endcond
751
752  void QueueManager::handleRemovedStructure(Structure *s)
753  {
754    handleKilledStructure(s);
755  }
756
757  void QueueManager::handleDuplicateStructure(Structure *s)
758  {
759    QWriteLocker locker (m_newDuplicateTracker.rwLock());
760    if (!m_newDuplicateTracker.append(s)) {
761      return;
762    }
763    QtConcurrent::run(this,
764                      &QueueManager::handleDuplicateStructure_, s);
765  }
766
767  // Doxygen skip:
768  /// @cond
769  void QueueManager::handleDuplicateStructure_(Structure *s)
770  {
771    Q_ASSERT(trackerContainsStructure(s, &m_newDuplicateTracker));
772    removeFromTrackerWhenScopeEnds popper (s, &m_newDuplicateTracker);
773
774    if (s->getStatus() != Structure::Duplicate) {
775      return;
776    }
777
778    // Ensure that the job is not tying up the queue
779    stopJob(s);
780
781    // Remove from running tracker
782    m_runningTracker.lockForWrite();
783    m_runningTracker.remove(s);
784    m_runningTracker.unlock();
785  }
786  /// @endcond
787
788  void QueueManager::handleRestartStructure(Structure *s)
789  {
790    QWriteLocker locker (m_restartTracker.rwLock());
791    if (!m_restartTracker.append(s)) {
792      return;
793    }
794    QtConcurrent::run(this,
795                      &QueueManager::handleRestartStructure_, s);
796  }
797
798  // Doxygen skip:
799  /// @cond
800  void QueueManager::handleRestartStructure_(Structure *s)
801  {
802    Q_ASSERT(trackerContainsStructure(s, &m_restartTracker));
803    removeFromTrackerWhenScopeEnds popper (s, &m_restartTracker);
804
805    if (s->getStatus() != Structure::Restart) {
806      return;
807    }
808
809    stopJob(s);
810
811    addStructureToSubmissionQueue(s);
812  }
813
814  void QueueManager::updateStructure(Structure *s) {
815    s->lock()->lockForWrite();
816    s->stopOptTimer();
817    s->resetFailCount();
818    s->setStatus(Structure::Updating);
819    s->lock()->unlock();
820    if (!m_opt->optimizer()->update(s)) {
821      s->lock()->lockForWrite();
822      s->setStatus(Structure::Error);
823      s->lock()->unlock();
824      emit structureUpdated(s);
825      return;
826    }
827    s->lock()->lockForWrite();
828    s->setStatus(Structure::StepOptimized);
829    s->lock()->unlock();
830    emit structureUpdated(s);
831    return;
832  }
833  /// @endcond
834
835  void QueueManager::killStructure(Structure *s) {
836    // End job if currently running
837    if ( s->getStatus() != Structure::Optimized ) {
838      s->lock()->lockForWrite();
839      s->stopOptTimer();
840      s->setStatus(Structure::Killed);
841      s->lock()->unlock();
842    }
843    else {
844      s->lock()->lockForWrite();
845      s->stopOptTimer();
846      s->setStatus(Structure::Removed);
847      s->lock()->unlock();
848    }
849    stopJob(s);
850    emit structureKilled(s);
851  }
852
853  void QueueManager::senderHasFinishedPreoptimization()
854  {
855    Structure *s = qobject_cast<Structure*>(this->sender());
856    if (s == NULL) {
857      qWarning() << Q_FUNC_INFO << "called with non-structure sender.";
858      return;
859    }
860
861    m_preOptTracker.remove(s);
862    this->addStructureToSubmissionQueue(s, 0);
863  }
864
865  void QueueManager::addStructureToSubmissionQueue(Structure *s,
866                                                   int optStep)
867  {
868    QWriteLocker locker (m_newSubmissionTracker.rwLock());
869    if (!m_newSubmissionTracker.append(s)) {
870      return;
871    }
872
873    QtConcurrent::run(this,
874                      &QueueManager::addStructureToSubmissionQueue_,
875                      s, optStep);
876  }
877
878  // Doxygen skip:
879  /// @cond
880  void QueueManager::addStructureToSubmissionQueue_(Structure *s, int optStep)
881  {
882    Q_ASSERT(trackerContainsStructure(s, &m_newSubmissionTracker));
883    removeFromTrackerWhenScopeEnds popper (s, &m_newSubmissionTracker);
884
885    // Update structure
886    s->lock()->lockForWrite();
887    s->setStatus(Structure::WaitingForOptimization);
888    if (optStep != 0) {
889      s->setCurrentOptStep(optStep);
890    }
891    s->lock()->unlock();
892
893    // Perform writing
894    m_opt->queueInterface()->writeInputFiles(s);
895    emit structureUpdated(s); // for optimizer lookup table, created during
896                              // input file creation.
897
898    m_jobStartTracker.lockForWrite();
899    m_jobStartTracker.append(s);
900    m_jobStartTracker.unlock();
901
902    m_runningTracker.lockForWrite();
903    m_runningTracker.append(s);
904    m_runningTracker.unlock();
905  }
906  /// @endcond
907
908  void QueueManager::startPreoptimization()
909  {
910    Structure *s;
911    m_needPreOptTracker.lockForWrite();
912    m_preOptTracker.lockForWrite();
913    if (!m_needPreOptTracker.popFirst(s)) {
914      m_preOptTracker.unlock();
915      m_needPreOptTracker.unlock();
916      return;
917    }
918    s->lock()->lockForWrite();
919
920    // Revalidate assumptions
921    if (!s->needsPreoptimization()) {
922      s->lock()->unlock();
923      m_preOptTracker.unlock();
924      m_needPreOptTracker.unlock();
925      return;
926    }
927
928    s->setStatus(Structure::Preoptimizing);
929    m_preOptTracker.append(s);
930
931    s->lock()->unlock();
932    m_preOptTracker.unlock();
933    m_needPreOptTracker.unlock();
934
935
936    this->connect(s, SIGNAL(preoptimizationFinished()),
937                  SLOT(senderHasFinishedPreoptimization()));
938
939    m_opt->preoptimizeStructure(s);
940  }
941
942  void QueueManager::startJob()
943  {
944    Structure *s;
945    if (!m_jobStartTracker.popFirst(s)) {
946      return;
947    }
948
949    s->lock()->lockForRead();
950    bool needsPreOpt = s->needsPreoptimization();
951    s->lock()->unlock();
952
953    if (needsPreOpt) {
954      // This shouldn't happen. If it does, just queue the job for
955      // preoptimization instead.
956      qDebug() << "Trying to skip preoptimization??";
957      m_opt->printBackTrace();
958      this->m_preOptTracker.lockForWrite();
959      this->m_preOptTracker.append(s);
960      this->m_preOptTracker.unlock();
961      return;
962    }
963
964    if (!m_opt->queueInterface()->startJob(s)) {
965      s->lock()->lockForWrite();
966      m_opt->warning(tr("QueueManager::startJob_: Job did not start "
967                        "successfully for structure %1-%2.")
968                     .arg(s->getIDString())
969                     .arg(s->getCurrentOptStep()));
970      s->setStatus(Structure::Error);
971      s->lock()->unlock();
972      return;
973    }
974
975    s->lock()->lockForWrite();
976    s->setStatus(Structure::Submitted);
977    s->lock()->unlock();
978
979    emit structureSubmitted(s);
980  }
981
982  void QueueManager::stopJob(Structure *s)
983  {
984    s->lock()->lockForRead();
985    if (s->isPreoptimizing()) {
986      s->abortPreoptimization();
987    }
988    s->lock()->unlock();
989    m_opt->queueInterface()->stopJob(s);
990  }
991
992  QList<Structure*> QueueManager::getAllPreoptimizingStructures()
993  {
994    QReadLocker locker (m_preOptTracker.rwLock()); Q_UNUSED(locker);
995    return *m_preOptTracker.list();
996  }
997
998  QList<Structure*> QueueManager::getAllRunningStructures()
999  {
1000    m_runningTracker.lockForRead();
1001    m_newStructureTracker.lockForRead();
1002    QList<Structure*> list(*m_runningTracker.list());
1003    list.append(*m_newStructureTracker.list());
1004    m_newStructureTracker.unlock();
1005    m_runningTracker.unlock();
1006    return list;
1007  }
1008
1009  QList<Structure*> QueueManager::getAllOptimizedStructures()
1010  {
1011    QList<Structure*> list;
1012    m_tracker->lockForRead();
1013    Structure *s;
1014    for (int i = 0; i < m_tracker->list()->size(); i++) {
1015      s = m_tracker->list()->at(i);
1016      s->lock()->lockForRead();
1017      if (s->getStatus() == Structure::Optimized)
1018        list.append(s);
1019      s->lock()->unlock();
1020    }
1021    m_tracker->unlock();
1022    return list;
1023  }
1024
1025  QList<Structure*> QueueManager::getAllDuplicateStructures()
1026  {
1027    QList<Structure*> list;
1028    m_tracker->lockForRead();
1029    Structure *s;
1030    for (int i = 0; i < m_tracker->list()->size(); i++) {
1031      s = m_tracker->list()->at(i);
1032      s->lock()->lockForRead();
1033      if (s->getStatus() == Structure::Duplicate)
1034        list.append(s);
1035      s->lock()->unlock();
1036    }
1037    m_tracker->unlock();
1038    return list;
1039  }
1040
1041  QList<Structure*> QueueManager::getAllStructures()
1042  {
1043    m_tracker->lockForRead();
1044    m_newStructureTracker.lockForRead();
1045    QList<Structure*> list (*m_tracker->list());
1046    list.append(*m_newStructureTracker.list());
1047    m_newStructureTracker.unlock();
1048    m_tracker->unlock();
1049    return list;
1050  }
1051
1052  QList<Structure*> QueueManager::lockForNaming()
1053  {
1054    QList<Structure*> structures = getAllStructures();
1055    // prevent compiler from optimizing "structures" out:
1056    structures.size();
1057
1058    m_tracker->lockForRead();
1059    return structures;
1060  }
1061
1062  void QueueManager::unlockForNaming(Structure *s)
1063  {
1064    if (!s) {
1065      m_tracker->unlock();
1066      return;
1067    }
1068
1069    // Discard structure if we're shutting down
1070    if (m_isDestroying) {
1071      --m_requestedStructures;
1072      m_tracker->unlock();
1073      return;
1074    }
1075
1076    if (!m_opt->isStarting) {
1077      --m_requestedStructures;
1078    }
1079
1080    // Append to tracker after decrementing
1081    // m_requestedStructures. This keeps behavior predictable during
1082    // session initialization.
1083    m_newStructureTracker.lockForWrite();
1084    m_newStructureTracker.append(s);
1085
1086    Q_ASSERT_X(m_requestedStructures >= 0, Q_FUNC_INFO,
1087               "The requested structures counter has become negative.");
1088
1089    qDebug() << "New structure accepted (" << s->getIDString() << ")";
1090
1091    m_newStructureTracker.unlock();
1092    m_tracker->unlock();
1093#if QT_VERSION == 0x040603
1094    emit newStructureQueued();
1095#else // QT_VERSION == 4.6.3
1096    QtConcurrent::run(this, &QueueManager::unlockForNaming_);
1097#endif // QT_VERSION == 4.6.3
1098  }
1099
1100  // Doxygen skip:
1101  /// @cond
1102  void QueueManager::unlockForNaming_()
1103  {
1104    Structure *s;
1105    m_tracker->lockForWrite();
1106    m_newStructureTracker.lockForWrite();
1107    if (!m_newStructureTracker.popFirst(s)) {
1108      m_newStructureTracker.unlock();
1109      m_tracker->unlock();
1110      return;
1111    }
1112
1113    // Update structure
1114    s->lock()->lockForWrite();
1115    s->setStatus(Structure::WaitingForOptimization);
1116    bool needsPreOpt = s->needsPreoptimization();
1117    s->lock()->unlock();
1118
1119    if (needsPreOpt) {
1120      m_needPreOptTracker.lockForWrite();
1121      m_needPreOptTracker.append(s);
1122    }
1123
1124    m_tracker->append(s);
1125
1126    if (needsPreOpt) {
1127      m_needPreOptTracker.unlock();
1128    }
1129
1130    m_newStructureTracker.unlock();
1131    m_tracker->unlock();
1132
1133    if (!needsPreOpt) {
1134      this->addStructureToSubmissionQueue(s);
1135    }
1136    emit structureStarted(s);
1137  }
1138  /// @endcond
1139
1140  void QueueManager::addManualStructureRequest(int requests)
1141  {
1142    m_tracker->lockForWrite();
1143    m_requestedStructures += requests;
1144    m_tracker->unlock();
1145  }
1146
1147  void QueueManager::appendToJobStartTracker(Structure *s)
1148  {
1149    m_jobStartTracker.lockForWrite();
1150    m_jobStartTracker.append(s);
1151    m_jobStartTracker.unlock();
1152  }
1153
1154} // end namespace GlobalSearch
Note: See TracBrowser for help on using the browser.