Tornado Tools

tornadotools.mongrel2.handler

Contents

Source code for tornadotools.mongrel2.handler

#
# Copyright (c) 2011 Daniel Truemper truemped@googlemail.com
#
# handler.py 07-Jul-2011
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# under the License.
#
#
import uuid

from tornado.httpserver import HTTPRequest

import zmq
from zmq.eventloop import stack_context
from zmq.eventloop.ioloop import IOLoop
from zmq.eventloop.zmqstream import ZMQStream

from .request import MongrelRequest


[docs]class Mongrel2Handler(object): """ A Mongrel2 handler class for use with `tornado.web.Application`. """ def __init__(self, request_callback, pull_addr, pub_addr, io_loop=None, zmq_ctx=None, sender_id=None, xheaders=True, no_keep_alive=False): self.request_callback = request_callback self.io_loop = io_loop or IOLoop.instance() self.sender_id = sender_id or str(uuid.uuid4()) self.xheaders = xheaders self.no_keep_alive = no_keep_alive self._zmq_context = zmq_ctx or zmq.Context() self._listening_stream = self._create_listening_stream(pull_addr) self._sending_stream = self._create_sending_stream(pub_addr) self._started = False
[docs] def start(self): """ Start to listen for incoming requests. """ assert not self._started self._listening_stream.on_recv(self._recv_callback) self._started = True
def _create_listening_stream(self, pull_addr): """ Create a stream listening for Requests. The `self._recv_callback` method is asociated with incoming requests. """ sock = self._zmq_context.socket(zmq.PULL) sock.connect(pull_addr) stream = ZMQStream(sock, io_loop=self.io_loop) return stream def _create_sending_stream(self, pub_addr): """ Create a `ZMQStream` for sending responses back to Mongrel2. """ sock = self._zmq_context.socket(zmq.PUB) sock.setsockopt(zmq.IDENTITY, self.sender_id) sock.connect(pub_addr) stream = ZMQStream(sock, io_loop=self.io_loop) return stream def _recv_callback(self, msg): """ Method is called when there is a message coming from a Mongrel2 server. This message should be a valid Request String. """ m2req = MongrelRequest.parse(msg[0]) MongrelConnection(m2req, self._sending_stream, self.request_callback, no_keep_alive=self.no_keep_alive, xheaders=self.xheaders)
[docs]class MongrelConnection(object): """ Handles the connection to the Mongrel2 server. We execute the request callback and provide methods for sending data to the server/client and eventually finish the request and HTTP connection. """ def __init__(self, m2req, stream, request_callback, no_keep_alive=False, xheaders=False): self.m2req = m2req self.stream = stream self.request_callback = request_callback self.no_keep_alive = no_keep_alive self.xheaders = xheaders self._request = None self._request_finished = False self._execute = stack_context.wrap(self._begin_request) self._execute() def _begin_request(self): """ Actually start executing this request. """ headers = self.m2req.headers self._request = HTTPRequest(connection=self, method=headers.get("METHOD"), uri=self.m2req.path, version=headers.get("VERSION"), headers=headers, remote_ip=headers.get("x-forwarded-for")) if len(self.m2req.body) > 0: self._request.body = self.m2req.body if self.m2req.is_disconnect(): self.finish() elif headers.get("x-mongrel2-upload-done", None): # there has been a file upload. expected = headers.get("x-mongrel2-upload-start", "BAD") upload = headers.get("x-mongrel2-upload-done", None) if expected == upload: self.request_callback(self._request) elif headers.get("x-mongrel2-upload-start", None): # this is just a notification that a file upload has started. Do # nothing for now! pass else: self.request_callback(self._request)
[docs] def write(self, chunk): """ Write a chunk of data to the server. """ self._send(chunk)
[docs] def finish(self): """ Finish this connection. """ assert self._request, "Request closed" self._request_finished = True if self.m2req.should_close() or self.no_keep_alive: self._send("") self._request = None
def _send(self, msg): """ Raw send to the given connection ID at the given uuid, mostly used internally. """ uuid = self.m2req.sender conn_id = self.m2req.conn_id header = "%s %d:%s," % (uuid, len(str(conn_id)), str(conn_id)) zmq_message = header + ' ' + msg self.stream.send(zmq_message)

Contents