[Repoze-checkins] r1235 - in repoze.bfg/trunk/repoze/bfg: . tests

Chris McDonough chrism at agendaless.com
Fri Jul 4 22:59:04 EDT 2008


Author: Chris McDonough <chrism at agendaless.com>
Date: Fri Jul  4 22:59:04 2008
New Revision: 1235

Log:
Add a plug point for a security policy.


Modified:
   repoze.bfg/trunk/repoze/bfg/interfaces.py
   repoze.bfg/trunk/repoze/bfg/router.py
   repoze.bfg/trunk/repoze/bfg/tests/test_router.py

Modified: repoze.bfg/trunk/repoze/bfg/interfaces.py
==============================================================================
--- repoze.bfg/trunk/repoze/bfg/interfaces.py	(original)
+++ repoze.bfg/trunk/repoze/bfg/interfaces.py	Fri Jul  4 22:59:04 2008
@@ -11,6 +11,11 @@
 class ITraversalPolicy(Interface):
     def __call__(environ, root):
         """ Return a tuple in the form (context, name, subpath) """
+
+class ISecurityPolicy(Interface):
+    def __call__(environ, context, name):
+        """ Return a WSGI app on unauthorized or None to signify that
+        the request is allowed to continue """
         
 class ITraverser(Interface):
     def __init__(context):

Modified: repoze.bfg/trunk/repoze/bfg/router.py
==============================================================================
--- repoze.bfg/trunk/repoze/bfg/router.py	(original)
+++ repoze.bfg/trunk/repoze/bfg/router.py	Fri Jul  4 22:59:04 2008
@@ -8,18 +8,21 @@
 from repoze.bfg.interfaces import IWebObRequest
 
 class Router:
-    def __init__(self, root_policy, traversal_policy):
+    def __init__(self, root_policy, traversal_policy, security_policy):
         self.root_policy = root_policy
         self.traversal_policy = traversal_policy
+        self.security_policy = security_policy
 
     def __call__(self, environ, start_response):
         root = self.root_policy(environ)
-        context, name, subpath = self.traversal_policy(root, environ)
-        environ['repoze.bfg.subpath'] = subpath
-        request = Request(environ)
-        directlyProvides(request, IWebObRequest)
-        app = queryMultiAdapter((context, request),
-                                IWSGIApplicationFactory, name=name)
+        context, name, subpath = self.traversal_policy(environ, root)
+        app = self.security_policy(environ, context, name)
         if app is None:
-            app = HTTPNotFound(request.url)
+            environ['repoze.bfg.subpath'] = subpath
+            request = Request(environ)
+            directlyProvides(request, IWebObRequest)
+            app = queryMultiAdapter((context, request),
+                                    IWSGIApplicationFactory, name=name)
+            if app is None:
+                app = HTTPNotFound(request.url)
         return app(environ, start_response)

Modified: repoze.bfg/trunk/repoze/bfg/tests/test_router.py
==============================================================================
--- repoze.bfg/trunk/repoze/bfg/tests/test_router.py	(original)
+++ repoze.bfg/trunk/repoze/bfg/tests/test_router.py	Fri Jul  4 22:59:04 2008
@@ -38,13 +38,15 @@
         headerii = []
         def rootpolicy(environ):
             return None
-        def traversalpolicy(root, environ):
+        def traversalpolicy(environ, root):
             return DummyContext(), 'foo', []
+        def securitypolicy(environ, context, name):
+            return None
         def start_response(status, headers):
             statii[:] = [status]
             headerii[:] = [headers]
         environ = self._makeEnviron()
-        router = self._makeOne(rootpolicy, traversalpolicy)
+        router = self._makeOne(rootpolicy, traversalpolicy, securitypolicy)
         result = router(environ, start_response)
         headers = headerii[0]
         self.assertEqual(len(headers), 2)
@@ -52,17 +54,40 @@
         self.assertEqual(status, '404 Not Found')
         self.failUnless('http://localhost:8080' in result[0], result)
 
