00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 #ifdef ENABLE_SSH
00018
00019
00021
00022 #include <globalsearch/queueinterfaces/loadleveler.h>
00023
00024 #include <globalsearch/macros.h>
00025 #include <globalsearch/optimizer.h>
00026 #include <globalsearch/sshconnection.h>
00027 #include <globalsearch/sshmanager.h>
00028 #include <globalsearch/structure.h>
00029
00030 #include <QtCore/QDir>
00031 #include <QtCore/QFile>
00032 #include <QtCore/QString>
00033 #include <QtCore/QStringList>
00034
00035 namespace GlobalSearch {
00036
00037 LoadLevelerQueueInterface::LoadLevelerQueueInterface(
00038 OptBase *parent, const QString &settingsFile) :
00039 RemoteQueueInterface(parent, settingsFile),
00040 m_llq("llq"),
00041 m_llsubmit("llsubmit"),
00042 m_llcancel("llcancel"),
00043 m_interval(20),
00044 m_cleanRemoteOnStop(false)
00045 {
00046 m_idString = "LoadLeveler";
00047 m_templates.clear();
00048 m_templates.append("job.ll");
00049 m_hasDialog = true;
00050
00051 readSettings(settingsFile);
00052 }
00053
00054 LoadLevelerQueueInterface::~LoadLevelerQueueInterface()
00055 {
00056 }
00057
00058 bool LoadLevelerQueueInterface::isReadyToSearch(QString *str)
00059 {
00060
00061 if (m_opt->filePath.isEmpty()) {
00062 *str = tr("Local working directory is not set. Check your Queue "
00063 "configuration.");
00064 return false;
00065 }
00066
00067
00068 QDir workingdir (m_opt->filePath);
00069 bool writable = true;
00070 if (!workingdir.exists()) {
00071 if (!workingdir.mkpath(m_opt->filePath)) {
00072 writable = false;
00073 }
00074 }
00075 else {
00076
00077 QString filename = m_opt->filePath + QString("queuetest-")
00078 + QString::number(RANDUINT());
00079 QFile file (filename);
00080 if (!file.open(QFile::ReadWrite)) {
00081 writable = false;
00082 }
00083 file.remove();
00084 }
00085 if (!writable) {
00086 *str = tr("Cannot write to working directory '%1'.\n\nPlease "
00087 "change the permissions on this directory or specify "
00088 "a different one in the Queue configuration.")
00089 .arg(m_opt->filePath);
00090 return false;
00091 }
00092
00093
00094 if (m_opt->host.isEmpty()) {
00095 *str = tr("Hostname of LoadLeveler server is not set. Check your Queue "
00096 "configuration.");
00097 return false;
00098 }
00099
00100 if (m_llcancel.isEmpty()) {
00101 *str = tr("llcancel command is not set. Check your Queue "
00102 "configuration.");
00103 return false;
00104 }
00105
00106 if (m_llq.isEmpty()) {
00107 *str = tr("llq command is not set. Check your Queue "
00108 "configuration.");
00109 return false;
00110 }
00111
00112 if (m_llsubmit.isEmpty()) {
00113 *str = tr("llsubmit command is not set. Check your Queue "
00114 "configuration.");
00115 return false;
00116 }
00117
00118 if (m_opt->rempath.isEmpty()) {
00119 *str = tr("Remote working directory is not set. Check your Queue "
00120 "configuration.");
00121 return false;
00122 }
00123
00124 if (m_opt->username.isEmpty()) {
00125 *str = tr("SSH username for LoadLeveler server is not set. Check your Queue "
00126 "configuration.");
00127 return false;
00128 }
00129
00130 if (m_opt->port < 0) {
00131 *str = tr("SSH port is invalid (Port %1). Check your Queue "
00132 "configuration.").arg(m_opt->port);
00133 return false;
00134 }
00135
00136 *str = "";
00137 return true;
00138 }
00139
00140 QDialog* LoadLevelerQueueInterface::dialog()
00141 {
00142 if (!m_dialog) {
00143 m_dialog = new LoadLevelerConfigDialog (m_opt->dialog(),
00144 m_opt,
00145 this);
00146 }
00147 LoadLevelerConfigDialog *d =
00148 qobject_cast<LoadLevelerConfigDialog*>(m_dialog);
00149 d->updateGUI();
00150
00151 return d;
00152 }
00153
00154 void LoadLevelerQueueInterface::readSettings(const QString &filename)
00155 {
00156 SETTINGS(filename);
00157
00158 settings->beginGroup(m_opt->getIDString().toLower());
00159 settings->beginGroup("queueinterface/loadlevelerqueueinterface");
00160 int loadedVersion = settings->value("version", 0).toInt();
00161 settings->beginGroup("paths");
00162
00163 m_llsubmit = settings->value("llsubmit", "llsubmit").toString();
00164 m_llq = settings->value("llq", "llq").toString();
00165 m_llcancel = settings->value("llcancel", "llcancel").toString();
00166 this->setInterval(settings->value("interval", 20).toInt());
00167 m_cleanRemoteOnStop = settings->value("cleanRemoteOnStop", false).toBool();
00168
00169 settings->endGroup();
00170 settings->endGroup();
00171 settings->endGroup();
00172
00173 DESTROY_SETTINGS(filename);
00174
00175
00176 switch (loadedVersion) {
00177 case 0:
00178 case 1:
00179 default:
00180 break;
00181 }
00182
00183 }
00184
00185 void LoadLevelerQueueInterface::writeSettings(const QString &filename)
00186 {
00187 SETTINGS(filename);
00188
00189 const int VERSION = 1;
00190
00191 settings->beginGroup(m_opt->getIDString().toLower());
00192 settings->beginGroup("queueinterface/loadlevelerqueueinterface");
00193 settings->setValue("version", VERSION);
00194 settings->beginGroup("paths");
00195
00196 settings->setValue("llsubmit", m_llsubmit);
00197 settings->setValue("llq", m_llq);
00198 settings->setValue("llcancel", m_llcancel);
00199 settings->setValue("interval", m_interval);
00200 settings->setValue("cleanRemoteOnStop", m_cleanRemoteOnStop);
00201
00202 settings->endGroup();
00203 settings->endGroup();
00204 settings->endGroup();
00205
00206 DESTROY_SETTINGS(filename);
00207 }
00208
00209 bool LoadLevelerQueueInterface::startJob(Structure *s)
00210 {
00211 SSHConnection *ssh = m_opt->ssh()->getFreeConnection();
00212
00213 if (ssh == NULL) {
00214 m_opt->warning(tr("Cannot connect to ssh server"));
00215 return false;
00216 }
00217
00218 QWriteLocker wlocker (s->lock());
00219
00220 QString command = "cd \"" + s->getRempath() + "\" && " +
00221 m_llsubmit + " job.ll";
00222
00223 QString stdout_str;
00224 QString stderr_str;
00225 int ec;
00226 if (!ssh->execute(command, stdout_str, stderr_str, ec) || ec != 0) {
00227 m_opt->warning(tr("Error executing %1: %2")
00228 .arg(command).arg(stderr_str));
00229 m_opt->ssh()->unlockConnection(ssh);
00230 return false;
00231 }
00232 m_opt->ssh()->unlockConnection(ssh);
00233
00234 bool ok;
00235 unsigned int jobId = this->parseJobId(stdout_str, &ok);
00236 if (!ok) {
00237
00238 m_opt->warning(tr("Cannot parse jobID for Structure %1. Command: \"%2\" "
00239 "Output: \"%3\"")
00240 .arg(s->getIDString())
00241 .arg(command)
00242 .arg(stdout_str));
00243 return false;
00244 }
00245
00246 s->setJobID(jobId);
00247 s->startOptTimer();
00248 return true;
00249 }
00250
00251 bool LoadLevelerQueueInterface::stopJob(Structure *s)
00252 {
00253 SSHConnection *ssh = m_opt->ssh()->getFreeConnection();
00254
00255 if (ssh == NULL) {
00256 m_opt->warning(tr("Cannot connect to ssh server"));
00257 return false;
00258 }
00259
00260
00261 QWriteLocker locker (s->lock());
00262
00263
00264 if (s->getJobID() == 0) {
00265 if (m_cleanRemoteOnStop) {
00266 this->cleanRemoteDirectory(s, ssh);
00267 }
00268 m_opt->ssh()->unlockConnection(ssh);
00269 return true;
00270 }
00271
00272 const QString command = m_llcancel + " " + QString::number(s->getJobID());
00273
00274
00275 QString stdout_str;
00276 QString stderr_str;
00277 int ec;
00278 bool ret = true;
00279 if (!ssh->execute(command, stdout_str, stderr_str, ec) || ec != 0) {
00280
00281 ret = false;
00282 }
00283
00284 s->setJobID(0);
00285 s->stopOptTimer();
00286 m_opt->ssh()->unlockConnection(ssh);
00287 return ret;
00288 }
00289
00290 QueueInterface::QueueStatus LoadLevelerQueueInterface::getStatus(Structure *s) const
00291 {
00292
00293 QWriteLocker locker (s->lock());
00294 QStringList queueData = getQueueList();
00295 unsigned int jobID = static_cast<unsigned int>(s->getJobID());
00296
00297
00298
00299 if (queueData.size() == 1 && queueData[0].compare("CommError") == 0) {
00300 return QueueInterface::CommunicationError;
00301 }
00302
00303
00304 if (!jobID && s->getStatus() != Structure::Submitted) {
00305 return QueueInterface::Error;
00306 }
00307
00308 QString status = this->parseStatus(queueData, jobID);
00309
00310
00311
00312
00313
00314
00315
00316
00317
00318 if (s->getStatus() == Structure::Submitted) {
00319
00320 if (status.isEmpty()) {
00321
00322 bool exists;
00323 if (!m_opt->optimizer()->checkIfOutputFileExists(s, &exists)) {
00324 return QueueInterface::CommunicationError;
00325 }
00326 if (!exists) {
00327
00328 return QueueInterface::Pending;
00329 }
00330 else {
00331
00332 return QueueInterface::Started;
00333 }
00334 }
00335 else {
00336
00337 return QueueInterface::Started;
00338 }
00339 }
00340
00341
00342 QRegExp runningStatusMatcher ("C|CP|D|E|EP|MP|NR|NQ|R|RM|RP|ST|TX|V|VP");
00343 QRegExp queuedStatusMatcher ("H|HS|I|S");
00344 QRegExp errorStatusMatcher ("SX|X|XP");
00345 if (runningStatusMatcher.exactMatch(status))
00346 return QueueInterface::Running;
00347 else if (queuedStatusMatcher.exactMatch(status)) {
00348 return QueueInterface::Queued;
00349 }
00350 else if (errorStatusMatcher.exactMatch(status)) {
00351 m_opt->warning(tr("Structure %1 returned an error status in the queue: %2")
00352 .arg(s->getIDString()).arg(status));
00353 return QueueInterface::Error;
00354 }
00355 else if (status.isEmpty()) {
00356 locker.unlock();
00357 bool outputFileExists;
00358 if (!m_opt->optimizer()->checkIfOutputFileExists(s, &outputFileExists) ) {
00359 return QueueInterface::CommunicationError;
00360 }
00361 locker.relock();
00362
00363 if (outputFileExists) {
00364
00365 bool success;
00366 if (!m_opt->optimizer()->checkForSuccessfulOutput(s, &success)) {
00367 return QueueInterface::CommunicationError;
00368 }
00369 if (success) {
00370 return QueueInterface::Success;
00371 }
00372 else {
00373 return QueueInterface::Error;
00374 }
00375 }
00376
00377
00378
00379
00380 m_opt->debug(tr("Structure %1 with jobID %2 is missing "
00381 "from the queue and has not written any output.")
00382 .arg(s->getIDString()).arg(s->getJobID()));
00383 return QueueInterface::Error;
00384 }
00385
00386 else {
00387 m_opt->debug(tr("Structure %1 with jobID %2 has "
00388 "unrecognized status: %3")
00389 .arg(s->getIDString()).arg(s->getJobID())
00390 .arg(status));
00391 return QueueInterface::Unknown;
00392 }
00393 }
00394
00395 void LoadLevelerQueueInterface::setInterval(const int sec)
00396 {
00397 m_queueMutex.lockForWrite();
00398 m_interval = sec;
00399 m_queueMutex.unlock();
00400 }
00401
00402 QString LoadLevelerQueueInterface::parseStatus(const QStringList &statusList,
00403 unsigned int jobId) const
00404 {
00405
00406
00407
00408
00409
00410
00411
00412
00413
00414
00415
00416
00417
00418
00419
00420
00421 QString matchString = QString(
00422 "^\\w*.%1\\.\\d+\\s*\\w+[\\s0-9/:]+(\\w+)").arg(jobId);
00423 QRegExp statusCapture (matchString);
00424 foreach (const QString &str, statusList) {
00425 if (str.indexOf(statusCapture) == -1) {
00426 continue;
00427 }
00428 break;
00429 }
00430
00431 return statusCapture.cap(1);
00432 }
00433
00434 unsigned int LoadLevelerQueueInterface::parseJobId(
00435 const QString &submissionOutput, bool *ok) const
00436 {
00437
00438
00439
00440
00441
00442 QRegExp idCapture (".*\".*\\.([0-9]+)\"");
00443 *ok = false;
00444 if (idCapture.indexIn(submissionOutput) == -1) {
00445
00446 m_opt->warning(tr("Cannot parse jobID from output: \"%1\" Match len %2")
00447 .arg(submissionOutput)
00448 .arg(idCapture.matchedLength()));
00449 return 0;
00450 }
00451
00452 bool idIsInt;
00453 unsigned int jobId = idCapture.cap(1).toUInt(&idIsInt);
00454
00455 if (!idIsInt) {
00456 m_opt->warning(tr("Invalid jobID. %1 output:\n%2\n"
00457 "Parsed jobid: '%3'' (must be a positive integer).")
00458 .arg(m_llsubmit).arg(submissionOutput)
00459 .arg(idCapture.cap(1)));
00460 return 0;
00461 }
00462
00463 *ok = true;
00464 return jobId;
00465 }
00466
00467 QStringList LoadLevelerQueueInterface::getQueueList() const
00468 {
00469
00470 QReadWriteLock &queueMutex = const_cast<QReadWriteLock&> (m_queueMutex);
00471
00472 queueMutex.lockForRead();
00473
00474
00475 if (m_queueTimeStamp.isValid() &&
00476
00477 #if QT_VERSION >= 0x040700
00478 m_queueTimeStamp.msecsTo(QDateTime::currentDateTime())
00479 <= 1000*m_interval
00480 #else
00481
00482
00483 (m_queueTimeStamp.date() == QDate::currentDate() &&
00484 m_queueTimeStamp.time().msecsTo(QTime::currentTime())
00485 <= 1000*m_interval)
00486 #endif
00487 ) {
00488
00489 QStringList ret (m_queueData);
00490 queueMutex.unlock();
00491 return ret;
00492 }
00493
00494
00495
00496 QDateTime oldTimeStamp (m_queueTimeStamp);
00497 queueMutex.unlock();
00498
00499
00500 QWriteLocker queueLocker (&queueMutex);
00501
00502
00503
00504
00505
00506 if (m_queueTimeStamp.time().msecsTo(oldTimeStamp.time()) != 0) {
00507 queueLocker.unlock();
00508 return this->getQueueList();
00509 }
00510
00511
00512
00513 QStringList &queueData = const_cast<QStringList&> (m_queueData);
00514 QDateTime &queueTimeStamp = const_cast<QDateTime&> (m_queueTimeStamp);
00515
00516
00517 SSHConnection *ssh = m_opt->ssh()->getFreeConnection();
00518
00519 if (ssh == NULL) {
00520 m_opt->warning(tr("Cannot connect to ssh server"));
00521 queueTimeStamp = QDateTime::currentDateTime();
00522 queueData.clear();
00523 queueData << "CommError";
00524 QStringList ret (m_queueData);
00525 return ret;
00526 }
00527
00528 QString command = m_llq + " -u " + m_opt->username;
00529
00530
00531 QString stdout_str;
00532 QString stderr_str;
00533 int ec;
00534
00535
00536
00537 if (!ssh->execute(command, stdout_str, stderr_str, ec)
00538 || (ec != 0 && ec != 1 )
00539 ) {
00540 m_opt->ssh()->unlockConnection(ssh);
00541 m_opt->warning(tr("Error executing %1: (%2) %3\n\t"
00542 "Using cached queue data.")
00543 .arg(command)
00544 .arg(QString::number(ec))
00545 .arg(stderr_str));
00546 queueTimeStamp = QDateTime::currentDateTime();
00547 QStringList ret (m_queueData);
00548 return ret;
00549 }
00550 m_opt->ssh()->unlockConnection(ssh);
00551
00552 queueData = stdout_str.split("\n", QString::SkipEmptyParts);
00553
00554 QStringList ret (m_queueData);
00555 queueTimeStamp = QDateTime::currentDateTime();
00556 return ret;
00557 }
00558
00559 }
00560
00562
00563 #endif // ENABLE_SSH