Commit c5fbe228 authored by Romain Reuillon's avatar Romain Reuillon
Browse files

[Plugin] enh: implement a single submission method and get rid of the jobservice typeclass.

parent 1b1afe00
......@@ -79,6 +79,7 @@ def allThirdParties = Seq(
openmoleOSGi,
openmoleRandom,
openmoleNetwork,
openmoleException,
txtmark)
lazy val openmoleCache = OsgiProject(thirdPartiesDir, "org.openmole.tool.cache", imports = Seq("*")) dependsOn (openmoleLogger) settings (thirdPartiesSettings: _*) settings (libraryDependencies += Libraries.squants, libraryDependencies += Libraries.cats)
......@@ -97,6 +98,7 @@ lazy val openmoleByteCode = OsgiProject(thirdPartiesDir, "org.openmole.tool.byte
lazy val openmoleOSGi = OsgiProject(thirdPartiesDir, "org.openmole.tool.osgi", imports = Seq("*")) dependsOn (openmoleFile) settings (libraryDependencies += Libraries.equinoxOSGi) settings (thirdPartiesSettings: _*)
lazy val openmoleRandom = OsgiProject(thirdPartiesDir, "org.openmole.tool.random", imports = Seq("*")) settings (thirdPartiesSettings: _*) settings(libraryDependencies += Libraries.math, Libraries.addScalaLang(scalaVersionValue)) dependsOn (openmoleCache)
lazy val openmoleNetwork = OsgiProject(thirdPartiesDir, "org.openmole.tool.network", imports = Seq("*")) settings (thirdPartiesSettings: _*)
lazy val openmoleException = OsgiProject(thirdPartiesDir, "org.openmole.tool.exception", imports = Seq("*")) settings (thirdPartiesSettings: _*)
lazy val txtmark = OsgiProject(thirdPartiesDir, "com.quandora.txtmark", exports = Seq("com.github.rjeschke.txtmark.*"), imports = Seq("*")) settings (thirdPartiesSettings: _*)
......@@ -189,7 +191,7 @@ lazy val exception = OsgiProject(coreDir, "org.openmole.core.exception", imports
lazy val tools = OsgiProject(coreDir, "org.openmole.core.tools", global = true, imports = Seq("*")) settings
(libraryDependencies ++= Seq(Libraries.xstream, Libraries.exec, Libraries.math, Libraries.scalatest, Libraries.equinoxOSGi), Libraries.addScalaLang(scalaVersionValue)) dependsOn
(exception, openmoleTar, openmoleFile, openmoleLock, openmoleThread, openmoleHash, openmoleLogger, openmoleStream, openmoleCollection, openmoleStatistics, openmoleTypes, openmoleCache, openmoleRandom, openmoleNetwork) settings (coreSettings: _*)
(exception, openmoleTar, openmoleFile, openmoleLock, openmoleThread, openmoleHash, openmoleLogger, openmoleStream, openmoleCollection, openmoleStatistics, openmoleTypes, openmoleCache, openmoleRandom, openmoleNetwork, openmoleException) settings (coreSettings: _*)
lazy val event = OsgiProject(coreDir, "org.openmole.core.event", imports = Seq("*")) dependsOn (tools) settings (coreSettings: _*)
......
......@@ -35,7 +35,6 @@ import org.openmole.core.threadprovider.{ ThreadProvider, Updater }
import org.openmole.core.workflow.execution._
import org.openmole.core.workflow.job._
import org.openmole.core.workspace._
import org.openmole.plugin.environment.batch.jobservice._
import org.openmole.plugin.environment.batch.refresh._
import org.openmole.plugin.environment.batch.storage._
import org.openmole.tool.cache._
......@@ -46,12 +45,9 @@ import squants.time.TimeConversions._
import squants.information.Information
import squants.information.InformationConversions._
import org.openmole.core.location._
import org.openmole.plugin.environment.batch
import org.openmole.plugin.environment.batch.environment.BatchEnvironment.{ ExecutionJobRegistry, signalUpload }
import org.openmole.plugin.environment.batch.environment.BatchEnvironment.{ ExecutionJobRegistry }
import scala.collection.immutable.TreeSet
import scala.ref.WeakReference
import scala.util.Random
object BatchEnvironment extends JavaLogger {
......@@ -163,12 +159,12 @@ object BatchEnvironment extends JavaLogger {
job.environment.plugins ++
Seq(job.environment.jvmLinuxX64, job.environment.runtime)
def serializeJob[S](storage: S, remoteStorage: RemoteStorage, job: BatchExecutionJob, communicationPath: String, replicaDirectory: String, clean: () Unit)(implicit services: BatchEnvironment.Services, storageInterface: StorageInterface[S], environmentStorage: EnvironmentStorage[S]) = {
def serializeJob[S](storage: S, remoteStorage: RemoteStorage, job: BatchExecutionJob, communicationPath: String, replicaDirectory: String)(implicit services: BatchEnvironment.Services, storageInterface: StorageInterface[S], environmentStorage: EnvironmentStorage[S]) = {
val storageService = StorageService(storage)
initCommunication(job, storageService, remoteStorage, communicationPath, replicaDirectory, clean)
initCommunication(job, storageService, remoteStorage, communicationPath, replicaDirectory)
}
def initCommunication(job: BatchExecutionJob, storage: StorageService[_], remoteStorage: RemoteStorage, communicationPath: String, replicaDirectory: String, clean: () Unit)(implicit services: BatchEnvironment.Services): SerializedJob = services.newFile.withTmpFile("job", ".tar") { jobFile
def initCommunication(job: BatchExecutionJob, storage: StorageService[_], remoteStorage: RemoteStorage, communicationPath: String, replicaDirectory: String)(implicit services: BatchEnvironment.Services): SerializedJob = services.newFile.withTmpFile("job", ".tar") { jobFile
import services._
serializerService.serialise(job.runnableTasks, jobFile)
......@@ -208,7 +204,7 @@ object BatchEnvironment extends JavaLogger {
FileMessage(path, hash)
}
SerializedJob(storage, inputPath, runtime, serializedStorage, Some(outputPath), clean)
SerializedJob(storage, inputPath, runtime, serializedStorage, Some(outputPath))
}
def toReplicatedFile(file: File, storage: StorageService[_], replicaDirectory: String, transferOptions: TransferOptions)(implicit services: BatchEnvironment.Services): ReplicatedFile = {
......@@ -276,11 +272,6 @@ object BatchEnvironment extends JavaLogger {
)
}
def resultPathInSerializedJob(serializedJob: SerializedJob) = serializedJob.resultPath.get
def submitSerializedJob[S](jobService: S, batchExecutionJob: BatchExecutionJob, serializedJob: SerializedJob, resultPath: SerializedJob String = resultPathInSerializedJob)(implicit jobServiceInterface: JobServiceInterface[S]) =
BatchJobService.submit(jobService, batchExecutionJob, serializedJob, () resultPath(serializedJob))
def isClean(environment: BatchEnvironment)(implicit services: BatchEnvironment.Services) = {
val environmentJobs = environment.jobs
environmentJobs.forall(_.state == ExecutionState.KILLED)
......@@ -333,10 +324,6 @@ abstract class BatchEnvironment extends SubmissionEnvironment { env ⇒
def exceptions = services.preference(Environment.maxExceptionsLog)
def serializeJob(batchExecutionJob: BatchExecutionJob): SerializedJob
def submitSerializedJob(batchExecutionJob: BatchExecutionJob, serializedJob: SerializedJob): BatchJobControl
//def delete(serializedJob: SerializedJob, batchExecutionJob: Option[BatchExecutionJob]): Unit
def clean = BatchEnvironment.isClean(this)
lazy val registry = new ExecutionJobRegistry()
......@@ -354,6 +341,8 @@ abstract class BatchEnvironment extends SubmissionEnvironment { env ⇒
JobManager ! Manage(bej)
}
def execute(batchExecutionJob: BatchExecutionJob): BatchJobControl
def runtime = BatchEnvironment.runtimeLocation
def jvmLinuxX64 = BatchEnvironment.JVMLinuxX64Location
......
......@@ -16,63 +16,35 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.openmole.plugin.environment.batch.jobservice
package org.openmole.plugin.environment.batch.environment
import org.openmole.core.event.EventDispatcher
import org.openmole.core.workflow.execution.ExecutionState._
import org.openmole.plugin.environment.batch.environment._
import org.openmole.tool.logger.JavaLogger
import org.openmole.plugin.environment.batch.storage.StorageService
import scala.concurrent.stm._
trait JobServiceInterface[JS] {
type J
def submit(js: JS, serializedJob: SerializedJob, batchExecutionJob: BatchExecutionJob): J
def state(js: JS, j: J): ExecutionState
def delete(js: JS, j: J): Unit
def stdOutErr(js: JS, j: J): (String, String)
}
object BatchJobService extends JavaLogger {
object BatchJobControl {
def tryStdOutErr(batchJob: BatchJobControl) = util.Try(batchJob.stdOutErr())
def submit[JS](jobService: JS, batchExecutionJob: BatchExecutionJob, serializedJob: SerializedJob, resultPath: () String)(implicit jobServiceInterface: JobServiceInterface[JS]): BatchJobControl = {
def updateState(job: jobServiceInterface.J)(): ExecutionState = jobServiceInterface.state(jobService, job)
def delete(job: jobServiceInterface.J)() = jobServiceInterface.delete(jobService, job)
def stdOutErr(job: jobServiceInterface.J)() = jobServiceInterface.stdOutErr(jobService, job)
val job = jobServiceInterface.submit(jobService, serializedJob, batchExecutionJob)
BatchJobService.Log.logger.fine(s"Successful submission: ${job}")
BatchJobControl(
updateState(job),
delete(job),
stdOutErr(job),
resultPath
)
}
}
object BatchJobControl {
def apply(
updateState: () ExecutionState,
delete: () Unit,
stdOutErr: () (String, String),
resultPath: () String): BatchJobControl = new BatchJobControl(
storageService: StorageService[_],
updateState: () ExecutionState,
delete: () Unit,
stdOutErr: () (String, String),
resultPath: () String,
clean: () Unit): BatchJobControl = new BatchJobControl(
storageService,
updateState,
delete,
stdOutErr,
resultPath)
resultPath,
clean)
}
class BatchJobControl(
val storage: StorageService[_],
val updateState: () ExecutionState,
val delete: () Unit,
val stdOutErr: () (String, String),
val resultPath: () String)
val resultPath: () String,
val clean: () Unit)
......@@ -25,5 +25,4 @@ case class SerializedJob(
inputPath: String,
runtime: Runtime,
remoteStorage: FileMessage,
resultPath: Option[String],
clean: () Unit)
\ No newline at end of file
resultPath: Option[String])
\ No newline at end of file
......@@ -21,19 +21,12 @@ import java.io.PrintStream
import org.openmole.core.communication.message._
import org.openmole.core.communication.storage._
import org.openmole.core.event.EventDispatcher
import org.openmole.core.exception.InternalProcessingError
import org.openmole.core.fileservice.FileService
import org.openmole.core.outputmanager.OutputManager
import org.openmole.core.preference.Preference
import org.openmole.core.serializer.SerializerService
import org.openmole.core.tools.service.Retry._
import org.openmole.core.workflow.execution
import org.openmole.core.workflow.execution._
import org.openmole.core.workspace.{ NewFile, Workspace }
import org.openmole.plugin.environment.batch.environment.BatchEnvironment._
import org.openmole.plugin.environment.batch.environment._
import org.openmole.plugin.environment.batch.jobservice.{ BatchJobControl, BatchJobService }
import org.openmole.plugin.environment.batch.storage._
import org.openmole.tool.file._
import org.openmole.tool.logger.JavaLogger
......@@ -45,24 +38,23 @@ object GetResultActor extends JavaLogger {
case class JobRemoteExecutionException(message: String, cause: Throwable) extends InternalProcessingError(message, cause)
def receive(msg: GetResult)(implicit services: BatchEnvironment.Services) = {
val GetResult(job, sj, resultPath, batchJob) = msg
val GetResult(job, resultPath, batchJob) = msg
try {
getResult(sj.storage, resultPath, job)
JobManager ! Kill(job, Some(batchJob), Some(sj))
getResult(batchJob.storage, resultPath, job)
JobManager ! Kill(job, Some(batchJob))
}
catch {
case e: Throwable
job.state = ExecutionState.FAILED
val stdOutErr = BatchJobService.tryStdOutErr(batchJob).toOption
val stdOutErr = BatchJobControl.tryStdOutErr(batchJob).toOption
JobManager ! Error(job, e, stdOutErr)
JobManager ! Kill(job, Some(batchJob), Some(sj))
JobManager ! Kill(job, Some(batchJob))
}
}
def getResult(storage: StorageService[_], outputFilePath: String, batchJob: BatchExecutionJob)(implicit services: BatchEnvironment.Services): Unit = {
import batchJob.job
import services._
val runtimeResult = getRuntimeResult(outputFilePath, storage)
......
......@@ -22,8 +22,7 @@ import java.util.concurrent.TimeUnit
import org.openmole.core.tools.service.Retry.retry
import org.openmole.core.workflow.execution._
import org.openmole.plugin.environment.batch.environment._
import org.openmole.plugin.environment.batch.jobservice.BatchJobControl
import org.openmole.plugin.environment.batch.environment.{ BatchJobControl, _ }
import org.openmole.tool.logger.JavaLogger
import org.openmole.tool.thread._
......@@ -36,7 +35,6 @@ object JobManager extends JavaLogger { self ⇒
def messagePriority(message: DispatchedMessage) =
message match {
case msg: Upload 10
case msg: Submit 50
case msg: Refresh 5
case msg: GetResult 50
......@@ -47,10 +45,9 @@ object JobManager extends JavaLogger { self ⇒
object DispatcherActor {
def receive(dispatched: DispatchedMessage)(implicit services: BatchEnvironment.Services) =
dispatched match {
case msg: Upload if (!msg.job.job.finished) UploadActor.receive(msg) else self ! Kill(msg.job, None, None)
case msg: Submit if (!msg.job.job.finished) SubmitActor.receive(msg) else self ! Kill(msg.job, None, Some(msg.serializedJob))
case msg: Refresh if (!msg.job.job.finished) RefreshActor.receive(msg) else self ! Kill(msg.job, Some(msg.batchJob), Some(msg.serializedJob))
case msg: GetResult if (!msg.job.job.finished) GetResultActor.receive(msg) else self ! Kill(msg.job, Some(msg.batchJob), Some(msg.serializedJob))
case msg: Submit if (!msg.job.job.finished) SubmitActor.receive(msg) else self ! Kill(msg.job, None)
case msg: Refresh if (!msg.job.job.finished) RefreshActor.receive(msg) else self ! Kill(msg.job, Some(msg.batchJob))
case msg: GetResult if (!msg.job.job.finished) GetResultActor.receive(msg) else self ! Kill(msg.job, Some(msg.batchJob))
case msg: RetryAction RetryActionActor.receive(msg)
case msg: Error ErrorActor.receive(msg)
}
......@@ -59,7 +56,6 @@ object JobManager extends JavaLogger { self ⇒
def dispatch(msg: DispatchedMessage)(implicit services: BatchEnvironment.Services) = services.threadProvider.submit(messagePriority(msg)) { () DispatcherActor.receive(msg) }
def !(msg: JobMessage)(implicit services: BatchEnvironment.Services): Unit = msg match {
case msg: Upload dispatch(msg)
case msg: Submit dispatch(msg)
case msg: Refresh dispatch(msg)
case msg: GetResult dispatch(msg)
......@@ -67,27 +63,25 @@ object JobManager extends JavaLogger { self ⇒
case msg: Error dispatch(msg)
case Manage(job)
self ! Upload(job)
self ! Submit(job)
case Delay(msg, delay)
services.threadProvider.scheduler.schedule((self ! msg): Runnable, delay.millis, TimeUnit.MILLISECONDS)
case Uploaded(job, sj) self ! Submit(job, sj)
case Submitted(job, bj)
self ! Delay(Refresh(job, bj, job.environment.updateInterval.minUpdateInterval), job.environment.updateInterval.minUpdateInterval)
case Submitted(job, sj, bj)
self ! Delay(Refresh(job, sj, bj, job.environment.updateInterval.minUpdateInterval), job.environment.updateInterval.minUpdateInterval)
case Kill(job, batchJob, serializedJob)
case Kill(job, batchJob)
BatchEnvironment.finishedExecutionJob(job.environment, job)
tryKillAndClean(job, batchJob, serializedJob)
tryKillAndClean(job, batchJob)
job.state = ExecutionState.KILLED
if (job.job.finished) BatchEnvironment.finishedJob(job.environment, job.job)
if (!job.job.finished && BatchEnvironment.numberOfExecutionJobs(job.environment, job.job) == 0) job.environment.submit(job.job)
case Resubmit(job, storage, batchJob, serializedJob)
tryKillAndClean(job, Some(batchJob), Some(serializedJob))
case Resubmit(job, batchJob)
tryKillAndClean(job, Some(batchJob))
job.state = ExecutionState.READY
dispatch(Upload(job))
dispatch(Submit(job))
case MoleJobError(mj, j, e)
val er = Environment.MoleJobExceptionRaised(j, e, WARNING, mj)
......@@ -97,15 +91,15 @@ object JobManager extends JavaLogger { self ⇒
}
def tryKillAndClean(job: BatchExecutionJob, bj: Option[BatchJobControl], sj: Option[SerializedJob])(implicit services: BatchEnvironment.Services) = {
def killBatchJob(bj: BatchJobControl)(implicit services: BatchEnvironment.Services) = retry(services.preference(BatchEnvironment.killJobRetry))(bj.delete)
def cleanSerializedJob(sj: SerializedJob)(implicit services: BatchEnvironment.Services) = retry(services.preference(BatchEnvironment.cleanJobRetry))(sj.clean())
def tryKillAndClean(job: BatchExecutionJob, bj: Option[BatchJobControl])(implicit services: BatchEnvironment.Services) = {
def kill(bj: BatchJobControl)(implicit services: BatchEnvironment.Services) = retry(services.preference(BatchEnvironment.killJobRetry))(bj.delete())
def clean(bj: BatchJobControl)(implicit services: BatchEnvironment.Services) = retry(services.preference(BatchEnvironment.cleanJobRetry))(bj.clean())
try bj.foreach(killBatchJob) catch {
try bj.foreach(kill) catch {
case e: Throwable self ! Error(job, e, None)
}
try sj.foreach(cleanSerializedJob) catch {
try bj.foreach(clean) catch {
case e: Throwable self ! Error(job, e, None)
}
}
......
......@@ -19,23 +19,19 @@ package org.openmole.plugin.environment.batch.refresh
import org.openmole.core.workflow.job._
import org.openmole.plugin.environment.batch.environment._
import org.openmole.plugin.environment.batch.jobservice._
import org.openmole.plugin.environment.batch.storage._
import squants.time.Time
sealed trait JobMessage
sealed trait DispatchedMessage
case class Upload(job: BatchExecutionJob) extends JobMessage with DispatchedMessage
case class Uploaded(job: BatchExecutionJob, serializedJob: SerializedJob) extends JobMessage
case class Submit(job: BatchExecutionJob, serializedJob: SerializedJob) extends JobMessage with DispatchedMessage
case class Submitted(job: BatchExecutionJob, serializedJob: SerializedJob, batchJob: BatchJobControl) extends JobMessage
case class Refresh(job: BatchExecutionJob, serializedJob: SerializedJob, batchJob: BatchJobControl, delay: Time, consecutiveUpdateErrors: Int = 0) extends JobMessage with DispatchedMessage
case class Resubmit(job: BatchExecutionJob, storage: StorageService[_], batchJob: BatchJobControl, serializedJob: SerializedJob) extends JobMessage
case class Submit(job: BatchExecutionJob) extends JobMessage with DispatchedMessage
case class Submitted(job: BatchExecutionJob, batchJob: BatchJobControl) extends JobMessage
case class Refresh(job: BatchExecutionJob, batchJob: BatchJobControl, delay: Time, consecutiveUpdateErrors: Int = 0) extends JobMessage with DispatchedMessage
case class Resubmit(job: BatchExecutionJob, batchJob: BatchJobControl) extends JobMessage
case class Delay(msg: JobMessage, delay: Time) extends JobMessage
case class Error(job: BatchExecutionJob, exception: Throwable, stdOutErr: Option[(String, String)]) extends JobMessage with DispatchedMessage
case class Kill(job: BatchExecutionJob, batchJob: Option[BatchJobControl], serializedJob: Option[SerializedJob]) extends JobMessage
case class GetResult(job: BatchExecutionJob, serializedJob: SerializedJob, outputFilePath: String, batchJob: BatchJobControl) extends JobMessage with DispatchedMessage
case class Kill(job: BatchExecutionJob, batchJob: Option[BatchJobControl]) extends JobMessage
case class GetResult(job: BatchExecutionJob, outputFilePath: String, batchJob: BatchJobControl) extends JobMessage with DispatchedMessage
case class Manage(job: BatchExecutionJob) extends JobMessage
case class MoleJobError(moleJob: MoleJob, job: BatchExecutionJob, exception: Throwable) extends JobMessage
case class RetryAction(action: () Boolean) extends JobMessage with DispatchedMessage
......@@ -18,10 +18,8 @@
package org.openmole.plugin.environment.batch.refresh
import org.openmole.core.exception.InternalProcessingError
import org.openmole.core.outputmanager.OutputManager
import org.openmole.core.workflow.execution.ExecutionState._
import org.openmole.plugin.environment.batch.environment.{ BatchEnvironment, ResubmitException, AccessControl }
import org.openmole.plugin.environment.batch.jobservice.BatchJobService
import org.openmole.plugin.environment.batch.environment.{ BatchEnvironment, BatchJobControl, ResubmitException }
import org.openmole.tool.logger.JavaLogger
object RefreshActor extends JavaLogger {
......@@ -29,38 +27,38 @@ object RefreshActor extends JavaLogger {
def receive(refresh: Refresh)(implicit services: BatchEnvironment.Services) = {
import services._
val Refresh(job, sj, bj, delay, updateErrorsInARow) = refresh
val Refresh(job, bj, delay, updateErrorsInARow) = refresh
if (!job.state.isFinal) {
try {
val oldState = job.state
job.state = bj.updateState()
if (job.state == DONE) JobManager ! GetResult(job, sj, bj.resultPath(), bj)
if (job.state == DONE) JobManager ! GetResult(job, bj.resultPath(), bj)
else if (!job.state.isFinal) {
val newDelay =
if (oldState == job.state)
(delay + job.environment.updateInterval.incrementUpdateInterval) min job.environment.updateInterval.maxUpdateInterval
else job.environment.updateInterval.minUpdateInterval
JobManager ! Delay(Refresh(job, sj, bj, newDelay, 0), newDelay)
JobManager ! Delay(Refresh(job, bj, newDelay, 0), newDelay)
}
else if (job.state == FAILED) {
val exception = new InternalProcessingError(s"""Job status is FAILED""".stripMargin)
val stdOutErr = BatchJobService.tryStdOutErr(bj).toOption
val stdOutErr = BatchJobControl.tryStdOutErr(bj).toOption
JobManager ! Error(job, exception, stdOutErr)
JobManager ! Kill(job, Some(bj), Some(sj))
JobManager ! Kill(job, Some(bj))
}
else JobManager ! Kill(job, Some(bj), Some(sj))
else JobManager ! Kill(job, Some(bj))
}
catch {
case _: ResubmitException
JobManager ! Resubmit(job, sj.storage, bj, sj)
JobManager ! Resubmit(job, bj)
case e: Throwable
if (updateErrorsInARow >= preference(BatchEnvironment.MaxUpdateErrorsInARow)) {
JobManager ! Error(job, e, BatchJobService.tryStdOutErr(bj).toOption)
JobManager ! Kill(job, Some(bj), Some(sj))
JobManager ! Error(job, e, BatchJobControl.tryStdOutErr(bj).toOption)
JobManager ! Kill(job, Some(bj))
}
else {
Log.logger.log(Log.FINE, s"${updateErrorsInARow + 1} errors in a row during job refresh", e)
JobManager ! Delay(Refresh(job, sj, bj, delay, updateErrorsInARow + 1), delay)
JobManager ! Delay(Refresh(job, bj, delay, updateErrorsInARow + 1), delay)
}
}
......
......@@ -25,18 +25,18 @@ object SubmitActor {
def receive(submit: Submit)(implicit services: BatchEnvironment.Services) = {
import services._
val Submit(job, sj) = submit
val Submit(job) = submit
if (!job.state.isFinal) {
try {
val bj = job.environment.submitSerializedJob(job, sj)
val bj = job.environment.execute(job)
job.state = SUBMITTED
JobManager ! Submitted(job, sj, bj)
JobManager ! Submitted(job, bj)
}
catch {
case e: Throwable
JobManager ! Error(job, e, None)
JobManager ! Submit(job, sj)
JobManager ! Submit(job)
}
}
}
......
/*
* Copyright (C) 2012 Romain Reuillon
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.openmole.plugin.environment.batch.refresh
import org.openmole.plugin.environment.batch.environment._
import org.openmole.tool.logger.JavaLogger
object UploadActor extends JavaLogger {
def receive(msg: Upload)(implicit services: BatchEnvironment.Services) = {
import services._
val job = msg.job
if (!job.state.isFinal) {
try {
val sj = job.environment.serializeJob(job)
JobManager ! Uploaded(job, sj)
}
catch {
case e: Throwable
JobManager ! Error(job, e, None)
JobManager ! msg
}
}
}
}
......@@ -46,6 +46,9 @@ object StorageService extends JavaLogger {
JobManager ! RetryAction(() action)
}
def rmDirectory[S](s: S, directory: String)(implicit hierarchicalStorageInterface: HierarchicalStorageInterface[S]) =
hierarchicalStorageInterface.rmDir(s, directory)
}
class StorageService[S](val storage: S)(implicit storageInterface: StorageInterface[S], environmentStorage: EnvironmentStorage[S]) {
......
......@@ -17,54 +17,28 @@
package org.openmole.plugin.environment.egi
import java.io.File
import org.apache.commons.configuration2.HierarchicalConfigurationXMLReader
import org.bouncycastle.voms.VOMSAttribute
import org.openmole.core.authentication.AuthenticationStore