Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

""" 

HTTP server that implements the Python WSGI protocol (PEP 333, rev 1.21). 

 

Based on wsgiref.simple_server which is part of the standard library since 2.5. 

 

This is a simple server for use in testing or debugging Django apps. It hasn't 

been reviewed for security issues. DON'T USE IT FOR PRODUCTION USE! 

""" 

 

import os 

import socket 

import sys 

import traceback 

import urllib 

import urlparse 

from SocketServer import ThreadingMixIn 

from wsgiref import simple_server 

from wsgiref.util import FileWrapper   # for backwards compatibility 

 

import django 

from django.core.exceptions import ImproperlyConfigured 

from django.core.management.color import color_style 

from django.core.wsgi import get_wsgi_application 

from django.utils.importlib import import_module 

from django.utils._os import safe_join 

from django.views import static 

 

from django.contrib.staticfiles import handlers 

 

__all__ = ['WSGIServer', 'WSGIRequestHandler'] 

 

 

def get_internal_wsgi_application(): 

    """ 

    Loads and returns the WSGI application as configured by the user in 

    ``settings.WSGI_APPLICATION``. With the default ``startproject`` layout, 

    this will be the ``application`` object in ``projectname/wsgi.py``. 

 

    This function, and the ``WSGI_APPLICATION`` setting itself, are only useful 

    for Django's internal servers (runserver, runfcgi); external WSGI servers 

    should just be configured to point to the correct application object 

    directly. 

 

    If settings.WSGI_APPLICATION is not set (is ``None``), we just return 

    whatever ``django.core.wsgi.get_wsgi_application`` returns. 

 

    """ 

    from django.conf import settings 

    app_path = getattr(settings, 'WSGI_APPLICATION') 

    if app_path is None: 

        return get_wsgi_application() 

    module_name, attr = app_path.rsplit('.', 1) 

    try: 

        mod = import_module(module_name) 

    except ImportError, e: 

        raise ImproperlyConfigured( 

            "WSGI application '%s' could not be loaded; " 

            "could not import module '%s': %s" % (app_path, module_name, e)) 

    try: 

        app = getattr(mod, attr) 

    except AttributeError, e: 

        raise ImproperlyConfigured( 

            "WSGI application '%s' could not be loaded; " 

            "can't find '%s' in module '%s': %s" 

            % (app_path, attr, module_name, e)) 

 

    return app 

 

 

class WSGIServerException(Exception): 

    pass 

 

 

class ServerHandler(simple_server.ServerHandler, object): 

    error_status = "500 INTERNAL SERVER ERROR" 

 

    def write(self, data): 

        """'write()' callable as specified by PEP 333""" 

 

        assert isinstance(data, str), "write() argument must be string" 

 

        if not self.status: 

            raise AssertionError("write() before start_response()") 

 

        elif not self.headers_sent: 

            # Before the first output, send the stored headers 

            self.bytes_sent = len(data)    # make sure we know content-length 

            self.send_headers() 

        else: 

            self.bytes_sent += len(data) 

 

        # XXX check Content-Length and truncate if too many bytes written? 

 

        # If data is too large, socket will choke, so write chunks no larger 

        # than 32MB at a time. 

        length = len(data) 

        if length > 33554432: 

            offset = 0 

            while offset < length: 

                chunk_size = min(33554432, length) 

                self._write(data[offset:offset+chunk_size]) 

                self._flush() 

                offset += chunk_size 

        else: 

            self._write(data) 

            self._flush() 

 

    def error_output(self, environ, start_response): 

        super(ServerHandler, self).error_output(environ, start_response) 

        return ['\n'.join(traceback.format_exception(*sys.exc_info()))] 

 

 

class WSGIServer(simple_server.WSGIServer, object): 

    """BaseHTTPServer that implements the Python WSGI protocol""" 

 

    def __init__(self, *args, **kwargs): 

        if kwargs.pop('ipv6', False): 

            self.address_family = socket.AF_INET6 

        super(WSGIServer, self).__init__(*args, **kwargs) 

 

    def server_bind(self): 

        """Override server_bind to store the server name.""" 

        try: 

            super(WSGIServer, self).server_bind() 

        except Exception, e: 

            raise WSGIServerException(e) 

        self.setup_environ() 

 

 

