[Repoze-checkins] r734 - in repoze.pam/trunk/repoze/pam: . plugins

Chris McDonough chrism at agendaless.com
Mon Feb 25 03:02:51 UTC 2008


Author: Chris McDonough <chrism at agendaless.com>
Date: Sun Feb 24 22:02:50 2008
New Revision: 734

Log:
Add form and cookie plugins, add post_extractor plugin type.



Added:
   repoze.pam/trunk/repoze/pam/plugins/cookie.py   (contents, props changed)
   repoze.pam/trunk/repoze/pam/plugins/form.py   (contents, props changed)
Modified:
   repoze.pam/trunk/repoze/pam/classifiers.py
   repoze.pam/trunk/repoze/pam/interfaces.py
   repoze.pam/trunk/repoze/pam/middleware.py
   repoze.pam/trunk/repoze/pam/plugins/basicauth.py
   repoze.pam/trunk/repoze/pam/tests.py

Modified: repoze.pam/trunk/repoze/pam/classifiers.py
==============================================================================
--- repoze.pam/trunk/repoze/pam/classifiers.py	(original)
+++ repoze.pam/trunk/repoze/pam/classifiers.py	Sun Feb 24 22:02:50 2008
@@ -53,7 +53,7 @@
     
 class DefaultResponseClassifier(object):
     implements(IResponseClassifier)
-    def __call__(self, environ, request_classification, headers, exception):
+    def __call__(self, environ, request_classification, status, headers):
         """ By default, return the request classification """
         return request_classification
     

Modified: repoze.pam/trunk/repoze/pam/interfaces.py
==============================================================================
--- repoze.pam/trunk/repoze/pam/interfaces.py	(original)
+++ repoze.pam/trunk/repoze/pam/interfaces.py	Sun Feb 24 22:02:50 2008
@@ -2,27 +2,36 @@
 
 class IRequestClassifier(Interface):
     """ On ingress: classify a request.
-
-    This interface is responsible for returning a string representing
-    a classification name based on introspection of the WSGI
-    environment (environ).
     """
     def __call__(environ):
-        """ Return a string representing the classification of this
-        request. """
+        """ environ -> request classifier string
+
+        This interface is responsible for returning a string
+        value representing a request classification.
+
+        o 'environ' is the WSGI environment.
+        """
 
 class IResponseClassifier(Interface):
     """ On egress: classify a response.
+    """
+    def __call__(environ, request_classification, status, headers):
+        """ args -> response classifier string
 
-    This interface is responsible for returning a string representing
-    a classification name based on introspection of the ingress
-    classification, the WSGI environment (environ), the headers
-    returned in the response (headers), or the exception raised by a
-    downstream application.
-    """
-    def __call__(environ, request_classification, headers, exception):
-        """ Return a string representing the classification of this
-        request. """
+        This interface is responsible for returning a string representing
+        a response classification.
+
+        o 'environ' is the WSGI environment.
+
+        o 'request_classification' is the classification returned during
+          ingress by the request classifier.
+
+        o 'status' is the status written into start_response by
+          the downstream application.
+
+        o 'headers' is the headers tuple written into start_response
+          by the downstream application.
+          """
 
 class IExtractorPlugin(Interface):
 
@@ -48,7 +57,34 @@
         o Only extraction plugins which match one of the the current
           request's classifications will be asked to perform extraction.
         """
-    
+
+class IPostExtractorPlugin(Interface):
+    """ On ingress: allow the plugin to have a chance to influence the
+    environment once credentials are established and return extra
+    headers that will be set in the eventual response.
+
+    Each post-extractor matching the request classification is called
+    unconditionally after extraction.
+    """
+
+    def post_extract(environ, credentials, extractor):
+        """ args -> [ (header-name, header-value), ..] | None
+
+        o 'environ' is the WSGI environment.
+
+        o credentials are the credentials that were extracted by
+          repoze.pam during the extraction step.
+
+        o 'extractor' is the plugin instance that provided the
+          credentials.  If no plugin instance provided credentials to
+          repoze.pam, this will be None.
+
+        The return value should be a list of tuples, where each tuple is
+        in the form (header-name, header-value), e.g.
+        [ ('Set-Cookie', 'cookie_name=foo; Path=/') ] or None if
+        no headers should be set.
+        """
+
 class IAuthenticatorPlugin(Interface):
 
     """ On ingress: Map credentials to a user ID.
@@ -70,21 +106,29 @@
 
 class IChallengerPlugin(Interface):
 
-    """ On egress: Initiate a challenge to the user to provide credentials.
-
-        o 'environ' is the WSGI environment.
+    """ On egress: Conditionally initiate a challenge to the user to
+        provide credentials.
 
-        o Only challenge plugins which match one of the the current
-          request's classifications will be asked to perform a
-          challenge.
+        Only challenge plugins which match one of the the current
+        response's classifications will be asked to perform a
+        challenge.
     """
 
-    def challenge(environ, request_classifier, headers, exception):
+    def challenge(environ, status, headers):
+        """ args -> WSGI application or None
+
+        o 'environ' is the WSGI environment.
+
+        o 'status' is the status written into start_response by the
+          downstream application.
 
-        """ Examine the values passed in and perform an arbitrary action
-        (usually mutating environ or raising an exception) to cause a
-        challenge to be raised to the user.
+        o 'headers' is the headers tuple written into start_response by the
+          downstream application.
 
-        The return value of this method is ignored.
+        Examine the values passed in and return a WSGI application
+        (a callable which accepts environ and start_response as its
+        two positional arguments, ala PEP 333) which causes a
+        challenge to be performed.  Return None to forego performing a
+        challenge.
         """
 

Modified: repoze.pam/trunk/repoze/pam/middleware.py
==============================================================================
--- repoze.pam/trunk/repoze/pam/middleware.py	(original)
+++ repoze.pam/trunk/repoze/pam/middleware.py	Sun Feb 24 22:02:50 2008
@@ -1,113 +1,290 @@
+import logging
+from StringIO import StringIO
+import sys
+
+from paste.httpheaders import REMOTE_USER
+
 from repoze.pam.interfaces import IAuthenticatorPlugin
 from repoze.pam.interfaces import IExtractorPlugin
+from repoze.pam.interfaces import IPostExtractorPlugin
 from repoze.pam.interfaces import IChallengerPlugin
 
+class StartResponseWrapper(object):
+    def __init__(self, start_response, extra_headers):
+        self.start_response = start_response
+        self.extra_headers = extra_headers
+        self.headers = []
+        self.buffer = StringIO()
+
+    def wrap_start_response(self, status, headers, exc_info=None):
+        self.headers = headers
+        self.status = status
+        return self.buffer.write
+
+    def finish_response(self):
+        headers = self.headers + self.extra_headers
+        write = self.start_response(self.status, headers)
+        if write:
+            self.buffer.seek(0)
+            write(self.buffer.getvalue())
+            if hasattr(write, 'close'):
+                write.close()
+
+_STARTED = '-- repoze.pam request started --'
+_ENDED = '-- repoze.pam request ended --'
+
 class PluggableAuthenticationMiddleware(object):
-    def __init__(self, app, registry, request_classifier, response_classifier,
-                 add_credentials=True):
+    def __init__(self, app,
+                 registry,
+                 request_classifier,
+                 response_classifier,
+                 add_credentials=False,
+                 log_stream=None,
+                 log_level=logging.INFO):
         self.registry = registry
         self.app = app
         self.request_classifier = request_classifier
-        self.response_classififer = response_classifier
+        self.response_classifier = response_classifier
         self.add_credentials = add_credentials
+        self.logger = None
+        if log_stream:
+            handler = logging.StreamHandler(log_stream)
+            fmt = '%(asctime)s %(message)s'
+            formatter = logging.Formatter(fmt)
+            handler.setFormatter(formatter)
+            self.logger = logging.Logger('repoze.pam')
+            self.logger.addHandler(handler)
+            self.logger.setLevel(log_level)
 
     def __call__(self, environ, start_response):
-        request_classification = self.on_ingress(environ)
-        return self.app(environ, start_response)
-        # XXX on_egress
+        logger = self.logger
+        logger and logger.info(_STARTED)
+        classification, extra_headers = self.modify_environment(environ)
+
+        wrapper = StartResponseWrapper(start_response, extra_headers)
+        app_iter = self.app(environ, wrapper.wrap_start_response)
+
+        challenge_app = self.challenge(
+            environ,
+            classification,
+            wrapper.status,
+            wrapper.headers
+            )
+        logger and logger.info('challenge app used: %s' % challenge_app)
+
+        if challenge_app is not None:
+            if hasattr(app_iter, 'close'):
+                app_iter.close()
+            logger and logger.info(_ENDED)
+            return challenge_app(environ, start_response)
+        else:
+            wrapper.finish_response()
+            logger and logger.info(_ENDED)
+            return app_iter
 
-    def on_ingress(self, environ):
+    def modify_environment(self, environ):
+        # happens on ingress
         classification = self.request_classifier(environ)
-        credentials = self.extract(environ, classification)
-
+        logger = self.logger
+        logger and logger.info('request classification: %s' % classification)
+        credentials, extractor = self.extract(environ, classification)
+        headers = self.after_extract(environ, credentials, extractor,
+                                     classification)
         userid = None
 
         if credentials:
-            userid = self.authenticate(environ, credentials, classification)
+            userid, authenticator = self.authenticate(environ,
+                                                      credentials,
+                                                      classification)
+        else:
+            logger and logger.info(
+                'no authenticator plugin used (no credentials)')
 
         if self.add_credentials:
             environ['repoze.pam.credentials'] = credentials
 
-        if userid:
-            environ['REMOTE_USER'] = userid
-
-        return classification
+        remote_user_not_set = not REMOTE_USER(environ)
 
-    def on_egress(self, environ, request_classifier, headers, exception):
-        self.challenge(environ, request_classifier, headers, exception)
+        if remote_user_not_set and userid:
+            # only set REMOTE_USER if it's not yet set
+            logger and logger.info('REMOTE_USER set to %s' % userid)
+            environ['REMOTE_USER'] = userid
+        else:
+            logger and logger.info('REMOTE_USER not set')
 
-    def extract(self, environ, classifier):
-        extractor_candidates = self.registry.get(IExtractorPlugin)
-        extractors = self._match_classifier(extractor_candidates, classifier)
+        return classification, headers
+        
+    def extract(self, environ, classification):
+        # happens on ingress
+        candidates = self.registry.get(IExtractorPlugin, ())
+        plugins = self._match_classification(candidates, classification,
+                                             'request_classifications')
+        logger = self.logger
+        logger and self.logger.info(
+            'extractor plugins consulted %s' % plugins)
 
-        for extractor in extractors:
-            creds = extractor.extract(environ)
+        for plugin in plugins:
+            creds = plugin.extract(environ)
+            logger and logger.debug(
+                'credentials returned from extractor %s: %s' %
+                (plugin, creds)
+                )
             if creds:
                 # XXX PAS returns all credentials (it fully iterates over all
                 # extraction plugins)
-                return creds
-        return {}
+                logger and logger.info(
+                    'using credentials returned from extractor %s' % plugin)
+                return creds, plugin
+        logger and logger.info('no extractor plugins found credentials')
+        return {}, None
+
+    def after_extract(self, environ, credentials, extractor, classification):
+        candidates = self.registry.get(IPostExtractorPlugin, ())
+        plugins = self._match_classification(candidates,
+                                             classification,
+                                             'request_classifications')
+        logger = self.logger
+        logger and logger.info(
+            'post-extractor plugins consulted %s' % plugins)
+
+        extra_headers = {}
+        for plugin in plugins:
+            headers = plugin.post_extract(environ, credentials, extractor)
+            logger and logger.debug(
+                'headers returned from post-extractor %s: %s' %
+                (plugin, headers)
+                )
+            if headers:
+                extra_headers[plugin] = headers
+
+        logger and logger.info('extra headers gathered: %s' % extra_headers)
+
+        return flatten(extra_headers.values())
 
     def authenticate(self, environ, credentials, classification):
-        # on ingress
-        userid = None
-        auth_candidates = self.registry.get(IAuthenticatorPlugin)
-        authenticators = self._match_classifier(auth_candidates, classification)
-        for authenticator in authenticators:
-            userid = authenticator.authenticate(environ, credentials)
+        # happens on ingress
+        candidates = self.registry.get(IAuthenticatorPlugin, ())
+        plugins = self._match_classification(candidates,
+                                             classification,
+                                             'request_classifications')
+        logger = self.logger
+
+        logger and logger.info(
+            'authenticator plugins consulted %s' % plugins)
+
+        for plugin in plugins:
+            userid = plugin.authenticate(environ, credentials)
+            logger and logger.info(
+                'userid returned from authenticator %s: %s' %
+                (plugin, userid)
+                )
             if userid:
-                # XXX PAS calls all authenticators (it fully iterates over all
-                # authenticator plugins)
-                break
-        return userid
-
-    def challenge(self, environ, request_classification, headers, exception):
-        # on egress
-        classification = self.response_classifier(environ,
-                                                  request_classification,
-                                                  headers,
-                                                  exception)
-        challenger_candidates = self.registry.get(IChallengerPlugin)
-        challengers = self._match_classifier(challenger_candidates,
-                                              classification)
-        for challenger in challengers:
-            new_headers, new_status = challengers.challenge(environ)
+                logger and logger.info(
+                    'using userid returned from authenticator %s' % plugin)
+                return userid, plugin
+
+        logger and logger.info('no authenticator plugin authenticated a userid')
+        return None, None
+
+    def challenge(self, environ, request_classification, status, headers):
+        # happens on egress
+        classification = self.response_classifier(
+            environ,
+            request_classification,
+            status,
+            headers
+            )
+
+        logger = self.logger
+        logger and logger.info('response classification: %s' % classification)
+
+        candidates = self.registry.get(IChallengerPlugin, ())
+        plugins = self._match_classification(candidates,
+                                             classification,
+                                             'response_classifications')
+        logger and logger.info('challenger plugins consulted: %s' % plugins)
+        for plugin in plugins:
+            app = plugin.challenge(environ, status, headers)
+            logger and logger.debug('app returned from challenger %s: %s' %
+                                    (plugin, app)
+                                    )
+            if app is not None:
+                # new WSGI application
+                logger and logger.info(
+                    'challenger plugin %s returned an app: %s' % (plugin, app))
+                return app
+        logger and logger.info('no challenge app returned')
+        # signifies no challenge
+        return None
 
-    def _match_classifier(self, plugins, classifier):
+    def _match_classification(self, plugins, classification, attr):
         result = []
         for plugin in plugins:
-            plugin_classifiers = getattr(plugin, 'classifiers', None)
-            if not plugin_classifiers: # good for any
+            plugin_classifications = getattr(plugin, attr, None)
+            if not plugin_classifications: # good for any
                 result.append(plugin)
                 continue
-            if classifier in plugin_classifiers:
+            if classification in plugin_classifications:
                 result.append(plugin)
                     
         return result
 
+def flatten(L):
+    result = []
+    for seq in L:
+        for item in seq:
+            result.append(item)
+    return result
+
 def make_middleware(app, global_conf, config_file=None):
     if config_file is None:
         raise ValueError('config_file must be specified')
     return PluggableAuthenticationMiddleware(app)
 
 def make_test_middleware(app, global_conf):
-    # no config file required
+    # be able to test without a config file
     from repoze.pam.plugins.basicauth import BasicAuthPlugin
     from repoze.pam.plugins.htpasswd import HTPasswdPlugin
+    from repoze.pam.plugins.cookie import InsecureCookiePlugin
+    from repoze.pam.plugins.form import FormPlugin
     basicauth = BasicAuthPlugin('repoze.pam')
-    basicauth.classifiers = set() # good for any
+    any = set() # means good for any classification
+    basicauth.request_classifications = any
+    basicauth.response_classifications = any
     from StringIO import StringIO
     from repoze.pam.plugins.htpasswd import crypt_check
-    io = StringIO('chrism:aajfMKNH1hTm2\n')
+    io = StringIO()
+    salt = 'aa'
+    import crypt
+    for name, password in [ ('admin', 'admin') ]:
+        io.write('name:%s\n' % crypt.crypt(password, salt))
     htpasswd = HTPasswdPlugin(io, crypt_check)
-    htpasswd.classifiers = set() # good for any
-    registry = make_registry((basicauth,), (htpasswd,), (basicauth,))
-    class DummyClassifier:
-        def __call__(self, *arg, **kw):
-            return None
-    classifier = DummyClassifier()
-    middleware = PluggableAuthenticationMiddleware(app, registry,
-                                                   classifier, classifier)
+    htpasswd.request_classifications = any
+    htpasswd.response_classifications = any
+    cookie = InsecureCookiePlugin('oatmeal')
+    cookie.request_classifications = any
+    cookie.response_classifications = any
+    form = FormPlugin('__do_login')
+    # only do form extract/challenge for browser requests
+    form.request_classifications = set(('browser',)) 
+    form.response_classifications = set(('browser',)) 
+    registry = make_registry(
+        extractors = (cookie, basicauth, form),
+        post_extractors = (cookie, basicauth),
+        authenticators = (htpasswd,),
+        challengers = (form, basicauth),
+        )
+    from repoze.pam.classifiers import DefaultRequestClassifier
+    from repoze.pam.classifiers import DefaultResponseClassifier
+    request_classifier = DefaultRequestClassifier()
+    response_classifier = DefaultResponseClassifier()
+    middleware = PluggableAuthenticationMiddleware(app,
+                                                   registry,
+                                                   request_classifier,
+                                                   response_classifier,
+                                                   log_stream=sys.stdout,
+                                                   log_level = logging.DEBUG
+                                                   )
     return middleware
 
 def verify(plugins, iface):
@@ -115,10 +292,12 @@
     for plugin in plugins:
         verifyObject(iface, plugin, tentative=True)
     
-def make_registry(extractors, authenticators, challengers):
+def make_registry(extractors, post_extractors, authenticators, challengers):
     registry = {}
     verify(extractors, IExtractorPlugin)
     registry[IExtractorPlugin] = extractors
+    verify(post_extractors, IExtractorPlugin)
+    registry[IPostExtractorPlugin] = post_extractors
     verify(authenticators, IAuthenticatorPlugin)
     registry[IAuthenticatorPlugin] = authenticators
     verify(challengers, IChallengerPlugin)

Modified: repoze.pam/trunk/repoze/pam/plugins/basicauth.py
==============================================================================
--- repoze.pam/trunk/repoze/pam/plugins/basicauth.py	(original)
+++ repoze.pam/trunk/repoze/pam/plugins/basicauth.py	Sun Feb 24 22:02:50 2008
@@ -8,43 +8,51 @@
 
 from repoze.pam.interfaces import IChallengerPlugin
 from repoze.pam.interfaces import IExtractorPlugin
+from repoze.pam.interfaces import IPostExtractorPlugin
 
 class BasicAuthPlugin(object):
 
-    implements(IChallengerPlugin, IExtractorPlugin)
+    implements(IChallengerPlugin, IExtractorPlugin, IPostExtractorPlugin)
     
     def __init__(self, realm):
         self.realm = realm
 
     # IChallengerPlugin
-    def challenge(self, environ, request_classifier, headers, exception):
-        head = WWW_AUTHENTICATE.tuples('Basic realm="%s"' % self.realm)
-        raise HTTPUnauthorized(headers=head)
+    def challenge(self, environ, status, headers):
+        if status == '401 Unauthorized':
+            headers = WWW_AUTHENTICATE.tuples('Basic realm="%s"' % self.realm)
+            return HTTPUnauthorized(headers=headers)
 
     # IExtractorPlugin
     def extract(self, environ):
         authorization = AUTHORIZATION(environ)
         try:
             authmeth, auth = authorization.split(' ', 1)
-        except ValueError:
-            # not enough values to unpack
+        except ValueError: # not enough values to unpack
             return {}
         if authmeth.lower() == 'basic':
             try:
                 auth = auth.strip().decode('base64')
-            except binascii.Error:
-                # can't decode
+            except binascii.Error: # can't decode
                 return {}
             try:
                 login, password = auth.split(':', 1)
-            except ValueError:
-                # not enough values to unpack
+            except ValueError: # not enough values to unpack
                 return {}
-
-            return {'login':login, 'password':password}
+            auth = {'login':login, 'password':password}
+            return auth
 
         return {}
 
+    # IPostExtractorPlugin
+    def post_extract(self, environ, credentials, extractor):
+        if credentials:
+            if not AUTHORIZATION(environ):
+                auth = '%(login)s:%(password)s' % credentials
+                auth = auth.encode('base64').rstrip()
+                header = 'Basic %s' % auth
+                environ['HTTP_AUTHORIZATION'] = header
+
 def make_plugin(pam_conf, realm='basic'):
     plugin = BasicAuthPlugin(realm)
     return plugin

Added: repoze.pam/trunk/repoze/pam/plugins/cookie.py
==============================================================================
--- (empty file)
+++ repoze.pam/trunk/repoze/pam/plugins/cookie.py	Sun Feb 24 22:02:50 2008
@@ -0,0 +1,57 @@
+import binascii
+
+from paste.request import get_cookies
+
+from zope.interface import implements
+
+from repoze.pam.interfaces import IExtractorPlugin
+from repoze.pam.interfaces import IPostExtractorPlugin
+
+class InsecureCookiePlugin(object):
+
+    implements(IExtractorPlugin, IPostExtractorPlugin)
+    
+    def __init__(self, cookie_name):
+        self.cookie_name = cookie_name
+
+    # IExtractorPlugin
+    def extract(self, environ):
+        cookies = get_cookies(environ)
+        cookie = cookies.get(self.cookie_name)
+
+        if cookie is None:
+            return {}
+
+        try:
+            auth = cookie.value.decode('base64')
+        except binascii.Error: # can't decode
+            return {}
+
+        try:
+            login, password = auth.split(':', 1)
+            return {'login':login, 'password':password}
+        except ValueError: # not enough values to unpack
+            return {}
+
+    # IPostExtractorPlugin
+    def post_extract(self, environ, credentials, extractor):
+        if credentials:
+            cookie_value = '%(login)s:%(password)s' % credentials
+            cookie_value = cookie_value.encode('base64').rstrip()
+            cookies = get_cookies(environ)
+            existing = cookies.get(self.cookie_name)
+            value = getattr(existing, 'value', None)
+            if value != cookie_value:
+                # go ahead and set it in the environment for downstream
+                # apps to consume (XXX?)
+                cookies[self.cookie_name] = cookie_value
+                output = cookies.output(header='', sep='').lstrip()
+                environ['HTTP_COOKIE'] = output
+                # return a Set-Cookie header
+                set_cookie = '%s=%s; Path=/;' % (self.cookie_name, cookie_value)
+                return [('Set-Cookie', set_cookie)]
+
+def make_plugin(pam_conf, cookie_name='repoze.pam.plugins.cookie'):
+    plugin = InsecureCookiePlugin(cookie_name)
+    return plugin
+

Added: repoze.pam/trunk/repoze/pam/plugins/form.py
==============================================================================
--- (empty file)
+++ repoze.pam/trunk/repoze/pam/plugins/form.py	Sun Feb 24 22:02:50 2008
@@ -0,0 +1,90 @@
+from paste.httpheaders import CONTENT_LENGTH
+from paste.httpheaders import CONTENT_TYPE
+
+from paste.request import parse_dict_querystring
+from paste.request import parse_formvars
+
+from zope.interface import implements
+
+from repoze.pam.interfaces import IChallengerPlugin
+from repoze.pam.interfaces import IExtractorPlugin
+
+_DEFAULT_FORM = """
+<html>
+<head>
+  <title>Login Form</title>
+</head>
+<body>
+  <div>
+     <b>Log In</b>
+  </div>
+  <br/>
+  <form method="POST" action="?__do_login=true">
+    <table border="0">
+    <tr>
+      <td>User Name</td>
+      <td><input type="text" name="login"></input></td>
+    </tr>
+    <tr>
+      <td>Password</td>
+      <td><input type="password" name="password"></input></td>
+    </tr>
+    <tr>
+      <td></td>
+      <td><input type="submit" name="submit" value="Log In"/></td>
+    </tr>
+    </table>
+  </form>
+  <pre>
+  %s
+  </pre>
+</body>
+</html>
+"""
+
+def auth_form(environ, start_response):
+    import pprint
+    form = _DEFAULT_FORM % pprint.pformat(environ)
+    content_length = CONTENT_LENGTH.tuples(str(len(form)))
+    content_type = CONTENT_TYPE.tuples('text/html')
+    headers = content_length + content_type
+    start_response('200 OK', headers)
+    return [form]
+
+class FormPlugin(object):
+
+    implements(IChallengerPlugin, IExtractorPlugin)
+    
+    def __init__(self, login_form_qs):
+        self.login_form_qs = login_form_qs
+
+    # IExtractorPlugin
+    def extract(self, environ):
+        query = parse_dict_querystring(environ)
+        # If the extractor finds a special query string on any request,
+        # it will attempt to find the values in the input body.
+        if query.get(self.login_form_qs): 
+            form = parse_formvars(environ)
+            from StringIO import StringIO
+            # XXX we need to replace wsgi.input because we've read it
+            # this smells funny
+            environ['wsgi.input'] = StringIO()
+            form.update(query)
+            try:
+                login = form['login']
+                password = form['password']
+            except KeyError:
+                return {}
+            return {'login':login, 'password':password}
+
+        return {}
+
+    # IChallengerPlugin
+    def challenge(self, environ, status, headers):
+        if status == '401 Unauthorized':
+            return auth_form
+
+def make_plugin(pam_conf, login_form_qs='__do_login'):
+    plugin = FormPlugin(login_form_qs)
+    return plugin
+

Modified: repoze.pam/trunk/repoze/pam/tests.py
==============================================================================
--- repoze.pam/trunk/repoze/pam/tests.py	(original)
+++ repoze.pam/trunk/repoze/pam/tests.py	Sun Feb 24 22:02:50 2008
@@ -44,31 +44,37 @@
     def test_extract_success(self):
         environ = self._makeEnviron()
         mw = self._makeOne()
-        creds = mw.extract(environ, None)
+        creds, plugin = mw.extract(environ, None)
         self.assertEqual(creds['login'], 'chris')
         self.assertEqual(creds['password'], 'password')
+        self.failIf(plugin is None)
 
     def test_extract_fail(self):
         environ = self._makeEnviron()
         from repoze.pam.interfaces import IExtractorPlugin
+        extractor = DummyNoResultsExtractor()
         registry = {
-            IExtractorPlugin:[DummyNoResultsExtractor()]
+            IExtractorPlugin:[extractor]
             }
         mw = self._makeOne(registry=registry)
-        creds = mw.extract(environ, None)
+        creds, plugin = mw.extract(environ, None)
         self.assertEqual(creds, {})
+        self.assertEqual(plugin, None)
 
     def test_extract_success_skip_noresults(self):
         environ = self._makeEnviron()
         mw = self._makeOne()
         from repoze.pam.interfaces import IExtractorPlugin
+        extractor1 = DummyNoResultsExtractor()
+        extractor2 = DummyExtractor() 
         registry = {
-            IExtractorPlugin:[DummyNoResultsExtractor(), DummyExtractor()]
+            IExtractorPlugin:[extractor1, extractor2]
             }
         mw = self._makeOne(registry=registry)
-        creds = mw.extract(environ, None)
+        creds, plugin = mw.extract(environ, None)
         self.assertEqual(creds['login'], 'chris')
         self.assertEqual(creds['password'], 'password')
+        self.assertEqual(plugin, extractor2)
 
     def test_extract_success_firstwins(self):
         environ = self._makeEnviron()
@@ -80,46 +86,49 @@
             IExtractorPlugin:[extractor1, extractor2]
             }
         mw = self._makeOne(registry=registry)
-        creds = mw.extract(environ, None)
+        creds, plugin = mw.extract(environ, None)
         self.assertEqual(creds['login'], 'fred')
         self.assertEqual(creds['password'], 'fred')
+        self.assertEqual(plugin, extractor1)
 
     def test_extract_find_implicit_classifier(self):
         environ = self._makeEnviron()
         mw = self._makeOne()
         from repoze.pam.interfaces import IExtractorPlugin
         extractor1 = DummyExtractor({'login':'fred','password':'fred'})
-        extractor1.classifiers = set(['nomatch'])
+        extractor1.request_classifications = set(['nomatch'])
         extractor2 = DummyExtractor({'login':'bob','password':'bob'})
         registry = {
             IExtractorPlugin:[extractor1, extractor2]
             }
         mw = self._makeOne(registry=registry)
-        creds = mw.extract(environ, None)
+        creds, plugin = mw.extract(environ, None)
         self.assertEqual(creds['login'], 'bob')
         self.assertEqual(creds['password'], 'bob')
+        self.assertEqual(plugin, extractor2)
 
     def test_extract_find_explicit_classifier(self):
         environ = self._makeEnviron()
         mw = self._makeOne()
         from repoze.pam.interfaces import IExtractorPlugin
         extractor1 = DummyExtractor({'login':'fred','password':'fred'})
-        extractor1.classifiers = set(['nomatch'])
+        extractor1.request_classifications = set(['nomatch'])
         extractor2 = DummyExtractor({'login':'bob','password':'bob'})
-        extractor2.classifiers = set(['match'])
+        extractor2.request_classifications = set(['match'])
         registry = {
             IExtractorPlugin:[extractor1, extractor2]
             }
         mw = self._makeOne(registry=registry)
-        creds = mw.extract(environ, 'match')
+        creds, plugin = mw.extract(environ, 'match')
         self.assertEqual(creds['login'], 'bob')
         self.assertEqual(creds['password'], 'bob')
+        self.assertEqual(plugin, extractor2)
 
     def test_authenticate_success(self):
         environ = self._makeEnviron()
         mw = self._makeOne()
         creds = {'login':'chris', 'password':'password'}
-        userid = mw.authenticate(environ, creds, None)
+        userid, plugin = mw.authenticate(environ, creds, None)
         self.assertEqual(userid, 'chris')
 
     def test_authenticate_fail(self):
@@ -131,95 +140,132 @@
             IAuthenticatorPlugin:[DummyFailAuthenticator()]
             }
         mw = self._makeOne(registry=registry)
-        userid = mw.authenticate(environ, creds, None)
+        userid, plugin = mw.authenticate(environ, creds, None)
         self.assertEqual(userid, None)
+        self.assertEqual(plugin, None)
 
     def test_authenticate_success_skip_fail(self):
         environ = self._makeEnviron()
         mw = self._makeOne()
         from repoze.pam.interfaces import IAuthenticatorPlugin
+        plugin1 = DummyFailAuthenticator()
+        plugin2 = DummyAuthenticator()
         registry = {
-            IAuthenticatorPlugin:[DummyFailAuthenticator(),DummyAuthenticator()]
+            IAuthenticatorPlugin:[plugin1, plugin2]
             }
         mw = self._makeOne(registry=registry)
         creds = {'login':'chris', 'password':'password'}
-        userid = mw.authenticate(environ, creds, None)
+        userid, plugin = mw.authenticate(environ, creds, None)
         self.assertEqual(userid, 'chris')
+        self.assertEqual(plugin, plugin2)
 
     def test_authenticate_success_firstwins(self):
         environ = self._makeEnviron()
         mw = self._makeOne()
         from repoze.pam.interfaces import IAuthenticatorPlugin
+        plugin1 = DummyAuthenticator('chris_id1')
+        plugin2 = DummyAuthenticator('chris_id2')
         registry = {
-            IAuthenticatorPlugin:[DummyAuthenticator('chris_id1'),
-                                  DummyAuthenticator('chris_id2')]
+            IAuthenticatorPlugin:[plugin1, plugin2]
             }
         mw = self._makeOne(registry=registry)
         creds = {'login':'chris', 'password':'password'}
-        userid = mw.authenticate(environ, creds, None)
+        userid, plugin = mw.authenticate(environ, creds, None)
         self.assertEqual(userid, 'chris_id1')
+        self.assertEqual(plugin, plugin1)
 
     def test_authenticate_find_implicit_classifier(self):
         environ = self._makeEnviron()
         mw = self._makeOne()
         from repoze.pam.interfaces import IAuthenticatorPlugin
         plugin1 = DummyAuthenticator('chris_id1')
-        plugin1.classifiers = set(['nomatch'])
+        plugin1.request_classifications = set(['nomatch'])
         plugin2 = DummyAuthenticator('chris_id2')
         registry = {
             IAuthenticatorPlugin:[plugin1, plugin2]
             }
         mw = self._makeOne(registry=registry)
         creds = {'login':'chris', 'password':'password'}
-        userid = mw.authenticate(environ, creds, None)
+        userid, plugin = mw.authenticate(environ, creds, None)
         self.assertEqual(userid, 'chris_id2')
+        self.assertEqual(plugin, plugin2)
 
     def test_authenticate_find_explicit_classifier(self):
         environ = self._makeEnviron()
         mw = self._makeOne()
         from repoze.pam.interfaces import IAuthenticatorPlugin
         plugin1 = DummyAuthenticator('chris_id1')
-        plugin1.classifiers = set(['nomatch'])
+        plugin1.request_classifications = set(['nomatch'])
         plugin2 = DummyAuthenticator('chris_id2')
-        plugin2.classifiers = set(['match'])
+        plugin2.request_classifications = set(['match'])
         registry = {
             IAuthenticatorPlugin:[plugin1, plugin2]
             }
         mw = self._makeOne(registry=registry)
         creds = {'login':'chris', 'password':'password'}
-        userid = mw.authenticate(environ, creds, 'match')
+        userid, plugin = mw.authenticate(environ, creds, 'match')
         self.assertEqual(userid, 'chris_id2')
+        self.assertEqual(plugin, plugin2)
 
-    def test_on_ingress_success_addcredentials(self):
+    def test_modify_environment_success_addcredentials(self):
         environ = self._makeEnviron()
         mw = self._makeOne()
-        classification = mw.on_ingress(environ)
+        classification, headers = mw.modify_environment(environ)
         self.assertEqual(classification, 'browser')
         self.assertEqual(environ['REMOTE_USER'], 'chris')
         self.assertEqual(environ['repoze.pam.credentials'],
                          {'login':'chris','password':'password'})
+        self.assertEqual(headers, [])
         
-    def test_on_ingress_success_noaddcredentials(self):
+    def test_modify_environment_noaddcredentials(self):
         environ = self._makeEnviron()
         mw = self._makeOne()
         mw.add_credentials = False
-        classification = mw.on_ingress(environ)
+        classification, headers = mw.modify_environment(environ)
         self.assertEqual(classification, 'browser')
         self.assertEqual(environ['REMOTE_USER'], 'chris')
         self.failIf(environ.has_key('repoze.pam.credentials'))
+        self.assertEqual(headers, [])
 
-    def test_on_ingress_nocredentials(self):
+    def test_modify_environment_nocredentials(self):
         environ = self._makeEnviron()
         from repoze.pam.interfaces import IExtractorPlugin
         registry = {
             IExtractorPlugin:[DummyNoResultsExtractor()],
             }
         mw = self._makeOne(registry=registry)
-        classification = mw.on_ingress(environ)
+        classification, headers = mw.modify_environment(environ)
         self.assertEqual(classification, 'browser')
         self.assertEqual(environ.get('REMOTE_USER'), None)
         self.assertEqual(environ['repoze.pam.credentials'], {})
+        self.assertEqual(headers, [])
 
+    def test_modify_environment_remoteuser_already_set(self):
+        environ = self._makeEnviron({'REMOTE_USER':'admin'})
+        mw = self._makeOne()
+        classification, headers = mw.modify_environment(environ)
+        self.assertEqual(classification, 'browser')
+        self.assertEqual(environ.get('REMOTE_USER'), 'admin')
+        self.assertEqual(environ['repoze.pam.credentials'],
+                         {'login':'chris', 'password':'password'})
+        self.assertEqual(headers, [])
+
+    def test_modify_environment_with_postextractor(self):
+        environ = self._makeEnviron({'REMOTE_USER':'admin'})
+        from repoze.pam.interfaces import IExtractorPlugin
+        from repoze.pam.interfaces import IPostExtractorPlugin
+        registry = {
+            IExtractorPlugin:[DummyExtractor()],
+            IPostExtractorPlugin:[DummyPostExtractor()],
+            }
+        mw = self._makeOne(registry=registry)
+        classification, headers = mw.modify_environment(environ)
+        self.assertEqual(classification, 'browser')
+        self.assertEqual(environ.get('REMOTE_USER'), 'admin')
+        self.assertEqual(environ['repoze.pam.credentials'],
+                         {'login':'chris', 'password':'password'})
+        self.assertEqual(headers, [('foo', 'bar')])
+        
 class TestBasicAuthPlugin(Base):
     def _getTargetClass(self):
         from repoze.pam.plugins.basicauth import BasicAuthPlugin
@@ -237,56 +283,87 @@
         verifyClass(IChallengerPlugin, klass)
         verifyClass(IExtractorPlugin, klass)
 
-    def test_challenge(self):
+    def test_challenge_non401(self):
         plugin = self._makeOne('realm')
         environ = self._makeEnviron()
-        from paste.httpexceptions import HTTPUnauthorized
-        self.assertRaises(HTTPUnauthorized, plugin.challenge, environ,
-                          None, None, None)
+        result = plugin.challenge(environ, '200 OK', {})
+        self.assertEqual(result, None)
+
+    def test_challenge_401(self):
+        plugin = self._makeOne('realm')
+        environ = self._makeEnviron()
+        result = plugin.challenge(environ, '401 Unauthorized', {})
+        self.assertNotEqual(result, None)
+        app_iter = result(environ, lambda *arg: None)
+        items = []
+        for item in app_iter:
+            items.append(item)
+        response = ''.join(items)
+        self.failUnless(response.startswith('401 Unauthorized'))
         
     def test_extract_noauthinfo(self):
         plugin = self._makeOne('realm')
         environ = self._makeEnviron()
-        result = plugin.extract(environ)
-        self.assertEqual(result, {})
-
-    def test_extract_nonbasic(self):
-        plugin = self._makeOne('realm')
-        environ = self._makeEnviron({'HTTP_AUTHORIZATION':'Digest abc'})
-        result = plugin.extract(environ)
-        self.assertEqual(result, {})
+        creds = plugin.extract(environ)
+        self.assertEqual(creds, {})
 
     def test_extract_nonbasic(self):
         plugin = self._makeOne('realm')
         environ = self._makeEnviron({'HTTP_AUTHORIZATION':'Digest abc'})
-        result = plugin.extract(environ)
-        self.assertEqual(result, {})
+        creds = plugin.extract(environ)
+        self.assertEqual(creds, {})
 
     def test_extract_basic_badencoding(self):
         plugin = self._makeOne('realm')
         environ = self._makeEnviron({'HTTP_AUTHORIZATION':'Basic abc'})
-        result = plugin.extract(environ)
-        self.assertEqual(result, {})
+        creds = plugin.extract(environ)
+        self.assertEqual(creds, {})
 
     def test_extract_basic_badrepr(self):
         plugin = self._makeOne('realm')
         value = 'foo'.encode('base64')
         environ = self._makeEnviron({'HTTP_AUTHORIZATION':'Basic %s' % value})
-        result = plugin.extract(environ)
-        self.assertEqual(result, {})
+        creds = plugin.extract(environ)
+        self.assertEqual(creds, {})
 
     def test_extract_basic_ok(self):
         plugin = self._makeOne('realm')
         value = 'foo:bar'.encode('base64')
         environ = self._makeEnviron({'HTTP_AUTHORIZATION':'Basic %s' % value})
-        result = plugin.extract(environ)
-        self.assertEqual(result, {'login':'foo', 'password':'bar'})
+        creds = plugin.extract(environ)
+        self.assertEqual(creds, {'login':'foo', 'password':'bar'})
+
+    def test_post_extract_nocreds(self):
+        plugin = self._makeOne('realm')
+        creds = {}
+        environ = self._makeEnviron()
+        result = plugin.post_extract(environ, creds, plugin)
+        self.assertEqual(result, None)
+        self.assertEqual(environ.get('HTTP_AUTHORIZATION'), None)
+
+    def test_post_extract_creds_withauthorization(self):
+        plugin = self._makeOne('realm')
+        creds = {'login':'foo', 'password':'password'}
+        environ = self._makeEnviron({'HTTP_AUTHORIZATION':'Basic foo'})
+        result = plugin.post_extract(environ, creds, plugin)
+        self.assertEqual(result, None)
+        self.assertEqual(environ['HTTP_AUTHORIZATION'], 'Basic foo')
 
+    def test_post_extract_creds_mutates(self):
+        plugin = self._makeOne('realm')
+        creds = {'login':'foo', 'password':'password'}
+        environ = self._makeEnviron()
+        result = plugin.post_extract(environ, creds, plugin)
+        self.assertEqual(result, None)
+        auth = 'foo:password'.encode('base64').rstrip()
+        auth = 'Basic ' + auth
+        self.assertEqual(environ['HTTP_AUTHORIZATION'], auth)
+        
     def test_factory(self):
         from repoze.pam.plugins.basicauth import make_plugin
         plugin = make_plugin({}, 'realm')
         self.assertEqual(plugin.realm, 'realm')
-        
+
 class TestHTPasswdPlugin(Base):
     def _getTargetClass(self):
         from repoze.pam.plugins.htpasswd import HTPasswdPlugin
@@ -376,7 +453,84 @@
                              'repoze.pam.plugins.htpasswd:crypt_check')
         self.assertEqual(plugin.filename, 'foo')
         self.assertEqual(plugin.check, crypt_check)
+
+
+class TestInsecureCookiePlugin(Base):
+    def _getTargetClass(self):
+        from repoze.pam.plugins.cookie import InsecureCookiePlugin
+        return InsecureCookiePlugin
+
+    def _makeOne(self, *arg, **kw):
+        plugin = self._getTargetClass()(*arg, **kw)
+        return plugin
+
+    def test_implements(self):
+        from zope.interface.verify import verifyClass
+        from repoze.pam.interfaces import IExtractorPlugin
+        from repoze.pam.interfaces import IPostExtractorPlugin
+        klass = self._getTargetClass()
+        verifyClass(IExtractorPlugin, klass)
+        verifyClass(IPostExtractorPlugin, klass)
+
+    def test_extract_nocookies(self):
+        plugin = self._makeOne('oatmeal')
+        environ = self._makeEnviron()
+        result = plugin.extract(environ)
+        self.assertEqual(result, {})
         
+    def test_extract_badcookies(self):
+        plugin = self._makeOne('oatmeal')
+        environ = self._makeEnviron({'HTTP_COOKIE':'oatmeal=a'})
+        result = plugin.extract(environ)
+        self.assertEqual(result, {})
+
+    def test_extract_badcookies(self):
+        plugin = self._makeOne('oatmeal')
+        environ = self._makeEnviron({'HTTP_COOKIE':'oatmeal=a'})
+        result = plugin.extract(environ)
+        self.assertEqual(result, {})
+    
+    def test_extract_success(self):
+        plugin = self._makeOne('oatmeal')
+        auth = 'foo:password'.encode('base64').rstrip()
+        environ = self._makeEnviron({'HTTP_COOKIE':'oatmeal=%s;' % auth})
+        result = plugin.extract(environ)
+        self.assertEqual(result, {'login':'foo', 'password':'password'})
+
+    def test_post_extract_nocreds(self):
+        plugin = self._makeOne('oatmeal')
+        creds = {}
+        environ = self._makeEnviron()
+        result = plugin.post_extract(environ, creds, plugin)
+        self.assertEqual(result, None)
+        self.assertEqual(environ.get('HTTP_COOKIE'), None)
+
+    def test_post_extract_creds_same(self):
+        plugin = self._makeOne('oatmeal')
+        creds = {'login':'foo', 'password':'password'}
+        auth = 'foo:password'.encode('base64').rstrip()
+        auth = 'oatmeal=%s;' % auth
+        environ = self._makeEnviron({'HTTP_COOKIE':auth})
+        result = plugin.post_extract(environ, creds, plugin)
+        self.assertEqual(result, None)
+        self.assertEqual(environ.get('HTTP_COOKIE'), auth)
+
+    def test_post_extract_creds_different(self):
+        plugin = self._makeOne('oatmeal')
+        creds = {'login':'bar', 'password':'password'}
+        auth = 'foo:password'.encode('base64').rstrip()
+        creds_auth = 'bar:password'.encode('base64').rstrip()
+        environ = self._makeEnviron({'HTTP_COOKIE':'oatmeal=%s;' % auth})
+        result = plugin.post_extract(environ, creds, plugin)
+        expected = 'oatmeal=%s; Path=/;' % creds_auth
+        self.assertEqual(result, [('Set-Cookie', expected)])
+        self.assertEqual(environ['HTTP_COOKIE'], 'oatmeal=%s;' % creds_auth)
+
+    def test_factory(self):
+        from repoze.pam.plugins.cookie import make_plugin
+        plugin = make_plugin(None, 'foo')
+        self.assertEqual(plugin.cookie_name, 'foo')
+
 
 class TestDefaultRequestClassifier(Base):
     def _getTargetClass(self):
@@ -477,5 +631,10 @@
         return None
 
 class DummyChallenger:
-    def challenge(self, environ, request_classifier, headers, exception):
+    def challenge(self, environ, status, headers):
         environ['challenged'] = True
+
+class DummyPostExtractor:
+    def post_extract(self, environ, credentials, extractor):
+        return [ ('foo', 'bar') ]
+    


More information about the Repoze-checkins mailing list