[Repoze-checkins] r1287 - in repoze.bfg/trunk/repoze/bfg: . tests

Chris McDonough chrism at agendaless.com
Fri Jul 11 19:30:36 EDT 2008


Author: Chris McDonough <chrism at agendaless.com>
Date: Fri Jul 11 19:30:35 2008
New Revision: 1287

Log:
Add security policy checks.


Modified:
   repoze.bfg/trunk/repoze/bfg/configure.zcml
   repoze.bfg/trunk/repoze/bfg/interfaces.py
   repoze.bfg/trunk/repoze/bfg/router.py
   repoze.bfg/trunk/repoze/bfg/tests/test_router.py
   repoze.bfg/trunk/repoze/bfg/tests/test_wsgiadapter.py
   repoze.bfg/trunk/repoze/bfg/wsgiadapter.py

Modified: repoze.bfg/trunk/repoze/bfg/configure.zcml
==============================================================================
--- repoze.bfg/trunk/repoze/bfg/configure.zcml	(original)
+++ repoze.bfg/trunk/repoze/bfg/configure.zcml	Fri Jul 11 19:30:35 2008
@@ -18,7 +18,7 @@
   <adapter
       factory=".wsgiadapter.NaiveWSGIViewAdapter"
       provides=".interfaces.IWSGIApplicationFactory"
-      for="* *"
+      for="* * *"
    />
 
   <include file="meta.zcml" />

Modified: repoze.bfg/trunk/repoze/bfg/interfaces.py
==============================================================================
--- repoze.bfg/trunk/repoze/bfg/interfaces.py	(original)
+++ repoze.bfg/trunk/repoze/bfg/interfaces.py	Fri Jul 11 19:30:35 2008
@@ -36,10 +36,11 @@
         """ A PEP 333 application """
 
 class IWSGIApplicationFactory(Interface):
-    def __call__(view, request):
+    def __call__(context, request, view):
         """ Return an object that implements IWSGIApplication """
 
 class ILocation(Interface):
     """Objects that have a structural location"""
     __parent__ = Attribute("The parent in the location hierarchy")
     __name__ = Attribute("The name of the object")
+

Modified: repoze.bfg/trunk/repoze/bfg/router.py
==============================================================================
--- repoze.bfg/trunk/repoze/bfg/router.py	(original)
+++ repoze.bfg/trunk/repoze/bfg/router.py	Fri Jul 11 19:30:35 2008
@@ -33,12 +33,14 @@
             app = HTTPFound(add_slash=True)
         else:
             request.subpath = subpath
+            request.view_name = name
             app = queryMultiAdapter((context, request), IViewFactory, name=name,
                                     default=_marker)
             if app is _marker:
                 app = HTTPNotFound(request.url)
             else:
-                app = getMultiAdapter((app, request), IWSGIApplicationFactory)
+                app = getMultiAdapter((context, request, app),
+                                      IWSGIApplicationFactory)
         return app(environ, start_response)
 
 def make_app(root_policy, package=None, default_redirects=True,

Modified: repoze.bfg/trunk/repoze/bfg/tests/test_router.py
==============================================================================
--- repoze.bfg/trunk/repoze/bfg/tests/test_router.py	(original)
+++ repoze.bfg/trunk/repoze/bfg/tests/test_router.py	Fri Jul 11 19:30:35 2008
@@ -70,7 +70,7 @@
         environ = self._makeEnviron(PATH_INFO='/doesnt/end/in/slash')
         self._registerTraverserFactory(traversalfactory, '', None, None)
         self._registerViewFactory(viewfactory, '', None, None)
-        self._registerWSGIFactory(wsgifactory, '', None, None)
+        self._registerWSGIFactory(wsgifactory, '', None, None, None)
         router = self._makeOne(rootpolicy)
         start_response = DummyStartResponse()
         result = router(environ, start_response)
@@ -94,7 +94,7 @@
         environ = self._makeEnviron()
         self._registerTraverserFactory(traversalfactory, '', None, None)
         self._registerViewFactory(viewfactory, '', None, None)
-        self._registerWSGIFactory(wsgifactory, '', None, None)
+        self._registerWSGIFactory(wsgifactory, '', None, None, None)
         router = self._makeOne(rootpolicy)
         start_response = DummyStartResponse()
         result = router(environ, start_response)
@@ -115,7 +115,7 @@
         environ = self._makeEnviron()
         self._registerTraverserFactory(traversalfactory, '', None, None)
         self._registerViewFactory(viewfactory, 'foo', None, None)
-        self._registerWSGIFactory(wsgifactory, '', None, None)
+        self._registerWSGIFactory(wsgifactory, '', None, None, None)
         router = self._makeOne(rootpolicy)
         start_response = DummyStartResponse()
         result = router(environ, start_response)
@@ -142,7 +142,7 @@
         environ = self._makeEnviron()
         self._registerTraverserFactory(traversalfactory, '', None, None)
         self._registerViewFactory(viewfactory, '', IContext, IRequest)
-        self._registerWSGIFactory(wsgifactory, '', None, None)
+        self._registerWSGIFactory(wsgifactory, '', None, None, None)
         router = self._makeOne(rootpolicy)
         start_response = DummyStartResponse()
         result = router(environ, start_response)
@@ -171,7 +171,7 @@
         environ = self._makeEnviron()
         self._registerTraverserFactory(traversalfactory, '', None, None)
         self._registerViewFactory(viewfactory, '', IContext, IRequest)
-        self._registerWSGIFactory(wsgifactory, '', None, None)
+        self._registerWSGIFactory(wsgifactory, '', None, None, None)
         router = self._makeOne(rootpolicy)
         start_response = DummyStartResponse()
         result = router(environ, start_response)
@@ -183,13 +183,15 @@
 
 def make_wsgi_factory(status, headers, app_iter):
     class DummyWSGIApplicationFactory:
-        def __init__(self, view, request):
-            self.view = view
+        def __init__(self, context, request, view):
+            self.context = context
             self.request = request
+            self.view = view
 
         def __call__(self, environ, start_response):
-            environ['view'] = self.view
+            environ['context'] = self.context
             environ['request'] = self.request
+            environ['view'] = self.view
             start_response(status, headers)
             return app_iter
 

Modified: repoze.bfg/trunk/repoze/bfg/tests/test_wsgiadapter.py
==============================================================================
--- repoze.bfg/trunk/repoze/bfg/tests/test_wsgiadapter.py	(original)
+++ repoze.bfg/trunk/repoze/bfg/tests/test_wsgiadapter.py	Fri Jul 11 19:30:35 2008
@@ -1,6 +1,14 @@
 import unittest
 
-class NaiveWSGIAdapterTests(unittest.TestCase):
+from zope.component.testing import PlacelessSetup
+
+class NaiveWSGIAdapterTests(unittest.TestCase, PlacelessSetup):
+    def setUp(self):
+        PlacelessSetup.setUp(self)
+
+    def tearDown(self):
+        PlacelessSetup.tearDown(self)
+
     def _getTargetClass(self):
         from repoze.bfg.wsgiadapter import NaiveWSGIViewAdapter
         return NaiveWSGIViewAdapter
@@ -15,7 +23,8 @@
         def view():
             return response
         request = DummyRequest()
-        adapter = self._makeOne(view, request)
+        context = DummyContext()
+        adapter = self._makeOne(context, request, view)
         environ = {}
         start_response = DummyStartResponse()
         result = adapter(environ, start_response)
@@ -31,7 +40,8 @@
             response.start_response = start_response
             return response
         request = DummyRequest()
-        adapter = self._makeOne(view, request)
+        context = DummyContext()
+        adapter = self._makeOne(context, request, view)
         environ = {}
         start_response = DummyStartResponse()
         result = adapter(environ, start_response)
@@ -48,7 +58,61 @@
         def view(request):
             response.request = request
             return response
-        adapter = self._makeOne(view, request)
+        context = DummyContext()
+        adapter = self._makeOne(context, request, view)
+        environ = {}
+        start_response = DummyStartResponse()
+        result = adapter(environ, start_response)
+        self.assertEqual(result, ['Hello world'])
+        self.assertEqual(start_response.headers, ())
+        self.assertEqual(start_response.status, '200 OK')
+        self.assertEqual(response.request, request)
+
+    def test_view_fails_security_policy(self):
+        import zope.component
+        gsm = zope.component.getGlobalSiteManager()
+        from repoze.bfg.wsgiadapter import IViewSecurityPolicy
+        def failed(context, request):
+            def view():
+                response = DummyResponse()
+                response.app_iter = ['failed']
+                response.status = '401 Unauthorized'
+                response.headerlist = ()
+                return response
+            return view
+        gsm.registerAdapter(failed, (None, None), IViewSecurityPolicy)
+        request = DummyRequest()
+        response = DummyResponse()
+        response.app_iter = ['Hello world']
+        def view(request):
+            response.request = request
+            return response
+        context = DummyContext()
+        adapter = self._makeOne(context, request, view)
+        environ = {}
+        start_response = DummyStartResponse()
+        result = adapter(environ, start_response)
+        self.assertEqual(result, ['failed'])
+        self.assertEqual(start_response.headers, ())
+        self.assertEqual(start_response.status, '401 Unauthorized')
+
+    def test_view_passes_security_policy(self):
+        import zope.component
+        gsm = zope.component.getGlobalSiteManager()
+        from repoze.bfg.wsgiadapter import IViewSecurityPolicy
+        def failed(context, request):
+            def view():
+                return None
+            return view
+        gsm.registerAdapter(failed, (None, None), IViewSecurityPolicy)
+        request = DummyRequest()
+        response = DummyResponse()
+        response.app_iter = ['Hello world']
+        def view(request):
+            response.request = request
+            return response
+        context = DummyContext()
+        adapter = self._makeOne(context, request, view)
         environ = {}
         start_response = DummyStartResponse()
         result = adapter(environ, start_response)
@@ -57,6 +121,9 @@
         self.assertEqual(start_response.status, '200 OK')
         self.assertEqual(response.request, request)
 
+class DummyContext:
+    pass
+
 class DummyRequest:
     pass
 

Modified: repoze.bfg/trunk/repoze/bfg/wsgiadapter.py
==============================================================================
--- repoze.bfg/trunk/repoze/bfg/wsgiadapter.py	(original)
+++ repoze.bfg/trunk/repoze/bfg/wsgiadapter.py	Fri Jul 11 19:30:35 2008
@@ -1,19 +1,40 @@
-from zope.interface import implements
+from zope.component import queryMultiAdapter
 from zope.interface import classProvides
+from zope.interface import implements
+from zope.interface import Interface
 
 from repoze.bfg.interfaces import IWSGIApplicationFactory
 from repoze.bfg.interfaces import IWSGIApplication
 from repoze.bfg.mapply import mapply
 
+class IViewSecurityPolicy(Interface):
+    """ Marker interface for a view security policy; a view security
+    policy. """
+    def __call__():
+        """ Return None if the security check succeeded,
+        otherwise it should return a WSGI application representing an
+        unauthorized view"""
+
 class NaiveWSGIViewAdapter:
     classProvides(IWSGIApplicationFactory)
     implements(IWSGIApplication)
 
-    def __init__(self, view, request):
-        self.view = view
+    def __init__(self, context, request, view):
+        self.context = context
         self.request = request
+        self.view = view
 
     def __call__(self, environ, start_response):
+        context = self.context
+        request = self.request
+        view = self.view
+        security_policy = queryMultiAdapter((context, request),
+                                            IViewSecurityPolicy)
+        if security_policy:
+            failed_view = security_policy()
+            if failed_view:
+                view = failed_view
+
         catch_response = []
         def replace_start_response(status, headers):
             catch_response[:] = (status, headers)
@@ -22,7 +43,8 @@
             'environ':environ,
             'start_response':start_response,
             }
-        response =  mapply(self.view, positional = (), keyword = kwdict)
+
+        response =  mapply(view, positional = (), keyword = kwdict)
         if not catch_response:
             catch_response = (response.status, response.headerlist)
         start_response(*catch_response)


More information about the Repoze-checkins mailing list