Commit 301d40b3 authored by Romain Reuillon's avatar Romain Reuillon
Browse files

[Plugin] implement sge.

parent 8aca2698
......@@ -380,15 +380,15 @@ lazy val gridscale = OsgiProject(pluginDir, "org.openmole.plugin.environment.gri
lazy val pbs = OsgiProject(pluginDir, "org.openmole.plugin.environment.pbs", imports = Seq("*")) dependsOn(openmoleDSL, batch, gridscale, ssh) settings
(libraryDependencies += Libraries.gridscalePBS) settings (pluginSettings: _*)
//lazy val sge = OsgiProject(pluginDir, "org.openmole.plugin.environment.sge", imports = Seq("*")) dependsOn(openmoleDSL, batch, gridscale, ssh) settings
// (libraryDependencies += Libraries.gridscaleSGE) settings (pluginSettings: _*)
//
lazy val sge = OsgiProject(pluginDir, "org.openmole.plugin.environment.sge", imports = Seq("*")) dependsOn(openmoleDSL, batch, gridscale, ssh) settings
(libraryDependencies += Libraries.gridscaleSGE) settings (pluginSettings: _*)
//lazy val condor = OsgiProject(pluginDir, "org.openmole.plugin.environment.condor", imports = Seq("*")) dependsOn(openmoleDSL, batch, gridscale, ssh) settings
// (libraryDependencies += Libraries.gridscaleCondor) settings (pluginSettings: _*)
//
//lazy val slurm = OsgiProject(pluginDir, "org.openmole.plugin.environment.slurm", imports = Seq("*")) dependsOn(openmoleDSL, batch, gridscale, ssh) settings
// (libraryDependencies += Libraries.gridscaleSLURM) settings (pluginSettings: _*)
//
lazy val ssh = OsgiProject(pluginDir, "org.openmole.plugin.environment.ssh", imports = Seq("*")) dependsOn(openmoleDSL, event, batch, gridscale) settings
(libraryDependencies += Libraries.gridscaleSSH) settings (pluginSettings: _*)
......
......@@ -46,10 +46,10 @@ object PBSEnvironment {
workDirectory: OptionalArgument[String] = None,
threads: OptionalArgument[Int] = None,
storageSharedLocally: Boolean = false,
timeout: OptionalArgument[Time] = None,
timeout: OptionalArgument[Time] = None,
flavour: gridscale.pbs.PBSFlavour = Torque,
name: OptionalArgument[String] = None,
localSubmission: Boolean = false
localSubmission: Boolean = false
)(implicit services: BatchEnvironment.Services, authenticationStore: AuthenticationStore, cypher: Cypher, varName: sourcecode.Name) = {
import services._
......@@ -116,8 +116,7 @@ object PBSEnvironment {
workDirectory: Option[String],
threads: Option[Int],
storageSharedLocally: Boolean,
flavour: gridscale.pbs.PBSFlavour
)
flavour: gridscale.pbs.PBSFlavour)
def nbCores(parameters: Parameters) = parameters.coreByNode.map(c => math.min(c, parameters.threads.getOrElse(1)))
}
......@@ -185,7 +184,7 @@ class PBSEnvironment[A: gridscale.ssh.SSHAuthentication](
workDirectory = workDirectory,
queue = parameters.queue,
wallTime = parameters.wallTime,
memory = parameters.memory,
memory = Some(BatchEnvironment.requiredMemory(parameters.openMOLEMemory, parameters.memory)),
nodes = parameters.nodes,
coreByNode = PBSEnvironment.nbCores(parameters),
flavour = parameters.flavour
......@@ -279,7 +278,7 @@ class PBSLocalEnvironment(
workDirectory = workDirectory,
queue = parameters.queue,
wallTime = parameters.wallTime,
memory = parameters.memory,
memory = Some(BatchEnvironment.requiredMemory(parameters.openMOLEMemory, parameters.memory)),
nodes = parameters.nodes,
coreByNode = PBSEnvironment.nbCores(parameters),
flavour = parameters.flavour
......
///*
// * Copyright (C) 2012 Romain Reuillon
// * Copyright (C) 2014 Jonathan Passerat-Palmbach
//
// * This program is free software: you can redistribute it and/or modify
// * it under the terms of the GNU General Public License as published by
// * the Free Software Foundation, either version 3 of the License, or
// * (at your option) any later version.
// *
// * This program is distributed in the hope that it will be useful,
// * but WITHOUT ANY WARRANTY; without even the implied warranty of
// * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// * GNU General Public License for more details.
// *
// * You should have received a copy of the GNU General Public License
// * along with this program. If not, see <http://www.gnu.org/licenses/>.
// */
//
//package org.openmole.plugin.environment.pbs
//
//import fr.iscpif.gridscale.pbs.{ PBSJobDescription, PBSJobService ⇒ GSPBSJobService }
//import fr.iscpif.gridscale.ssh.SSHConnectionCache
//import org.openmole.core.workspace.Workspace
//import org.openmole.plugin.environment.batch.environment._
//import org.openmole.plugin.environment.batch.jobservice.{ BatchJob, BatchJobId }
//import org.openmole.plugin.environment.ssh.{ ClusterJobService, SSHService }
//import org.openmole.tool.logger.Logger
//
//object PBSJobService extends Logger
//
//import org.openmole.plugin.environment.pbs.PBSJobService._
//import squants.time.TimeConversions._
//
//trait PBSJobService extends ClusterJobService { js ⇒
//
// val environment: PBSEnvironment
// import environment.services._
//
// lazy val jobService = new GSPBSJobService with SSHConnectionCache {
// def host = js.host
// def user = js.user
// def credential = js.credential
// override def port = js.port
// override def timeout = preference(SSHService.timeout)
// }
//
// protected def _submit(serializedJob: SerializedJob) = {
// val (remoteScript, result) = buildScript(serializedJob)
// val jobDescription = PBSJobDescription(
// executable = "/bin/bash",
// arguments = remoteScript,
// queue = environment.queue,
// workDirectory = serializedJob.path,
// wallTime = environment.wallTime.map(x ⇒ x: concurrent.duration.Duration),
// memory = Some(BatchEnvironment.requiredMemory(environment.openMOLEMemory, environment.memory).toMegabytes.toInt),
// nodes = environment.nodes,
// coreByNode = environment.coreByNode orElse environment.threads
// )
//
// val jid = js.jobService.submit(jobDescription)
// Log.logger.fine(s"PBS job [${jid.pbsId}], description: \n ${jobDescription}")
//
// new BatchJob with BatchJobId {
// val jobService = js
// val id = jid
// val resultPath = result
// }
// }
//
//}
......@@ -17,19 +17,24 @@
package org.openmole.plugin.environment.sge
import org.openmole.core.authentication.AuthenticationStore
import org.openmole.core.authentication._
import org.openmole.core.workflow.dsl._
import org.openmole.core.workflow.execution._
import org.openmole.plugin.environment.batch.environment._
import org.openmole.plugin.environment.batch.jobservice._
import org.openmole.plugin.environment.ssh._
import org.openmole.tool.crypto.Cypher
import squants._
import squants.{Time, _}
import squants.information._
import effectaside._
import org.openmole.plugin.environment.batch.refresh.{JobManager, StopEnvironment}
import org.openmole.plugin.environment.gridscale._
object SGEEnvironment {
def apply(
user: String,
host: String,
port: Int = 22,
user: OptionalArgument[String] = None,
host: OptionalArgument[String] = None,
port: OptionalArgument[Int] = 22,
queue: OptionalArgument[String] = None,
openMOLEMemory: OptionalArgument[Information] = None,
wallTime: OptionalArgument[Time] = None,
......@@ -38,13 +43,13 @@ object SGEEnvironment {
workDirectory: OptionalArgument[String] = None,
threads: OptionalArgument[Int] = None,
storageSharedLocally: Boolean = false,
name: OptionalArgument[String] = None
)(implicit services: BatchEnvironment.Services, authenticationStore: AuthenticationStore, cypher: Cypher, varName: sourcecode.Name) = {
timeout: OptionalArgument[Time] = None,
name: OptionalArgument[String] = None,
localSubmission: Boolean = false)(implicit services: BatchEnvironment.Services, authenticationStore: AuthenticationStore, cypher: Cypher, varName: sourcecode.Name) = {
import services._
new SGEEnvironment(
user = user,
host = host,
port = port,
val parameters = Parameters(
queue = queue,
openMOLEMemory = openMOLEMemory,
wallTime = wallTime,
......@@ -52,41 +57,224 @@ object SGEEnvironment {
sharedDirectory = sharedDirectory,
workDirectory = workDirectory,
threads = threads,
storageSharedLocally = storageSharedLocally,
name = Some(name.getOrElse(varName.value))
)(SSHAuthentication.find(user, host, port).apply)
storageSharedLocally = storageSharedLocally)
if (!localSubmission) {
val userValue = user.mustBeDefined("user")
val hostValue = host.mustBeDefined("host")
val portValue = port.mustBeDefined("port")
new SGEEnvironment(
user = userValue,
host = hostValue,
port = portValue,
timeout = timeout.getOrElse(services.preference(SSHEnvironment.TimeOut)),
parameters = parameters,
name = Some(name.getOrElse(varName.value)),
authentication = SSHAuthentication.find(userValue, hostValue, portValue).apply
)
} else
new SGELocalEnvironment(
parameters = parameters,
name = Some(name.getOrElse(varName.value))
)
}
implicit def asSSHServer[A: gridscale.ssh.SSHAuthentication]: AsSSHServer[SGEEnvironment[A]] = new AsSSHServer[SGEEnvironment[A]] {
override def apply(t: SGEEnvironment[A]) = gridscale.ssh.SSHServer(t.host, t.port, t.timeout)(t.authentication)
}
implicit def isJobService[A]: JobServiceInterface[SGEEnvironment[A]] = new JobServiceInterface[SGEEnvironment[A]] {
override type J = gridscale.cluster.BatchScheduler.BatchJob
override def submit(env: SGEEnvironment[A], serializedJob: SerializedJob): BatchJob[J] = env.submit(serializedJob)
override def state(env: SGEEnvironment[A], j: J): ExecutionState.ExecutionState = env.state(j)
override def delete(env: SGEEnvironment[A], j: J): Unit = env.delete(j)
override def stdOutErr(js: SGEEnvironment[A], j: J) = js.stdOutErr(j)
}
case class Parameters(
queue: Option[String],
openMOLEMemory: Option[Information],
wallTime: Option[Time],
memory: Option[Information],
sharedDirectory: Option[String],
workDirectory: Option[String],
threads: Option[Int],
storageSharedLocally: Boolean)
}
class SGEEnvironment[A: gridscale.ssh.SSHAuthentication](
val user: String,
val host: String,
val port: Int,
val timeout: Time,
val parameters: SGEEnvironment.Parameters,
val name: Option[String],
val authentication: A)(implicit val services: BatchEnvironment.Services) extends BatchEnvironment { env =>
import services._
implicit val sshInterpreter = gridscale.ssh.SSH()
implicit val systemInterpreter = effectaside.System()
override def stop() =
try super.stop()
finally {
def usageControls = List(storageService.usageControl, jobService.usageControl)
JobManager ! StopEnvironment(this, usageControls, Some(sshInterpreter().close))
}
import env.services.{ threadProvider, preference }
import org.openmole.plugin.environment.ssh._
lazy val storageService =
sshStorageService(
user = user,
host = host,
port = port,
storage = env,
environment = env,
concurrency = services.preference(SSHEnvironment.MaxConnections),
sharedDirectory = parameters.sharedDirectory,
storageSharedLocally = parameters.storageSharedLocally
)
override def trySelectStorage(files: Vector[File]) = BatchEnvironment.trySelectSingleStorage(storageService)
val installRuntime = new RuntimeInstallation(
Frontend.ssh(host, port, timeout, authentication),
storageService = storageService
)
def submit(serializedJob: SerializedJob) = {
def buildScript(serializedJob: SerializedJob) = {
import services._
SharedStorage.buildScript(
env.installRuntime.apply,
parameters.workDirectory,
parameters.openMOLEMemory,
parameters.threads,
serializedJob,
env.storageService
)
}
val (remoteScript, result, workDirectory) = buildScript(serializedJob)
val description = _root_.gridscale.sge.SGEJobDescription(
command = s"/bin/bash $remoteScript",
queue = parameters.queue,
workDirectory = serializedJob.path,
wallTime = parameters.wallTime,
memory = Some(BatchEnvironment.requiredMemory(parameters.openMOLEMemory, parameters.memory)),
)
val id = gridscale.sge.submit[_root_.gridscale.ssh.SSHServer](env, description)
BatchJob(id, result)
}
def state(id: gridscale.cluster.BatchScheduler.BatchJob) =
GridScaleJobService.translateStatus(gridscale.sge.state[_root_.gridscale.ssh.SSHServer](env, id))
def delete(id: gridscale.cluster.BatchScheduler.BatchJob) =
gridscale.sge.clean[_root_.gridscale.ssh.SSHServer](env, id)
def stdOutErr(id: gridscale.cluster.BatchScheduler.BatchJob) =
(gridscale.sge.stdOut[_root_.gridscale.ssh.SSHServer](env, id), gridscale.sge.stdErr[_root_.gridscale.ssh.SSHServer](env, id))
lazy val jobService = BatchJobService(env, concurrency = services.preference(SSHEnvironment.MaxConnections))
override def trySelectJobService() = BatchEnvironment.trySelectSingleJobService(jobService)
}
class SGEEnvironment(
val user: String,
val host: String,
override val port: Int,
val queue: Option[String],
override val openMOLEMemory: Option[Information],
val wallTime: Option[Time],
val memory: Option[Information],
val sharedDirectory: Option[String],
val workDirectory: Option[String],
override val threads: Option[Int],
val storageSharedLocally: Boolean,
override val name: Option[String]
)(val credential: fr.iscpif.gridscale.ssh.SSHAuthentication)(implicit val services: BatchEnvironment.Services) extends ClusterEnvironment { env
type JS = SGEJobService
lazy val jobService =
new SGEJobService {
def queue = env.queue
val environment = env
def sharedFS = storage
def id = url.toString
def workDirectory = env.workDirectory
def timeout = env.timeout
def credential = env.credential
def user = env.user
def host = env.host
def port = env.port
object SGELocalEnvironment {
implicit def isJobService: JobServiceInterface[SGELocalEnvironment] = new JobServiceInterface[SGELocalEnvironment] {
override type J = gridscale.cluster.BatchScheduler.BatchJob
override def submit(env: SGELocalEnvironment, serializedJob: SerializedJob): BatchJob[J] = env.submit(serializedJob)
override def state(env: SGELocalEnvironment, j: J): ExecutionState.ExecutionState = env.state(j)
override def delete(env: SGELocalEnvironment, j: J): Unit = env.delete(j)
override def stdOutErr(js: SGELocalEnvironment, j: J) = js.stdOutErr(j)
}
}
class SGELocalEnvironment(
val parameters: SGEEnvironment.Parameters,
val name: Option[String])(implicit val services: BatchEnvironment.Services) extends BatchEnvironment { env =>
import services._
lazy val usageControl = UsageControl(services.preference(SSHEnvironment.MaxLocalOperations))
def usageControls = List(storageService.usageControl, jobService.usageControl)
implicit val localInterpreter = gridscale.local.Local()
implicit val systemInterpreter = effectaside.System()
import env.services.{ threadProvider, preference }
import org.openmole.plugin.environment.ssh._
override def stop() =
try super.stop()
finally {
def usageControls = List(storageService.usageControl, jobService.usageControl)
JobManager ! StopEnvironment(this, usageControls)
}
lazy val storageService =
localStorageService(
environment = env,
concurrency = services.preference(SSHEnvironment.MaxLocalOperations),
root = "",
sharedDirectory = parameters.sharedDirectory,
)
override def trySelectStorage(files: Vector[File]) = BatchEnvironment.trySelectSingleStorage(storageService)
val installRuntime = new RuntimeInstallation(
Frontend.local,
storageService = storageService
)
import _root_.gridscale.local.LocalHost
def submit(serializedJob: SerializedJob) = {
def buildScript(serializedJob: SerializedJob) = {
import services._
SharedStorage.buildScript(
env.installRuntime.apply,
parameters.workDirectory,
parameters.openMOLEMemory,
parameters.threads,
serializedJob,
env.storageService
)
}
val (remoteScript, result, workDirectory) = buildScript(serializedJob)
val description = _root_.gridscale.sge.SGEJobDescription(
command = s"/bin/bash $remoteScript",
queue = parameters.queue,
workDirectory = serializedJob.path,
wallTime = parameters.wallTime,
memory = Some(BatchEnvironment.requiredMemory(parameters.openMOLEMemory, parameters.memory)),
)
val id = gridscale.sge.submit(LocalHost(), description)
BatchJob(id, result)
}
def state(id: gridscale.cluster.BatchScheduler.BatchJob) =
GridScaleJobService.translateStatus(gridscale.sge.state(LocalHost(), id))
def delete(id: gridscale.cluster.BatchScheduler.BatchJob) =
gridscale.sge.clean(LocalHost(), id)
def stdOutErr(id: gridscale.cluster.BatchScheduler.BatchJob) =
(gridscale.sge.stdOut(LocalHost(), id), gridscale.sge.stdErr(LocalHost(), id))
lazy val jobService = BatchJobService(env, concurrency = services.preference(SSHEnvironment.MaxLocalOperations))
override def trySelectJobService() = BatchEnvironment.trySelectSingleJobService(jobService)
}
/*
* Copyright (C) 2012 Romain Reuillon
* Copyright (C) 2014 Jonathan Passerat-Palmbach
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.openmole.plugin.environment.sge
import fr.iscpif.gridscale.sge.{ SGEJobDescription, SGEJobService GSSGEJobService }
import fr.iscpif.gridscale.ssh.{ SSHConnectionCache, SSHHost }
import org.openmole.core.workspace.Workspace
import org.openmole.plugin.environment.batch.environment._
import org.openmole.plugin.environment.batch.jobservice.{ BatchJob, BatchJobId }
import org.openmole.plugin.environment.ssh.{ ClusterJobService, SSHService, SharedStorage }
import org.openmole.tool.logger.Logger
object SGEJobService extends Logger
import org.openmole.plugin.environment.sge.SGEJobService._
import squants.time.TimeConversions._
trait SGEJobService extends ClusterJobService with SSHHost with SharedStorage { js
val environment: SGEEnvironment
import environment.services._
lazy val jobService = new GSSGEJobService with SSHConnectionCache {
def host = js.host
def user = js.user
def credential = js.credential
override def port = js.port
override def timeout = preference(SSHService.timeout)
}
protected def _submit(serializedJob: SerializedJob) = {
val (remoteScript, result) = buildScript(serializedJob)
val jobDescription = SGEJobDescription(
executable = "/bin/bash",
arguments = remoteScript,
queue = environment.queue,
workDirectory = serializedJob.path,
wallTime = environment.wallTime.map(x x: concurrent.duration.Duration),
memory = Some(BatchEnvironment.requiredMemory(environment.openMOLEMemory, environment.memory).toMegabytes.toInt)
)
val jid = js.jobService.submit(jobDescription)
Log.logger.fine(s"SGE job [${jid.sgeId}], description: \n ${jobDescription.toSGE}")
new BatchJob with BatchJobId {
val jobService = js
val id = jid
val resultPath = result
}
}
}
......@@ -101,9 +101,10 @@ object Libraries {
lazy val gridscale = "org.openmole.library" %% "gridscale" % gridscaleVersion
lazy val gridscaleSSH = "org.openmole.library" %% "gridscale-ssh" % gridscaleVersion
lazy val gridscalePBS = "org.openmole.library" %% "gridscale-pbs" % gridscaleVersion
// lazy val gridscaleSGE = "fr.iscpif.gridscale.bundle" %% "sge" % gridscaleVersion
// lazy val gridscaleCondor = "fr.iscpif.gridscale.bundle" %% "condor" % gridscaleVersion
// lazy val gridscaleSLURM = "fr.iscpif.gridscale.bundle" %% "slurm" % gridscaleVersion
lazy val gridscaleSGE = "org.openmole.library" %% "gridscale-sge" % gridscaleVersion
lazy val gridscaleCondor = "org.openmole.library" %% "gridscale-condor" % gridscaleVersion
lazy val gridscaleSLURM = "org.openmole.library" %% "gridscale-slurm" % gridscaleVersion
lazy val gridscaleEGI = Seq(
"org.openmole.library" %% "gridscale-egi" % gridscaleVersion,
"org.openmole.library" %% "gridscale-webdav" % gridscaleVersion,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment