Commit 8aca2698 authored by Romain Reuillon's avatar Romain Reuillon
Browse files

[Plugin] implement pbs env

parent 0c66b8a7
......@@ -373,6 +373,26 @@ lazy val gridscaleOAR = OsgiProject(dir, "gridscale.oar", imports = Seq("*")) se
version := gridscaleVersion
) settings(settings: _*) dependsOn(gridscale, gridscaleCluster)
lazy val gridscalePBS = OsgiProject(dir, "gridscale.pbs", imports = Seq("*")) settings (
libraryDependencies += "fr.iscpif.gridscale" %% "pbs" % gridscaleVersion,
version := gridscaleVersion
) settings(settings: _*) dependsOn(gridscale, gridscaleCluster)
lazy val gridscaleSGE = OsgiProject(dir, "gridscale.sge", imports = Seq("*")) settings (
libraryDependencies += "fr.iscpif.gridscale" %% "sge" % gridscaleVersion,
version := gridscaleVersion
) settings(settings: _*) dependsOn(gridscale, gridscaleCluster)
lazy val gridscaleCondor = OsgiProject(dir, "gridscale.condor", imports = Seq("*")) settings (
libraryDependencies += "fr.iscpif.gridscale" %% "condor" % gridscaleVersion,
version := gridscaleVersion
) settings(settings: _*) dependsOn(gridscale, gridscaleCluster)
lazy val gridscaleSLURM = OsgiProject(dir, "gridscale.slurm", imports = Seq("*")) settings (
libraryDependencies += "fr.iscpif.gridscale" %% "slurm" % gridscaleVersion,
version := gridscaleVersion
) settings(settings: _*) dependsOn(gridscale, gridscaleCluster)
lazy val gridscaleEGI = OsgiProject(dir, "gridscale.egi", imports = Seq("*")) settings (
libraryDependencies += "fr.iscpif.gridscale" %% "egi" % gridscaleVersion,
......
......@@ -374,13 +374,12 @@ lazy val oar = OsgiProject(pluginDir, "org.openmole.plugin.environment.oar", imp
lazy val egi = OsgiProject(pluginDir, "org.openmole.plugin.environment.egi") dependsOn(openmoleDSL, batch, workspace, fileService, gridscale) settings (
libraryDependencies ++= Libraries.gridscaleEGI, Libraries.addScalaLang) settings (pluginSettings: _*)
lazy val gridscale = OsgiProject(pluginDir, "org.openmole.plugin.environment.gridscale", imports = Seq("*")) settings(
libraryDependencies += Libraries.gridscaleLocal)dependsOn(openmoleDSL, tools, batch, exception) settings (pluginSettings: _*)
//lazy val pbs = OsgiProject(pluginDir, "org.openmole.plugin.environment.pbs", imports = Seq("*")) dependsOn(openmoleDSL, batch, gridscale, ssh) settings
// (libraryDependencies += Libraries.gridscalePBS) settings (pluginSettings: _*)
//
lazy val pbs = OsgiProject(pluginDir, "org.openmole.plugin.environment.pbs", imports = Seq("*")) dependsOn(openmoleDSL, batch, gridscale, ssh) settings
(libraryDependencies += Libraries.gridscalePBS) settings (pluginSettings: _*)
//lazy val sge = OsgiProject(pluginDir, "org.openmole.plugin.environment.sge", imports = Seq("*")) dependsOn(openmoleDSL, batch, gridscale, ssh) settings
// (libraryDependencies += Libraries.gridscaleSGE) settings (pluginSettings: _*)
//
......
......@@ -111,6 +111,8 @@ object OAREnvironment {
bestEffort: Boolean
)
def nbCores(parameters: Parameters) = parameters.core.map(c => math.min(c, parameters.threads.getOrElse(1)))
}
class OAREnvironment[A: gridscale.ssh.SSHAuthentication](
......@@ -176,7 +178,7 @@ class OAREnvironment[A: gridscale.ssh.SSHAuthentication](
workDirectory = workDirectory,
queue = parameters.queue,
cpu = parameters.cpu,
core = parameters.core,
core = OAREnvironment.nbCores(parameters),
wallTime = parameters.wallTime,
bestEffort = parameters.bestEffort
)
......@@ -261,7 +263,7 @@ class OARLocalEnvironment(
workDirectory = workDirectory,
queue = parameters.queue,
cpu = parameters.cpu,
core = parameters.core,
core = OAREnvironment.nbCores(parameters),
wallTime = parameters.wallTime,
bestEffort = parameters.bestEffort
)
......
......@@ -17,82 +17,289 @@
package org.openmole.plugin.environment.pbs
import org.openmole.core.authentication.AuthenticationStore
import org.openmole.core.authentication._
import org.openmole.core.workflow.dsl._
import org.openmole.core.workflow.execution._
import org.openmole.plugin.environment.batch.environment._
import org.openmole.plugin.environment.batch.jobservice._
import org.openmole.plugin.environment.ssh._
import org.openmole.tool.crypto.Cypher
import squants._
import squants.information._
import effectaside._
import org.openmole.plugin.environment.batch.refresh.{ JobManager, StopEnvironment }
import org.openmole.plugin.environment.gridscale._
object PBSEnvironment {
def apply(
user: String,
host: String,
port: Int = 22,
queue: OptionalArgument[String] = None,
openMOLEMemory: OptionalArgument[Information] = None,
wallTime: OptionalArgument[Time] = None,
memory: OptionalArgument[Information] = None,
nodes: OptionalArgument[Int] = None,
coreByNode: OptionalArgument[Int] = None,
sharedDirectory: OptionalArgument[String] = None,
workDirectory: OptionalArgument[String] = None,
threads: OptionalArgument[Int] = None,
storageSharedLocally: Boolean = false,
name: OptionalArgument[String] = None
)(implicit services: BatchEnvironment.Services, authenticationStore: AuthenticationStore, cypher: Cypher, varName: sourcecode.Name) = {
def apply(
user: OptionalArgument[String] = None,
host: OptionalArgument[String] = None,
port: OptionalArgument[Int] = 22,
queue: OptionalArgument[String] = None,
openMOLEMemory: OptionalArgument[Information] = None,
wallTime: OptionalArgument[Time] = None,
memory: OptionalArgument[Information] = None,
nodes: OptionalArgument[Int] = None,
coreByNode: OptionalArgument[Int] = None,
sharedDirectory: OptionalArgument[String] = None,
workDirectory: OptionalArgument[String] = None,
threads: OptionalArgument[Int] = None,
storageSharedLocally: Boolean = false,
timeout: OptionalArgument[Time] = None,
flavour: gridscale.pbs.PBSFlavour = Torque,
name: OptionalArgument[String] = None,
localSubmission: Boolean = false
)(implicit services: BatchEnvironment.Services, authenticationStore: AuthenticationStore, cypher: Cypher, varName: sourcecode.Name) = {
import services._
import services._
new PBSEnvironment(
val parameters = Parameters(
queue = queue,
wallTime = wallTime,
openMOLEMemory = openMOLEMemory,
memory = memory,
nodes = nodes,
coreByNode = coreByNode,
sharedDirectory = sharedDirectory,
workDirectory = workDirectory,
threads = threads,
storageSharedLocally = storageSharedLocally,
flavour = flavour
)
if(!localSubmission) {
val userValue = user.mustBeDefined("user")
val hostValue = host.mustBeDefined("host")
val portValue = port.mustBeDefined("port")
new PBSEnvironment(
user = userValue,
host = hostValue,
port = portValue,
timeout = timeout.getOrElse(services.preference(SSHEnvironment.TimeOut)),
parameters = parameters,
name = Some(name.getOrElse(varName.value)),
authentication = SSHAuthentication.find(userValue, hostValue, portValue).apply
)
} else
new PBSLocalEnvironment(
parameters = parameters,
name = Some(name.getOrElse(varName.value))
)
}
implicit def asSSHServer[A: gridscale.ssh.SSHAuthentication]: AsSSHServer[PBSEnvironment[A]] = new AsSSHServer[PBSEnvironment[A]] {
override def apply(t: PBSEnvironment[A]) = gridscale.ssh.SSHServer(t.host, t.port, t.timeout)(t.authentication)
}
implicit def isJobService[A]: JobServiceInterface[PBSEnvironment[A]] = new JobServiceInterface[PBSEnvironment[A]] {
override type J = gridscale.cluster.BatchScheduler.BatchJob
override def submit(env: PBSEnvironment[A], serializedJob: SerializedJob): BatchJob[J] = env.submit(serializedJob)
override def state(env: PBSEnvironment[A], j: J): ExecutionState.ExecutionState = env.state(j)
override def delete(env: PBSEnvironment[A], j: J): Unit = env.delete(j)
override def stdOutErr(js: PBSEnvironment[A], j: J) = js.stdOutErr(j)
}
case class Parameters(
queue: Option[String],
wallTime: Option[Time],
openMOLEMemory: Option[Information],
memory: Option[Information],
nodes: Option[Int],
coreByNode: Option[Int],
sharedDirectory: Option[String],
workDirectory: Option[String],
threads: Option[Int],
storageSharedLocally: Boolean,
flavour: gridscale.pbs.PBSFlavour
)
def nbCores(parameters: Parameters) = parameters.coreByNode.map(c => math.min(c, parameters.threads.getOrElse(1)))
}
class PBSEnvironment[A: gridscale.ssh.SSHAuthentication](
val user: String,
val host: String,
val port: Int,
val timeout: Time,
val parameters: PBSEnvironment.Parameters,
val name: Option[String],
val authentication: A
)(implicit val services: BatchEnvironment.Services) extends BatchEnvironment {
env
import services._
implicit val sshInterpreter = gridscale.ssh.SSH()
implicit val systemInterpreter = effectaside.System()
override def stop() =
try super.stop()
finally {
def usageControls = List(storageService.usageControl, jobService.usageControl)
JobManager ! StopEnvironment(this, usageControls, Some(sshInterpreter().close))
}
import env.services.{ threadProvider, preference }
import org.openmole.plugin.environment.ssh._
lazy val storageService =
sshStorageService(
user = user,
host = host,
port = port,
queue = queue,
openMOLEMemory = openMOLEMemory,
wallTime = wallTime,
memory = memory,
nodes = nodes,
coreByNode = coreByNode,
sharedDirectory = sharedDirectory,
storage = env,
environment = env,
concurrency = services.preference(SSHEnvironment.MaxConnections),
sharedDirectory = parameters.sharedDirectory,
storageSharedLocally = parameters.storageSharedLocally
)
override def trySelectStorage(files: Vector[File]) = BatchEnvironment.trySelectSingleStorage(storageService)
val installRuntime = new RuntimeInstallation(
Frontend.ssh(host, port, timeout, authentication),
storageService = storageService
)
def submit(serializedJob: SerializedJob) = {
def buildScript(serializedJob: SerializedJob) = {
import services._
SharedStorage.buildScript(
env.installRuntime.apply,
parameters.workDirectory,
parameters.openMOLEMemory,
parameters.threads,
serializedJob,
env.storageService
)
}
val (remoteScript, result, workDirectory) = buildScript(serializedJob)
val description = gridscale.pbs.PBSJobDescription(
command = s"/bin/bash $remoteScript",
workDirectory = workDirectory,
threads = threads,
storageSharedLocally = storageSharedLocally,
name = Some(name.getOrElse(varName.value))
)(SSHAuthentication.find(user, host, port).apply)
queue = parameters.queue,
wallTime = parameters.wallTime,
memory = parameters.memory,
nodes = parameters.nodes,
coreByNode = PBSEnvironment.nbCores(parameters),
flavour = parameters.flavour
)
val id = gridscale.pbs.submit[_root_.gridscale.ssh.SSHServer](env, description)
BatchJob(id, result)
}
def state(id: gridscale.cluster.BatchScheduler.BatchJob) =
GridScaleJobService.translateStatus(gridscale.pbs.state[_root_.gridscale.ssh.SSHServer](env, id))
def delete(id: gridscale.cluster.BatchScheduler.BatchJob) =
gridscale.pbs.clean[_root_.gridscale.ssh.SSHServer](env, id)
def stdOutErr(id: gridscale.cluster.BatchScheduler.BatchJob) =
(gridscale.pbs.stdOut[_root_.gridscale.ssh.SSHServer](env, id), gridscale.pbs.stdErr[_root_.gridscale.ssh.SSHServer](env, id))
lazy val jobService = BatchJobService(env, concurrency = services.preference(SSHEnvironment.MaxConnections))
override def trySelectJobService() = BatchEnvironment.trySelectSingleJobService(jobService)
}
object PBSLocalEnvironment{
implicit def isJobService: JobServiceInterface[PBSLocalEnvironment] = new JobServiceInterface[PBSLocalEnvironment] {
override type J = gridscale.cluster.BatchScheduler.BatchJob
override def submit(env: PBSLocalEnvironment, serializedJob: SerializedJob): BatchJob[J] = env.submit(serializedJob)
override def state(env: PBSLocalEnvironment, j: J): ExecutionState.ExecutionState = env.state(j)
override def delete(env: PBSLocalEnvironment, j: J): Unit = env.delete(j)
override def stdOutErr(js: PBSLocalEnvironment, j: J) = js.stdOutErr(j)
}
}
class PBSEnvironment(
val user: String,
val host: String,
override val port: Int,
val queue: Option[String],
override val openMOLEMemory: Option[Information],
val wallTime: Option[Time],
val memory: Option[Information],
val nodes: Option[Int],
val coreByNode: Option[Int],
val sharedDirectory: Option[String],
val workDirectory: Option[String],
override val threads: Option[Int],
val storageSharedLocally: Boolean,
override val name: Option[String]
)(val credential: fr.iscpif.gridscale.ssh.SSHAuthentication)(implicit val services: BatchEnvironment.Services) extends ClusterEnvironment { env
type JS = PBSJobService
lazy val jobService =
new PBSJobService {
def queue = env.queue
val environment = env
def sharedFS = storage
def workDirectory = env.workDirectory
def timeout = env.timeout
def credential = env.credential
def user = env.user
def host = env.host
def port = env.port
class PBSLocalEnvironment(
val parameters: PBSEnvironment.Parameters,
val name: Option[String]
)(implicit val services: BatchEnvironment.Services) extends BatchEnvironment { env
import services._
lazy val usageControl = UsageControl(services.preference(SSHEnvironment.MaxLocalOperations))
def usageControls = List(storageService.usageControl, jobService.usageControl)
implicit val localInterpreter = gridscale.local.Local()
implicit val systemInterpreter = effectaside.System()
import env.services.{ threadProvider, preference }
import org.openmole.plugin.environment.ssh._
override def stop() =
try super.stop()
finally {
def usageControls = List(storageService.usageControl, jobService.usageControl)
JobManager ! StopEnvironment(this, usageControls)
}
lazy val storageService =
localStorageService(
environment = env,
concurrency = services.preference(SSHEnvironment.MaxLocalOperations),
root = "",
sharedDirectory = parameters.sharedDirectory,
)
override def trySelectStorage(files: Vector[File]) = BatchEnvironment.trySelectSingleStorage(storageService)
val installRuntime = new RuntimeInstallation(
Frontend.local,
storageService = storageService
)
import _root_.gridscale.local.LocalHost
def submit(serializedJob: SerializedJob) = {
def buildScript(serializedJob: SerializedJob) = {
import services._
SharedStorage.buildScript(
env.installRuntime.apply,
parameters.workDirectory,
parameters.openMOLEMemory,
parameters.threads,
serializedJob,
env.storageService
)
}
val (remoteScript, result, workDirectory) = buildScript(serializedJob)
val description = gridscale.pbs.PBSJobDescription(
command = s"/bin/bash $remoteScript",
workDirectory = workDirectory,
queue = parameters.queue,
wallTime = parameters.wallTime,
memory = parameters.memory,
nodes = parameters.nodes,
coreByNode = PBSEnvironment.nbCores(parameters),
flavour = parameters.flavour
)
val id = gridscale.pbs.submit(LocalHost(), description)
BatchJob(id, result)
}
def state(id: gridscale.cluster.BatchScheduler.BatchJob) =
GridScaleJobService.translateStatus(gridscale.pbs.state(LocalHost(), id))
def delete(id: gridscale.cluster.BatchScheduler.BatchJob) =
gridscale.pbs.clean(LocalHost(), id)
def stdOutErr(id: gridscale.cluster.BatchScheduler.BatchJob) =
(gridscale.pbs.stdOut(LocalHost(), id), gridscale.pbs.stdErr(LocalHost(), id))
lazy val jobService = BatchJobService(env, concurrency = services.preference(SSHEnvironment.MaxLocalOperations))
override def trySelectJobService() = BatchEnvironment.trySelectSingleJobService(jobService)
}
/*
* Copyright (C) 2012 Romain Reuillon
* Copyright (C) 2014 Jonathan Passerat-Palmbach
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.openmole.plugin.environment.pbs
import fr.iscpif.gridscale.pbs.{ PBSJobDescription, PBSJobService GSPBSJobService }
import fr.iscpif.gridscale.ssh.SSHConnectionCache
import org.openmole.core.workspace.Workspace
import org.openmole.plugin.environment.batch.environment._
import org.openmole.plugin.environment.batch.jobservice.{ BatchJob, BatchJobId }
import org.openmole.plugin.environment.ssh.{ ClusterJobService, SSHService }
import org.openmole.tool.logger.Logger
object PBSJobService extends Logger
import org.openmole.plugin.environment.pbs.PBSJobService._
import squants.time.TimeConversions._
trait PBSJobService extends ClusterJobService { js
val environment: PBSEnvironment
import environment.services._
lazy val jobService = new GSPBSJobService with SSHConnectionCache {
def host = js.host
def user = js.user
def credential = js.credential
override def port = js.port
override def timeout = preference(SSHService.timeout)
}
protected def _submit(serializedJob: SerializedJob) = {
val (remoteScript, result) = buildScript(serializedJob)
val jobDescription = PBSJobDescription(
executable = "/bin/bash",
arguments = remoteScript,
queue = environment.queue,
workDirectory = serializedJob.path,
wallTime = environment.wallTime.map(x x: concurrent.duration.Duration),
memory = Some(BatchEnvironment.requiredMemory(environment.openMOLEMemory, environment.memory).toMegabytes.toInt),
nodes = environment.nodes,
coreByNode = environment.coreByNode orElse environment.threads
)
val jid = js.jobService.submit(jobDescription)
Log.logger.fine(s"PBS job [${jid.pbsId}], description: \n ${jobDescription}")
new BatchJob with BatchJobId {
val jobService = js
val id = jid
val resultPath = result
}
}
}
///*
// * Copyright (C) 2012 Romain Reuillon
// * Copyright (C) 2014 Jonathan Passerat-Palmbach
//
// * This program is free software: you can redistribute it and/or modify
// * it under the terms of the GNU General Public License as published by
// * the Free Software Foundation, either version 3 of the License, or
// * (at your option) any later version.
// *
// * This program is distributed in the hope that it will be useful,
// * but WITHOUT ANY WARRANTY; without even the implied warranty of
// * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// * GNU General Public License for more details.
// *
// * You should have received a copy of the GNU General Public License
// * along with this program. If not, see <http://www.gnu.org/licenses/>.
// */
//
//package org.openmole.plugin.environment.pbs
//
//import fr.iscpif.gridscale.pbs.{ PBSJobDescription, PBSJobService ⇒ GSPBSJobService }
//import fr.iscpif.gridscale.ssh.SSHConnectionCache
//import org.openmole.core.workspace.Workspace
//import org.openmole.plugin.environment.batch.environment._
//import org.openmole.plugin.environment.batch.jobservice.{ BatchJob, BatchJobId }
//import org.openmole.plugin.environment.ssh.{ ClusterJobService, SSHService }
//import org.openmole.tool.logger.Logger
//
//object PBSJobService extends Logger
//
//import org.openmole.plugin.environment.pbs.PBSJobService._
//import squants.time.TimeConversions._
//
//trait PBSJobService extends ClusterJobService { js ⇒
//
// val environment: PBSEnvironment
// import environment.services._
//
// lazy val jobService = new GSPBSJobService with SSHConnectionCache {
// def host = js.host
// def user = js.user
// def credential = js.credential
// override def port = js.port
// override def timeout = preference(SSHService.timeout)
// }
//
// protected def _submit(serializedJob: SerializedJob) = {
// val (remoteScript, result) = buildScript(serializedJob)