00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 #ifdef ENABLE_SSH
00018
00019
00021
00022 #include <globalsearch/queueinterfaces/sge.h>
00023
00024 #include <globalsearch/macros.h>
00025 #include <globalsearch/optimizer.h>
00026 #include <globalsearch/sshconnection.h>
00027 #include <globalsearch/sshmanager.h>
00028 #include <globalsearch/structure.h>
00029
00030 #include <QtCore/QDir>
00031 #include <QtCore/QFile>
00032
00033 namespace GlobalSearch {
00034
00035 SgeQueueInterface::SgeQueueInterface(OptBase *parent,
00036 const QString &settingsFile) :
00037 RemoteQueueInterface(parent, settingsFile),
00038 m_qstat("qstat"),
00039 m_qsub("qsub"),
00040 m_qdel("qdel"),
00041 m_interval(1),
00042 m_cleanRemoteOnStop(false)
00043 {
00044 m_idString = "SGE";
00045 m_templates.append("job.sh");
00046 m_hasDialog = true;
00047
00048 readSettings(settingsFile);
00049 }
00050
00051 SgeQueueInterface::~SgeQueueInterface()
00052 {
00053 }
00054
00055 bool SgeQueueInterface::isReadyToSearch(QString *str)
00056 {
00057
00058 if (m_opt->filePath.isEmpty()) {
00059 *str = tr("Local working directory is not set. Check your Queue "
00060 "configuration.");
00061 return false;
00062 }
00063
00064
00065 QDir workingdir (m_opt->filePath);
00066 bool writable = true;
00067 if (!workingdir.exists()) {
00068 if (!workingdir.mkpath(m_opt->filePath)) {
00069 writable = false;
00070 }
00071 }
00072 else {
00073
00074 QString filename = m_opt->filePath + QString("queuetest-")
00075 + QString::number(RANDUINT());
00076 QFile file (filename);
00077 if (!file.open(QFile::ReadWrite)) {
00078 writable = false;
00079 }
00080 file.remove();
00081 }
00082 if (!writable) {
00083 *str = tr("Cannot write to working directory '%1'.\n\nPlease "
00084 "change the permissions on this directory or specify "
00085 "a different one in the Queue configuration.")
00086 .arg(m_opt->filePath);
00087 return false;
00088 }
00089
00090
00091 if (m_opt->host.isEmpty()) {
00092 *str = tr("Hostname of SGE server is not set. Check your Queue "
00093 "configuration.");
00094 return false;
00095 }
00096
00097 if (m_qdel.isEmpty()) {
00098 *str = tr("qdel command is not set. Check your Queue "
00099 "configuration.");
00100 return false;
00101 }
00102
00103 if (m_qdel.isEmpty()) {
00104 *str = tr("qdel command is not set. Check your Queue "
00105 "configuration.");
00106 return false;
00107 }
00108
00109 if (m_qstat.isEmpty()) {
00110 *str = tr("qstat command is not set. Check your Queue "
00111 "configuration.");
00112 return false;
00113 }
00114
00115 if (m_qsub.isEmpty()) {
00116 *str = tr("qsub command is not set. Check your Queue "
00117 "configuration.");
00118 return false;
00119 }
00120
00121 if (m_opt->rempath.isEmpty()) {
00122 *str = tr("Remote working directory is not set. Check your Queue "
00123 "configuration.");
00124 return false;
00125 }
00126
00127 if (m_opt->username.isEmpty()) {
00128 *str = tr("SSH username for SGE server is not set. Check your Queue "
00129 "configuration.");
00130 return false;
00131 }
00132
00133 if (m_opt->port < 0) {
00134 *str = tr("SSH port is invalid (Port %1). Check your Queue "
00135 "configuration.").arg(m_opt->port);
00136 return false;
00137 }
00138
00139 *str = "";
00140 return true;
00141 }
00142
00143 QDialog* SgeQueueInterface::dialog()
00144 {
00145 if (!m_dialog) {
00146 m_dialog = new SgeConfigDialog (m_opt->dialog(),
00147 m_opt,
00148 this);
00149 }
00150 SgeConfigDialog *d = qobject_cast<SgeConfigDialog*>(m_dialog);
00151 d->updateGUI();
00152
00153 return d;
00154 }
00155
00156 void SgeQueueInterface::readSettings(const QString &filename)
00157 {
00158 SETTINGS(filename);
00159
00160 settings->beginGroup(m_opt->getIDString().toLower());
00161 settings->beginGroup("queueinterface/sgequeueinterface");
00162 int loadedVersion = settings->value("version", 0).toInt();
00163 settings->beginGroup("paths");
00164
00165 m_qsub = settings->value("qsub", "qsub").toString();
00166 m_qstat = settings->value("qstat", "qstat").toString();
00167 m_qdel = settings->value("qdel", "qdel").toString();
00168 this->setInterval(settings->value("interval", 1).toInt());
00169 m_cleanRemoteOnStop = settings->value("cleanRemoteOnStop", false).toBool();
00170
00171
00172 settings->endGroup();
00173 settings->endGroup();
00174 settings->endGroup();
00175
00176 DESTROY_SETTINGS(filename);
00177
00178
00179 switch (loadedVersion) {
00180 case 0:
00181 settings->beginGroup(m_opt->getIDString().toLower());
00182 settings->beginGroup("sys");
00183 m_qsub = settings->value("queue/qsub", "qsub").toString();
00184 m_qstat = settings->value("queue/qstat", "qstat").toString();
00185 m_qdel = settings->value("queue/qdel", "qdel").toString();
00186 settings->endGroup();
00187 settings->endGroup();
00188 case 1:
00189 default:
00190 break;
00191 }
00192
00193 }
00194
00195 void SgeQueueInterface::writeSettings(const QString &filename)
00196 {
00197 SETTINGS(filename);
00198
00199 const int VERSION = 1;
00200
00201 settings->beginGroup(m_opt->getIDString().toLower());
00202 settings->beginGroup("queueinterface/sgequeueinterface");
00203 settings->setValue("version", VERSION);
00204 settings->beginGroup("paths");
00205
00206 settings->setValue("qsub", m_qsub);
00207 settings->setValue("qstat", m_qstat);
00208 settings->setValue("qdel", m_qdel);
00209 settings->setValue("interval", m_interval);
00210 settings->setValue("cleanRemoteOnStop", m_cleanRemoteOnStop);
00211
00212 settings->endGroup();
00213 settings->endGroup();
00214 settings->endGroup();
00215
00216 DESTROY_SETTINGS(filename);
00217 }
00218
00219 bool SgeQueueInterface::startJob(Structure *s)
00220 {
00221 SSHConnection *ssh = m_opt->ssh()->getFreeConnection();
00222
00223 if (ssh == NULL) {
00224 m_opt->warning(tr("Cannot connect to ssh server"));
00225 return false;
00226 }
00227
00228 QWriteLocker wlocker (s->lock());
00229
00230 QString command = "cd \"" + s->getRempath() + "\" && " +
00231 m_qsub + " job.sh";
00232
00233 QString stdout_str;
00234 QString stderr_str;
00235 int ec;
00236 if (!ssh->execute(command, stdout_str, stderr_str, ec) || ec != 0) {
00237 m_opt->warning(tr("Error executing %1: %2")
00238 .arg(command).arg(stderr_str));
00239 m_opt->ssh()->unlockConnection(ssh);
00240 return false;
00241 }
00242 m_opt->ssh()->unlockConnection(ssh);
00243
00244
00245
00246 unsigned int jobID = stdout_str.split(QRegExp("\\s+"))[2].toUInt();
00247
00248 s->setJobID(jobID);
00249 s->startOptTimer();
00250 return true;
00251 }
00252
00253 bool SgeQueueInterface::stopJob(Structure *s)
00254 {
00255 SSHConnection *ssh = m_opt->ssh()->getFreeConnection();
00256
00257 if (ssh == NULL) {
00258 m_opt->warning(tr("Cannot connect to ssh server"));
00259 return false;
00260 }
00261
00262
00263 QWriteLocker locker (s->lock());
00264
00265
00266 if (s->getJobID() == 0) {
00267 if (m_cleanRemoteOnStop) {
00268 this->cleanRemoteDirectory(s, ssh);
00269 }
00270 m_opt->ssh()->unlockConnection(ssh);
00271 return true;
00272 }
00273
00274 const QString command = m_qdel + " " + QString::number(s->getJobID());
00275
00276
00277 QString stdout_str;
00278 QString stderr_str;
00279 int ec;
00280 bool ret = true;
00281 if (!ssh->execute(command, stdout_str, stderr_str, ec) || ec != 0) {
00282
00283 ret = false;
00284 }
00285
00286 s->setJobID(0);
00287 s->stopOptTimer();
00288 m_opt->ssh()->unlockConnection(ssh);
00289 return ret;
00290 }
00291
00292 QueueInterface::QueueStatus SgeQueueInterface::getStatus(Structure *s) const
00293 {
00294
00295 QWriteLocker locker (s->lock());
00296 QStringList queueData = getQueueList();
00297 unsigned int jobID = static_cast<unsigned int>(s->getJobID());
00298
00299
00300
00301 if (queueData.size() == 1 && queueData[0].compare("CommError") == 0) {
00302 return QueueInterface::CommunicationError;
00303 }
00304
00305
00306 if (!jobID && s->getStatus() != Structure::Submitted) {
00307 return QueueInterface::Error;
00308 }
00309
00310
00311
00312
00313
00314
00315
00316
00317
00318 QString status;
00319 QStringList list;
00320 for (int i = 0; i < queueData.size(); i++) {
00321 list = queueData.at(i).split(QRegExp("\\s+"), QString::SkipEmptyParts);
00322 if (list[0].toUInt() == jobID) {
00323 status = list[4];
00324 continue;
00325 }
00326 }
00327
00328
00329
00330
00331
00332
00333
00334
00335
00336 if (s->getStatus() == Structure::Submitted) {
00337
00338 if (status.isEmpty()) {
00339
00340 bool exists;
00341 if (!m_opt->optimizer()->checkIfOutputFileExists(s, &exists)) {
00342 return QueueInterface::CommunicationError;
00343 }
00344 if (!exists) {
00345
00346 return QueueInterface::Pending;
00347 }
00348 else {
00349
00350 return QueueInterface::Started;
00351 }
00352 }
00353 else {
00354
00355 return QueueInterface::Started;
00356 }
00357 }
00358
00359 if (status.contains('r')) {
00360 return QueueInterface::Running;
00361 }
00362 else if (status.contains(QRegExp("q|w|s"))) {
00363 return QueueInterface::Queued;
00364 }
00365 else {
00366 locker.unlock();
00367 bool outputFileExists;
00368 if (!m_opt->optimizer()->checkIfOutputFileExists(s, &outputFileExists) ) {
00369 return QueueInterface::CommunicationError;
00370 }
00371 locker.relock();
00372
00373 if (outputFileExists) {
00374
00375 bool success;
00376 if (!m_opt->optimizer()->checkForSuccessfulOutput(s, &success)) {
00377 return QueueInterface::CommunicationError;
00378 }
00379 if (success) {
00380 return QueueInterface::Success;
00381 }
00382 else {
00383 return QueueInterface::Error;
00384 }
00385 }
00386 }
00387
00388 return QueueInterface::Unknown;
00389 }
00390
00391 void SgeQueueInterface::setInterval(const int sec)
00392 {
00393 m_queueMutex.lockForWrite();
00394 m_interval = sec;
00395 m_queueMutex.unlock();
00396 }
00397
00398 QStringList SgeQueueInterface::getQueueList() const
00399 {
00400
00401 QReadWriteLock &queueMutex = const_cast<QReadWriteLock&> (m_queueMutex);
00402
00403 queueMutex.lockForRead();
00404
00405
00406 if (m_queueTimeStamp.isValid() &&
00407
00408 #if QT_VERSION >= 0x040700
00409 m_queueTimeStamp.msecsTo(QDateTime::currentDateTime())
00410 <= 1000*m_interval
00411 #else
00412
00413
00414 (m_queueTimeStamp.date() == QDate::currentDate() &&
00415 m_queueTimeStamp.time().msecsTo(QTime::currentTime())
00416 <= 1000*m_interval)
00417 #endif
00418 ) {
00419
00420 QStringList ret (m_queueData);
00421 queueMutex.unlock();
00422 return ret;
00423 }
00424
00425
00426
00427 QDateTime oldTimeStamp (m_queueTimeStamp);
00428 queueMutex.unlock();
00429
00430
00431 QWriteLocker queueLocker (&queueMutex);
00432
00433
00434
00435
00436
00437 if (m_queueTimeStamp.time().msecsTo(oldTimeStamp.time()) != 0) {
00438 queueLocker.unlock();
00439 return this->getQueueList();
00440 }
00441
00442
00443
00444 QStringList &queueData = const_cast<QStringList&> (m_queueData);
00445 QDateTime &queueTimeStamp = const_cast<QDateTime&> (m_queueTimeStamp);
00446
00447
00448 SSHConnection *ssh = m_opt->ssh()->getFreeConnection();
00449
00450 if (ssh == NULL) {
00451 m_opt->warning(tr("Cannot connect to ssh server"));
00452 queueTimeStamp = QDateTime::currentDateTime();
00453 queueData.clear();
00454 queueData << "CommError";
00455 QStringList ret (m_queueData);
00456 return ret;
00457 }
00458
00459 QString command = m_qstat + " | grep " + m_opt->username;
00460
00461
00462 QString stdout_str;
00463 QString stderr_str;
00464 int ec;
00465
00466
00467
00468 if (!ssh->execute(command, stdout_str, stderr_str, ec)
00469 || (ec != 0 && ec != 1 )
00470 ) {
00471 m_opt->ssh()->unlockConnection(ssh);
00472 m_opt->warning(tr("Error executing %1: (%2) %3\n\t"
00473 "Using cached queue data.")
00474 .arg(command)
00475 .arg(QString::number(ec))
00476 .arg(stderr_str));
00477 queueTimeStamp = QDateTime::currentDateTime();
00478 QStringList ret (m_queueData);
00479 return ret;
00480 }
00481 m_opt->ssh()->unlockConnection(ssh);
00482
00483 queueData = stdout_str.split("\n", QString::SkipEmptyParts);
00484
00485 QStringList ret (m_queueData);
00486 queueTimeStamp = QDateTime::currentDateTime();
00487 return ret;
00488 }
00489
00490 }
00491
00493
00494 #endif // ENABLE_SSH