class WSGIRequestHandler(simple_server.WSGIRequestHandler, object): 

 

    def __init__(self, *args, **kwargs): 

        from django.conf import settings 

        self.admin_media_prefix = urlparse.urljoin(settings.STATIC_URL, 'admin/') 

        # We set self.path to avoid crashes in log_message() on unsupported 

        # requests (like "OPTIONS"). 

        self.path = '' 

        self.style = color_style() 

        super(WSGIRequestHandler, self).__init__(*args, **kwargs) 

 

    def get_environ(self): 

        env = self.server.base_environ.copy() 

        env['SERVER_PROTOCOL'] = self.request_version 

        env['REQUEST_METHOD'] = self.command 

        if '?' in self.path: 

            path,query = self.path.split('?',1) 

        else: 

            path,query = self.path,'' 

 

        env['PATH_INFO'] = urllib.unquote(path) 

        env['QUERY_STRING'] = query 

        env['REMOTE_ADDR'] = self.client_address[0] 

 

        if self.headers.typeheader is None: 

            env['CONTENT_TYPE'] = self.headers.type 

        else: 

            env['CONTENT_TYPE'] = self.headers.typeheader 

 

        length = self.headers.getheader('content-length') 

        if length: 

            env['CONTENT_LENGTH'] = length 

 

        for h in self.headers.headers: 

            k,v = h.split(':',1) 

            k=k.replace('-','_').upper(); v=v.strip() 

            if k in env: 

                continue                    # skip content length, type,etc. 

            if 'HTTP_'+k in env: 

                env['HTTP_'+k] += ','+v     # comma-separate multiple headers 

            else: 

                env['HTTP_'+k] = v 

        return env 

 

    def log_message(self, format, *args): 

        # Don't bother logging requests for admin images or the favicon. 

        if (self.path.startswith(self.admin_media_prefix) 

                or self.path == '/favicon.ico'): 

            return 

 

        msg = "[%s] %s\n" % (self.log_date_time_string(), format % args) 

 

        # Utilize terminal colors, if available 

        if args[1][0] == '2': 

            # Put 2XX first, since it should be the common case 

            msg = self.style.HTTP_SUCCESS(msg) 

        elif args[1][0] == '1': 

            msg = self.style.HTTP_INFO(msg) 

        elif args[1] == '304': 

            msg = self.style.HTTP_NOT_MODIFIED(msg) 

        elif args[1][0] == '3': 

            msg = self.style.HTTP_REDIRECT(msg) 

        elif args[1] == '404': 

            msg = self.style.HTTP_NOT_FOUND(msg) 

        elif args[1][0] == '4': 

            msg = self.style.HTTP_BAD_REQUEST(msg) 

        else: 

            # Any 5XX, or any other response 

            msg = self.style.HTTP_SERVER_ERROR(msg) 

 

        sys.stderr.write(msg) 

 

 

class AdminMediaHandler(handlers.StaticFilesHandler): 

    """ 

    WSGI middleware that intercepts calls to the admin media directory, as 

    defined by the STATIC_URL setting, and serves those images. 

    Use this ONLY LOCALLY, for development! This hasn't been tested for 

    security and is not super efficient. 

 

    This is pending for deprecation since 1.3. 

    """ 

    def get_base_dir(self): 

        return os.path.join(django.__path__[0], 'contrib', 'admin', 'static', 'admin') 

 

    def get_base_url(self): 

        from django.conf import settings 

        return urlparse.urljoin(settings.STATIC_URL, 'admin/') 

 

    def file_path(self, url): 

        """ 

        Returns the path to the media file on disk for the given URL. 

 

        The passed URL is assumed to begin with ``self.base_url``.  If the 

        resulting file path is outside the media directory, then a ValueError 

        is raised. 

        """ 

        relative_url = url[len(self.base_url[2]):] 

        relative_path = urllib.url2pathname(relative_url) 

        return safe_join(self.base_dir, relative_path) 

 

    def serve(self, request): 

        document_root, path = os.path.split(self.file_path(request.path)) 

        return static.serve(request, path, document_root=document_root) 

 

    def _should_handle(self, path): 

        """ 

        Checks if the path should be handled. Ignores the path if: 

 

        * the host is provided as part of the base_url 

        * the request's path isn't under the base path 

        """ 

        return path.startswith(self.base_url[2]) and not self.base_url[1] 

 

 

def run(addr, port, wsgi_handler, ipv6=False, threading=False): 

    server_address = (addr, port) 

    if threading: 

        httpd_cls = type('WSGIServer', (ThreadingMixIn, WSGIServer), {}) 

    else: 

        httpd_cls = WSGIServer 

    httpd = httpd_cls(server_address, WSGIRequestHandler, ipv6=ipv6) 

    httpd.set_app(wsgi_handler) 

    httpd.serve_forever()