[Repoze-checkins] r981 - in repoze.who/trunk: . repoze/who

Tres Seaver tseaver at palladion.com
Wed May 7 10:28:52 EDT 2008


Author: Tres Seaver <tseaver at palladion.com>
Date: Wed May  7 10:28:52 2008
New Revision: 981

Log:
Added predicate-based "restriction" middleware support.

'repoze.who.restrict' allows configuration-driven authorization via a WSGI
filter.  One example predicate, 'authenticated_predicate', is supplied, which
requires that the user be authenticated either via 'REMOTE_USER' or via
'repoze.who.identity'.  Other predicates may be supplied and configured via
paste configuration.


Added:
   repoze.who/trunk/repoze/who/restrict.py
Modified:
   repoze.who/trunk/CHANGES.txt
   repoze.who/trunk/repoze/who/tests.py
   repoze.who/trunk/setup.py

Modified: repoze.who/trunk/CHANGES.txt
==============================================================================
--- repoze.who/trunk/CHANGES.txt	(original)
+++ repoze.who/trunk/CHANGES.txt	Wed May  7 10:28:52 2008
@@ -1,6 +1,25 @@
 repoze.who changes
 ==================
 
+After 1.0.1
+
+ - Added predicate-based "restriction" middleware support
+   (repoze.who.restrict), allowing configuratio-driven authorization
+   as a WSGI filter.  One example predicate, 'authenticated_predicate',
+   is supplied, which requires that the user be authenticated either via
+   'REMOTE_USER' or via 'repoze.who.identity'.  To use the filter to
+   restrict access::
+
+     [filter:authenticated_only]
+     use = egg:repoze.who#authenticated
+
+   or::
+
+     [filter:some_predicate]
+     use = egg:repoze.who#predicate
+     predicate = my.module:some_predicate
+     some_option = a value
+
 1.0.1
 
  - Remove dependency-link to dist.repoze.org to prevent easy_install

Added: repoze.who/trunk/repoze/who/restrict.py
==============================================================================
--- (empty file)
+++ repoze.who/trunk/repoze/who/restrict.py	Wed May  7 10:28:52 2008
@@ -0,0 +1,31 @@
+# Authorization middleware
+from pkg_resources import EntryPoint
+
+def authenticated_predicate():
+    def _predicate(environ):
+        return 'REMOTE_USER' in environ or 'repoze.who.identity' in environ
+    return _predicate
+
+class PredicateRestriction:
+
+    def __init__(self, app, predicate, enabled=True, **kw):
+        self.app = app
+        self.enabled = enabled
+        options = kw.copy()
+        self.predicate = predicate(**options)
+
+    def __call__(self, environ, start_response):
+        if self.enabled:
+            if not self.predicate(environ):
+                start_response('401 Unauthorized', ())
+                return []
+        return self.app(environ, start_response)
+
+def make_authenticated_restriction(app, global_config, enabled=True):
+    return PredicateRestriction(app, authenticated_predicate, enabled)
+
+def make_predicate_restriction(app, global_config,
+                               predicate, enabled=True, **kw):
+    if isinstance(predicate, basestring):
+        predicate = EntryPoint.parse('x=%s' % predicate).load(False)
+    return PredicateRestriction(app, predicate, enabled, **kw)

Modified: repoze.who/trunk/repoze/who/tests.py
==============================================================================
--- repoze.who/trunk/repoze/who/tests.py	(original)
+++ repoze.who/trunk/repoze/who/tests.py	Wed May  7 10:28:52 2008
@@ -2225,6 +2225,149 @@
 
 """
 
+class AuthenticatedPredicateTests(unittest.TestCase):
+
+    def _getFUT(self):
+        from repoze.who.restrict import authenticated_predicate
+        return authenticated_predicate()
+
+    def test___call___no_identity_returns_False(self):
+        predicate = self._getFUT()
+        environ = {}
+        self.failIf(predicate(environ))
+
+    def test___call___w_REMOTE_AUTH_returns_True(self):
+        predicate = self._getFUT()
+        environ = {'REMOTE_USER': 'fred'}
+        self.failUnless(predicate(environ))
+
+    def test___call___w_repoze_who_identity_returns_True(self):
+        predicate = self._getFUT()
+        environ = {'repoze.who.identity': {'login': 'fred'}}
+        self.failUnless(predicate(environ))
+
+class PredicateRestrictionTests(unittest.TestCase):
+
+    def _getTargetClass(self):
+        from repoze.who.restrict import PredicateRestriction
+        return PredicateRestriction
+
+    def _makeOne(self, app=None, **kw):
+        if app is None:
+            app = DummyApp()
+        return self._getTargetClass()(app, **kw)
+
+    def test___call___disabled_predicate_false_calls_app_not_predicate(self):
+        _tested = []
+        def _factory():
+            def _predicate(env):
+                _tested.append(env)
+                return False
+            return _predicate
+
+        _started = []
+        def _start_response(status, headers):
+            _started.append((status, headers))
+        environ = {'testing': True}
+
+        restrict = self._makeOne(predicate=_factory, enabled=False)
+        restrict(environ, _start_response)
+
+        self.assertEqual(len(_tested), 0)
+        self.assertEqual(len(_started), 0)
+        self.assertEqual(restrict.app.environ, environ)
+
+    def test___call___enabled_predicate_false_returns_401(self):
+        _tested = []
+        def _factory():
+            def _predicate(env):
+                _tested.append(env)
+                return False
+            return _predicate
+
+        _started = []
+        def _start_response(status, headers):
+            _started.append((status, headers))
+        environ = {'testing': True}
+
+        restrict = self._makeOne(predicate=_factory)
+        restrict(environ, _start_response)
+
+        self.assertEqual(len(_tested), 1)
+        self.assertEqual(len(_started), 1, _started)
+        self.assertEqual(_started[0][0], '401 Unauthorized')
+        self.assertEqual(restrict.app.environ, None)
+
+    def test___call___enabled_predicate_true_calls_app(self):
+        _tested = []
+        def _factory():
+            def _predicate(env):
+                _tested.append(env)
+                return True
+            return _predicate
+
+        _started = []
+        def _start_response(status, headers):
+            _started.append((status, headers))
+        environ = {'testing': True, 'REMOTE_USER': 'fred'}
+
+        restrict = self._makeOne(predicate=_factory)
+        restrict(environ, _start_response)
+
+        self.assertEqual(len(_tested), 1)
+        self.assertEqual(len(_started), 0)
+        self.assertEqual(restrict.app.environ, environ)
+
+class MakePredicateRestrictionTests(unittest.TestCase):
+
+    def _getFUT(self):
+        from repoze.who.restrict import make_predicate_restriction
+        return make_predicate_restriction
+
+    def test_non_string_predicate_no_args(self):
+        fut = self._getFUT()
+        app = DummyApp()
+        def _predicate(env):
+            return True
+        def _factory():
+            return _predicate
+
+        filter = fut(app, {}, predicate=_factory)
+
+        self.failUnless(filter.app is app)
+        self.failUnless(filter.predicate is _predicate)
+        self.failUnless(filter.enabled)
+
+    def test_disabled_non_string_predicate_w_args(self):
+        fut = self._getFUT()
+        app = DummyApp()
+
+        filter = fut(app, {}, predicate=DummyPredicate, enabled=False,
+                     foo='Foo')
+
+        self.failUnless(filter.app is app)
+        self.failUnless(isinstance(filter.predicate, DummyPredicate))
+        self.assertEqual(filter.predicate.foo, 'Foo')
+        self.failIf(filter.enabled)
+
+    def test_enabled_string_predicate_w_args(self):
+        fut = self._getFUT()
+        app = DummyApp()
+
+        filter = fut(app, {}, predicate='repoze.who.tests:DummyPredicate',
+                     enabled=True, foo='Foo')
+
+        self.failUnless(filter.app is app)
+        self.failUnless(isinstance(filter.predicate, DummyPredicate))
+        self.assertEqual(filter.predicate.foo, 'Foo')
+        self.failUnless(filter.enabled)
+
+class DummyPredicate:
+    def __init__(self, **kw):
+        self.__dict__.update(kw)
+    def __call__(self, env):
+        return True
+
 def compare_success(*arg):
     return True
 
@@ -2260,6 +2403,7 @@
 # XXX need make_middleware tests
 
 class DummyApp:
+    environ = None
     def __call__(self, environ, start_response):
         self.environ = environ
         return []

Modified: repoze.who/trunk/setup.py
==============================================================================
--- repoze.who/trunk/setup.py	(original)
+++ repoze.who/trunk/setup.py	Wed May  7 10:28:52 2008
@@ -55,6 +55,8 @@
       [paste.filter_app_factory]
       test = repoze.who.middleware:make_test_middleware
       config = repoze.who.config:make_middleware_with_config
+      predicate = repoze.who.restrict:make_predicate_restriction
+      authenticated = repoze.who.restrict:make_authenticated_restriction
       """
       )
 


More information about the Repoze-checkins mailing list