I try to use the asynchronous servlet with Jetty 9, but I get IllegalStateException from HttpOutput class. The same code works well with Tomcat.
This is my test code (in Scala).
TestServers.scala ------------------------------------------------------------------------------------------------------------------------
package playground.webserver
import java.io.File
import javax.servlet.Servlet
import org.apache.catalina.startup.Tomcat
import org.eclipse.jetty.server.Server
import org.eclipse.jetty.servlet.{ServletHolder, ServletContextHandler}
;
trait TestServer {
val port: Int
def start: Unit
def stop: Unit
def addServlet(servlet: Servlet, contextPath: String)
}
class TomcatTestServer(val port: Int) extends TestServer {
private val tomcat = new Tomcat()
tomcat.setPort(port)
override def start: Unit = tomcat.start()
override def stop: Unit = tomcat.stop()
override def addServlet(servlet: Servlet, contextPath: String): Unit = {
val base = new File(System.getProperty("java.io.tmpdir"));
val context = tomcat.addContext(contextPath, base.getAbsolutePath());
tomcat.addServlet(contextPath, contextPath, servlet)
context.addServletMapping("/", contextPath)
}
}
class JettyTestServer(val port: Int) extends TestServer {
private val jetty = new Server(port)
private val context = new ServletContextHandler(ServletContextHandler.SESSIONS)
context.setContextPath("/")
jetty.setHandler(context)
override def start: Unit = jetty.start()
override def stop: Unit = jetty.stop()
override def addServlet(servlet: Servlet, contextPath: String): Unit = {
context.addServlet(new ServletHolder(servlet), contextPath)
}
}
AsyncIoTest.scala------------------------------------------------------------------------------------------------------------------------------------
package playground.webserver
import java.net.ServerSocket
import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}
import javax.servlet.{AsyncContext, WriteListener}
import org.apache.http.client.methods.HttpGet
import org.apache.http.impl.client.HttpClients
import org.scalatest.{BeforeAndAfterAll, FunSpec, Matchers}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
abstract class AsyncIoTest extends FunSpec with Matchers with BeforeAndAfterAll {
private val data = "" Array[Byte](1024 * 1024 * 128)
class DataWriteListener(context: AsyncContext) extends WriteListener {
private[this] var pos = 0
override def onError(t: Throwable): Unit = {
context.getRequest.getServletContext.log("Async Error", t)
context.complete()
}
override def onWritePossible(): Unit = {
val out = context.getResponse.getOutputStream
while (out.isReady && pos < data.length) {
val toWrite = math.min(1024, data.length - pos)
out.write(data, pos, toWrite)
pos += toWrite
}
if (pos >= data.length) {
context.complete()
}
}
}
class AsyncServlet extends HttpServlet {
override def doGet(req: HttpServletRequest, resp: HttpServletResponse): Unit = {
req.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);
resp.setStatus(200)
resp.setContentLength(data.length)
val async = req.startAsync()
val out = resp.getOutputStream
out.setWriteListener(new DataWriteListener(async))
}
}
class SyncServlet extends HttpServlet {
override def doGet(req: HttpServletRequest, resp: HttpServletResponse): Unit = {
resp.setStatus(200)
resp.setContentLength(data.length)
val out = resp.getOutputStream
var pos = 0
while(pos < data.length) {
out.write(data, pos, 1024)
pos += 1024
}
}
}
protected val server: TestServer
protected lazy val port: Int = findAvailablePort()
private val requestNum = 100
private def findAvailablePort(): Int = {
val serverSocket = new ServerSocket(0)
val port = serverSocket.getLocalPort()
serverSocket.close()
port
}
describe("Sync IO") {
it("trigger some traffic") {
testWithUrl(s"http://localhost:${port}/sync")
}
}
describe("Async IO") {
it("trigger some traffic") {
testWithUrl(s"http://localhost:${port}/async")
}
}
override protected def beforeAll(): Unit = {
server.addServlet(new AsyncServlet, "/async")
server.addServlet(new SyncServlet, "/sync")
server.start
}
override protected def afterAll(): Unit = {
server.stop
}
private def testWithUrl(url: String): Unit = {
val start = System.currentTimeMillis
val futures = (1 to requestNum).map { i =>
Future {
val total = readData(url)
total should be(1024 * 1024 * 128)
total
}
}
Await.result(Future.sequence(futures), Duration.Inf)
println(s"Total millis used: ${System.currentTimeMillis - start}")
}
private def readData(url: String): Long = {
val httpclient = HttpClients.createDefault()
val httpGet = new HttpGet(url)
val resp = httpclient.execute(httpGet)
val is = resp.getEntity.getContent
val buf = new Array[Byte](1024 * 1024)
var read = 0
var total = 0L
while(read != -1) {
total += read
read = is.read(buf)
}
resp.close()
total
}
}
class AsyncIoWithJettyTest extends AsyncIoTest {
override protected val server: TestServer = new JettyTestServer(port)
}
class AsyncIoWithTomcatTest extends AsyncIoTest {
override protected val server: TestServer = new TomcatTestServer(port)
}
This is the error I got.
[qtp990355670-39] WARN org.eclipse.jetty.util.thread.QueuedThreadPool -
[qtp990355670-32] WARN org.eclipse.jetty.server.HttpChannel - //localhost:51809/async
java.lang.IllegalStateException
at org.eclipse.jetty.server.HttpOutput$AsyncICB.onCompleteSuccess(HttpOutput.java:990)
at org.eclipse.jetty.server.HttpOutput$AsyncWrite.onCompleteSuccess(HttpOutput.java:1126)
at org.eclipse.jetty.util.IteratingCallback.processing(IteratingCallback.java:325)
at org.eclipse.jetty.util.IteratingCallback.succeeded(IteratingCallback.java:365)
at org.eclipse.jetty.server.HttpConnection$SendCallback.onCompleteSuccess(HttpConnection.java:747)
at org.eclipse.jetty.util.IteratingCallback.processing(IteratingCallback.java:325)
at org.eclipse.jetty.util.IteratingCallback.succeeded(IteratingCallback.java:365)
at org.eclipse.jetty.io.WriteFlusher$PendingState.complete(WriteFlusher.java:269)
at org.eclipse.jetty.io.WriteFlusher.completeWrite(WriteFlusher.java:394)
at org.eclipse.jetty.io.SelectChannelEndPoint$3.run(SelectChannelEndPoint.java:89)
at org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.produceAndRun(ExecuteProduceConsume.java:213)
at org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:147)
at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:654)
at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:572)
at java.lang.Thread.run(Thread.java:745)
[qtp990355670-39] WARN org.eclipse.jetty.util.thread.QueuedThreadPool - Unexpected thread death: org.eclipse.jetty.util.thread.QueuedThreadPool$3@26b0a9dd in qtp990355670{STARTED,8<=20<=200,i=9,q=0}
java.lang.NullPointerException
at playground.webserver.AsyncIoTest$DataWriteListener.onError(AsyncIoTest.scala:22)
at org.eclipse.jetty.server.HttpOutput.close(HttpOutput.java:201)
at org.eclipse.jetty.server.Response.closeOutput(Response.java:987)
at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:412)
at org.eclipse.jetty.server.HttpChannel.run(HttpChannel.java:262)
at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:654)
at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:572)
at java.lang.Thread.run(Thread.java:745)
......