MoleExecution.scala 33.9 KB
Newer Older
Romain Reuillon's avatar
Romain Reuillon committed
1
/*
Romain Reuillon's avatar
Romain Reuillon committed
2
 * Copyright (C) 2010 Romain Reuillon
Romain Reuillon's avatar
Romain Reuillon committed
3
4
5
6
7
8
9
10
11
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
 * GNU Affero General Public License for more details.
Romain Reuillon's avatar
Romain Reuillon committed
13
 *
14
 * You should have received a copy of the GNU Affero General Public License
Romain Reuillon's avatar
Romain Reuillon committed
15
16
17
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

Romain Reuillon's avatar
Romain Reuillon committed
18
package org.openmole.core.workflow.mole
Romain Reuillon's avatar
Romain Reuillon committed
19
20

import java.util.UUID
21
import java.util.concurrent.{ Executors, Semaphore }
22
import java.util.logging.Level
23
24

import org.openmole.core.context.{ Context, Variable }
25
import org.openmole.core.event._
26
import org.openmole.core.exception.{ InternalProcessingError, UserBadDataError }
27
import org.openmole.core.threadprovider.ThreadProvider
28
29
import org.openmole.core.workflow.dsl._
import org.openmole.core.workflow.execution._
30
import org.openmole.core.workflow.job.State._
Romain Reuillon's avatar
Romain Reuillon committed
31
import org.openmole.core.workflow.job._
32
import org.openmole.core.workflow.mole
33
34
import org.openmole.core.workflow.mole.MoleExecution.{ Cleaned, MoleExecutionFailed, SubMoleExecutionState }
import org.openmole.core.workflow.task.TaskExecutionContext
35
import org.openmole.core.workflow.tools.{ OptionalArgument  _, _ }
36
import org.openmole.core.workflow.transition.{ DataChannel, IAggregationTransition, ITransition }
37
import org.openmole.core.workflow.validation._
38
import org.openmole.tool.cache.KeyValueCache
39
import org.openmole.tool.collection.{ PriorityQueue, StaticArrayBuffer }
40
import org.openmole.tool.lock._
41
import org.openmole.tool.thread._
42
import org.openmole.tool.logger.{ JavaLogger, LoggerService }
43

44
import scala.collection.mutable.{ Buffer, ListBuffer }
45

46
object MoleExecution extends JavaLogger {
Romain Reuillon's avatar
Romain Reuillon committed
47

48
  class Started extends Event[MoleExecution]
49
  case class Finished(canceled: Boolean) extends Event[MoleExecution]
50
51
  case class JobCreated(moleJob: MoleJob, capsule: MoleCapsule) extends Event[MoleExecution]
  case class JobSubmitted(moleJob: Job, capsule: MoleCapsule, environment: Environment) extends Event[MoleExecution]
52
  case class JobFinished(moleJob: MoleJobId, context: Context, capsule: MoleCapsule) extends Event[MoleExecution]
53
  case class Cleaned() extends Event[MoleExecution]
54

Romain Reuillon's avatar
Romain Reuillon committed
55
56
57
58
59
60
61
62
63
64
65
  object MoleExecutionFailed {
    def exception(moleExecutionError: MoleExecutionFailed) = moleExecutionError.exception
    def capsule(moleExecutionError: MoleExecutionFailed) = moleExecutionError match {
      case e: JobFailed              Some(e.capsule)
      case e: ExceptionRaised        Some(e.capsule)
      case e: SourceExceptionRaised  Some(e.capsule)
      case e: HookExceptionRaised    Some(e.capsule)
      case e: MoleExecutionError     None
    }
  }

66
67
  sealed trait MoleExecutionFailed {
    def exception: Throwable
68
  }
69

70
  case class JobFailed(moleJob: MoleJobId, capsule: MoleCapsule, exception: Throwable) extends Event[MoleExecution] with MoleExecutionFailed {
71
72
    def level = Level.SEVERE
  }
73

74
  case class ExceptionRaised(moleJob: MoleJobId, capsule: MoleCapsule, exception: Throwable, level: Level) extends Event[MoleExecution] with MoleExecutionFailed
75
  case class SourceExceptionRaised(source: Source, capsule: MoleCapsule, exception: Throwable, level: Level) extends Event[MoleExecution] with MoleExecutionFailed
76
  case class HookExceptionRaised(hook: Hook, capsule: MoleCapsule, moleJob: MoleJobId, exception: Throwable, level: Level) extends Event[MoleExecution] with MoleExecutionFailed
77
  case class MoleExecutionError(exception: Throwable) extends MoleExecutionFailed
78

79
80
  private def listOfTupleToMap[K, V](l: Traversable[(K, V)]): Map[K, Traversable[V]] = l.groupBy(_._1).mapValues(_.map(_._2))

81
  def apply(
82
    mole:                        Mole,
83
84
85
86
    sources:                     Iterable[(MoleCapsule, Source)]            = Iterable.empty,
    hooks:                       Iterable[(MoleCapsule, Hook)]              = Iterable.empty,
    environments:                Map[MoleCapsule, EnvironmentProvider]      = Map.empty,
    grouping:                    Map[MoleCapsule, Grouping]                 = Map.empty,
87
88
89
    implicits:                   Context                                    = Context.empty,
    defaultEnvironment:          OptionalArgument[LocalEnvironmentProvider] = None,
    cleanOnFinish:               Boolean                                    = true,
90
91
92
    startStopDefaultEnvironment: Boolean                                    = true,
    taskCache:                   KeyValueCache                              = KeyValueCache(),
    lockRepository:              LockRepository[LockKey]                    = LockRepository()
93
  )(implicit moleServices: MoleServices): MoleExecution = {
94

95
    def defaultDefaultEnvironment = LocalEnvironment()(varName = sourcecode.Name("local"))
96

97
    new MoleExecution(
98
      mole,
99
100
      listOfTupleToMap(sources),
      listOfTupleToMap(hooks),
101
      environments,
102
      grouping,
103
      defaultEnvironment.getOrElse(defaultDefaultEnvironment),
104
105
      cleanOnFinish,
      implicits,
Romain Reuillon's avatar
Romain Reuillon committed
106
      MoleExecutionContext()(moleServices),
107
      startStopDefaultEnvironment,
108
      id = UUID.randomUUID().toString,
109
      keyValueCache = taskCache,
110
      lockRepository = lockRepository
111
    )
112
  }
Romain Reuillon's avatar
Romain Reuillon committed
113

114
  type CapsuleStatuses = Map[MoleCapsule, JobStatuses]
115

116
  case class JobStatuses(ready: Long, running: Long, completed: Long)
Romain Reuillon's avatar
Romain Reuillon committed
117

118
119
120
121
122
123
124
  object AggregationTransitionRegistryRecord {
    def apply(size: Int): AggregationTransitionRegistryRecord =
      new AggregationTransitionRegistryRecord(new StaticArrayBuffer(size), new StaticArrayBuffer(size))
  }

  case class AggregationTransitionRegistryRecord(ids: StaticArrayBuffer[Long], values: StaticArrayBuffer[Array[Any]])
  type AggregationTransitionRegistry = RegistryWithTicket[IAggregationTransition, AggregationTransitionRegistryRecord]
125
126
  type MasterCapsuleRegistry = RegistryWithTicket[MasterCapsule, Context]
  type TransitionRegistry = RegistryWithTicket[ITransition, Iterable[Variable[_]]]
127

128
129
  def cancel(subMoleExecution: SubMoleExecutionState): Unit = {
    subMoleExecution.canceled = true
Romain Reuillon's avatar
Romain Reuillon committed
130
131
132
133
134

    val allJobs = subMoleExecution.jobs.toVector
    allJobs.foreach(j  removeJob(subMoleExecution, j))
    assert(subMoleExecution.jobs.isEmpty)

135
136
    val children = subMoleExecution.children.values.toVector
    children.foreach(cancel)
Romain Reuillon's avatar
Romain Reuillon committed
137
138

    removeSubMole(subMoleExecution)
139
  }
140

Romain Reuillon's avatar
Romain Reuillon committed
141
  def removeJob(subMoleExecutionState: SubMoleExecutionState, job: MoleJobId) = {
Romain Reuillon's avatar
Romain Reuillon committed
142
    val removed = subMoleExecutionState.jobs.remove(job)
Romain Reuillon's avatar
Romain Reuillon committed
143
    subMoleExecutionState.moleExecution.jobs.remove(job)
Romain Reuillon's avatar
Romain Reuillon committed
144
    if (removed) updateNbJobs(subMoleExecutionState, -1)
Romain Reuillon's avatar
Romain Reuillon committed
145
146
  }

147
148
149
150
  def addJob(subMoleExecution: SubMoleExecutionState, job: MoleJobId, capsule: MoleCapsule) =
    if (!subMoleExecution.canceled) {
      subMoleExecution.jobs.add(job)
      subMoleExecution.moleExecution.jobs.put(job, capsule)
Romain Reuillon's avatar
Romain Reuillon committed
151
      updateNbJobs(subMoleExecution, 1)
152
    }
Romain Reuillon's avatar
Romain Reuillon committed
153

154
  def updateNbJobs(subMoleExecutionState: SubMoleExecutionState, v: Int): Unit = {
Romain Reuillon's avatar
Romain Reuillon committed
155
156
157
    import subMoleExecutionState.moleExecution.executionContext.services._
    LoggerService.log(Level.FINE, s"update number of jobs of sub mole execution ${subMoleExecutionState}, add ${v} to ${subMoleExecutionState.nbJobs}")

158
159
160
    subMoleExecutionState.nbJobs = subMoleExecutionState.nbJobs + v
    subMoleExecutionState.parent.foreach(s  updateNbJobs(s, v))
  }
Romain Reuillon's avatar
Romain Reuillon committed
161

162
  def submit(subMoleExecutionState: SubMoleExecutionState, capsule: MoleCapsule, context: Context, ticket: Ticket): Unit = {
163
164
    import subMoleExecutionState.moleExecution.executionContext.services._
    if (!subMoleExecutionState.canceled) {
Romain Reuillon's avatar
Romain Reuillon committed
165
166
      val jobId = nextJobId(subMoleExecutionState.moleExecution)
      MoleExecution.addJob(subMoleExecutionState, jobId, capsule)
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184

      val sourced =
        subMoleExecutionState.moleExecution.sources(capsule).foldLeft(Context.empty) {
          case (a, s) 
            val ctx = try s.perform(subMoleExecutionState.moleExecution.implicits + context, subMoleExecutionState.moleExecution.executionContext)
            catch {
              case t: Throwable 
                Log.logger.log(Log.FINE, "Error in submole execution", t)
                val event = MoleExecution.SourceExceptionRaised(s, capsule, t, Log.SEVERE)
                eventDispatcher.trigger(subMoleExecutionState.moleExecution, event)
                cancel(subMoleExecutionState.moleExecution, Some(event))
                throw new InternalProcessingError(t, s"Error in source execution that is plugged to $capsule")
            }
            a + ctx
        } + Variable(Variable.openMOLESeed, seeder.newSeed)

      capsule match {
        case c: MasterCapsule 
185
186
          //          def stateChanged(job: MoleJob, oldState: State, newState: State) =
          //            eventDispatcher.trigger(subMoleExecutionState.moleExecution, MoleExecution.JobStatusChanged(job, c, newState, oldState))
187
188

          subMoleExecutionState.masterCapsuleExecutor.submit {
Romain Reuillon's avatar
Romain Reuillon committed
189
190
            try {
              val savedContext = subMoleExecutionState.masterCapsuleRegistry.remove(c, ticket.parentOrException).getOrElse(Context.empty)
191
              val moleJob: MoleJob = MoleJob(capsule.task, subMoleExecutionState.moleExecution.implicits + sourced + context + savedContext, jobId, (_, _)  Unit, ()  subMoleExecutionState.canceled)
Romain Reuillon's avatar
Romain Reuillon committed
192

Romain Reuillon's avatar
Romain Reuillon committed
193
              eventDispatcher.trigger(subMoleExecutionState.moleExecution, MoleExecution.JobCreated(moleJob, capsule))
194

Romain Reuillon's avatar
Romain Reuillon committed
195
196
197
198
199
200
201
202
203
              val taskContext =
                TaskExecutionContext(
                  newFile.baseDir,
                  subMoleExecutionState.moleExecution.defaultEnvironment,
                  preference,
                  threadProvider,
                  fileService,
                  workspace,
                  outputRedirection,
204
                  loggerService,
205
                  subMoleExecutionState.moleExecution.keyValueCache,
Romain Reuillon's avatar
Romain Reuillon committed
206
207
208
                  subMoleExecutionState.moleExecution.lockRepository,
                  moleExecution = Some(subMoleExecutionState.moleExecution)
                )
Romain Reuillon's avatar
Romain Reuillon committed
209

210
              val result = moleJob.perform(taskContext)
Romain Reuillon's avatar
Romain Reuillon committed
211
              MoleJob.finish(moleJob, result) // Does nothing
212
213
214
215
216
217

              result match {
                case Left(newContext)  subMoleExecutionState.masterCapsuleRegistry.register(c, ticket.parentOrException, c.toPersist(newContext))
                case _                
              }

Romain Reuillon's avatar
Romain Reuillon committed
218
              MoleExecutionMessage.send(subMoleExecutionState.moleExecution)(MoleExecutionMessage.JobFinished(subMoleExecutionState.id)(jobId, result, capsule, ticket))
Romain Reuillon's avatar
Romain Reuillon committed
219
220
221
222
            }
            catch {
              case t: Throwable  MoleExecutionMessage.send(subMoleExecutionState.moleExecution)(MoleExecutionMessage.MoleExecutionError(t))
            }
223
224
          }
        case _ 
225
          def onJobFinished(job: MoleJobId, result: Either[Context, Throwable]) =
226
            MoleExecutionMessage.send(subMoleExecutionState.moleExecution)(MoleExecutionMessage.JobFinished(subMoleExecutionState.id)(job, result, capsule, ticket))
227

228
          val newContext = subMoleExecutionState.moleExecution.implicits + sourced + context
Romain Reuillon's avatar
Romain Reuillon committed
229
          val moleJob: MoleJob = MoleJob(capsule.task, newContext, jobId, onJobFinished, ()  subMoleExecutionState.canceled)
230

231
          eventDispatcher.trigger(subMoleExecutionState.moleExecution, MoleExecution.JobCreated(moleJob, capsule))
Romain Reuillon's avatar
Romain Reuillon committed
232

233
          group(subMoleExecutionState.moleExecution, moleJob, newContext, capsule)
234
      }
235

236
237
    }
  }
238

239
240
241
  def processJobFinished(moleExecution: MoleExecution, msg: mole.MoleExecutionMessage.JobFinished) =
    if (!MoleExecution.moleJobIsFinished(moleExecution, msg.job)) {
      val state = moleExecution.subMoleExecutions(msg.subMoleExecution)
Romain Reuillon's avatar
Romain Reuillon committed
242
243
      if (!state.canceled) MoleExecution.processFinalState(state, msg.job, msg.result, msg.capsule, msg.ticket)
      removeJob(state, msg.job)
Romain Reuillon's avatar
Romain Reuillon committed
244
      MoleExecution.checkIfSubMoleIsFinished(state)
245
    }
246

Romain Reuillon's avatar
Romain Reuillon committed
247
  def performHooksAndTransitions(subMoleExecutionState: SubMoleExecutionState, job: MoleJobId, context: Context, capsule: MoleCapsule, ticket: Ticket) = {
248
249
    val mole = subMoleExecutionState.moleExecution.mole

250
    def ctxForHooks = (subMoleExecutionState.moleExecution.implicits + context) - Variable.openMOLESeed
251
252

    def executeHook(h: Hook) =
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
      try {
        def toHookExecutionContext(cache: KeyValueCache, executionContext: MoleExecutionContext) = {
          val services = executionContext.services
          HookExecutionContext(
            cache = cache,
            preference = services.preference,
            threadProvider = services.threadProvider,
            fileService = services.fileService,
            workspace = services.workspace,
            outputRedirection = services.outputRedirection,
            loggerService = services.loggerService,
            random = services.newRandom,
            newFile = services.newFile)
        }

        h.perform(ctxForHooks, toHookExecutionContext(subMoleExecutionState.moleExecution.keyValueCache, subMoleExecutionState.moleExecution.executionContext))
      }
270
271
272
273
274
275
      catch {
        case e: Throwable 
          import subMoleExecutionState.moleExecution.executionContext.services._
          val event = MoleExecution.HookExceptionRaised(h, capsule, job, e, Log.SEVERE)
          eventDispatcher.trigger(subMoleExecutionState.moleExecution, event)
          cancel(subMoleExecutionState.moleExecution, Some(event))
276
          Log.logger.log(Log.FINE, "Error in execution of misc " + h + "at the end of task " + capsule.task, e)
277
278
          throw e
      }
279

280
281
    try {
      val hooksVariables = subMoleExecutionState.moleExecution.hooks(capsule).flatMap(executeHook).unzip._2
282
283
      val newContext = context ++ hooksVariables
      mole.outputDataChannels(capsule).toSeq.foreach { d  DataChannel.provides(d, subMoleExecutionState.moleExecution.implicits + newContext, ticket, subMoleExecutionState.moleExecution) }
284

285
286
287
      for {
        transition  mole.outputTransitions(capsule).toList.sortBy(t  mole.slots(t.end.capsule).size).reverse
      } transition.perform(subMoleExecutionState.moleExecution.implicits + context, ticket, subMoleExecutionState.moleExecution, subMoleExecutionState.id, subMoleExecutionState.moleExecution.executionContext)
288

289
290
291
292
293
294
295
296
297
298
299
    }
    catch {
      case t: Throwable 
        Log.logger.log(Log.FINE, "Error in submole execution", t)
        val event = MoleExecution.ExceptionRaised(job, capsule, t, Log.SEVERE)
        import subMoleExecutionState.moleExecution.executionContext.services._
        eventDispatcher.trigger(subMoleExecutionState.moleExecution, event)
        cancel(subMoleExecutionState.moleExecution, Some(event))
        throw t
    }
  }
300

301
302
303
304
305
306
307
308
309
310
  def newSubMoleExecution(
    parent:        Option[SubMoleExecutionState],
    moleExecution: MoleExecution) = {
    val id = SubMoleExecution(moleExecution.currentSubMoleExecutionId)
    moleExecution.currentSubMoleExecutionId += 1
    val sm = new SubMoleExecutionState(id, parent, moleExecution)
    parent.foreach(_.children.put(id, sm))
    moleExecution.subMoleExecutions.put(id, sm)
    sm
  }
311

312
313
314
  def newChildSubMoleExecution(subMoleExecution: SubMoleExecutionState): SubMoleExecutionState =
    newSubMoleExecution(Some(subMoleExecution), subMoleExecution.moleExecution)

Romain Reuillon's avatar
Romain Reuillon committed
315
  def processFinalState(subMoleExecutionState: SubMoleExecutionState, job: MoleJobId, result: Either[Context, Throwable], capsule: MoleCapsule, ticket: Ticket) = {
316
317
    result match {
      case Right(e) 
318
319
320
321
        val error = MoleExecution.JobFailed(job, capsule, e)
        cancel(subMoleExecutionState.moleExecution, Some(error))
        Log.logger.log(Log.FINE, s"Error in user job execution for capsule $capsule, job state is FAILED.", e)
        subMoleExecutionState.moleExecution.executionContext.services.eventDispatcher.trigger(subMoleExecutionState.moleExecution, error)
322
323
324
      case Left(context) 
        subMoleExecutionState.moleExecution.completed(capsule) = subMoleExecutionState.moleExecution.completed(capsule) + 1
        subMoleExecutionState.moleExecution.executionContext.services.eventDispatcher.trigger(subMoleExecutionState.moleExecution, MoleExecution.JobFinished(job, context, capsule))
Romain Reuillon's avatar
Romain Reuillon committed
325
        performHooksAndTransitions(subMoleExecutionState, job, context, capsule, ticket)
326
327
    }
  }
328

329
330
331
332
  /* -------------- Mole Execution ----------------- */

  def start(moleExecution: MoleExecution, context: Option[Context]) =
    if (!moleExecution._started) {
Romain Reuillon's avatar
Romain Reuillon committed
333
334
335
      import moleExecution.executionContext.services._
      LoggerService.log(Level.FINE, "Starting mole execution")

336
337
338
      def startEnvironments() = {
        if (moleExecution.startStopDefaultEnvironment) moleExecution.defaultEnvironment.start()
        moleExecution.environments.values.foreach(_.start())
339
      }
340

341
342
343
344
345
      import moleExecution.executionContext.services._

      newFile.baseDir.mkdirs()
      moleExecution._started = true
      moleExecution._startTime = Some(System.currentTimeMillis)
346
      eventDispatcher.trigger(moleExecution, new MoleExecution.Started)
347
348
349
      startEnvironments()
      submit(moleExecution.rootSubMoleExecution, moleExecution.mole.root, context.getOrElse(Context.empty), nextTicket(moleExecution, moleExecution.rootTicket))
      checkAllWaiting(moleExecution)
350
    }
351

352
353
  private def finish(moleExecution: MoleExecution, canceled: Boolean = false) =
    if (!moleExecution._finished) {
Romain Reuillon's avatar
Romain Reuillon committed
354
355
356
      import moleExecution.executionContext.services._
      LoggerService.log(Level.FINE, s"finish mole execution $moleExecution, canceled ${canceled}")

357
358
359
      moleExecution._finished = true
      moleExecution._endTime = Some(System.currentTimeMillis)
      moleExecution.executionContext.services.eventDispatcher.trigger(moleExecution, MoleExecution.Finished(canceled = canceled))
360
      moleExecution.finishedSemaphore.release()
361

362
      moleExecution.executionContext.services.threadProvider.submit(ThreadProvider.maxPriority) { () 
363
364
365
        def stopEnvironments() = {
          if (moleExecution.startStopDefaultEnvironment) moleExecution.defaultEnvironment.stop()
          moleExecution.environments.values.foreach(_.stop())
366
        }
367
368

        try stopEnvironments()
Romain Reuillon's avatar
Romain Reuillon committed
369
        finally MoleExecutionMessage.send(moleExecution)(MoleExecutionMessage.CleanMoleExecution())
370
      }
371
    }
372

Romain Reuillon's avatar
Romain Reuillon committed
373
374
375
376
  def clean(moleExecution: MoleExecution) = {
    import moleExecution.executionContext.services._
    LoggerService.log(Level.FINE, s"clean mole execution $moleExecution")

377
378
379
380
381
    try if (moleExecution.cleanOnFinish) moleExecution.executionContext.services.newFile.baseDir.recursiveDelete
    finally {
      moleExecution._cleaned = true
      moleExecution.cleanedSemaphore.release()
      moleExecution.executionContext.services.eventDispatcher.trigger(moleExecution, MoleExecution.Cleaned())
382
    }
Romain Reuillon's avatar
Romain Reuillon committed
383
  }
384

Romain Reuillon's avatar
Romain Reuillon committed
385
  def cancel(moleExecution: MoleExecution, t: Option[MoleExecutionFailed]): Unit = {
386
    if (!moleExecution._canceled) {
Romain Reuillon's avatar
Romain Reuillon committed
387
      import moleExecution.executionContext.services._
Romain Reuillon's avatar
Romain Reuillon committed
388
389
      LoggerService.log(Level.FINE, s"cancel mole execution $moleExecution, with error $t")

390
391
392
393
394
      moleExecution._exception = t
      cancel(moleExecution.rootSubMoleExecution)
      moleExecution._canceled = true
      finish(moleExecution, canceled = true)
    }
Romain Reuillon's avatar
Romain Reuillon committed
395
  }
396
397
398
399
400

  def nextTicket(moleExecution: MoleExecution, parent: Ticket): Ticket = {
    val ticket = Ticket(parent, moleExecution.ticketNumber)
    moleExecution.ticketNumber = moleExecution.ticketNumber + 1
    ticket
Romain Reuillon's avatar
Romain Reuillon committed
401
  }
402

403
404
405
406
407
  def nextJobId(moleExecution: MoleExecution) = {
    val id = moleExecution.moleId
    moleExecution.moleId += 1
    id
  }
408

409
  def group(moleExecution: MoleExecution, moleJob: MoleJob, context: Context, capsule: MoleCapsule) = {
410
411
412
    moleExecution.grouping.get(capsule) match {
      case Some(strategy) 
        val groups = moleExecution.waitingJobs.getOrElseUpdate(capsule, collection.mutable.Map())
413
        val category = strategy.apply(context, groups.toVector)(moleExecution.newGroup, moleExecution.executionContext.services.defaultRandom)
414
415
416
417
        val jobs = groups.getOrElseUpdate(category, ListBuffer())
        jobs.append(moleJob)
        moleExecution.nbWaiting += 1

418
        if (strategy.complete(jobs)) {
419
420
421
422
423
424
425
426
          groups -= category
          moleExecution.nbWaiting -= jobs.size
          Some(Job(moleExecution, jobs.toVector)  capsule)
        }
        else None
      case None 
        val job = Job(moleExecution, Vector(moleJob))
        Some(job  capsule)
427
    }
428
  }.foreach { case (j, c)  submit(moleExecution, j, c) }
429

430
  def submit(moleExecution: MoleExecution, job: Job, capsule: MoleCapsule) = {
Romain Reuillon's avatar
Romain Reuillon committed
431
432
    val env = moleExecution.environments.getOrElse(capsule, moleExecution.defaultEnvironment)
    import moleExecution.executionContext.services._
433

Romain Reuillon's avatar
Romain Reuillon committed
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
    env match {
      case env: SubmissionEnvironment  env.submit(job)
      case env: LocalEnvironment 
        env.submit(
          job,
          TaskExecutionContext(
            newFile.baseDir,
            env,
            preference,
            threadProvider,
            fileService,
            workspace,
            outputRedirection,
            loggerService,
            moleExecution.keyValueCache,
            moleExecution.lockRepository,
            moleExecution = Some(moleExecution)
451
          )
Romain Reuillon's avatar
Romain Reuillon committed
452
        )
453
    }
Romain Reuillon's avatar
Romain Reuillon committed
454
455

    eventDispatcher.trigger(moleExecution, MoleExecution.JobSubmitted(job, capsule, env))
456
  }
Romain Reuillon's avatar
Romain Reuillon committed
457

458
  def submitAll(moleExecution: MoleExecution) = {
459
460
461
462
    for {
      (capsule, groups)  moleExecution.waitingJobs
      (_, jobs)  groups.toList
    } submit(moleExecution, Job(moleExecution, jobs), capsule)
463
464
    moleExecution.nbWaiting = 0
    moleExecution.waitingJobs.clear
465
466
  }

Romain Reuillon's avatar
Romain Reuillon committed
467
  def removeSubMole(subMoleExecutionState: SubMoleExecutionState) = {
468
469
470
471
    subMoleExecutionState.parent.foreach(s  s.children.remove(subMoleExecutionState.id))
    subMoleExecutionState.moleExecution.subMoleExecutions.remove(subMoleExecutionState.id)
  }

Romain Reuillon's avatar
Romain Reuillon committed
472
473
  def checkIfSubMoleIsFinished(state: SubMoleExecutionState) = {
    def hasMessages = state.moleExecution.messageQueue.all.exists(MoleExecutionMessage.msgForSubMole(_, state))
Romain Reuillon's avatar
Romain Reuillon committed
474

Romain Reuillon's avatar
Romain Reuillon committed
475
476
    if (state.nbJobs == 0 && !hasMessages) {
      state.onFinish.foreach(_(state))
Romain Reuillon's avatar
Romain Reuillon committed
477
      removeSubMole(state)
478
    }
479
  }
480

481
  def moleJobIsFinished(moleExecution: MoleExecution, id: MoleJobId) = !moleExecution.jobs.contains(id)
482

483
484
485
  def checkAllWaiting(moleExecution: MoleExecution) =
    if (moleExecution.rootSubMoleExecution.nbJobs <= moleExecution.nbWaiting) MoleExecution.submitAll(moleExecution)

486
487
  def checkMoleExecutionIsFinished(moleExecution: MoleExecution) = {
    import moleExecution.executionContext.services._
Romain Reuillon's avatar
Romain Reuillon committed
488
489
490
491

    def jobs = if (moleExecution.rootSubMoleExecution.nbJobs <= 5) s": ${moleExecution.jobs}" else ""
    def subMoles = if (moleExecution.rootSubMoleExecution.nbJobs <= 5) s" - ${moleExecution.subMoleExecutions.map(s ⇒ s._2.canceled -> s._2.jobs)}" else ""
    LoggerService.log(Level.FINE, s"check if mole execution $moleExecution is finished, message queue empty ${moleExecution.messageQueue.isEmpty}, number of jobs ${moleExecution.rootSubMoleExecution.nbJobs}${jobs}${subMoles}")
492
    if (moleExecution.messageQueue.isEmpty && moleExecution.rootSubMoleExecution.nbJobs == 0) MoleExecution.finish(moleExecution)
493
  }
494

495
  def allJobIds(moleExecution: MoleExecution) = moleExecution.jobs.toVector
496

497
  def capsuleStatuses(moleExecution: MoleExecution, jobs: Seq[(MoleJobId, MoleCapsule)], completed: Map[MoleCapsule, Long]): CapsuleStatuses = {
498

499
    val runningSet: java.util.HashSet[Long] = {
500
      def submissionEnvironments = moleExecution.environments.values.toSeq.collect { case e: SubmissionEnvironment  e }
501
      def localEnvironments = moleExecution.environments.values.toSeq.collect { case e: LocalEnvironment  e } ++ Seq(moleExecution.defaultEnvironment)
502

503
      val set = new java.util.HashSet[Long](jobs.size + 1, 1.0f)
504
505

      for {
506
        env  submissionEnvironments
507
        ej  env.runningJobs
508
509
        id  ej.moleJobIds
      } set.add(id)
510

511
512
513
514
515
      for {
        env  localEnvironments
        ej  env.runningJobs
      } set.add(ej.id)

516
      set
517
518
    }

519
    def isRunning(moleJob: MoleJobId): Boolean = runningSet.contains(moleJob)
520

521
522
    val ready = collection.mutable.Map[MoleCapsule, Long]()
    val running = collection.mutable.Map[MoleCapsule, Long]()
523

524
    def increment(map: collection.mutable.Map[MoleCapsule, Long], key: MoleCapsule) = {
525
526
527
      val value = map.getOrElse(key, 0L)
      map.update(key, value + 1)
    }
528
529

    for {
530
      (moleJob, capsule)  jobs
531
    } {
532
533
      if (isRunning(moleJob)) increment(running, capsule)
      else increment(ready, capsule)
534
    }
535

536
    moleExecution.mole.capsules.map { c 
537
538
539
540
541
542
543
      c ->
        MoleExecution.JobStatuses(
          ready = ready.getOrElse(c, 0L),
          running = running.getOrElse(c, 0L),
          completed = completed.getOrElse(c, 0L)
        )
    }.toMap
544
  }
Romain Reuillon's avatar
Romain Reuillon committed
545

546
547
548
549
550
551
552
  class SubMoleExecutionState(
    val id:            SubMoleExecution,
    val parent:        Option[SubMoleExecutionState],
    val moleExecution: MoleExecution) {

    import moleExecution.executionContext.services._

Romain Reuillon's avatar
Romain Reuillon committed
553
554
555
    var nbJobs = 0L
    var children = collection.mutable.TreeMap[SubMoleExecution, SubMoleExecutionState]()
    var jobs = collection.mutable.TreeSet[MoleJobId]()
556

557
    @volatile var canceled = false
558

559
560
561
562
563
564
565
    val onFinish = collection.mutable.ListBuffer[(SubMoleExecutionState  Any)]()
    val masterCapsuleRegistry = new MasterCapsuleRegistry
    val aggregationTransitionRegistry = new AggregationTransitionRegistry
    val transitionRegistry = new TransitionRegistry
    lazy val masterCapsuleExecutor = Executors.newSingleThreadExecutor(threadProvider.threadFactory)
  }

