00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 #ifdef ENABLE_SSH
00018
00019
00021
00022 #include <globalsearch/queueinterfaces/pbs.h>
00023
00024 #include <globalsearch/macros.h>
00025 #include <globalsearch/optimizer.h>
00026 #include <globalsearch/sshconnection.h>
00027 #include <globalsearch/sshmanager.h>
00028 #include <globalsearch/structure.h>
00029
00030 #include <QtCore/QDir>
00031 #include <QtCore/QFile>
00032 #include <QtCore/QString>
00033 #include <QtCore/QStringList>
00034
00035 namespace GlobalSearch {
00036
00037 PbsQueueInterface::PbsQueueInterface(OptBase *parent,
00038 const QString &settingsFile) :
00039 RemoteQueueInterface(parent, settingsFile),
00040 m_qstat("qstat"),
00041 m_qsub("qsub"),
00042 m_qdel("qdel"),
00043 m_interval(1),
00044 m_cleanRemoteOnStop(false)
00045 {
00046 m_idString = "PBS";
00047 m_templates.append("job.pbs");
00048 m_hasDialog = true;
00049
00050 readSettings(settingsFile);
00051 }
00052
00053 PbsQueueInterface::~PbsQueueInterface()
00054 {
00055 }
00056
00057 bool PbsQueueInterface::isReadyToSearch(QString *str)
00058 {
00059
00060 if (m_opt->filePath.isEmpty()) {
00061 *str = tr("Local working directory is not set. Check your Queue "
00062 "configuration.");
00063 return false;
00064 }
00065
00066
00067 QDir workingdir (m_opt->filePath);
00068 bool writable = true;
00069 if (!workingdir.exists()) {
00070 if (!workingdir.mkpath(m_opt->filePath)) {
00071 writable = false;
00072 }
00073 }
00074 else {
00075
00076 QString filename = m_opt->filePath + QString("queuetest-")
00077 + QString::number(RANDUINT());
00078 QFile file (filename);
00079 if (!file.open(QFile::ReadWrite)) {
00080 writable = false;
00081 }
00082 file.remove();
00083 }
00084 if (!writable) {
00085 *str = tr("Cannot write to working directory '%1'.\n\nPlease "
00086 "change the permissions on this directory or specify "
00087 "a different one in the Queue configuration.")
00088 .arg(m_opt->filePath);
00089 return false;
00090 }
00091
00092
00093 if (m_opt->host.isEmpty()) {
00094 *str = tr("Hostname of PBS server is not set. Check your Queue "
00095 "configuration.");
00096 return false;
00097 }
00098
00099 if (m_qdel.isEmpty()) {
00100 *str = tr("qdel command is not set. Check your Queue "
00101 "configuration.");
00102 return false;
00103 }
00104
00105 if (m_qdel.isEmpty()) {
00106 *str = tr("qdel command is not set. Check your Queue "
00107 "configuration.");
00108 return false;
00109 }
00110
00111 if (m_qstat.isEmpty()) {
00112 *str = tr("qstat command is not set. Check your Queue "
00113 "configuration.");
00114 return false;
00115 }
00116
00117 if (m_qsub.isEmpty()) {
00118 *str = tr("qsub command is not set. Check your Queue "
00119 "configuration.");
00120 return false;
00121 }
00122
00123 if (m_opt->rempath.isEmpty()) {
00124 *str = tr("Remote working directory is not set. Check your Queue "
00125 "configuration.");
00126 return false;
00127 }
00128
00129 if (m_opt->username.isEmpty()) {
00130 *str = tr("SSH username for PBS server is not set. Check your Queue "
00131 "configuration.");
00132 return false;
00133 }
00134
00135 if (m_opt->port < 0) {
00136 *str = tr("SSH port is invalid (Port %1). Check your Queue "
00137 "configuration.").arg(m_opt->port);
00138 return false;
00139 }
00140
00141 *str = "";
00142 return true;
00143 }
00144
00145 QDialog* PbsQueueInterface::dialog()
00146 {
00147 if (!m_dialog) {
00148 m_dialog = new PbsConfigDialog (m_opt->dialog(),
00149 m_opt,
00150 this);
00151 }
00152 PbsConfigDialog *d = qobject_cast<PbsConfigDialog*>(m_dialog);
00153 d->updateGUI();
00154
00155 return d;
00156 }
00157
00158 void PbsQueueInterface::readSettings(const QString &filename)
00159 {
00160 SETTINGS(filename);
00161
00162 settings->beginGroup(m_opt->getIDString().toLower());
00163 settings->beginGroup("queueinterface/pbsqueueinterface");
00164 int loadedVersion = settings->value("version", 0).toInt();
00165 settings->beginGroup("paths");
00166
00167 m_qsub = settings->value("qsub", "qsub").toString();
00168 m_qstat = settings->value("qstat", "qstat").toString();
00169 m_qdel = settings->value("qdel", "qdel").toString();
00170 this->setInterval(settings->value("interval", 1).toInt());
00171 m_cleanRemoteOnStop = settings->value("cleanRemoteOnStop", false).toBool();
00172
00173 settings->endGroup();
00174 settings->endGroup();
00175 settings->endGroup();
00176
00177 DESTROY_SETTINGS(filename);
00178
00179
00180 switch (loadedVersion) {
00181 case 0:
00182 settings->beginGroup(m_opt->getIDString().toLower());
00183 settings->beginGroup("sys");
00184 m_qsub = settings->value("queue/qsub", "qsub").toString();
00185 m_qstat = settings->value("queue/qstat", "qstat").toString();
00186 m_qdel = settings->value("queue/qdel", "qdel").toString();
00187 settings->endGroup();
00188 settings->endGroup();
00189 case 1:
00190 default:
00191 break;
00192 }
00193
00194 }
00195
00196 void PbsQueueInterface::writeSettings(const QString &filename)
00197 {
00198 SETTINGS(filename);
00199
00200 const int VERSION = 1;
00201
00202 settings->beginGroup(m_opt->getIDString().toLower());
00203 settings->beginGroup("queueinterface/pbsqueueinterface");
00204 settings->setValue("version", VERSION);
00205 settings->beginGroup("paths");
00206
00207 settings->setValue("qsub", m_qsub);
00208 settings->setValue("qstat", m_qstat);
00209 settings->setValue("qdel", m_qdel);
00210 settings->setValue("interval", m_interval);
00211 settings->setValue("cleanRemoteOnStop", m_cleanRemoteOnStop);
00212
00213 settings->endGroup();
00214 settings->endGroup();
00215 settings->endGroup();
00216
00217 DESTROY_SETTINGS(filename);
00218 }
00219
00220 bool PbsQueueInterface::startJob(Structure *s)
00221 {
00222 SSHConnection *ssh = m_opt->ssh()->getFreeConnection();
00223
00224 if (ssh == NULL) {
00225 m_opt->warning(tr("Cannot connect to ssh server"));
00226 return false;
00227 }
00228
00229 QWriteLocker wlocker (s->lock());
00230
00231 QString command = "cd \"" + s->getRempath() + "\" && " +
00232 m_qsub + " job.pbs";
00233
00234 QString stdout_str;
00235 QString stderr_str;
00236 int ec;
00237 if (!ssh->execute(command, stdout_str, stderr_str, ec) || ec != 0) {
00238 m_opt->warning(tr("Error executing %1: %2")
00239 .arg(command).arg(stderr_str));
00240 m_opt->ssh()->unlockConnection(ssh);
00241 return false;
00242 }
00243 m_opt->ssh()->unlockConnection(ssh);
00244
00245
00246 QStringList list = stdout_str.split(".");
00247 bool ok;
00248 unsigned int jobID;
00249 if (list.size()) {
00250 jobID = list.first().toUInt(&ok);
00251 }
00252 else {
00253 ok = false;
00254 }
00255
00256 if (!ok) {
00257 m_opt->warning(tr("Error retrieving jobID for structure %1.")
00258 .arg(s->getIDString()));
00259 return false;
00260 }
00261
00262 s->setJobID(jobID);
00263 s->startOptTimer();
00264 return true;
00265 }
00266
00267 bool PbsQueueInterface::stopJob(Structure *s)
00268 {
00269 SSHConnection *ssh = m_opt->ssh()->getFreeConnection();
00270
00271 if (ssh == NULL) {
00272 m_opt->warning(tr("Cannot connect to ssh server"));
00273 return false;
00274 }
00275
00276
00277 QWriteLocker locker (s->lock());
00278
00279
00280 if (s->getJobID() == 0) {
00281 if (m_cleanRemoteOnStop) {
00282 this->cleanRemoteDirectory(s, ssh);
00283 }
00284 m_opt->ssh()->unlockConnection(ssh);
00285 return true;
00286 }
00287
00288 const QString command = m_qdel + " " + QString::number(s->getJobID());
00289
00290
00291 QString stdout_str;
00292 QString stderr_str;
00293 int ec;
00294 bool ret = true;
00295 if (!ssh->execute(command, stdout_str, stderr_str, ec) || ec != 0) {
00296
00297 ret = false;
00298 }
00299
00300 s->setJobID(0);
00301 s->stopOptTimer();
00302 m_opt->ssh()->unlockConnection(ssh);
00303 return ret;
00304 }
00305
00306 QueueInterface::QueueStatus PbsQueueInterface::getStatus(Structure *s) const
00307 {
00308
00309 QWriteLocker locker (s->lock());
00310 QStringList queueData = getQueueList();
00311 unsigned int jobID = static_cast<unsigned int>(s->getJobID());
00312
00313
00314
00315 if (queueData.size() == 1 && queueData[0].compare("CommError") == 0) {
00316 return QueueInterface::CommunicationError;
00317 }
00318
00319
00320 if (!jobID && s->getStatus() != Structure::Submitted) {
00321 return QueueInterface::Error;
00322 }
00323
00324
00325 QString status;
00326 int i = queueData.indexOf(QRegExp("^" + QString::number(jobID)+ ".*"));
00327 if (i != -1) {
00328 QStringList entryList = queueData.at(i).split(QRegExp("\\s+"));
00329 if (entryList.size() < 10) {
00330 m_opt->debug(QString("Skipping shot qstat entry; need at least 10"
00331 "fields: %1").arg(queueData.at(i)));
00332 }
00333 else {
00334 status = entryList.at(9);
00335 }
00336 }
00337
00338
00339
00340
00341
00342
00343
00344
00345
00346 if (s->getStatus() == Structure::Submitted) {
00347
00348 if (status.isEmpty()) {
00349
00350 bool exists;
00351 if (!m_opt->optimizer()->checkIfOutputFileExists(s, &exists)) {
00352 return QueueInterface::CommunicationError;
00353 }
00354 if (!exists) {
00355
00356 return QueueInterface::Pending;
00357 }
00358 else {
00359
00360 return QueueInterface::Started;
00361 }
00362 }
00363 else {
00364
00365 return QueueInterface::Started;
00366 }
00367 }
00368
00369 if (status.contains(QRegExp("R|E"))) {
00370 return QueueInterface::Running;
00371 }
00372 else if (status.contains(QRegExp("Q|H|T|W|S"))) {
00373 return QueueInterface::Queued;
00374 }
00375 else if (status.isEmpty()) {
00376 locker.unlock();
00377 bool outputFileExists;
00378 if (!m_opt->optimizer()->checkIfOutputFileExists(s, &outputFileExists) ) {
00379 return QueueInterface::CommunicationError;
00380 }
00381 locker.relock();
00382
00383 if (outputFileExists) {
00384
00385 bool success;
00386 if (!m_opt->optimizer()->checkForSuccessfulOutput(s, &success)) {
00387 return QueueInterface::CommunicationError;
00388 }
00389 if (success) {
00390 return QueueInterface::Success;
00391 }
00392 else {
00393 return QueueInterface::Error;
00394 }
00395 }
00396
00397
00398
00399
00400 m_opt->debug(tr("Structure %1 with jobID %2 is missing "
00401 "from the queue and has not written any output.")
00402 .arg(s->getIDString()).arg(s->getJobID()));
00403 return QueueInterface::Error;
00404 }
00405
00406 else {
00407 m_opt->debug(tr("Structure %1 with jobID %2 has "
00408 "unrecognized status: %3")
00409 .arg(s->getIDString()).arg(s->getJobID())
00410 .arg(status));
00411 return QueueInterface::Unknown;
00412 }
00413 }
00414
00415 void PbsQueueInterface::setInterval(const int sec)
00416 {
00417 m_queueMutex.lockForWrite();
00418 m_interval = sec;
00419 m_queueMutex.unlock();
00420 }
00421
00422 QStringList PbsQueueInterface::getQueueList() const
00423 {
00424
00425 QReadWriteLock &queueMutex = const_cast<QReadWriteLock&> (m_queueMutex);
00426
00427 queueMutex.lockForRead();
00428
00429
00430 if (m_queueTimeStamp.isValid() &&
00431
00432 #if QT_VERSION >= 0x040700
00433 m_queueTimeStamp.msecsTo(QDateTime::currentDateTime())
00434 <= 1000*m_interval
00435 #else
00436
00437
00438 (m_queueTimeStamp.date() == QDate::currentDate() &&
00439 m_queueTimeStamp.time().msecsTo(QTime::currentTime())
00440 <= 1000*m_interval)
00441 #endif
00442 ) {
00443
00444 QStringList ret (m_queueData);
00445 queueMutex.unlock();
00446 return ret;
00447 }
00448
00449
00450
00451 QDateTime oldTimeStamp (m_queueTimeStamp);
00452 queueMutex.unlock();
00453
00454
00455 QWriteLocker queueLocker (&queueMutex);
00456
00457
00458
00459
00460
00461 if (m_queueTimeStamp.time().msecsTo(oldTimeStamp.time()) != 0) {
00462 queueLocker.unlock();
00463 return this->getQueueList();
00464 }
00465
00466
00467
00468 QStringList &queueData = const_cast<QStringList&> (m_queueData);
00469 QDateTime &queueTimeStamp = const_cast<QDateTime&> (m_queueTimeStamp);
00470
00471
00472 SSHConnection *ssh = m_opt->ssh()->getFreeConnection();
00473
00474 if (ssh == NULL) {
00475 m_opt->warning(tr("Cannot connect to ssh server"));
00476 queueTimeStamp = QDateTime::currentDateTime();
00477 queueData.clear();
00478 queueData << "CommError";
00479 QStringList ret (m_queueData);
00480 return ret;
00481 }
00482
00483 QString command = m_qstat + " -u " + m_opt->username;
00484
00485
00486 QString stdout_str;
00487 QString stderr_str;
00488 int ec;
00489
00490
00491
00492 if (!ssh->execute(command, stdout_str, stderr_str, ec)
00493 || (ec != 0 && ec != 1 )
00494 ) {
00495 m_opt->ssh()->unlockConnection(ssh);
00496 m_opt->warning(tr("Error executing %1: (%2) %3\n\t"
00497 "Using cached queue data.")
00498 .arg(command)
00499 .arg(QString::number(ec))
00500 .arg(stderr_str));
00501 queueTimeStamp = QDateTime::currentDateTime();
00502 QStringList ret (m_queueData);
00503 return ret;
00504 }
00505 m_opt->ssh()->unlockConnection(ssh);
00506
00507 queueData = stdout_str.split("\n", QString::SkipEmptyParts);
00508
00509 QStringList ret (m_queueData);
00510 queueTimeStamp = QDateTime::currentDateTime();
00511 return ret;
00512 }
00513
00514 }
00515
00517
00518 #endif // ENABLE_SSH