+    def test_call_securitypolicy_denies(self):
+        statii = []
+        headerii = []
+        def rootpolicy(environ):
+            return None
+        def traversalpolicy(environ, root):
+            return DummyContext(), 'foo', []
+        def securitypolicy(environ, context, name):
+            from webob.exc import HTTPUnauthorized
+            return HTTPUnauthorized()
+        def start_response(status, headers):
+            statii[:] = [status]
+            headerii[:] = [headers]
+        environ = self._makeEnviron()
+        router = self._makeOne(rootpolicy, traversalpolicy, securitypolicy)
+        result = router(environ, start_response)
+        headers = headerii[0]
+        self.assertEqual(len(headers), 2)
+        status = statii[0]
+        self.assertEqual(status, '401 Unauthorized')
+
     def test_call_app_registered_nonspecific_default_path(self):
         def rootpolicy(environ):
             return None
         context = DummyContext()
-        def traversalpolicy(root, environ):
+        def traversalpolicy(environ, root):
             return context, '', []
         def start_response(status, headers):
             pass
+        def securitypolicy(environ, context, name):
+            return None
         environ = self._makeEnviron()
         self._registerFactory(DummyWSGIApplicationFactory, '', None, None)
-        router = self._makeOne(rootpolicy, traversalpolicy)
+        router = self._makeOne(rootpolicy, traversalpolicy, securitypolicy)
         result = router(environ, start_response)
         self.failUnless(result[0] is context)
         import webob
@@ -73,13 +98,15 @@
         def rootpolicy(environ):
             return None
         context = DummyContext()
-        def traversalpolicy(root, environ):
+        def traversalpolicy(environ, root):
             return context, 'foo', ['bar', 'baz']
         def start_response(status, headers):
             pass
+        def securitypolicy(environ, context, name):
+            return None
         environ = self._makeEnviron()
         self._registerFactory(DummyWSGIApplicationFactory, 'foo', None, None)
-        router = self._makeOne(rootpolicy, traversalpolicy)
+        router = self._makeOne(rootpolicy, traversalpolicy, securitypolicy)
         result = router(environ, start_response)
         self.failUnless(result[0] is context)
         import webob
@@ -95,15 +122,17 @@
         class IContext(Interface):
             pass
         directlyProvides(context, IContext)
-        def traversalpolicy(root, environ):
+        def traversalpolicy(environ, root):
             return context, 'foo', ['bar', 'baz']
         def start_response(status, headers):
             pass
+        def securitypolicy(environ, context, name):
+            return None
         environ = self._makeEnviron()
         from repoze.bfg.interfaces import IWebObRequest
         self._registerFactory(DummyWSGIApplicationFactory, 'foo', IContext,
                               IWebObRequest)
-        router = self._makeOne(rootpolicy, traversalpolicy)
+        router = self._makeOne(rootpolicy, traversalpolicy, securitypolicy)
         result = router(environ, start_response)
         self.failUnless(result[0] is context)
         import webob
@@ -123,16 +152,18 @@
         headerii = []
         def rootpolicy(environ):
             return None
-        def traversalpolicy(root, environ):
+        def traversalpolicy(environ, root):
             return context, 'foo', []
         def start_response(status, headers):
             statii[:] = [status]
             headerii[:] = [headers]
+        def securitypolicy(environ, context, name):
+            return None
         environ = self._makeEnviron()
         from repoze.bfg.interfaces import IWebObRequest
         self._registerFactory(DummyWSGIApplicationFactory, 'foo', IContext,
                               IWebObRequest)
-        router = self._makeOne(rootpolicy, traversalpolicy)
+        router = self._makeOne(rootpolicy, traversalpolicy, securitypolicy)
         result = router(environ, start_response)
         headers = headerii[0]
         self.assertEqual(len(headers), 2)


More information about the Repoze-checkins mailing list