Commit 96f64410 authored by Romain Reuillon's avatar Romain Reuillon
Browse files

[Plugin] enh: compress job serialization

parent 9d7a4826
Pipeline #1252 passed with stages
in 55 minutes and 37 seconds
......@@ -84,7 +84,7 @@ class Runtime {
val executionMessage =
newFile.withTmpFile { executionMessageFileCache
retry(storage.download(inputMessagePath, executionMessageFileCache), transferRetry)
serializerService.deserializeAndExtractFiles[ExecutionMessage](executionMessageFileCache, deleteFilesOnGC = true)
serializerService.deserializeAndExtractFiles[ExecutionMessage](executionMessageFileCache, deleteFilesOnGC = true, gz = true)
}
val systemOut = OutputManager.systemOutput
......
......@@ -111,7 +111,7 @@ object SimExplorer extends JavaLogger {
logger.fine("plugins: " + config.pluginPath.get + " " + new File(config.pluginPath.get).listFilesSafe.mkString(","))
val storage = serializerService.deserializeAndExtractFiles[RemoteStorage](new File(config.storage.get), deleteFilesOnGC = true)
val storage = serializerService.deserializeAndExtractFiles[RemoteStorage](new File(config.storage.get), deleteFilesOnGC = true, gz = true)
new Runtime().apply(
storage,
......
......@@ -95,8 +95,8 @@ class SerializerService { service ⇒
def deserialize[T](is: InputStream): T = buildXStream().fromXML(is).asInstanceOf[T]
def deserializeAndExtractFiles[T](file: File, deleteFilesOnGC: Boolean)(implicit newFile: TmpDirectory, fileService: FileService): T = {
val tis = new TarInputStream(file.bufferedInputStream)
def deserializeAndExtractFiles[T](file: File, deleteFilesOnGC: Boolean, gz: Boolean = false)(implicit newFile: TmpDirectory, fileService: FileService): T = {
val tis = new TarInputStream(file.bufferedInputStream(gz = gz))
try deserializeAndExtractFiles(tis, deleteFilesOnGC = deleteFilesOnGC)
finally tis.close
}
......@@ -110,8 +110,8 @@ class SerializerService { service ⇒
}
}
def serializeAndArchiveFiles(obj: Any, f: File)(implicit newFile: TmpDirectory): Unit = {
val os = new TarOutputStream(f.bufferedOutputStream())
def serializeAndArchiveFiles(obj: Any, f: File, gz: Boolean = false)(implicit newFile: TmpDirectory): Unit = {
val os = new TarOutputStream(f.bufferedOutputStream(gz = gz))
try serializeAndArchiveFiles(obj, os)
finally os.close
}
......@@ -127,8 +127,8 @@ class SerializerService { service ⇒
def pluginsAndFiles(obj: Any) = pluginAndFileListing().list(obj)
def deserializeReplaceFiles[T](file: File, files: Map[String, File]): T = {
val is = file.bufferedInputStream
def deserializeReplaceFiles[T](file: File, files: Map[String, File], gz: Boolean = false): T = {
val is = file.bufferedInputStream(gz = gz)
try deserializeReplaceFiles[T](is, files)
finally is.close
}
......
......@@ -82,7 +82,7 @@ class JarWizardApiImpl(s: Services) extends JarWizardAPI {
Some(BasicLaunchingCommand(Some(JavaLikeLanguage()), ""))
def jarClasses(jarPath: SafePath) = {
val zip = new ZipInputStream(jarPath.toFile.bufferedInputStream)
val zip = new ZipInputStream(jarPath.toFile.bufferedInputStream())
var classes: Seq[FullClass] = Seq()
......
......@@ -236,7 +236,7 @@ object BatchEnvironment {
/* ---- upload the execution message ----*/
val inputPath =
newFile.withTmpFile("job", ".tar") { executionMessageFile
serializerService.serializeAndArchiveFiles(executionMessage, executionMessageFile)
serializerService.serializeAndArchiveFiles(executionMessage, executionMessageFile, gz = true)
signalUpload(eventDispatcher.eventId, upload(executionMessageFile, TransferOptions(noLink = true, canMove = true)), executionMessageFile, environment, storageId)
}
......@@ -244,7 +244,7 @@ object BatchEnvironment {
services.newFile.withTmpFile("remoteStorage", ".tar") { storageFile
import org.openmole.tool.hash._
import services._
services.serializerService.serializeAndArchiveFiles(remoteStorage, storageFile)
services.serializerService.serializeAndArchiveFiles(remoteStorage, storageFile, gz = true)
val hash = storageFile.hash().toString()
val path = signalUpload(eventDispatcher.eventId, upload(storageFile, TransferOptions(noLink = true, canMove = true, raw = true)), storageFile, environment, storageId)
FileMessage(path, hash)
......
......@@ -35,22 +35,15 @@ object StorageInterface {
}
def upload(compressed: Boolean, uploadStream: (() InputStream, String) Unit)(src: File, dest: String, options: TransferOptions = TransferOptions.default): Unit = {
def fileStream() = src.bufferedInputStream
if (compressed) {
def compressedFileStream() = src.bufferedInputStream.toGZiped
if (!options.raw) uploadStream(compressedFileStream, dest) else uploadStream(fileStream, dest)
}
else uploadStream(fileStream, dest)
def fileStream() = src.bufferedInputStream()
def compressedFileStream() = src.bufferedInputStream(gz = true)
if (compressed && !options.raw) uploadStream(compressedFileStream, dest) else uploadStream(fileStream, dest)
}
def download(compressed: Boolean, downloadStream: (String, InputStream Unit) Unit)(src: String, dest: File, options: TransferOptions = TransferOptions.default): Unit = {
def downloadFile(is: InputStream) = Files.copy(is, dest.toPath)
if (compressed) {
def uncompressed(is: InputStream) = downloadFile(is.toGZ)
if (!options.raw) downloadStream(src, uncompressed) else downloadStream(src, downloadFile)
}
else downloadStream(src, downloadFile)
def uncompressed(is: InputStream) = downloadFile(is.toGZ)
if (compressed && !options.raw) downloadStream(src, uncompressed) else downloadStream(src, downloadFile)
}
def isDirectory(name: String) = name.endsWith("/")
......
......@@ -59,7 +59,8 @@ object LogicalLinkStorage {
LogicalLinkStorage.upload(s, src, uploadDestination, options)
uploadDestination
}
override def download(src: String, dest: File, options: TransferOptions)(implicit newFile: TmpDirectory): Unit = LogicalLinkStorage.download(s, src, dest, options)
override def download(src: String, dest: File, options: TransferOptions)(implicit newFile: TmpDirectory): Unit =
LogicalLinkStorage.download(s, src, dest, options)
}
}
......
......@@ -56,11 +56,11 @@ package object ssh {
override def rmFile(t: SSHStorage, path: String): Unit = t.accessControl { gssh.rmFile(t, path) }
override def upload(t: SSHStorage, src: File, dest: String, options: TransferOptions): Unit =t.accessControl {
override def upload(t: SSHStorage, src: File, dest: String, options: TransferOptions): Unit = t.accessControl {
StorageInterface.upload(false, gssh.writeFile(t, _, _))(src, dest, options)
}
override def download(t: SSHStorage, src: String, dest: File, options: TransferOptions): Unit =t.accessControl {
override def download(t: SSHStorage, src: String, dest: File, options: TransferOptions): Unit = t.accessControl {
StorageInterface.download(false, gssh.readFile[Unit](t, _, _))(src, dest, options)
}
......
......@@ -128,13 +128,13 @@ package file {
else copyFile(toF, followSymlinks)
}
def copy(to: OutputStream) = withClosable(bufferedInputStream) {
def copy(to: OutputStream) = withClosable(bufferedInputStream()) {
_.copy(to)
}
// TODO replace with NIO
def copy(to: OutputStream, maxRead: Int, timeout: Time)(implicit pool: ThreadPoolExecutor): Unit =
withClosable(bufferedInputStream) {
withClosable(bufferedInputStream()) {
_.copy(to, maxRead, timeout)
}
......@@ -143,7 +143,7 @@ package file {
toF
}
def copyUncompressFile(toF: File): File = withClosable(new GZIPInputStream(file.bufferedInputStream)) { from
def copyUncompressFile(toF: File): File = withClosable(new GZIPInputStream(file.bufferedInputStream())) { from
Files.copy(from, toF, StandardCopyOption.REPLACE_EXISTING)
toF
}
......@@ -407,7 +407,9 @@ package file {
finally lockFile.delete()
}
def bufferedInputStream = new BufferedInputStream(Files.newInputStream(file))
def bufferedInputStream(gz: Boolean = false) =
if (!gz) new BufferedInputStream(Files.newInputStream(file))
else new GZIPInputStream(new BufferedInputStream(Files.newInputStream(file)))
private def writeOptions(append: Boolean) = {
import StandardOpenOption._
......@@ -420,7 +422,7 @@ package file {
else new BufferedOutputStream(Files.newOutputStream(file.toPath, writeOptions(append): _*).toGZ)
}
def gzippedBufferedInputStream = new GZIPInputStream(bufferedInputStream)
def gzippedBufferedInputStream = new GZIPInputStream(bufferedInputStream())
def gzippedBufferedOutputStream = new GZIPOutputStream(bufferedOutputStream())
......@@ -448,7 +450,7 @@ package file {
def withFileOutputStream[T] = withClosable[FileOutputStream, T](new FileOutputStream(file))(_)
def withInputStream[T] = withClosable[InputStream, T](bufferedInputStream)(_)
def withInputStream[T] = withClosable[InputStream, T](bufferedInputStream())(_)
def withReader[T] = withClosable[Reader, T](Files.newBufferedReader(file.toPath))(_)
......@@ -463,7 +465,7 @@ package file {
withClosable[DirectoryStream[Path], T](open)(_)
}
def withSource[T] = withClosable[Source, T](Source.fromInputStream(bufferedInputStream))(_)
def withSource[T] = withClosable[Source, T](Source.fromInputStream(bufferedInputStream()))(_)
def wrapError[T](f: T): T =
try f
......
......@@ -36,7 +36,7 @@ package object osgi {
def openInputStream(c: ClassSource) =
c match {
case ClassFile(_, file) file.bufferedInputStream
case ClassFile(_, file) file.bufferedInputStream()
case ClassByteCode(_, byteCode) new ByteArrayInputStream(byteCode)
}
}
......
......@@ -146,7 +146,7 @@ package object tar {
}
def extract(dest: File, overwrite: Boolean = false) =
withClosable(new TarInputStream(file.bufferedInputStream)) {
withClosable(new TarInputStream(file.bufferedInputStream())) {
_.extract(dest, overwrite)
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment