Commit 19b5fe4d authored by Romain Reuillon's avatar Romain Reuillon
Browse files

[Plugin] enh: refactor storage interface.

parent 1d6ba634
......@@ -81,7 +81,7 @@ def allThirdParties = Seq(
openmoleNetwork,
txtmark)
lazy val openmoleCache = OsgiProject(thirdPartiesDir, "org.openmole.tool.cache", imports = Seq("*")) dependsOn (openmoleLogger) settings (thirdPartiesSettings: _*) settings (libraryDependencies += Libraries.squants)
lazy val openmoleCache = OsgiProject(thirdPartiesDir, "org.openmole.tool.cache", imports = Seq("*")) dependsOn (openmoleLogger) settings (thirdPartiesSettings: _*) settings (libraryDependencies += Libraries.squants, libraryDependencies += Libraries.cats)
lazy val openmoleTar = OsgiProject(thirdPartiesDir, "org.openmole.tool.tar", imports = Seq("*")) dependsOn (openmoleFile) settings (thirdPartiesSettings: _*)
lazy val openmoleFile = OsgiProject(thirdPartiesDir, "org.openmole.tool.file", imports = Seq("*")) dependsOn(openmoleLock, openmoleStream, openmoleLogger) settings (thirdPartiesSettings: _*)
lazy val openmoleLock = OsgiProject(thirdPartiesDir, "org.openmole.tool.lock", imports = Seq("*")) settings (thirdPartiesSettings: _*)
......
......@@ -24,6 +24,7 @@ import org.openmole.core.communication.message._
import org.openmole.core.communication.storage._
import org.openmole.core.exception.UserBadDataError
import org.openmole.core.workflow.job._
import org.openmole.plugin.environment.batch
import org.openmole.plugin.environment.batch.environment.BatchEnvironment.signalUpload
import org.openmole.plugin.environment.batch.environment._
import org.openmole.plugin.environment.batch.storage._
......@@ -73,7 +74,7 @@ object UploadActor extends JavaLogger {
val plugins = new TreeSet[File]()(fileOrdering) ++ job.plugins
val files = (new TreeSet[File]()(fileOrdering) ++ job.files) diff plugins
val communicationPath = storage.child(storage.tmpDir, UUID.randomUUID.toString)
val communicationPath = storage.child(storage.tmpDirectory(token), StorageService.timedUniqName)
storage.makeDir(communicationPath)
val inputPath = storage.child(communicationPath, uniqName("job", ".in"))
......@@ -114,8 +115,8 @@ object UploadActor extends JavaLogger {
val hash = services.fileService.hash(toReplicate).toString
def upload = {
val name = uniqName(System.currentTimeMillis.toString, ".rep")
val newFile = storage.child(storage.persistentDir, name)
val name = batch.storage.StorageService.timedUniqName
val newFile = storage.child(storage.persistentDirectory(token), name)
Log.logger.fine(s"Upload $toReplicate to $newFile on ${storage.id} mode $fileMode")
signalUpload(eventDispatcher.eventId, storage.upload(toReplicate, newFile, options), toReplicate, newFile, storage)
newFile
......
......@@ -27,23 +27,6 @@ import org.openmole.tool.stream._
import scala.ref.WeakReference
//object Storage {
// val BufferSize = ConfigurationLocation("Storage", "BufferSize", Some(64 kilobytes))
// val CopyTimeout = ConfigurationLocation("Storage", "CopyTimeout", Some(1 minutes))
// val CloseTimeout = ConfigurationLocation("Storage", "CloseTimeout", Some(1 minutes))
//}
//
//trait CompressedTransfer <: Storage {
//
// override abstract def _uploadStream(src: InputStream, dest: String, options: TransferOptions) =
// if (!options.raw) super._uploadStream(src.toGZiped, dest, options) else super._uploadStream(src, dest, options)
//
// override abstract protected def _downloadStream(src: String, options: TransferOptions) =
// if (!options.raw) super._downloadStream(src, options).toGZ else super._downloadStream(src, options)
//
//}
//
object StorageInterface {
def remote[S](s: S)(implicit storage: StorageInterface[S]) =
......@@ -72,6 +55,8 @@ object StorageInterface {
else downloadStream(src, downloadFile)
}
def isDirectory(name: String) = name.endsWith("/")
}
trait StorageInterface[T] {
......
......@@ -60,181 +60,144 @@ object StorageService extends JavaLogger {
remoteStorage: RemoteStorage,
concurrency: Int,
isConnectionError: Throwable Boolean
)(implicit storageInterface: StorageInterface[S], threadProvider: ThreadProvider, preference: Preference) = {
)(implicit storageInterface: StorageInterface[S], threadProvider: ThreadProvider, preference: Preference, replicaCatalog: ReplicaCatalog) = {
val usageControl = UsageControl(concurrency)
val storage = new StorageService[S](s, root, id, environment, remoteStorage, usageControl, isConnectionError)
import cats.implicits._
val baseDir = Lazy(createBasePath(s, root, isConnectionError))
val persistentDirCache = baseDir.map { baseDir
val dir = storageInterface.child(s, baseDir, persistent)
if (!storageInterface.exists(s, dir)) storageInterface.makeDir(s, dir)
cleanPersistent(s, dir, id)
dir
}
def persistentDir(accessToken: AccessToken) = persistentDirCache()
val tmpDirCache = baseDir.map { baseDir
val dir = storageInterface.child(s, baseDir, tmp)
if (!storageInterface.exists(s, dir)) storageInterface.makeDir(s, dir)
cleanTmp(s, dir)
dir
}
def tmpDir(token: AccessToken) = tmpDirCache()
val storage = new StorageService[S](s, persistentDir, tmpDir, id, environment, remoteStorage, usageControl)
startGC(storage)
storage
}
def timedUniqName = org.openmole.tool.file.uniqName(System.currentTimeMillis.toString, ".rep", separator = "_")
lazy val replicationPattern = Pattern.compile("(\\p{XDigit}*)_.*")
def extractTimeFromName(name: String) = {
val matcher = replicationPattern.matcher(name)
if (!matcher.matches) None
else Try(matcher.group(1).toLong).toOption
}
}
import org.openmole.plugin.environment.batch.storage.StorageService.Log._
import org.openmole.plugin.environment.batch.storage.StorageService._
class StorageService[S](
s: S,
val root: String,
val id: String,
val environment: BatchEnvironment,
val remoteStorage: RemoteStorage,
val usageControl: UsageControl,
isConnectionError: Throwable Boolean,
qualityHysteresis: Int = 100
)(implicit storage: StorageInterface[S]) {
import environment.services
import environment.services._
val _directoryCache = Cache {
CacheBuilder.
newBuilder().
expireAfterWrite(preference(StorageService.DirRegenerate).millis, TimeUnit.MILLISECONDS).
build[String, String]()
}
val _serializedRemoteStorage = Cache {
val file = newFile.newFile("remoteStorage", ".xml")
fileService.deleteWhenGarbageCollected(file)
serializerService.serialiseAndArchiveFiles(remoteStorage, file)
file
}
def serializedRemoteStorage = _serializedRemoteStorage()
def directoryCache = _directoryCache()
protected implicit def callable[T](f: () T): Callable[T] = new Callable[T]() { def call() = f() }
def clean(implicit token: AccessToken) = {
replicaCatalog.deleteReplicas(this)
rmDir(baseDir)
directoryCache.invalidateAll
}
def baseDir(implicit token: AccessToken): String =
unwrap { directoryCache.get("baseDir", () createBasePath) }
def cleanTmp[S](s: S, tmpDirectory: String)(implicit storageInterface: StorageInterface[S], preference: Preference) = {
val entries = storageInterface.list(s, tmpDirectory)
val removalDate = System.currentTimeMillis - preference(TmpDirRemoval).toMillis
protected def createBasePath(implicit token: AccessToken) = {
val rootPath = mkRootDir
val basePath = storage.child(s, rootPath, baseDirName)
util.Try(makeDir(basePath)) match {
case util.Success(_) basePath
case util.Failure(e)
if (exists(basePath)) basePath else throw e
}
}
def remove(name: String) = extractTimeFromName(name).map(_ < removalDate).getOrElse(true)
protected def mkRootDir(implicit token: AccessToken): String = synchronized {
val paths = Iterator.iterate[Option[String]](Some(root))(p p.flatMap(parent)).takeWhile(_.isDefined).toSeq.reverse.flatten
paths.tail.foldLeft(paths.head.toString) {
(path, file)
val childPath = storage.child(s, path, storage.name(s, file))
try makeDir(childPath)
catch {
case e: Throwable if isConnectionError(e) throw e
case e: Throwable logger.log(FINE, "Error creating base directory " + root, e)
}
childPath
for {
entry entries
if remove(entry.name)
} {
val path = storageInterface.child(s, tmpDirectory, entry.name)
if (entry.`type` == FileType.Directory) storageInterface.rmDir(s, path)
else storageInterface.rmFile(s, path)
}
}
def persistentDir(implicit token: AccessToken): String =
unwrap { directoryCache.get("persistentDir", () createPersistentDir) }
private def createPersistentDir(implicit token: AccessToken) = {
val persistentPath = storage.child(s, baseDir, persistent)
if (!exists(persistentPath)) makeDir(persistentPath)
def cleanPersistent[S](s: S, persistentPath: String, storageId: String)(implicit replicaCatalog: ReplicaCatalog, preference: Preference, storageInterface: StorageInterface[S]) = {
def graceIsOver(name: String) =
StorageService.extractTimeFromName(name).map {
_ + preference(ReplicaCatalog.ReplicaGraceTime).toMillis < System.currentTimeMillis
}.getOrElse(true)
val names = listNames(persistentPath)
val inReplica = replicaCatalog.forPaths(names.map { storage.child(s, persistentPath, _) }, Seq(this.id)).map(_.path).toSet
val names = storageInterface.list(s, persistentPath).map(_.name)
val inReplica = replicaCatalog.forPaths(names.map { storageInterface.child(s, persistentPath, _) }, Seq(storageId)).map(_.path).toSet
for {
name names
if graceIsOver(name)
} {
val path = storage.child(s, persistentPath, name)
if (!inReplica.contains(path)) backgroundRmFile(path)
val path = storageInterface.child(s, persistentPath, name)
if (!inReplica.contains(path)) storageInterface.rmFile(s, path)
}
persistentPath
}
def tmpDir(implicit token: AccessToken) =
unwrap { directoryCache.get("tmpDir", () createTmpDir) }
private def createTmpDir(implicit token: AccessToken) = {
val time = System.currentTimeMillis
def createBasePath[S](s: S, root: String, isConnectionError: Throwable Boolean)(implicit storageInterface: StorageInterface[S], preference: Preference) = {
def baseDirName = "openmole-" + preference(Preference.uniqueID) + '/'
val tmpNoTime = storage.child(s, baseDir, tmp)
if (!exists(tmpNoTime)) makeDir(tmpNoTime)
val removalDate = System.currentTimeMillis - preference(TmpDirRemoval).toMillis
def mkRootDir: String = synchronized {
val paths = Iterator.iterate[Option[String]](Some(root))(p p.flatMap(storageInterface.parent(s, _))).takeWhile(_.isDefined).toSeq.reverse.flatten
for (entry list(tmpNoTime)) {
val childPath = storage.child(s, tmpNoTime, entry.name)
def rmDir =
try {
val timeOfDir = (if (entry.name.endsWith("/")) entry.name.substring(0, entry.name.length - 1) else entry.name).toLong
if (timeOfDir < removalDate) backgroundRmDir(childPath)
}
catch {
case (ex: NumberFormatException) backgroundRmDir(childPath)
}
entry.`type` match {
case FileType.Directory rmDir
case FileType.File backgroundRmFile(childPath)
case FileType.Link backgroundRmFile(childPath)
case FileType.Unknown
try rmDir
paths.tail.foldLeft(paths.head.toString) {
(path, file)
val childPath = storageInterface.child(s, path, storageInterface.name(s, file))
try storageInterface.makeDir(s, childPath)
catch {
case e: Throwable backgroundRmFile(childPath)
case e: Throwable if isConnectionError(e) throw e
case e: Throwable Log.logger.log(Log.FINE, "Error creating base directory " + root, e)
}
childPath
}
}
val tmpTimePath = storage.child(s, tmpNoTime, time.toString)
util.Try(makeDir(tmpTimePath)) match {
case util.Success(_) tmpTimePath
val rootPath = mkRootDir
val basePath = storageInterface.child(s, rootPath, baseDirName)
util.Try(storageInterface.makeDir(s, basePath)) match {
case util.Success(_) basePath
case util.Failure(e)
if (exists(tmpTimePath)) tmpTimePath else throw e
if (storageInterface.exists(s, basePath)) basePath else throw e
}
}
}
class StorageService[S](
s: S,
val persistentDirectory: AccessToken String,
val tmpDirectory: AccessToken String,
val id: String,
val environment: BatchEnvironment,
val remoteStorage: RemoteStorage,
val usageControl: UsageControl,
qualityHysteresis: Int = 100
)(implicit storage: StorageInterface[S]) {
import environment.services
import environment.services._
val _serializedRemoteStorage = Cache {
val file = newFile.newFile("remoteStorage", ".xml")
fileService.deleteWhenGarbageCollected(file)
serializerService.serialiseAndArchiveFiles(remoteStorage, file)
file
}
def serializedRemoteStorage = _serializedRemoteStorage()
override def toString: String = id
lazy val quality = QualityControl(qualityHysteresis)
def exists(path: String)(implicit token: AccessToken): Boolean = token.access { quality { storage.exists(s, path) } }
def listNames(path: String)(implicit token: AccessToken): Seq[String] = token.access { quality { storage.list(s, path).map(_.name) } }
def list(path: String)(implicit token: AccessToken): Seq[ListEntry] = token.access { quality { storage.list(s, path) } }
def makeDir(path: String)(implicit token: AccessToken): Unit = token.access { quality { storage.makeDir(s, path) } }
def rmDir(path: String)(implicit token: AccessToken): Unit = token.access { quality { storage.rmDir(s, path) } }
def rmFile(path: String)(implicit token: AccessToken): Unit = token.access { quality { storage.rmFile(s, path) } }
def mv(from: String, to: String)(implicit token: AccessToken) = token.access { quality { storage.mv(s, from, to) } }
def parent(path: String): Option[String] = storage.parent(s, path)
def name(path: String) = storage.name(s, path)
def child(path: String, name: String) = storage.child(s, path, name)
def upload(src: File, dest: String, options: TransferOptions = TransferOptions.default)(implicit token: AccessToken) = token.access { quality { storage.upload(s, src, dest, options) } }
def download(src: String, dest: File, options: TransferOptions = TransferOptions.default)(implicit token: AccessToken) = token.access { quality { storage.download(s, src, dest, options) } }
def baseDirName = "openmole-" + preference(Preference.uniqueID) + '/'
def backgroundRmFile(path: String) = JobManager ! DeleteFile(this, path, false)
def backgroundRmDir(path: String) = JobManager ! DeleteFile(this, path, true)
......
......@@ -37,7 +37,7 @@ object SharedStorage extends JavaLogger {
val runtimeInstall = runtimePrefix + runtime.runtime.hash
val (workdir, scriptName) = {
val installDir = sharedFS.child(sharedFS.baseDir, "install")
val installDir = sharedFS.child(sharedFS.tmpDirectory(token), "install")
util.Try(sharedFS.makeDir(installDir))
val workdir = sharedFS.child(installDir, preference(Preference.uniqueID) + "_install")
......
......@@ -29,6 +29,7 @@ import org.openmole.plugin.environment.batch.storage.{ StorageInterface, Storage
import org.openmole.plugin.environment.gridscale.{ LocalStorage, LogicalLinkStorage }
import squants.time.Time
import effectaside._
import org.openmole.core.replication.ReplicaCatalog
package object ssh {
// class RemoteLogicalLinkStorage(val root: String) extends LogicalLinkStorage with SimpleStorage
......@@ -66,7 +67,7 @@ package object ssh {
concurrency: Int,
root: String,
sharedDirectory: Option[String],
forceCopyOnRemoteStorage: Boolean = false)(implicit threadProvider: ThreadProvider, preference: Preference, localInterpreter: Effect[_root_.gridscale.local.Local]) = {
forceCopyOnRemoteStorage: Boolean = false)(implicit threadProvider: ThreadProvider, preference: Preference, replicaCatalog: ReplicaCatalog, localInterpreter: Effect[_root_.gridscale.local.Local]) = {
val remoteStorage = StorageInterface.remote(LogicalLinkStorage(forceCopy = forceCopyOnRemoteStorage))
def id = new URI("file", null, "localhost", -1, sharedDirectory.orNull, null, null).toString
StorageService(LocalStorage(), root, id, environment, remoteStorage, concurrency, t false)
......@@ -82,7 +83,7 @@ package object ssh {
sharedDirectory: Option[String],
storageSharedLocally: Boolean,
forceCopyOnRemoteStorage: Boolean = false
)(implicit storageInterface: StorageInterface[S], threadProvider: ThreadProvider, preference: Preference) = {
)(implicit storageInterface: StorageInterface[S], threadProvider: ThreadProvider, preference: Preference, replicaCatalog: ReplicaCatalog) = {
val root = sharedDirectory match {
case Some(p) p
......
package org.openmole.tool.cache
import cats._
object Lazy {
def apply[T](t: T): Lazy[T] = new Lazy[T] {
override lazy val content = t
}
implicit def isMonoid: Functor[Lazy] = new Functor[Lazy] {
override def map[A, B](fa: Lazy[A])(f: A B): Lazy[B] = Lazy[B](f(fa()))
}
}
trait Lazy[+T] <: (() T) {
......
......@@ -513,7 +513,7 @@ package file {
else (doubleBytes / tB).formatted("%.2f").toString + "TB"
}
def uniqName(prefix: String, sufix: String) = prefix + "_" + UUID.randomUUID.toString + sufix
def uniqName(prefix: String, sufix: String, separator: String = "_") = prefix + separator + UUID.randomUUID.toString + sufix
def acceptDirectory = new Filter[Path] {
def accept(entry: Path): Boolean = Files.isDirectory(entry)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment