[Repoze-checkins] r784 - in repoze.pam/trunk: . repoze/pam repoze/pam/fixtures repoze/pam/plugins

Chris McDonough chrism at agendaless.com
Sat Mar 8 17:44:23 UTC 2008


Author: Chris McDonough <chrism at agendaless.com>
Date: Sat Mar  8 12:44:22 2008
New Revision: 784

Log:
 - Allow a downstream application to reset the identity during egress
   (e.g. if a user changes his username or password and the
   application would rather not require that the he logs out and logs
   back in for the change to "take").  The application should set a
   key in the WSGI environment: 'repoze.pam.identity_reset', which
   should have a value that is an identity dictionary that can be
   consumed by the "remember" method of the identifier that was used
   for the current request.

 - Add 'repoze.pam.identifier' to environment on ingress, which will
   be the identifier plugin instance which gave PAM the identity that
   became REMOTE_USER (to support introspection by the application, so
   it has a shot at knowing the identity format before it sets the
   value of 'repoze.pam.identity_reset' ).

 - Allow "remote user key" (default: REMOTE_USER) to be overridden
   (pass in remote_user_key to middleware constructor).

 - Allow form plugin to override the default form.



Added:
   repoze.pam/trunk/repoze/pam/fixtures/form.html
Modified:
   repoze.pam/trunk/CHANGES.txt
   repoze.pam/trunk/repoze/pam/middleware.py
   repoze.pam/trunk/repoze/pam/plugins/form.py
   repoze.pam/trunk/repoze/pam/tests.py

Modified: repoze.pam/trunk/CHANGES.txt
==============================================================================
--- repoze.pam/trunk/CHANGES.txt	(original)
+++ repoze.pam/trunk/CHANGES.txt	Sat Mar  8 12:44:22 2008
@@ -1,3 +1,25 @@
+Unreleased
+
+ - Allow a downstream application to reset the identity during egress
+   (e.g. if a user changes his username or password and the
+   application would rather not require that the he logs out and logs
+   back in for the change to "take").  The application should set a
+   key in the WSGI environment: 'repoze.pam.identity_reset', which
+   should have a value that is an identity dictionary that can be
+   consumed by the "remember" method of the identifier that was used
+   for the current request.
+
+ - Add 'repoze.pam.identifier' to environment on ingress, which will
+   be the identifier plugin instance which gave PAM the identity that
+   became REMOTE_USER (to support introspection by the application, so
+   it has a shot at knowing the identity format before it sets the
+   value of 'repoze.pam.identity_reset' ).
+
+ - Allow "remote user key" (default: REMOTE_USER) to be overridden
+   (pass in remote_user_key to middleware constructor).
+
+ - Allow form plugin to override the default form.
+
 0.4 (03-07-2008)
 
  - Allow plugins to specify a classifiers list per interface (instead

Added: repoze.pam/trunk/repoze/pam/fixtures/form.html
==============================================================================
--- (empty file)
+++ repoze.pam/trunk/repoze/pam/fixtures/form.html	Sat Mar  8 12:44:22 2008
@@ -0,0 +1,2 @@
+<html>
+</html>

Modified: repoze.pam/trunk/repoze/pam/middleware.py
==============================================================================
--- repoze.pam/trunk/repoze/pam/middleware.py	(original)
+++ repoze.pam/trunk/repoze/pam/middleware.py	Sat Mar  8 12:44:22 2008
@@ -18,8 +18,9 @@
                  challengers,
                  classifier,
                  challenge_decider,
-                 log_stream=None,
-                 log_level=logging.INFO
+                 log_stream = None,
+                 log_level = logging.INFO,
+                 remote_user_key = 'REMOTE_USER',
                  ):
         iregistry, nregistry = make_registries(identifiers, authenticators,
                                                challengers)
@@ -28,6 +29,7 @@
         self.app = app
         self.classifier = classifier
         self.challenge_decider = challenge_decider
+        self.remote_user_key = remote_user_key
         self.logger = None
         if log_stream:
             handler = logging.StreamHandler(log_stream)
@@ -39,8 +41,9 @@
             self.logger.setLevel(log_level)
 
     def __call__(self, environ, start_response):
-        if REMOTE_USER(environ):
-            # act as a pass through if REMOTE_USER is already set
+        if self.remote_user_key in environ:
+            # act as a pass through if REMOTE_USER (or whatever) is
+            # already set
             return self.app(environ, start_response)
 
         environ['repoze.pam.plugins'] = self.name_registry
@@ -62,10 +65,16 @@
             if auth_ids:
                 auth_ids.sort()
                 best = auth_ids[0]
+                authenticator = best[0][1]
+                identifier = best[1][1]
                 identity = best[2]
                 userid = best[3]
-                identifier = best[1][1]
-                environ['REMOTE_USER'] = userid
+                # add the identifier plugin to the environment; it may
+                # allow a downstream application to better decide how
+                # to manufacture a 'repoze.pam.identity_reset'
+                environ['repoze.pam.identifier'] = identifier
+                # set the REMOTE_USER
+                environ[self.remote_user_key] = userid
         else:
             logger and logger.info('no identities found, not authenticating')
 
@@ -95,6 +104,17 @@
         else:
             logger and logger.info('no challenge required')
             remember_headers = []
+            app_identity = environ.get('repoze.pam.identity_reset')
+            if app_identity:
+                # A downstream application has requested an "identity
+                # reset" (e.g. a user changed his username or password
+                # or both, and the application doesn't want to require
+                # that he log in again for the new identity to
+                # 'take').  We don't want to expose the identity to
+                # upstream consumers as it may contain cleartext
+                # password info.
+                del environ['repoze.pam.identity_reset']
+                identity = app_identity
             if identifier:
                 remember_headers = identifier.remember(environ, identity)
                 if remember_headers:
@@ -316,7 +336,7 @@
         challengers,
         default_request_classifier,
         default_challenge_decider,
-        log_stream= log_stream,
+        log_stream = log_stream,
         log_level = logging.DEBUG
         )
     return middleware

Modified: repoze.pam/trunk/repoze/pam/plugins/form.py
==============================================================================
--- repoze.pam/trunk/repoze/pam/plugins/form.py	(original)
+++ repoze.pam/trunk/repoze/pam/plugins/form.py	Sat Mar  8 12:44:22 2008
@@ -12,7 +12,7 @@
 _DEFAULT_FORM = """
 <html>
 <head>
-  <title>Login Form</title>
+  <title>Log In</title>
 </head>
 <body>
   <div>
@@ -36,7 +36,6 @@
     </table>
   </form>
   <pre>
-  %s
   </pre>
 </body>
 </html>
@@ -46,12 +45,13 @@
 
     implements(IChallenger, IIdentifier)
     
-    def __init__(self, login_form_qs, rememberer_name):
+    def __init__(self, login_form_qs, rememberer_name, formbody=None):
         self.login_form_qs = login_form_qs
         # rememberer_name is the name of another configured plugin which
         # implements IIdentifier, to handle remember and forget duties
         # (ala a cookie plugin or a session plugin)
         self.rememberer_name = rememberer_name
+        self.formbody = formbody
 
     # IIdentifier
     def identify(self, environ):
@@ -91,9 +91,8 @@
     # IChallenger
     def challenge(self, environ, status, app_headers, forget_headers):
         # heck yeah.
+        form = self.formbody or _DEFAULT_FORM
         def auth_form(environ, start_response):
-            import pprint
-            form = _DEFAULT_FORM % pprint.pformat(environ)
             content_length = CONTENT_LENGTH.tuples(str(len(form)))
             content_type = CONTENT_TYPE.tuples('text/html')
             headers = content_length + content_type + forget_headers
@@ -105,10 +104,13 @@
     def __repr__(self):
         return '<%s %s>' % (self.__class__.__name__, id(self))
 
-def make_plugin(pam_conf, login_form_qs='__do_login', rememberer_name=None):
+def make_plugin(pam_conf, login_form_qs='__do_login', rememberer_name=None,
+                form=None):
     if rememberer_name is None:
         raise ValueError(
             'must include rememberer key (name of another IIdentifier plugin)')
-    plugin = FormPlugin(login_form_qs, rememberer_name)
+    if form is not None:
+        form = open(form).read()
+    plugin = FormPlugin(login_form_qs, rememberer_name, form)
     return plugin
 

Modified: repoze.pam/trunk/repoze/pam/tests.py
==============================================================================
--- repoze.pam/trunk/repoze/pam/tests.py	(original)
+++ repoze.pam/trunk/repoze/pam/tests.py	Sat Mar  8 12:44:22 2008
@@ -276,7 +276,7 @@
                               [], identifier, identity)
         self.assertEqual(result, None)
         self.assertEqual(environ['challenged'], None)
-        self.assertEqual(identifier.forgotten, True)
+        self.assertEqual(identifier.forgotten, identity)
 
     def test_challenge_identifier_app(self):
         environ = self._makeEnviron()
@@ -290,7 +290,7 @@
                                [], identifier, identity)
         self.assertEqual(result, app)
         self.assertEqual(environ['challenged'], app)
-        self.assertEqual(identifier.forgotten, True)
+        self.assertEqual(identifier.forgotten, identity)
 
     def test_multi_challenge_firstwins(self):
         environ = self._makeEnviron()
@@ -306,7 +306,7 @@
                               [], identifier, identity)
         self.assertEqual(result, app1)
         self.assertEqual(environ['challenged'], app1)
-        self.assertEqual(identifier.forgotten, True)
+        self.assertEqual(identifier.forgotten, identity)
 
     def test_multi_challenge_skipnomatch_findimplicit(self):
         environ = self._makeEnviron()
@@ -325,7 +325,7 @@
                                [], identifier, identity)
         self.assertEqual(result, app2)
         self.assertEqual(environ['challenged'], app2)
-        self.assertEqual(identifier.forgotten, True)
+        self.assertEqual(identifier.forgotten, identity)
 
     def test_multi_challenge_skipnomatch_findexplicit(self):
         environ = self._makeEnviron()
@@ -344,7 +344,7 @@
                                [], identifier, identity)
         self.assertEqual(result, app2)
         self.assertEqual(environ['challenged'], app2)
-        self.assertEqual(identifier.forgotten, True)
+        self.assertEqual(identifier.forgotten, identity)
 
     def test_call_remoteuser_already_set(self):
         environ = self._makeEnviron({'REMOTE_USER':'admin'})
@@ -439,8 +439,9 @@
         result = mw(environ, start_response)
         self.assertEqual(environ['challenged'], challenge_app)
         self.failUnless(result[0].startswith('401 Unauthorized\r\n'))
-        self.assertEqual(identifier.forgotten, True)
+        self.assertEqual(identifier.forgotten, identifier.credentials)
         self.assertEqual(environ['REMOTE_USER'], 'chris')
+        self.assertEqual(environ['repoze.pam.identifier'], identifier)
 
     def test_call_200_challenger_and_identifier_and_authenticator(self):
         environ = self._makeEnviron()
@@ -461,9 +462,34 @@
         result = mw(environ, start_response)
         self.assertEqual(environ.get('challenged'), None)
         self.assertEqual(identifier.forgotten, False)
-        self.assertEqual(identifier.remembered, True)
+        self.assertEqual(identifier.remembered, identifier.credentials)
         self.assertEqual(environ['REMOTE_USER'], 'chris')
+        self.assertEqual(environ['repoze.pam.identifier'], identifier)
 
+    def test_call_200_identity_reset(self):
+        environ = self._makeEnviron()
+        headers = [('a', '1')]
+        new_identity = {'user_id':'foo', 'password':'bar'}
+        app = DummyIdentityResetApp('200 OK', headers, new_identity)
+        from paste.httpexceptions import HTTPUnauthorized
+        challenge_app = HTTPUnauthorized()
+        challenge = DummyChallenger(challenge_app)
+        challengers = [ ('challenge', challenge) ]
+        identifier = DummyIdentifier()
+        identifiers = [ ('identifier', identifier) ]
+        authenticator = DummyAuthenticator()
+        authenticators = [ ('authenticator', authenticator) ]
+        mw = self._makeOne(app=app, challengers=challengers,
+                           identifiers=identifiers,
+                           authenticators=authenticators)
+        start_response = DummyStartResponse()
+        result = mw(environ, start_response)
+        self.failIf(environ.has_key('repoze.pam.identity_reset'))
+        self.assertEqual(environ.get('challenged'), None)
+        self.assertEqual(identifier.forgotten, False)
+        self.assertEqual(identifier.remembered, new_identity)
+        self.assertEqual(environ['REMOTE_USER'], 'chris')
+        self.assertEqual(environ['repoze.pam.identifier'], identifier)
 
 
     # XXX need more call tests:
@@ -807,8 +833,10 @@
         from repoze.pam.plugins.form import FormPlugin
         return FormPlugin
 
-    def _makeOne(self, login_form_qs='__do_login', rememberer_name='cookie'):
-        plugin = self._getTargetClass()(login_form_qs, rememberer_name)
+    def _makeOne(self, login_form_qs='__do_login', rememberer_name='cookie',
+                 formbody=None):
+        plugin = self._getTargetClass()(login_form_qs, rememberer_name,
+                                        formbody)
         return plugin
 
     def _makeFormEnviron(self, login=None, password=None, do_login=False):
@@ -873,24 +901,70 @@
     def test_remember(self):
         plugin = self._makeOne()
         environ = self._makeFormEnviron()
-        result = plugin.remember(environ, {})
+        identity = {}
+        result = plugin.remember(environ, identity)
         self.assertEqual(result, None)
         self.assertEqual(environ['repoze.pam.plugins']['cookie'].remembered,
-                         True)
+                         identity)
 
     def test_forget(self):
         plugin = self._makeOne()
         environ = self._makeFormEnviron()
-        result = plugin.forget(environ, {})
+        identity = {}
+        result = plugin.forget(environ, identity)
         self.assertEqual(result, None)
         self.assertEqual(environ['repoze.pam.plugins']['cookie'].forgotten,
-                         True)
-
-    def test_factory(self):
-        from repoze.pam.plugins.cookie import make_plugin
-        plugin = make_plugin(None, 'foo')
-        self.assertEqual(plugin.cookie_name, 'foo')
+                         identity
+                         )
 
+    def test_challenge_defaultform(self):
+        from repoze.pam.plugins.form import _DEFAULT_FORM
+        plugin = self._makeOne()
+        environ = self._makeFormEnviron()
+        app = plugin.challenge(environ, '401 Unauthorized', [], [])
+        sr = DummyStartResponse()
+        result = app(environ, sr)
+        self.assertEqual(''.join(result), _DEFAULT_FORM)
+        self.assertEqual(len(sr.headers), 2)
+        cl = str(len(_DEFAULT_FORM))
+        self.assertEqual(sr.headers[0], ('Content-Length', cl))
+        self.assertEqual(sr.headers[1], ('Content-Type', 'text/html'))
+        self.assertEqual(sr.status, '200 OK')
+
+    def test_challenge_customform(self):
+        here = os.path.dirname(__file__)
+        fixtures = os.path.join(here, 'fixtures')
+        form = os.path.join(fixtures, 'form.html')
+        formbody = open(form).read()
+        plugin = self._makeOne(formbody=formbody)
+        environ = self._makeFormEnviron()
+        app = plugin.challenge(environ, '401 Unauthorized', [], [])
+        sr = DummyStartResponse()
+        result = app(environ, sr)
+        self.assertEqual(''.join(result), formbody)
+        self.assertEqual(len(sr.headers), 2)
+        cl = str(len(formbody))
+        self.assertEqual(sr.headers[0], ('Content-Length', cl))
+        self.assertEqual(sr.headers[1], ('Content-Type', 'text/html'))
+        self.assertEqual(sr.status, '200 OK')
+
+    def test_factory_withform(self):
+        from repoze.pam.plugins.form import make_plugin
+        here = os.path.dirname(__file__)
+        fixtures = os.path.join(here, 'fixtures')
+        form = os.path.join(fixtures, 'form.html')
+        formbody = open(form).read()
+        plugin = make_plugin(None, '__login', 'cookie', form)
+        self.assertEqual(plugin.login_form_qs, '__login')
+        self.assertEqual(plugin.rememberer_name, 'cookie')
+        self.assertEqual(plugin.formbody, formbody)
+
+    def test_factory_defaultform(self):
+        from repoze.pam.plugins.form import make_plugin
+        plugin = make_plugin(None, '__login', 'cookie')
+        self.assertEqual(plugin.login_form_qs, '__login')
+        self.assertEqual(plugin.rememberer_name, 'cookie')
+        self.assertEqual(plugin.formbody, None)
 
 class TestDefaultRequestClassifier(Base):
     def _getFUT(self):
@@ -1146,6 +1220,18 @@
         self.environ = environ
         start_response(self.status, self.headers)
         return ['body']
+
+class DummyIdentityResetApp:
+    def __init__(self, status, headers, new_identity):
+        self.status = status
+        self.headers = headers
+        self.new_identity = new_identity
+
+    def __call__(self, environ, start_response):
+        self.environ = environ
+        environ['repoze.pam.identity_reset'] = self.new_identity
+        start_response(self.status, self.headers)
+        return ['body']
     
 class DummyRequestClassifier:
     def __call__(self, environ):
@@ -1163,10 +1249,10 @@
         return self.credentials
 
     def forget(self, environ, identity):
-        self.forgotten = True
+        self.forgotten = identity
 
     def remember(self, environ, identity):
-        self.remembered = True
+        self.remembered = identity
 
 class DummyNoResultsIdentifier:
     def identify(self, environ):


More information about the Repoze-checkins mailing list