Commit fea3a484 authored by Mathieu's avatar Mathieu
Browse files

Fix stream forward

parent a72df66f
......@@ -51,7 +51,9 @@ lazy val server = project.in(file("server")) settings (defaultSettings) settings
"fr.hmil" %% "roshttp" % rosHttpVersion,
"org.scalaj" %% "scalaj-http" % "2.4.2",
"io.skuber" %% "skuber" % skuberVersion,
"io.suzaku" %% "boopickle" % "1.2.6"
"io.suzaku" %% "boopickle" % "1.3.1",
"org.apache.httpcomponents" % "httpclient" % "4.5.9",
"org.apache.httpcomponents" % "httpmime" % "4.5.9",
)
) dependsOn (shared) enablePlugins (ScalatraPlugin)
......
package org.openmoleconnect.server
import java.nio.ByteBuffer
import java.nio.{ByteBuffer, ByteOrder}
import fr.hmil.roshttp.Method
import fr.hmil.roshttp.body.{ByteBufferBody, PlainTextBody}
......@@ -16,10 +16,16 @@ import scalatags.Text.{all => tags}
import scala.concurrent.duration._
import shared.Data._
import boopickle.Default._
import fr.hmil.roshttp.HttpRequest
import monix.execution.Scheduler.Implicits.global
import fr.hmil.roshttp.response.SimpleHttpResponse
import javax.servlet.ServletInputStream
import org.apache.commons.io.IOUtils
import org.apache.http.client.HttpClient
import org.apache.http.client.methods.HttpPost
import org.apache.http.client.utils.URIBuilder
import org.apache.http.entity.InputStreamEntity
import org.apache.http.impl.client.HttpClients
import org.openmoleconnect.server.DB._
......@@ -35,15 +41,15 @@ class ConnectServlet(arguments: ConnectServer.ServletArguments) extends Scalatra
)
def waitForGet(httpRequest: HttpRequest) = {
Await.result(
httpRequest.get()
, 1 minute)
Await.result(
httpRequest.get()
, 1 minute)
}
def waitForPost(httpRequest: HttpRequest) = {
Await.result(
httpRequest.withMethod(Method.POST).send(),
1 minute)
Await.result(
httpRequest.withMethod(Method.POST).send(),
1 minute)
}
def waitForPost2(httpRequest: HttpRequest) = {
......@@ -71,7 +77,7 @@ class ConnectServlet(arguments: ConnectServer.ServletArguments) extends Scalatra
//def withForwardRequest(hostIP: String)(action: HttpRequest => ActionResult): ActionResult = {
def withForwardRequest(hostIP: String)(action: HttpRequest => ActionResult): ActionResult = {
action(baseForwardRequest.withHost(hostIP).withPort(80).withPath(""))
action(baseForwardRequest.withHost(hostIP).withPort(80).withPath(""))
}
def withAccesToken(action: TokenData => ActionResult): Serializable = {
......@@ -112,101 +118,44 @@ class ConnectServlet(arguments: ConnectServer.ServletArguments) extends Scalatra
NotFound()
}
var incr = 0
def toUnsignedByte(bytes:Array[Byte]):Short = {
val aByte:Int = 0xff & bytes(0).asInstanceOf[Int]
aByte.asInstanceOf[Short]
}
// OM instance requests
post("/*") {
println("POST /*")
withAccesToken { tokenData =>
tokenData.host.hostIP.map { hip =>
withForwardRequest(hip) { forwardRequest =>
multiParams("splat").headOption match {
case Some(path) =>
val is = request.getInputStream
val bytes: Array[Byte] = Iterator.continually(is.read()).takeWhile(_ != -1).map(_.asInstanceOf[Byte]).toArray[Byte]
val bb = ByteBuffer.wrap(bytes)
println("----------- REQUEST " + path)
for {
b <- bytes
} yield {
println("B " + b)
}
// val req = waitForPost(
// forwardRequest.withPath(s"/$path").withHeader("Content-Type", "application/octet-stream") //withBody(ByteBufferBody(bb))
// )
//
//
// val reqByte = req.body.asInstanceOf[ByteBuffer]
//
//
// println("Body " + req)
// val data = Array.ofDim[Byte](reqByte.remaining())
// reqByte.get(data)
// // Ok(data)
//
// println("Data " + data)
// if (req.statusCode < 400) Ok(data)
// else NotFound()
val req = waitForPost(
forwardRequest.withPath(s"/$path").withHeader("Content-Type", "application/octet-stream").withBody(ByteBufferBody(bb))
)
response addHeader("Content-Type", "application/octet-stream")
println("----------- RESPONSE " + path)
val is2 = req.body.getBytes//.map{_ & 0xff }
for {
b <- is2
} yield {
println("B " + b)
}
println("----------------")
val bbuffer = ByteBuffer.wrap(is2)
val data = Array.ofDim[Byte](bbuffer.remaining)
val getdata = bbuffer.get(data)
// val is2 = req.body.map{TypedArrayBuffer.wrap}
//val is2 = req.body
// println("GET DATA " + getdata)
//
// println("DATA " + data)
// for {
// b <- data
// } yield {
// println("B " + b)
// }
//
// if (incr == 2) {
// println("INCR " + incr)
// println("unpickle: " + Unpickle[ListFilesData].tryFromBytes(data.asInstanceOf[ByteBuffer]))
//
// // val bytes2: Array[Byte] = Iterator.continually(is.read()).takeWhile(_ != -1).map(_.asInstanceOf[Byte]).toArray[Byte]
// // val bb2 = ByteBuffer.wrap(is2)
// } else {
// incr = incr + 1
// }
if (req.statusCode < 400) Ok(data) //Ok(is2)
else NotFound()
case None => NotFound()
// withForwardRequest(hip) { forwardRequest =>
val forwardRequest = baseForwardRequest.withHost(hip).withPort(80).withPath("")
multiParams("splat").headOption match {
case Some(path) =>
val is = request.getInputStream
println("request: " + s"http://$hip/$path")
val uri = new URIBuilder()
.setScheme("http")
.setHost(hip)
.setPort(80)
.setPath(path)
.build()
val httpPost = new HttpPost(uri)
httpPost.setEntity(new InputStreamEntity(is))
val filtred = Seq("Content-Length")
request.getHeaderNames.filter(n => !filtred.contains(n)).foreach {
n => httpPost.setHeader(n, request.getHeader(n))
}
// Add timeout
val client = HttpClients.createDefault()
val forwardResponse = client.execute(httpPost)
response.setStatus(forwardResponse.getStatusLine.getStatusCode)
IOUtils.copy(forwardResponse.getEntity.getContent, response.getOutputStream())
Ok()
case None => NotFound()
// }
}
}.getOrElse(NotFound())
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment