00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 #ifdef ENABLE_SSH
00018
00019
00021
00022 #include <globalsearch/queueinterfaces/lsf.h>
00023
00024 #include <globalsearch/macros.h>
00025 #include <globalsearch/optimizer.h>
00026 #include <globalsearch/sshconnection.h>
00027 #include <globalsearch/sshmanager.h>
00028 #include <globalsearch/structure.h>
00029
00030 #include <QtCore/QDir>
00031 #include <QtCore/QFile>
00032
00033 namespace GlobalSearch {
00034
00035 LsfQueueInterface::LsfQueueInterface(OptBase *parent,
00036 const QString &settingsFile) :
00037 RemoteQueueInterface(parent, settingsFile),
00038 m_bjobs("bjobs"),
00039 m_bsub("bsub"),
00040 m_bkill("bkill"),
00041 m_cleanRemoteOnStop(false)
00042 {
00043 m_idString = "LSF";
00044 m_templates.append("job.lsf");
00045 m_hasDialog = true;
00046
00047 readSettings(settingsFile);
00048 }
00049
00050 LsfQueueInterface::~LsfQueueInterface()
00051 {
00052 }
00053
00054 bool LsfQueueInterface::isReadyToSearch(QString *str)
00055 {
00056
00057 if (m_opt->filePath.isEmpty()) {
00058 *str = tr("Local working directory is not set. Check your Queue "
00059 "configuration.");
00060 return false;
00061 }
00062
00063
00064 QDir workingdir (m_opt->filePath);
00065 bool writable = true;
00066 if (!workingdir.exists()) {
00067 if (!workingdir.mkpath(m_opt->filePath)) {
00068 writable = false;
00069 }
00070 }
00071 else {
00072
00073 QString filename = m_opt->filePath + QString("queuetest-")
00074 + QString::number(RANDUINT());
00075 QFile file (filename);
00076 if (!file.open(QFile::ReadWrite)) {
00077 writable = false;
00078 }
00079 file.remove();
00080 }
00081 if (!writable) {
00082 *str = tr("Cannot write to working directory '%1'.\n\nPlease "
00083 "change the permissions on this directory or specify "
00084 "a different one in the Queue configuration.")
00085 .arg(m_opt->filePath);
00086 return false;
00087 }
00088
00089
00090 if (m_opt->host.isEmpty()) {
00091 *str = tr("Hostname of LSF server is not set. Check your Queue "
00092 "configuration.");
00093 return false;
00094 }
00095
00096 if (m_bkill.isEmpty()) {
00097 *str = tr("bkill command is not set. Check your Queue "
00098 "configuration.");
00099 return false;
00100 }
00101
00102 if (m_bjobs.isEmpty()) {
00103 *str = tr("bjobs command is not set. Check your Queue "
00104 "configuration.");
00105 return false;
00106 }
00107
00108 if (m_bsub.isEmpty()) {
00109 *str = tr("bsub command is not set. Check your Queue "
00110 "configuration.");
00111 return false;
00112 }
00113
00114 if (m_opt->rempath.isEmpty()) {
00115 *str = tr("Remote working directory is not set. Check your Queue "
00116 "configuration.");
00117 return false;
00118 }
00119
00120 if (m_opt->username.isEmpty()) {
00121 *str = tr("SSH username for LSF server is not set. Check your Queue "
00122 "configuration.");
00123 return false;
00124 }
00125
00126 if (m_opt->port < 0) {
00127 *str = tr("SSH port is invalid (Port %1). Check your Queue "
00128 "configuration.").arg(m_opt->port);
00129 return false;
00130 }
00131
00132 *str = "";
00133 return true;
00134 }
00135
00136 QDialog* LsfQueueInterface::dialog()
00137 {
00138 if (!m_dialog) {
00139 m_dialog = new LsfConfigDialog (m_opt->dialog(),
00140 m_opt,
00141 this);
00142 }
00143 LsfConfigDialog *d = qobject_cast<LsfConfigDialog*>(m_dialog);
00144 d->updateGUI();
00145
00146 return d;
00147 }
00148
00149 void LsfQueueInterface::readSettings(const QString &filename)
00150 {
00151 SETTINGS(filename);
00152
00153 settings->beginGroup(m_opt->getIDString().toLower());
00154 settings->beginGroup("queueinterface/lsfqueueinterface");
00155 int loadedVersion = settings->value("version", 0).toInt();
00156 settings->beginGroup("paths");
00157
00158 m_bsub = settings->value("bsub", "bsub").toString();
00159 m_bjobs = settings->value("bjobs", "bjobs").toString();
00160 m_bkill = settings->value("bkill", "bkill").toString();
00161 m_cleanRemoteOnStop = settings->value("cleanRemoteOnStop", false).toBool();
00162
00163 settings->endGroup();
00164 settings->endGroup();
00165 settings->endGroup();
00166
00167 DESTROY_SETTINGS(filename);
00168
00169
00170 switch (loadedVersion) {
00171 case 0:
00172 default:
00173 break;
00174 }
00175
00176 }
00177
00178 void LsfQueueInterface::writeSettings(const QString &filename)
00179 {
00180 SETTINGS(filename);
00181
00182 const int VERSION = 1;
00183
00184 settings->beginGroup(m_opt->getIDString().toLower());
00185 settings->beginGroup("queueinterface/lsfqueueinterface");
00186 settings->setValue("version", VERSION);
00187 settings->beginGroup("paths");
00188
00189 settings->setValue("bsub", m_bsub);
00190 settings->setValue("bjobs", m_bjobs);
00191 settings->setValue("bkill", m_bkill);
00192 settings->setValue("cleanRemoteOnStop", m_cleanRemoteOnStop);
00193
00194 settings->endGroup();
00195 settings->endGroup();
00196 settings->endGroup();
00197
00198 DESTROY_SETTINGS(filename);
00199 }
00200
00201 bool LsfQueueInterface::startJob(Structure *s)
00202 {
00203 SSHConnection *ssh = m_opt->ssh()->getFreeConnection();
00204
00205 if (ssh == NULL) {
00206 m_opt->warning(tr("Cannot connect to ssh server"));
00207 return false;
00208 }
00209
00210 QWriteLocker wlocker (s->lock());
00211
00212 QString command = "cd \"" + s->getRempath() + "\" && " +
00213 m_bsub + "< job.lsf";
00214
00215 QString stdout_str;
00216 QString stderr_str;
00217 int ec;
00218 if (!ssh->execute(command, stdout_str, stderr_str, ec) || ec != 0) {
00219 m_opt->warning(tr("Error executing %1: %2")
00220 .arg(command).arg(stderr_str));
00221 m_opt->ssh()->unlockConnection(ssh);
00222 return false;
00223 }
00224 m_opt->ssh()->unlockConnection(ssh);
00225
00226
00227
00228
00229 QStringList list = stdout_str.split(QRegExp("<|>"));
00230 bool ok;
00231 unsigned int jobID;
00232 if (list.size() >= 2) {
00233 jobID = list.at(1).toUInt(&ok);
00234 }
00235 else {
00236 ok = false;
00237 }
00238
00239 if (!ok) {
00240 m_opt->warning(tr("Error retrieving jobID for structure %1.")
00241 .arg(s->getIDString()));
00242 return false;
00243 }
00244
00245 s->setJobID(jobID);
00246 s->startOptTimer();
00247 return true;
00248 }
00249
00250 bool LsfQueueInterface::stopJob(Structure *s)
00251 {
00252 SSHConnection *ssh = m_opt->ssh()->getFreeConnection();
00253
00254 if (ssh == NULL) {
00255 m_opt->warning(tr("Cannot connect to ssh server"));
00256 return false;
00257 }
00258
00259
00260 QWriteLocker locker (s->lock());
00261
00262
00263 if (s->getJobID() == 0) {
00264 if (m_cleanRemoteOnStop) {
00265 this->cleanRemoteDirectory(s, ssh);
00266 }
00267 m_opt->ssh()->unlockConnection(ssh);
00268 return true;
00269 }
00270
00271 const QString command = m_bkill + " " + QString::number(s->getJobID());
00272
00273
00274 QString stdout_str;
00275 QString stderr_str;
00276 int ec;
00277 bool ret = true;
00278 if (!ssh->execute(command, stdout_str, stderr_str, ec) || ec != 0) {
00279
00280 ret = false;
00281 }
00282
00283 s->setJobID(0);
00284 s->stopOptTimer();
00285 m_opt->ssh()->unlockConnection(ssh);
00286 return ret;
00287 }
00288
00289 QueueInterface::QueueStatus LsfQueueInterface::getStatus(Structure *s) const
00290 {
00291
00292 QWriteLocker locker (s->lock());
00293 QStringList queueData = getQueueList();
00294 unsigned int jobID = static_cast<unsigned int>(s->getJobID());
00295
00296
00297
00298 if (queueData.size() == 1 && queueData[0].compare("CommError") == 0) {
00299 return QueueInterface::CommunicationError;
00300 }
00301
00302
00303 if (!jobID && s->getStatus() != Structure::Submitted) {
00304 return QueueInterface::Error;
00305 }
00306
00307
00308
00309
00310
00311
00312
00313
00314
00315 QString status;
00316 QStringList entryList;
00317 unsigned int curJobID = 0;
00318 bool ok;
00319 for (int i = 0; i < queueData.size(); i++) {
00320 entryList = queueData.at(i).split(QRegExp("\\s+"),
00321 QString::SkipEmptyParts);
00322 if (entryList.size()) {
00323 curJobID = entryList.first().toUInt(&ok);
00324 if (!ok) {
00325 continue;
00326 }
00327 }
00328 else {
00329 continue;
00330 }
00331 if (curJobID == jobID) {
00332 if (entryList.size() < 3) {
00333 continue;
00334 }
00335 status = entryList.at(2);
00336 break;
00337 }
00338 }
00339
00340
00341
00342
00343
00344
00345
00346
00347
00348 if (s->getStatus() == Structure::Submitted) {
00349
00350 if (status.isEmpty()) {
00351
00352 bool exists;
00353 if (!m_opt->optimizer()->checkIfOutputFileExists(s, &exists)) {
00354 return QueueInterface::CommunicationError;
00355 }
00356 if (!exists) {
00357
00358 return QueueInterface::Pending;
00359 }
00360 else {
00361
00362 return QueueInterface::Started;
00363 }
00364 }
00365 else {
00366
00367 return QueueInterface::Started;
00368 }
00369 }
00370
00371 if (status.contains(QRegExp("RUN|DONE|EXIT"))) {
00372 return QueueInterface::Running;
00373 }
00374 else if (status.contains(QRegExp("PEND|PSUSP|USUSP|SSUSP"))) {
00375 return QueueInterface::Queued;
00376 }
00377 else if (status.isEmpty()) {
00378 locker.unlock();
00379 bool outputFileExists;
00380 if (!m_opt->optimizer()->checkIfOutputFileExists(s, &outputFileExists) ) {
00381 return QueueInterface::CommunicationError;
00382 }
00383 locker.relock();
00384
00385 if (outputFileExists) {
00386
00387 bool success;
00388 if (!m_opt->optimizer()->checkForSuccessfulOutput(s, &success)) {
00389 return QueueInterface::CommunicationError;
00390 }
00391 if (success) {
00392 return QueueInterface::Success;
00393 }
00394 else {
00395 return QueueInterface::Error;
00396 }
00397 }
00398
00399
00400
00401
00402 m_opt->debug(tr("Structure %1 with jobID %2 is missing "
00403 "from the queue and has not written any output.")
00404 .arg(s->getIDString()).arg(s->getJobID()));
00405 return QueueInterface::Error;
00406 }
00407
00408 else {
00409 m_opt->debug(tr("Structure %1 with jobID %2 has "
00410 "unrecognized status: %3")
00411 .arg(s->getIDString()).arg(s->getJobID())
00412 .arg(status));
00413 return QueueInterface::Unknown;
00414 }
00415 }
00416
00417 QStringList LsfQueueInterface::getQueueList() const
00418 {
00419
00420 QReadWriteLock &queueMutex = const_cast<QReadWriteLock&> (m_queueMutex);
00421
00422 queueMutex.lockForRead();
00423
00424
00425 if (m_queueTimeStamp.isValid() &&
00426
00427 #if QT_VERSION >= 0x040700
00428 m_queueTimeStamp.msecsTo(QDateTime::currentDateTime())
00429 <= 1000
00430 #else
00431
00432
00433 (m_queueTimeStamp.date() == QDate::currentDate() &&
00434 m_queueTimeStamp.time().msecsTo(QTime::currentTime())
00435 <= 1000)
00436 #endif
00437 ) {
00438
00439 QStringList ret (m_queueData);
00440 queueMutex.unlock();
00441 return ret;
00442 }
00443
00444
00445
00446 QDateTime oldTimeStamp (m_queueTimeStamp);
00447 queueMutex.unlock();
00448
00449
00450 QWriteLocker queueLocker (&queueMutex);
00451
00452
00453
00454
00455
00456 if (m_queueTimeStamp.time().msecsTo(oldTimeStamp.time()) != 0) {
00457 queueLocker.unlock();
00458 return this->getQueueList();
00459 }
00460
00461
00462
00463 QStringList &queueData = const_cast<QStringList&> (m_queueData);
00464 QDateTime &queueTimeStamp = const_cast<QDateTime&> (m_queueTimeStamp);
00465
00466
00467 SSHConnection *ssh = m_opt->ssh()->getFreeConnection();
00468
00469 if (ssh == NULL) {
00470 m_opt->warning(tr("Cannot connect to ssh server"));
00471 queueTimeStamp = QDateTime::currentDateTime();
00472 queueData.clear();
00473 queueData << "CommError";
00474 QStringList ret (m_queueData);
00475 return ret;
00476 }
00477
00478 QString command = m_bjobs + " -u " + m_opt->username;
00479
00480
00481 QString stdout_str;
00482 QString stderr_str;
00483 int ec;
00484
00485
00486
00487 if (!ssh->execute(command, stdout_str, stderr_str, ec)
00488 || (ec != 0 && ec != 1 )
00489 ) {
00490 m_opt->ssh()->unlockConnection(ssh);
00491 m_opt->warning(tr("Error executing %1: (%2) %3\n\t"
00492 "Using cached queue data.")
00493 .arg(command)
00494 .arg(QString::number(ec))
00495 .arg(stderr_str));
00496 queueTimeStamp = QDateTime::currentDateTime();
00497 QStringList ret (m_queueData);
00498 return ret;
00499 }
00500 m_opt->ssh()->unlockConnection(ssh);
00501
00502 queueData = stdout_str.split("\n", QString::SkipEmptyParts);
00503
00504 QStringList ret (m_queueData);
00505 queueTimeStamp = QDateTime::currentDateTime();
00506 return ret;
00507 }
00508
00509 }
00510
00512
00513 #endif // ENABLE_SSH