Skip to main content

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [List Home]
Re: [jetty-dev] jetty-9 websockets API

Inline responses: (again, but to the correct list *doh*)

On Tue, Jul 3, 2012 at 4:03 AM, Greg Wilkins <gregw@xxxxxxxxxxx> wrote:
Jesse/Joakime

I've moved this to jetty-dev where it should be.

+1
 

I had a quick look at the proposed standard websocket API.... it does
not shout "I'm wonderful" at me.   However the interesting thing is
the feedback from others - there appears to be support for using
streams to send/receive large messages independently from frame
boundaries.   We should also look at it more closely to see if we can
get at least convergence on naming etc.

The use of streams within scope of Jetty 9 introduces some apparent nasties with regards to blocking and threading (more about that below)
Jesse and I noodled thru a few ideas on how to accomplish this while maintaining the original event driven approach currently in the Jetty 9 codebase. (also, more about that below)
 

I can see the argument that frames are a transport artefact and should
not effect the application - so if messages are too large to fit in
memory then the application should not really have to deal with
frames.   Streams are good for this because they allow the application
to decide how big buffers to pass to a read or how bug a blob to write
- and these sized can be independent or at least decoupled from frame
size.

Jesse and I have been discussing adding 2 layers to isolate the Framing from the Messaging philosophies.
As for incoming frames a new MessageAggregator would be wedged between the Parser and WebSocketEventDriver.
And a MessageGenerator would be introduced in the WebSocketConnection.write() methods for messages to attempt to break down large messages (based on policy) before writing out on the AsyncEndPoint.write()
 

We brings me back to our current jetty-9 implementation and I think we
do need to review how we expose frames.

I'm not so sure we need different classes fro
Ping/Pong/Text/Data/Binary etc frames.   Specially for data, we cannot
apply any interpretation on the frames themselves.  it is only when
frames are aggregated into messages that we can say they are binary or
text.   Also I do see scope for extensions changing both the number
and type of frames as they are processed, so perhaps a single Frame
class is better than polymorphic types.   If we remove the UTF-8
processing from TextFrame, there is not much left of that class!

Having the individual specific frame types currently makes things easier to understand, and also makes the End-User WebSocket Pojo declarations easier to route/understand as well.  Should the end user want to deal with frames, then  can declare which frame types they would be interested in.
As for going for the single Frame object approach, i would discourage that as well, as it makes maintenance complicated with all of the various rules, parsing, and generating that would introduce (don't want a repeat of Jetty 7's monolithic generator/parser).
Also, when the extensions get wired back up, they will be wired up at the BaseFrame level.
Beside, we followed the SPDY lead here too. with the individual frames.
And with respect to the UTF8 checks, those are just there to honor the RFC. 

Now, that being said.
Having TextFrame's do the UTF8 check is really an inappropriate place as the RFC really talks about the Message level requiring that UTF8 validation.  However, the autobahn test expect validation the frames level and fail fast in an example of a invalid codepoint.  The tests also allow a split codepoint between frames with no failure.
So to resolve this we have the MessageAggregator and MessageGenerator introduced below to aggregate/fragment large messages and presumably validate the UTF8 requirement there.

How about we fix the TextFrame validation for now, and re-evaluate maybe going back down to 3 frame types (BaseFrame -> (DataFrame | ControlFrame)) within the next few weeks (as we start integrating the extension chain back into the mix).  
 

The WebSocketListener API is good because it delivers messages not
frames.   I guess the mood in JCP356 is to support methods like

  void onWebSocketBinary(InputStream in)

so that large messages can be delivered.  I think this gives a
threading problem as streams are blocking.  But we will have to
monitor where that goes.

Jesse and I have been favoring a more simple approach.

onWebSocketMessage(Message message)

Message object
    boolean isComplete() - indicates if the complete String or binary blob is contained in the message
    boolean isText() - special check for content sent as Op=TEXT with assumption that the content will be UTF8 compliant
      // Message Data APIs
    String getText() - returns the complete text of the message
    ByteBuffer getBytes() - returns the complete binary blob of the message
      // Streaming APIs
    Reader getReader() - returns a reader for streaming a text message
    InputStream getStream() - returns a reader for streaming a binary message
    ReadableByteChannel getChannel() - returns a channel for reading for streaming back the message

If the end user used the .getReader() / .getStream() / or .getChannel() then it is assumed that
the end user wants to treat further parts of this same message in a Streaming API style

Message notification is governed my WebSocketPolicy.getMaxTextMessageSize() (and .getMaxBinaryMessageSize())

3 usage techniques from End-User Socket Pojo perspective
  1. Type 1 - Simple message, fits entirely within Message object.
    isComplete=true
    1. End user uses .getText() does something with it
    2. onMessage() returns
    3. MessageAggregator sees that the Message isComplete and resets it for another message boundary.
  2. Type 2 - Large message, does not fit within defined policy sizes.
    isComplete=false
    1. End user uses .getReader() to access the information.
    2. End user spins up a thread to process the reader. (IMPORTANT otherwise onFillable / Parser is blocked)
    3. onMessage() returns.
    4. MessageAggregator inspects the Message object it just used and determines that the delivery mode should be via the Streaming API
    5. MessageAggregator continues to reuse the old Message object and populates further continuation frames into the Streaming API.
    6. End user's thread continues to read from the Stream.
    7. MessageAggregator hits a FIN=TRUE and closes the stream.
    8. End user's thread ends (hopefully)
  3. Type 3 - Large message, does not fit within defined policy sizes.
    isComplete=false
    1. End user uses .getBytes() to get the data
    2. End user saves the bytes to disk (or some other task)
    3. onMessage() returns
    4. MessageAggregator sees the isComplete=false and no use of a Streaming API
    5. MessageAggregator dumps/resets the Message object
    6. Parser notifies MessageAggregator of new Continuation frame.
    7. MessageListener sees no active Message object and constructs a new one
    8. Once policy size thresholds are reached a new Message object is notified in onMessage()
 

On the otherside, WebSocketConnection has a write(BaseFrame frame)
method.   I think that is wrong.  Applications write messages not
frames and it is our implementations job to turn messages into frames.

We do have message writes as well - so I think we need to have a
different interface that allows frames to be written (and that would
be the interface exposed to extensions that are frame aware).

I like
    void write(ByteBuffer... buffers) throws IOException;
    <C> void write(C context, Callback<C> callback, ByteBuffer...
buffers) throws IOException;
    <C> void write(C context, Callback<C> callback, String...
messages) throws IOException;

but why
    void write(String message) throws IOException;
instead of
    void write(String... message) throws IOException;

are the methods without callbacks meant to be blocking? the javadoc
says non-blocking?

All writes are non-blocking (or at least they should be IMO)
Now, if we introduce the Streaming API stuff, this would imply the following new APIs on our end as well..

Traditional approach
// binary streaming (1 message)
OutputStream WebSocketConnection.newOutputStream(); 
// text streaming (1 message)
Writer WebSocketConnection.newWriter();
// binary nio streaming (1 message)
WritableByteChannel WebSocketConnection.newOutputChannel();

Open Questions:
  1. Requests for a new writer (for example) should close the prior newWriter()?
  2. Other uses of WebSocketConnection.write() should close active Streaming API message use? (with what? a continuation frame of FIN=true and payload length=0?)
  3. Should either of these use cases actually just trigger a protocol failure and close the socket? (for improper use of the streaming api .close() methods)
  4. Speaking of which, how do you handle a stream.close() if the bytes have already gone out?  (with the empty FIN=true continuation?)

Offloaded approach
// streaming binary 
WebSocketConnection.write(InputStream in);
WebSocketConnection.write(C context, Callback<C> callback, InputStream in);
// streaming text
WebSocketConnection.write(Reader reader);
WebSocketConnection.write(C context, Callback<C> callback, Reader reader);
// nio streaming binary
WebSocketConnection.write(ReadableByteChannel channel);
WebSocketConnection.write(C context, Callback<C> callback, ReadableByteChannel channel);

Open Questions:
  1. Blocking or not? (Favoring non-blocking, but how? with an active thread?)
  2. How do we interrupt / cancel the operation on an offloaded write?
  3. Can any frame show up in the middle of all of the frames needed to send these streams? (spec seems to indicate no)
 

I then think we need to have the extension API have a mechanism for
sending/receiving frames (not messages).
The Parser.Listener interface fits the bill for receiving frames, but
WebSocketConnection is not right for sending.  We need another
abstraction between the connection the application sees and the
Generator.  ie AsyncWebSocketConnection cannot call the generator
directly, but will call sendFrame on some new abstraction, that will
pass the frame through all extensions before calling the generator.


The extension chain has 2 modes.  Incoming / Outgoing

Parsing CallStack
  1. Raw Network Connection
  2. EndPoint
  3. AsyncEndPoint
  4. AsyncConnection
  5. WebSocketAsyncConnection
    1. .onFillable()
  6. websocket.parser.Parser  <-- raw frame parsing
    1. FrameListener      <-- raw frame notification
    2. Extension Chain (implements FrameListener)    <-- frame mutation
  7. MessageAggregator (implements FrameListener)    <-- NEW 
    (aggregates message frames based on policy) 
  8. WebSocketEventDriver (implements MessageAggregator.Listener)
TODO: rename Parser.Listener to FrameListener

Generator CallStack
  1. End User WebSocket POJO
  2. WebSocketConnection.write()  <-- write methods (text or binary)
    1. MessageGenerator (produces 1..n frames based on policy) ¿ NEW
    2. Extension Chain --> mutated frames
    3. At this point we have n or more frames that need to be sent
    4. Allocate Byte Buffer Array (equal to number of frames)
    5. Generator -> fills byte array with something appropriate for wire
  3. AsyncEndPoint.write()
  4. EndPoint
  5. Raw Network Connection
Open Questions:
  1. How do we handle a streaming output?
  2. Are we expected to spawn a thread of our own to read from the end-user stream and copy the bytes out to the AsyncEndPoint.write() periodically?
  3. Can we get a way to use AsyncEndPoint.write() with a ReadableByteChannel as an example?
--
Joakim Erdfelt <joakim@xxxxxxxxxxx>


Back to the top