Tay Ray Chuan home archive

tornado: presenting a new paradigm for IOStream read callbacks

Fri, 24 Dec 2010 17:44:48 +0800 | Filed under tornado

Recently I tried my hand at handling chunked encoding, as a prelude to implementing chunked handling for smart http git servers (AFAIK, grack, which looks pretty neat, can't do it) - receiving chunked requests on the server, that is, not sending out chunked responses, examples for which are abundant.

After spending some time with wsgiref's simple_server, I couldn't find a way to send the 100 (Continue) header and not close the connection (sidenote: this should be possible by hacking the handle method in the request handler in wsgiref).

So I decided to try tornado. It has a nice interface, very similar to Google App Engine's webapp.

My first attempt looked a lot like how tornado's simple_httpclient.py does it (some irrelevant parts removed):

    def _on_chunk_length(self, data):
        # TODO: "chunk extensions" http://tools.ietf.org/html/rfc2616#section-3.6.1
        length = int(data.strip(), 16)
        if length == 0:
            # ...
        else:
            self.stream.read_bytes(length + 2,  # chunk ends with \r\n
                              self._on_chunk_data)

    def _on_chunk_data(self, data):
        assert data[-2:] == "\r\n"
        chunk = data[:-2]
        # ...
        self.stream.read_until("\r\n", self._on_chunk_length)

Neat and simple. This "read loop" goes back and forth between reading the length information, and reading the amount of data of this length.

Unfortunately, I ended up busting Python's recursion limit for pretty small POSTs. If you examine the implementations of read_until(), read_bytes() and _read_from_buffer() in IOStream's, it becomes apparent why this is occurring - the read calls are returning immediately, so we have a stack that looks like this:

 File "f:\Files\coding\gists\chunked_handler\naive.py", line 33, in _on_chunk_length
   self._stream.read_bytes(chunk_length + 2, self._on_chunk_data)
 File "f:\files\coding\python\tornado\tornado\iostream.py", line 152, in read_bytes
   if self._read_from_buffer():
 File "f:\files\coding\python\tornado\tornado\iostream.py", line 317, in _read_from_buffer
   self._run_callback(callback, self._consume(num_bytes))
 File "f:\files\coding\python\tornado\tornado\iostream.py", line 233, in _run_callback
   callback(*args, **kwargs)
 File "f:\files\coding\python\tornado\tornado\stack_context.py", line 171, in wrapped
   callback(*args, **kwargs)
 File "f:\Files\coding\gists\chunked_handler\_naive.py", line 41, in _on_chunk_data
   self._stream.read_until('\r\n', self._on_chunk_length)
 File "f:\files\coding\python\tornado\tornado\iostream.py", line 133, in read_until
   if self._read_from_buffer():
 File "f:\files\coding\python\tornado\tornado\iostream.py", line 327, in _read_from_buffer
   self._consume(loc + delimiter_len))
 File "f:\files\coding\python\tornado\tornado\iostream.py", line 233, in _run_callback
   callback(*args, **kwargs)
 File "f:\files\coding\python\tornado\tornado\stack_context.py", line 171, in wrapped
   callback(*args, **kwargs)
 File "f:\Files\coding\gists\chunked_handler\naive.py", line 33, in _on_chunk_length
   self._stream.read_bytes(chunk_length + 2, self._on_chunk_data)

Notice how each callback invocation is followed preceded by 4 other function calls - no wonder we bust the limit quickly! But I think it has its place - choppy requests, for example, where the client sends data in an intermittent fashion.

I decided to write a procedural, imperative handler that assumes each read completes immediately, with callbacks not doing any reads. But while this approach avoids busting the stack, it's unable to handle the occasional case (actually, quite frequently) where the read fails and we need the callback to start the read loop again.

What if callbacks could be smart enough to figure whether the read completed immediately, or if they needed to re-start the read loop? In Python, we have a neat way of doing this: using objects and the __call__ method.

Have a look at my callback implementation and the chunked handler built on top of it.

The important bit of the callback object is this:

    def enter(self):
        assert not self.has_changed

        self._entered = True
        self.entry_expr(self)
        self._entered = False

        return self.has_changed

    def __call__(self, *args, **kwargs):
        assert self.data.state is self.start_state

        self._handle(*args, **kwargs)
        if not self._entered:
            self.entry_callback()

while the read loop (saved in the callback's entry_callback property) can be expanded from its cryptic form into this:

while data.state is not DONE:
    if data.state is WAIT_CHUNK:
        if not data_callback.enter():
            # callback did not return immediately
            return
    elif data.state is WAIT_LENGTH:
        if not length_callback.enter():
            # callback did not return immediately
            return

Our imperative read loop is smart enough to know if the read completely immediately - if so, proceed to the next step of the read loop; if not, stop the loop. When data arrives through the socket, our callback gets invoked, and, detecting that it was invoked outside the read loop, re-starts the read loop.

This gives us the best of both worlds - a familiar, imperative-like hander, with the ability to handle tornado's asynchronous, event-driven character.

Next up: make the handler more memory-efficient by streaming the data and not saving each chunk.

blog comments powered by Disqus