566
567
568
569
570
571
572
573
574
575
576
577
578
579
  object SynchronisationContext {
    implicit def default = Synchronized
    def apply[T](th: Any, op:  T)(implicit s: SynchronisationContext) =
      s match {
        case MoleExecution.Synchronized  synchronized(op)
        case MoleExecution.UnsafeAccess  op
      }

  }

  sealed trait SynchronisationContext
  case object Synchronized extends SynchronisationContext
  case object UnsafeAccess extends SynchronisationContext

580
581
582
583
584
585
586
587
588
589
590
591
}

object SubMoleExecution {
  implicit def ordering: Ordering[SubMoleExecution] = Ordering.by[SubMoleExecution, Long](_.id)
}

case class SubMoleExecution(id: Long) extends AnyVal

sealed trait MoleExecutionMessage

object MoleExecutionMessage {
  case class PerformTransition(subMoleExecution: SubMoleExecution)(val operation: SubMoleExecutionState  Unit) extends MoleExecutionMessage
592
  case class JobFinished(subMoleExecution: SubMoleExecution)(val job: MoleJobId, val result: Either[Context, Throwable], val capsule: MoleCapsule, val ticket: Ticket) extends MoleExecutionMessage //, val state: State, val capsule: MoleCapsule, val ticket: Ticket) extends MoleExecutionMessage
593
594
595
  case class WithMoleExecutionSate(operation: MoleExecution  Unit) extends MoleExecutionMessage
  case class StartMoleExecution(context: Option[Context]) extends MoleExecutionMessage
  case class CancelMoleExecution() extends MoleExecutionMessage
Romain Reuillon's avatar
Romain Reuillon committed
596
  case class CleanMoleExecution() extends MoleExecutionMessage
Romain Reuillon's avatar
Romain Reuillon committed
597
  case class MoleExecutionError(t: Throwable) extends MoleExecutionMessage
598

599
600
601
602
  def msgForSubMole(msg: MoleExecutionMessage, subMoleExecutionState: SubMoleExecutionState) = msg match {
    case msg: PerformTransition  msg.subMoleExecution == subMoleExecutionState.id
    case msg: JobFinished        msg.subMoleExecution == subMoleExecutionState.id
    case _                       false
603
604
  }

605
606
  def messagePriority(moleExecutionMessage: MoleExecutionMessage) =
    moleExecutionMessage match {
607
608
609
      case _: CancelMoleExecution  100
      case _: PerformTransition    10
      case _                       1
610
611
    }

612
613
  def send(moleExecution: MoleExecution)(moleExecutionMessage: MoleExecutionMessage, priority: Option[Int] = None) =
    moleExecution.messageQueue.enqueue(moleExecutionMessage, priority getOrElse messagePriority(moleExecutionMessage))
614
615

  def dispatch(moleExecution: MoleExecution, msg: MoleExecutionMessage) = moleExecution.synchronized {
Romain Reuillon's avatar
Romain Reuillon committed
616
617
618
    import moleExecution.executionContext.services._
    LoggerService.log(Level.FINE, s"processing message $msg in mole execution $moleExecution")

619
    try {
620
      msg match {
Romain Reuillon's avatar
Romain Reuillon committed
621
622
623
624
625
626
        case msg: PerformTransition 
          if (!moleExecution._canceled) {
            val state = moleExecution.subMoleExecutions(msg.subMoleExecution)
            if (!state.canceled) msg.operation(state)
            MoleExecution.checkIfSubMoleIsFinished(state)
          }
627
628
629
        case msg: JobFinished            MoleExecution.processJobFinished(moleExecution, msg)
        case msg: StartMoleExecution     MoleExecution.start(moleExecution, msg.context)
        case msg: CancelMoleExecution    MoleExecution.cancel(moleExecution, None)
630
        case msg: WithMoleExecutionSate  msg.operation(moleExecution)
631
        case msg: CleanMoleExecution     MoleExecution.clean(moleExecution)
Romain Reuillon's avatar
Romain Reuillon committed
632
        case msg: MoleExecutionError     MoleExecution.cancel(moleExecution, Some(MoleExecution.MoleExecutionError(msg.t)))
633
      }
634
635
636
637
    }
    catch {
      case t: Throwable  MoleExecution.cancel(moleExecution, Some(MoleExecution.MoleExecutionError(t)))
    }
638
639
640
641
642
643

    MoleExecution.checkAllWaiting(moleExecution)
    MoleExecution.checkMoleExecutionIsFinished(moleExecution)
  }

  def dispatcher(moleExecution: MoleExecution) =
644
    while (!(moleExecution._cleaned)) {
Romain Reuillon's avatar
Romain Reuillon committed
645
      val msg = moleExecution.messageQueue.dequeue
646
      dispatch(moleExecution, msg)
Romain Reuillon's avatar
Romain Reuillon committed
647
    }
648

649
650
651
652
653
654
}

class MoleExecution(
  val mole:                        Mole,
  val sources:                     Sources,
  val hooks:                       Hooks,
655
656
  val environmentProviders:        Map[MoleCapsule, EnvironmentProvider],
  val grouping:                    Map[MoleCapsule, Grouping],
657
658
659
660
661
  val defaultEnvironmentProvider:  LocalEnvironmentProvider,
  val cleanOnFinish:               Boolean,
  val implicits:                   Context,
  val executionContext:            MoleExecutionContext,
  val startStopDefaultEnvironment: Boolean,
662
  val id:                          String,
663
  val keyValueCache:               KeyValueCache,
664
  val lockRepository:              LockRepository[LockKey]
665
666
) {

667
  val messageQueue = PriorityQueue[MoleExecutionMessage](fifo = true)
668
669
670
671

  private[mole] var _started = false
  private[mole] var _canceled = false
  private[mole] var _finished = false
672
  private[mole] var _cleaned = false
673

674
675
676
  private val finishedSemaphore = new Semaphore(0)
  private val cleanedSemaphore = new Semaphore(0)

677
678
679
680
681
  def sync[T](op:  T)(implicit s: MoleExecution.SynchronisationContext) = MoleExecution.SynchronisationContext(this, op)

  def started(implicit s: MoleExecution.SynchronisationContext) = sync(_started)
  def canceled(implicit s: MoleExecution.SynchronisationContext) = sync(_canceled)
  def finished(implicit s: MoleExecution.SynchronisationContext) = sync(_finished)
682

683
  def cleaned(implicit s: MoleExecution.SynchronisationContext) = sync(_cleaned)
684
685
686
687

  private[mole] var _startTime: Option[Long] = None
  private[mole] var _endTime: Option[Long] = None

688
689
  def startTime(implicit s: MoleExecution.SynchronisationContext) = sync(_startTime)
  def endTime(implicit s: MoleExecution.SynchronisationContext) = sync(_endTime)
690
691
692
693

  private[mole] var ticketNumber = 1L
  private[mole] val rootTicket = Ticket(id, 0)

694
695
  private[mole] var moleId = 0L

696
697
  private[mole] val newGroup = NewGroup()

698
  private[mole] val waitingJobs = collection.mutable.Map[MoleCapsule, collection.mutable.Map[MoleJobGroup, ListBuffer[MoleJob]]]()
699
700
701
  private[mole] var nbWaiting = 0

  private[mole] val completed = {
702
    val map = collection.mutable.Map[MoleCapsule, Long]()
703
704
705
706
    map ++= mole.capsules.map(_ -> 0L)
    map
  }

707
  lazy val environmentInstances = environmentProviders.toVector.map { case (k, v)  v }.distinct.map { v  v  v(executionContext.services) }.toMap
708
  lazy val environments = environmentProviders.toVector.map { case (k, v)  k  environmentInstances(v) }.toMap
709
  lazy val defaultEnvironment = defaultEnvironmentProvider(executionContext.services)
710

711
712
713
714
715
716
717
  def allEnvironments = (environmentInstances.values ++ Seq(defaultEnvironment)).toVector.distinct

  lazy val rootSubMoleExecution = MoleExecution.newSubMoleExecution(None, this)
  lazy val subMoleExecutions = collection.mutable.TreeMap[SubMoleExecution, SubMoleExecutionState]()

  private[mole] var currentSubMoleExecutionId = 0L

718
  private[mole] val jobs = collection.mutable.TreeMap[MoleJobId, MoleCapsule]()
719

720
721
722
  private[workflow] val dataChannelRegistry = new RegistryWithTicket[DataChannel, Buffer[Variable[_]]]
  private[mole] var _exception = Option.empty[MoleExecutionFailed]

723
  def exception(implicit s: MoleExecution.SynchronisationContext) = sync(_exception)
724

725
726
  def duration(implicit s: MoleExecution.SynchronisationContext): Option[Long] = sync {
    (startTime, endTime) match {
727
728
729
      case (None, _)           None
      case (Some(t), None)     Some(System.currentTimeMillis - t)
      case (Some(s), Some(e))  Some(e - s)
730
    }
731
  }
732

733
734
  def run: Unit = run(None)

735
736
737
738
739
740
  def validate = {
    import executionContext.services._
    val validationErrors = Validation(mole, implicits, sources, hooks)
    if (!validationErrors.isEmpty) throw new UserBadDataError(s"Formal validation of your mole has failed, ${validationErrors.size} error(s) has(ve) been found.\n" + validationErrors.mkString("\n") + s"\nIn mole: $mole")
  }

741
742
  def run(context: Option[Context] = None, validate: Boolean = true) = {
    if (!_started) {
743
      if (validate) this.validate
744
745
746
747
      MoleExecutionMessage.send(this)(MoleExecutionMessage.StartMoleExecution(context))
      MoleExecutionMessage.dispatcher(this)
      _exception.foreach(e  throw e.exception)
      this
748
    }
749
    else this
750
751
  }

752
  def start(doValidation: Boolean) = {
753
    import executionContext.services._
754
755
    if (doValidation) validate
    val t = threadProvider.newThread { ()  run(None, validate = doValidation) }
756
757
758
    t.start()
    this
  }
Romain Reuillon's avatar
Romain Reuillon committed
759

760
761
762
763
764
765
  def hangOn(cleaned: Boolean = true) = {
    if (cleaned) cleanedSemaphore.acquireAndRelease()
    else finishedSemaphore.acquireAndRelease()
    this
  }

766
  def cancel = MoleExecutionMessage.send(this)(MoleExecutionMessage.CancelMoleExecution())
767
768

  def capsuleStatuses(implicit s: MoleExecution.SynchronisationContext) = {
769
    val (jobs, cmp) = sync { (MoleExecution.allJobIds(this).toVector, completed.toMap) }
770
771
    MoleExecution.capsuleStatuses(this, jobs, cmp)
  }
772

Romain Reuillon's avatar
Romain Reuillon committed
773
}