JobManager.scala 4.74 KB
Newer Older
1
/*
Romain Reuillon's avatar
Romain Reuillon committed
2
 * Copyright (C) 2012 Romain Reuillon
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

18
package org.openmole.plugin.environment.batch.refresh
19

20
import java.util.concurrent.TimeUnit
21

Romain Reuillon's avatar
Romain Reuillon committed
22
import org.openmole.core.workflow.execution._
Romain Reuillon's avatar
Romain Reuillon committed
23
import org.openmole.core.workflow.mole.MoleExecution.moleJobIsFinished
Romain Reuillon's avatar
Romain Reuillon committed
24
import org.openmole.core.workflow.mole.{ MoleExecution, MoleExecutionMessage }
25
import org.openmole.plugin.environment.batch.environment.BatchEnvironment.ExecutionJobRegistry
26
import org.openmole.plugin.environment.batch.environment.JobStore.StoredJob
27
28
29
import org.openmole.plugin.environment.batch.environment._
import org.openmole.tool.logger.JavaLogger
import org.openmole.tool.thread._
30

31
object JobManager extends JavaLogger { self 
32
  import Log._
33

34
35
  def killPriority = 10

36
37
  def messagePriority(message: DispatchedMessage) =
    message match {
38
39
40
      case _: Refresh    5
      case _: Submit     50
      case _: GetResult  50
41
      case _: Kill       10
42
43
44
      case _: Manage     75
      case _: Error      100 // This is very quick to process
      case _             1
45
    }
46

47
  object DispatcherActor {
48
49
50
    def receive(dispatched: DispatchedMessage)(implicit services: BatchEnvironment.Services) = {
      System.runFinalization // Help with finalization just in case

51
      dispatched match {
52
53
54
        case msg: Submit       SubmitActor.receive(msg)
        case msg: Refresh      RefreshActor.receive(msg)
        case msg: GetResult    GetResultActor.receive(msg)
55
56
        case msg: RetryAction  RetryActionActor.receive(msg)
        case msg: Error        ErrorActor.receive(msg)
57
        case msg: Kill         KillActor.receive(msg)
58
      }
59
    }
60
61
  }

62
  def dispatch(msg: DispatchedMessage)(implicit services: BatchEnvironment.Services) = services.threadProvider.submit(messagePriority(msg)) { ()  DispatcherActor.receive(msg) }
63

64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
  def !(msg: JobMessage)(implicit services: BatchEnvironment.Services): Unit = {
    msg match {
      case msg: Submit       shouldKill(msg.job.environment, msg.job.storedJob, Kill(msg.job, None)) { ()  dispatch(msg) }
      case msg: Refresh      shouldKill(msg.job.environment, msg.job.storedJob, Kill(msg.job, Some(msg.batchJob))) { ()  dispatch(msg) }
      case msg: GetResult    shouldKill(msg.job.environment, msg.job.storedJob, Kill(msg.job, Some(msg.batchJob))) { ()  dispatch(msg) }
      case msg: RetryAction  dispatch(msg)
      case msg: Error        dispatch(msg)
      case msg: Kill         dispatch(msg)

      case Manage(job, environment) 
        val bej = BatchExecutionJob(job, environment)
        ExecutionJobRegistry.register(environment.registry, bej)
        services.eventDispatcher.trigger(environment, Environment.JobSubmitted(bej))
        self ! Submit(bej)

      case Delay(msg, delay) 
        services.threadProvider.scheduler.schedule((self ! msg): Runnable, delay.millis, TimeUnit.MILLISECONDS)

      case Submitted(job, bj) 
        shouldKill(job.environment, job.storedJob, Kill(job, Some(bj))) { ()  self ! Delay(Refresh(job, bj, bj.updateInterval.minUpdateInterval), bj.updateInterval.minUpdateInterval) }

      case MoleJobError(mj, j, e) 
        val er = Environment.MoleJobExceptionRaised(j, e, WARNING, mj)
        j.environment.error(er)
        services.eventDispatcher.trigger(j.environment: Environment, er)
        logger.log(FINE, "Error during job execution, it will be resubmitted.", e)
Romain Reuillon's avatar
Romain Reuillon committed
90

91
    }
92
  }
93

94
95
  def sendToMoleExecution(job: StoredJob)(f: MoleExecution  Unit) =
    MoleExecutionMessage.send(job.moleExecution) { MoleExecutionMessage.WithMoleExecutionSate(f) }
96

97
  def canceled(storedJob: StoredJob) = storedJob.storedMoleJobs.forall(_.subMoleCanceled())
98

99
100
  def shouldKill(environment: BatchEnvironment, storedJob: StoredJob, kill: Kill)(op: ()  Unit)(implicit services: BatchEnvironment.Services) = {
    if (environment.stopped || canceled(storedJob)) self ! kill
101
102
103
    else sendToMoleExecution(storedJob) { state 
      if (!jobIsFinished(state, storedJob)) op() else self ! kill
    }
104
  }
Romain Reuillon's avatar
Romain Reuillon committed
105
106
107
108

  def jobIsFinished(moleExecution: MoleExecution, job: StoredJob) =
    job.storedMoleJobs.map(_.id).forall(mj  moleJobIsFinished(moleExecution, mj))

109
}