Commit e6f81162 authored by Romain Reuillon's avatar Romain Reuillon
Browse files

Merge branch 'master' into 9-dev

parents a33b55ca 72e864cd
Pipeline #330 failed with stage
in 1 minute and 30 seconds
......@@ -134,19 +134,21 @@ object MoleExecution extends JavaLogger {
val children = subMoleExecution.children.values.toVector
children.foreach(cancel)
subMoleExecution.parent.foreach(_.children.remove(subMoleExecution.id))
removeSubMole(subMoleExecution)
}
def removeJob(subMoleExecutionState: SubMoleExecutionState, job: MoleJobId) = {
subMoleExecutionState.jobs.remove(job)
val removed = subMoleExecutionState.jobs.remove(job)
subMoleExecutionState.moleExecution.jobs.remove(job)
updateNbJobs(subMoleExecutionState, -1)
if (removed) updateNbJobs(subMoleExecutionState, -1)
}
def addJob(subMoleExecution: SubMoleExecutionState, job: MoleJobId, capsule: MoleCapsule) =
if (!subMoleExecution.canceled) {
subMoleExecution.jobs.add(job)
subMoleExecution.moleExecution.jobs.put(job, capsule)
updateNbJobs(subMoleExecution, 1)
}
def updateNbJobs(subMoleExecutionState: SubMoleExecutionState, v: Int): Unit = {
......@@ -160,7 +162,8 @@ object MoleExecution extends JavaLogger {
def submit(subMoleExecutionState: SubMoleExecutionState, capsule: MoleCapsule, context: Context, ticket: Ticket): Unit = {
import subMoleExecutionState.moleExecution.executionContext.services._
if (!subMoleExecutionState.canceled) {
updateNbJobs(subMoleExecutionState, 1)
val jobId = nextJobId(subMoleExecutionState.moleExecution)
MoleExecution.addJob(subMoleExecutionState, jobId, capsule)
val sourced =
subMoleExecutionState.moleExecution.sources(capsule).foldLeft(Context.empty) {
......@@ -182,14 +185,12 @@ object MoleExecution extends JavaLogger {
// def stateChanged(job: MoleJob, oldState: State, newState: State) =
// eventDispatcher.trigger(subMoleExecutionState.moleExecution, MoleExecution.JobStatusChanged(job, c, newState, oldState))
val jobId = nextJobId(subMoleExecutionState.moleExecution)
subMoleExecutionState.masterCapsuleExecutor.submit {
try {
val savedContext = subMoleExecutionState.masterCapsuleRegistry.remove(c, ticket.parentOrException).getOrElse(Context.empty)
val moleJob: MoleJob = MoleJob(capsule.task, subMoleExecutionState.moleExecution.implicits + sourced + context + savedContext, jobId, (_, _) Unit, () subMoleExecutionState.canceled)
eventDispatcher.trigger(subMoleExecutionState.moleExecution, MoleExecution.JobCreated(moleJob, capsule))
MoleExecutionMessage.send(subMoleExecutionState.moleExecution)(MoleExecutionMessage.RegisterJob(subMoleExecutionState, moleJob, capsule))
val taskContext =
TaskExecutionContext(
......@@ -205,15 +206,16 @@ object MoleExecution extends JavaLogger {
subMoleExecutionState.moleExecution.lockRepository,
moleExecution = Some(subMoleExecutionState.moleExecution)
)
val result = moleJob.perform(taskContext)
MoleJob.finish(moleJob, result)
MoleJob.finish(moleJob, result) // Does nothing
result match {
case Left(newContext) subMoleExecutionState.masterCapsuleRegistry.register(c, ticket.parentOrException, c.toPersist(newContext))
case _
}
MoleExecutionMessage.send(subMoleExecutionState.moleExecution)(MoleExecutionMessage.JobFinished(subMoleExecutionState.id)(moleJob.id, result, capsule, ticket))
MoleExecutionMessage.send(subMoleExecutionState.moleExecution)(MoleExecutionMessage.JobFinished(subMoleExecutionState.id)(jobId, result, capsule, ticket))
}
catch {
case t: Throwable MoleExecutionMessage.send(subMoleExecutionState.moleExecution)(MoleExecutionMessage.MoleExecutionError(t))
......@@ -224,9 +226,8 @@ object MoleExecution extends JavaLogger {
MoleExecutionMessage.send(subMoleExecutionState.moleExecution)(MoleExecutionMessage.JobFinished(subMoleExecutionState.id)(job, result, capsule, ticket))
val newContext = subMoleExecutionState.moleExecution.implicits + sourced + context
val moleJob: MoleJob = MoleJob(capsule.task, newContext, nextJobId(subMoleExecutionState.moleExecution), onJobFinished, () subMoleExecutionState.canceled)
val moleJob: MoleJob = MoleJob(capsule.task, newContext, jobId, onJobFinished, () subMoleExecutionState.canceled)
MoleExecution.addJob(subMoleExecutionState, moleJob.id, capsule)
eventDispatcher.trigger(subMoleExecutionState.moleExecution, MoleExecution.JobCreated(moleJob, capsule))
group(subMoleExecutionState.moleExecution, moleJob, newContext, capsule)
......@@ -238,10 +239,8 @@ object MoleExecution extends JavaLogger {
def processJobFinished(moleExecution: MoleExecution, msg: mole.MoleExecutionMessage.JobFinished) =
if (!MoleExecution.moleJobIsFinished(moleExecution, msg.job)) {
val state = moleExecution.subMoleExecutions(msg.subMoleExecution)
if (!state.canceled) {
try MoleExecution.processFinalState(state, msg.job, msg.result, msg.capsule, msg.ticket)
finally removeJob(state, msg.job)
}
if (!state.canceled) MoleExecution.processFinalState(state, msg.job, msg.result, msg.capsule, msg.ticket)
removeJob(state, msg.job)
MoleExecution.checkIfSubMoleIsFinished(state)
}
......@@ -465,7 +464,7 @@ object MoleExecution extends JavaLogger {
moleExecution.waitingJobs.clear
}
def clean(subMoleExecutionState: SubMoleExecutionState) = {
def removeSubMole(subMoleExecutionState: SubMoleExecutionState) = {
subMoleExecutionState.parent.foreach(s s.children.remove(subMoleExecutionState.id))
subMoleExecutionState.moleExecution.subMoleExecutions.remove(subMoleExecutionState.id)
}
......@@ -475,7 +474,7 @@ object MoleExecution extends JavaLogger {
if (state.nbJobs == 0 && !hasMessages) {
state.onFinish.foreach(_(state))
MoleExecution.clean(state)
removeSubMole(state)
}
}
......@@ -490,7 +489,6 @@ object MoleExecution extends JavaLogger {
def jobs = if (moleExecution.rootSubMoleExecution.nbJobs <= 5) s": ${moleExecution.jobs}" else ""
def subMoles = if (moleExecution.rootSubMoleExecution.nbJobs <= 5) s" - ${moleExecution.subMoleExecutions.map(s ⇒ s._2.canceled -> s._2.jobs)}" else ""
LoggerService.log(Level.FINE, s"check if mole execution $moleExecution is finished, message queue empty ${moleExecution.messageQueue.isEmpty}, number of jobs ${moleExecution.rootSubMoleExecution.nbJobs}${jobs}${subMoles}")
if (moleExecution.messageQueue.isEmpty && moleExecution.rootSubMoleExecution.nbJobs == 0) MoleExecution.finish(moleExecution)
}
......@@ -595,7 +593,6 @@ object MoleExecutionMessage {
case class WithMoleExecutionSate(operation: MoleExecution Unit) extends MoleExecutionMessage
case class StartMoleExecution(context: Option[Context]) extends MoleExecutionMessage
case class CancelMoleExecution() extends MoleExecutionMessage
case class RegisterJob(subMoleExecution: SubMoleExecutionState, job: MoleJob, capsule: MoleCapsule) extends MoleExecutionMessage
case class CleanMoleExecution() extends MoleExecutionMessage
case class MoleExecutionError(t: Throwable) extends MoleExecutionMessage
......@@ -607,7 +604,6 @@ object MoleExecutionMessage {
def messagePriority(moleExecutionMessage: MoleExecutionMessage) =
moleExecutionMessage match {
case _: RegisterJob 200
case _: CancelMoleExecution 100
case _: PerformTransition 10
case _ 1
......@@ -631,7 +627,6 @@ object MoleExecutionMessage {
case msg: JobFinished MoleExecution.processJobFinished(moleExecution, msg)
case msg: StartMoleExecution MoleExecution.start(moleExecution, msg.context)
case msg: CancelMoleExecution MoleExecution.cancel(moleExecution, None)
case msg: RegisterJob MoleExecution.addJob(msg.subMoleExecution, msg.job.id, msg.capsule)
case msg: WithMoleExecutionSate msg.operation(moleExecution)
case msg: CleanMoleExecution MoleExecution.clean(moleExecution)
case msg: MoleExecutionError MoleExecution.cancel(moleExecution, Some(MoleExecution.MoleExecutionError(msg.t)))
......@@ -647,7 +642,7 @@ object MoleExecutionMessage {
def dispatcher(moleExecution: MoleExecution) =
while (!(moleExecution._cleaned)) {
val msg = moleExecution.messageQueue.dequeue()
val msg = moleExecution.messageQueue.dequeue
dispatch(moleExecution, msg)
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment