[Repoze-checkins] r1621 - in repoze.bfg/trunk/repoze/bfg: . tests

Chris McDonough chrism at agendaless.com
Sat Aug 16 20:15:17 EDT 2008


Author: Chris McDonough <chrism at agendaless.com>
Date: Sat Aug 16 20:15:17 2008
New Revision: 1621

Log:
Add RepozeWhoIdentityACLSecurityPolicy; add debug logging.


Modified:
   repoze.bfg/trunk/repoze/bfg/security.py
   repoze.bfg/trunk/repoze/bfg/tests/test_security.py

Modified: repoze.bfg/trunk/repoze/bfg/security.py
==============================================================================
--- repoze.bfg/trunk/repoze/bfg/security.py	(original)
+++ repoze.bfg/trunk/repoze/bfg/security.py	Sat Aug 16 20:15:17 2008
@@ -1,3 +1,7 @@
+import logging
+import os
+import sys
+
 from zope.interface import implements
 from zope.component import queryUtility
 
@@ -80,29 +84,13 @@
         self.logger and self.logger.debug(str(result))
         return result
 
-class RemoteUserACLSecurityPolicy(object):
-    """ A security policy which:
-
-    - examines the request.environ for the REMOTE_USER variable and
-      uses any non-false value as a principal id for this request.
-
-    - uses an ACL-based authorization model which attempts to find an
-      ACL on the context, and which returns ``Allowed`` from its
-      'permits' method if the ACL found grants access to the current
-      principal.  It returns ``Denied`` if permission was not granted
-      (either explicitly via a deny or implicitly by not finding a
-      matching ACE action).  An ACL is an ordered sequence of ACE
-      tuples, e.g.  ``[(Allow, Everyone, 'read'), (Deny, 'george',
-      'write')]``.  ACLs stored on model instance objects as their
-      __acl__ attribute will be used by the security machinery to
-      grant or deny access.
-
-    """
+class ACLSecurityPolicy(object):
     implements(ISecurityPolicy)
     authorizer_factory = ACLAuthorizer
     
-    def __init__(self, logger=None):
+    def __init__(self, logger, get_principals):
         self.logger = logger
+        self.get_principals = get_principals
 
     def permits(self, context, request, permission):
         """ Return ``Allowed`` if the policy permits access,
@@ -118,17 +106,93 @@
         return False
 
     def authenticated_userid(self, request):
-        return request.environ.get('REMOTE_USER', None)
+        principals = self.get_principals(request)
+        if principals:
+            return principals[0]
 
     def effective_principals(self, request):
-        userid = self.authenticated_userid(request)
         effective_principals = [Everyone]
+        principal_ids = self.get_principals(request)
 
-        if userid is not None:
+        if principal_ids:
             effective_principals.append(Authenticated)
-            effective_principals.append(userid)
+            effective_principals.extend(principal_ids)
+
         return effective_principals
 
+DEBUG_LOG_KEY = 'BFG_SECURITY_DEBUG'
+
+def debug_logger(logger):
+    if logger is None:
+        do_debug_log = os.environ.get(DEBUG_LOG_KEY, '')
+        if str(do_debug_log).lower() in ('1', 'y', 'true', 't', 'on'):
+            handler = logging.StreamHandler(sys.stdout)
+            fmt = '%(asctime)s %(message)s'
+            formatter = logging.Formatter(fmt)
+            handler.setFormatter(formatter)
+            logger = logging.Logger('repoze.bfg.security')
+            logger.addHandler(handler)
+            logger.setLevel(logging.DEBUG)
+            return logger
+    return logger
+
+def RemoteUserACLSecurityPolicy(logger=None):
+    """ A security policy which:
+
+    - examines the request.environ for the REMOTE_USER variable and
+      uses any non-false value as a principal id for this request.
+
+    - uses an ACL-based authorization model which attempts to find an
+      ACL on the context, and which returns ``Allowed`` from its
+      'permits' method if the ACL found grants access to the current
+      principal.  It returns ``Denied`` if permission was not granted
+      (either explicitly via a deny or implicitly by not finding a
+      matching ACE action).  An ACL is an ordered sequence of ACE
+      tuples, e.g.  ``[(Allow, Everyone, 'read'), (Deny, 'george',
+      'write')]``.  ACLs stored on model instance objects as their
+      __acl__ attribute will be used by the security machinery to
+      grant or deny access.
+
+    """
+    logger = debug_logger(logger)
+    def get_principals(request):
+        user_id = request.environ.get('REMOTE_USER')
+        if user_id:
+            return [user_id]
+        return []
+    return ACLSecurityPolicy(logger, get_principals)
+
+def RepozeWhoIdentityACLSecurityPolicy(logger=None):
+    """ A security policy which:
+
+    - examines the request.environ for the ``repoze.who.identity``
+      dictionary.  If one is found, the principal ids for the request
+      are composed of ``repoze.who.identity['repoze.who.userid']``
+      plus ``repoze.who.identity.get('groups', []).
+
+    - uses an ACL-based authorization model which attempts to find an
+      ACL on the context, and which returns ``Allowed`` from its
+      'permits' method if the ACL found grants access to the current
+      principal.  It returns ``Denied`` if permission was not granted
+      (either explicitly via a deny or implicitly by not finding a
+      matching ACE action).  An ACL is an ordered sequence of ACE
+      tuples, e.g.  ``[(Allow, Everyone, 'read'), (Deny, 'george',
+      'write')]``.  ACLs stored on model instance objects as their
+      __acl__ attribute will be used by the security machinery to
+      grant or deny access.
+
+    """
+    logger = debug_logger(logger)
+    def get_principals(request):
+        identity = request.environ.get('repoze.who.identity')
+        if not identity:
+            return []
+        principals = [identity['repoze.who.userid']]
+        principals.extend(identity.get('groups', []))
+        return principals
+        
+    return ACLSecurityPolicy(logger, get_principals)
+
 class PermitsResult:
     def __init__(self, ace, acl, permission, principals, context):
         self.acl = acl

Modified: repoze.bfg/trunk/repoze/bfg/tests/test_security.py
==============================================================================
--- repoze.bfg/trunk/repoze/bfg/tests/test_security.py	(original)
+++ repoze.bfg/trunk/repoze/bfg/tests/test_security.py	Sat Aug 16 20:15:17 2008
@@ -210,10 +210,10 @@
         result = authorizer.permits('read', *principals)
         self.assertEqual(len(logger.messages), 1)
 
-class RemoteUserACLSecurityPolicy(unittest.TestCase, PlacelessSetup):
+class TestACLSecurityPolicy(unittest.TestCase, PlacelessSetup):
     def _getTargetClass(self):
-        from repoze.bfg.security import RemoteUserACLSecurityPolicy
-        return RemoteUserACLSecurityPolicy
+        from repoze.bfg.security import ACLSecurityPolicy
+        return ACLSecurityPolicy
 
     def _makeOne(self, *arg, **kw):
         klass = self._getTargetClass()
@@ -225,40 +225,22 @@
     def tearDown(self):
         PlacelessSetup.tearDown(self)
 
-    def test_instance_implements_ISecurityPolicy(self):
-        from zope.interface.verify import verifyObject
-        from repoze.bfg.interfaces import ISecurityPolicy
-        logger = DummyLogger()
-        verifyObject(ISecurityPolicy, self._makeOne(logger))
-
     def test_class_implements_ISecurityPolicy(self):
         from zope.interface.verify import verifyClass
         from repoze.bfg.interfaces import ISecurityPolicy
         verifyClass(ISecurityPolicy, self._getTargetClass())
 
-    def test_authenticated_userid(self):
-        context = DummyContext()
-        request = DummyRequest({'REMOTE_USER':'fred'})
-        logger = DummyLogger()
-        policy = self._makeOne(logger)
-        result = policy.authenticated_userid(request)
-        self.assertEqual(result, 'fred')
-
-    def test_effective_principals(self):
-        context = DummyContext()
-        request = DummyRequest({'REMOTE_USER':'fred'})
+    def test_instance_implements_ISecurityPolicy(self):
+        from zope.interface.verify import verifyObject
+        from repoze.bfg.interfaces import ISecurityPolicy
         logger = DummyLogger()
-        policy = self._makeOne(logger)
-        result = policy.effective_principals(request)
-        from repoze.bfg.security import Everyone
-        from repoze.bfg.security import Authenticated
-        self.assertEqual(result, [Everyone, Authenticated, 'fred'])
+        verifyObject(ISecurityPolicy, self._makeOne(logger, lambda *arg: None))
 
-    def test_permits_no_remote_user_no_acl_info_on_context(self):
+    def test_permits_no_principals_no_acl_info_on_context(self):
         context = DummyContext()
         request = DummyRequest({})
         logger = DummyLogger()
-        policy = self._makeOne(logger)
+        policy = self._makeOne(logger, lambda *arg: None)
         authorizer_factory = make_authorizer_factory(None)
         policy.authorizer_factory = authorizer_factory
         result = policy.permits(context, request, 'view')
@@ -268,12 +250,12 @@
         self.assertEqual(authorizer_factory.permission, 'view')
         self.assertEqual(authorizer_factory.context, context)
 
-    def test_permits_no_remote_user_acl_info_on_context(self):
+    def test_permits_no_principals_acl_info_on_context(self):
         context = DummyContext()
         context.__acl__ = []
         request = DummyRequest({})
         logger = DummyLogger()
-        policy = self._makeOne(logger)
+        policy = self._makeOne(logger, lambda *arg: None)
         authorizer_factory = make_authorizer_factory(None)
         policy.authorizer_factory = authorizer_factory
         result = policy.permits(context, request, 'view')
@@ -283,7 +265,7 @@
         self.assertEqual(authorizer_factory.permission, 'view')
         self.assertEqual(authorizer_factory.context, context)
 
-    def test_permits_no_remote_user_withparents_root_has_acl_info(self):
+    def test_permits_no_principals_withparents_root_has_acl_info(self):
         context = DummyContext()
         context.__name__ = None
         context.__parent__ = None
@@ -293,7 +275,7 @@
         context.__acl__ = []
         request = DummyRequest({})
         logger = DummyLogger()
-        policy = self._makeOne(logger)
+        policy = self._makeOne(logger, lambda *arg: None)
         authorizer_factory = make_authorizer_factory(None)
         policy.authorizer_factory = authorizer_factory
         result = policy.permits(context, request, 'view')
@@ -303,7 +285,7 @@
         self.assertEqual(authorizer_factory.permission, 'view')
         self.assertEqual(authorizer_factory.context, context)
 
-    def test_permits_no_remote_user_withparents_root_allows_everyone(self):
+    def test_permits_no_principals_withparents_root_allows_everyone(self):
         context = DummyContext()
         context.__name__ = None
         context.__parent__ = None
@@ -312,7 +294,7 @@
         context2.__parent__ = context
         request = DummyRequest({})
         logger = DummyLogger()
-        policy = self._makeOne(logger)
+        policy = self._makeOne(logger, lambda *arg: None)
         authorizer_factory = make_authorizer_factory(context)
         policy.authorizer_factory = authorizer_factory
         result = policy.permits(context, request, 'view')
@@ -321,6 +303,89 @@
         self.assertEqual(authorizer_factory.principals, (Everyone,))
         self.assertEqual(authorizer_factory.permission, 'view')
         self.assertEqual(authorizer_factory.context, context)
+    
+
+class TestRemoteUserACLSecurityPolicy(unittest.TestCase, PlacelessSetup):
+    def _getTargetClass(self):
+        from repoze.bfg.security import RemoteUserACLSecurityPolicy
+        return RemoteUserACLSecurityPolicy
+
+    def _makeOne(self, *arg, **kw):
+        klass = self._getTargetClass()
+        return klass(*arg, **kw)
+
+    def setUp(self):
+        PlacelessSetup.setUp(self)
+
+    def tearDown(self):
+        PlacelessSetup.tearDown(self)
+
+    def test_instance_implements_ISecurityPolicy(self):
+        from zope.interface.verify import verifyObject
+        from repoze.bfg.interfaces import ISecurityPolicy
+        logger = DummyLogger()
+        verifyObject(ISecurityPolicy, self._makeOne(logger))
+
+    def test_authenticated_userid(self):
+        context = DummyContext()
+        request = DummyRequest({'REMOTE_USER':'fred'})
+        logger = DummyLogger()
+        policy = self._makeOne(logger)
+        result = policy.authenticated_userid(request)
+        self.assertEqual(result, 'fred')
+
+    def test_effective_principals(self):
+        context = DummyContext()
+        request = DummyRequest({'REMOTE_USER':'fred'})
+        logger = DummyLogger()
+        policy = self._makeOne(logger)
+        result = policy.effective_principals(request)
+        from repoze.bfg.security import Everyone
+        from repoze.bfg.security import Authenticated
+        self.assertEqual(result, [Everyone, Authenticated, 'fred'])
+
+
+class TestRepozeWhoIdentityACLSecurityPolicy(unittest.TestCase, PlacelessSetup):
+    def _getTargetClass(self):
+        from repoze.bfg.security import RepozeWhoIdentityACLSecurityPolicy
+        return RepozeWhoIdentityACLSecurityPolicy
+
+    def _makeOne(self, *arg, **kw):
+        klass = self._getTargetClass()
+        return klass(*arg, **kw)
+
+    def setUp(self):
+        PlacelessSetup.setUp(self)
+
+    def tearDown(self):
+        PlacelessSetup.tearDown(self)
+
+    def test_instance_implements_ISecurityPolicy(self):
+        from zope.interface.verify import verifyObject
+        from repoze.bfg.interfaces import ISecurityPolicy
+        logger = DummyLogger()
+        verifyObject(ISecurityPolicy, self._makeOne(logger))
+
+    def test_authenticated_userid(self):
+        context = DummyContext()
+        identity = {'repoze.who.identity':{'repoze.who.userid':'fred'}}
+        request = DummyRequest(identity)
+        logger = DummyLogger()
+        policy = self._makeOne(logger)
+        result = policy.authenticated_userid(request)
+        self.assertEqual(result, 'fred')
+
+    def test_effective_principals(self):
+        context = DummyContext()
+        identity = {'repoze.who.identity':{'repoze.who.userid':'fred'}}
+        request = DummyRequest(identity)
+        logger = DummyLogger()
+        policy = self._makeOne(logger)
+        result = policy.effective_principals(request)
+        from repoze.bfg.security import Everyone
+        from repoze.bfg.security import Authenticated
+        self.assertEqual(result, [Everyone, Authenticated, 'fred'])
+
 
 class TestAPIFunctions(unittest.TestCase, PlacelessSetup):
     def setUp(self):


More information about the Repoze-checkins mailing list