[Repoze-checkins] r734 - in repoze.pam/trunk/repoze/pam: . plugins
Chris McDonough
chrism at agendaless.com
Mon Feb 25 03:02:51 UTC 2008
Author: Chris McDonough <chrism at agendaless.com>
Date: Sun Feb 24 22:02:50 2008
New Revision: 734
Log:
Add form and cookie plugins, add post_extractor plugin type.
Added:
repoze.pam/trunk/repoze/pam/plugins/cookie.py (contents, props changed)
repoze.pam/trunk/repoze/pam/plugins/form.py (contents, props changed)
Modified:
repoze.pam/trunk/repoze/pam/classifiers.py
repoze.pam/trunk/repoze/pam/interfaces.py
repoze.pam/trunk/repoze/pam/middleware.py
repoze.pam/trunk/repoze/pam/plugins/basicauth.py
repoze.pam/trunk/repoze/pam/tests.py
Modified: repoze.pam/trunk/repoze/pam/classifiers.py
==============================================================================
--- repoze.pam/trunk/repoze/pam/classifiers.py (original)
+++ repoze.pam/trunk/repoze/pam/classifiers.py Sun Feb 24 22:02:50 2008
@@ -53,7 +53,7 @@
class DefaultResponseClassifier(object):
implements(IResponseClassifier)
- def __call__(self, environ, request_classification, headers, exception):
+ def __call__(self, environ, request_classification, status, headers):
""" By default, return the request classification """
return request_classification
Modified: repoze.pam/trunk/repoze/pam/interfaces.py
==============================================================================
--- repoze.pam/trunk/repoze/pam/interfaces.py (original)
+++ repoze.pam/trunk/repoze/pam/interfaces.py Sun Feb 24 22:02:50 2008
@@ -2,27 +2,36 @@
class IRequestClassifier(Interface):
""" On ingress: classify a request.
-
- This interface is responsible for returning a string representing
- a classification name based on introspection of the WSGI
- environment (environ).
"""
def __call__(environ):
- """ Return a string representing the classification of this
- request. """
+ """ environ -> request classifier string
+
+ This interface is responsible for returning a string
+ value representing a request classification.
+
+ o 'environ' is the WSGI environment.
+ """
class IResponseClassifier(Interface):
""" On egress: classify a response.
+ """
+ def __call__(environ, request_classification, status, headers):
+ """ args -> response classifier string
- This interface is responsible for returning a string representing
- a classification name based on introspection of the ingress
- classification, the WSGI environment (environ), the headers
- returned in the response (headers), or the exception raised by a
- downstream application.
- """
- def __call__(environ, request_classification, headers, exception):
- """ Return a string representing the classification of this
- request. """
+ This interface is responsible for returning a string representing
+ a response classification.
+
+ o 'environ' is the WSGI environment.
+
+ o 'request_classification' is the classification returned during
+ ingress by the request classifier.
+
+ o 'status' is the status written into start_response by
+ the downstream application.
+
+ o 'headers' is the headers tuple written into start_response
+ by the downstream application.
+ """
class IExtractorPlugin(Interface):
@@ -48,7 +57,34 @@
o Only extraction plugins which match one of the the current
request's classifications will be asked to perform extraction.
"""
-
+
+class IPostExtractorPlugin(Interface):
+ """ On ingress: allow the plugin to have a chance to influence the
+ environment once credentials are established and return extra
+ headers that will be set in the eventual response.
+
+ Each post-extractor matching the request classification is called
+ unconditionally after extraction.
+ """
+
+ def post_extract(environ, credentials, extractor):
+ """ args -> [ (header-name, header-value), ..] | None
+
+ o 'environ' is the WSGI environment.
+
+ o credentials are the credentials that were extracted by
+ repoze.pam during the extraction step.
+
+ o 'extractor' is the plugin instance that provided the
+ credentials. If no plugin instance provided credentials to
+ repoze.pam, this will be None.
+
+ The return value should be a list of tuples, where each tuple is
+ in the form (header-name, header-value), e.g.
+ [ ('Set-Cookie', 'cookie_name=foo; Path=/') ] or None if
+ no headers should be set.
+ """
+
class IAuthenticatorPlugin(Interface):
""" On ingress: Map credentials to a user ID.
@@ -70,21 +106,29 @@
class IChallengerPlugin(Interface):
- """ On egress: Initiate a challenge to the user to provide credentials.
-
- o 'environ' is the WSGI environment.
+ """ On egress: Conditionally initiate a challenge to the user to
+ provide credentials.
- o Only challenge plugins which match one of the the current
- request's classifications will be asked to perform a
- challenge.
+ Only challenge plugins which match one of the the current
+ response's classifications will be asked to perform a
+ challenge.
"""
- def challenge(environ, request_classifier, headers, exception):
+ def challenge(environ, status, headers):
+ """ args -> WSGI application or None
+
+ o 'environ' is the WSGI environment.
+
+ o 'status' is the status written into start_response by the
+ downstream application.
- """ Examine the values passed in and perform an arbitrary action
- (usually mutating environ or raising an exception) to cause a
- challenge to be raised to the user.
+ o 'headers' is the headers tuple written into start_response by the
+ downstream application.
- The return value of this method is ignored.
+ Examine the values passed in and return a WSGI application
+ (a callable which accepts environ and start_response as its
+ two positional arguments, ala PEP 333) which causes a
+ challenge to be performed. Return None to forego performing a
+ challenge.
"""
Modified: repoze.pam/trunk/repoze/pam/middleware.py
==============================================================================
--- repoze.pam/trunk/repoze/pam/middleware.py (original)
+++ repoze.pam/trunk/repoze/pam/middleware.py Sun Feb 24 22:02:50 2008
@@ -1,113 +1,290 @@
+import logging
+from StringIO import StringIO
+import sys
+
+from paste.httpheaders import REMOTE_USER
+
from repoze.pam.interfaces import IAuthenticatorPlugin
from repoze.pam.interfaces import IExtractorPlugin
+from repoze.pam.interfaces import IPostExtractorPlugin
from repoze.pam.interfaces import IChallengerPlugin
+class StartResponseWrapper(object):
+ def __init__(self, start_response, extra_headers):
+ self.start_response = start_response
+ self.extra_headers = extra_headers
+ self.headers = []
+ self.buffer = StringIO()
+
+ def wrap_start_response(self, status, headers, exc_info=None):
+ self.headers = headers
+ self.status = status
+ return self.buffer.write
+
+ def finish_response(self):
+ headers = self.headers + self.extra_headers
+ write = self.start_response(self.status, headers)
+ if write:
+ self.buffer.seek(0)
+ write(self.buffer.getvalue())
+ if hasattr(write, 'close'):
+ write.close()
+
+_STARTED = '-- repoze.pam request started --'
+_ENDED = '-- repoze.pam request ended --'
+
class PluggableAuthenticationMiddleware(object):
- def __init__(self, app, registry, request_classifier, response_classifier,
- add_credentials=True):
+ def __init__(self, app,
+ registry,
+ request_classifier,
+ response_classifier,
+ add_credentials=False,
+ log_stream=None,
+ log_level=logging.INFO):
self.registry = registry
self.app = app
self.request_classifier = request_classifier
- self.response_classififer = response_classifier
+ self.response_classifier = response_classifier
self.add_credentials = add_credentials
+ self.logger = None
+ if log_stream:
+ handler = logging.StreamHandler(log_stream)
+ fmt = '%(asctime)s %(message)s'
+ formatter = logging.Formatter(fmt)
+ handler.setFormatter(formatter)
+ self.logger = logging.Logger('repoze.pam')
+ self.logger.addHandler(handler)
+ self.logger.setLevel(log_level)
def __call__(self, environ, start_response):
- request_classification = self.on_ingress(environ)
- return self.app(environ, start_response)
- # XXX on_egress
+ logger = self.logger
+ logger and logger.info(_STARTED)
+ classification, extra_headers = self.modify_environment(environ)
+
+ wrapper = StartResponseWrapper(start_response, extra_headers)
+ app_iter = self.app(environ, wrapper.wrap_start_response)
+
+ challenge_app = self.challenge(
+ environ,
+ classification,
+ wrapper.status,
+ wrapper.headers
+ )
+ logger and logger.info('challenge app used: %s' % challenge_app)
+
+ if challenge_app is not None:
+ if hasattr(app_iter, 'close'):
+ app_iter.close()
+ logger and logger.info(_ENDED)
+ return challenge_app(environ, start_response)
+ else:
+ wrapper.finish_response()
+ logger and logger.info(_ENDED)
+ return app_iter
- def on_ingress(self, environ):
+ def modify_environment(self, environ):
+ # happens on ingress
classification = self.request_classifier(environ)
- credentials = self.extract(environ, classification)
-
+ logger = self.logger
+ logger and logger.info('request classification: %s' % classification)
+ credentials, extractor = self.extract(environ, classification)
+ headers = self.after_extract(environ, credentials, extractor,
+ classification)
userid = None
if credentials:
- userid = self.authenticate(environ, credentials, classification)
+ userid, authenticator = self.authenticate(environ,
+ credentials,
+ classification)
+ else:
+ logger and logger.info(
+ 'no authenticator plugin used (no credentials)')
if self.add_credentials:
environ['repoze.pam.credentials'] = credentials
- if userid:
- environ['REMOTE_USER'] = userid
-
- return classification
+ remote_user_not_set = not REMOTE_USER(environ)
- def on_egress(self, environ, request_classifier, headers, exception):
- self.challenge(environ, request_classifier, headers, exception)
+ if remote_user_not_set and userid:
+ # only set REMOTE_USER if it's not yet set
+ logger and logger.info('REMOTE_USER set to %s' % userid)
+ environ['REMOTE_USER'] = userid
+ else:
+ logger and logger.info('REMOTE_USER not set')
- def extract(self, environ, classifier):
- extractor_candidates = self.registry.get(IExtractorPlugin)
- extractors = self._match_classifier(extractor_candidates, classifier)
+ return classification, headers
+
+ def extract(self, environ, classification):
+ # happens on ingress
+ candidates = self.registry.get(IExtractorPlugin, ())
+ plugins = self._match_classification(candidates, classification,
+ 'request_classifications')
+ logger = self.logger
+ logger and self.logger.info(
+ 'extractor plugins consulted %s' % plugins)
- for extractor in extractors:
- creds = extractor.extract(environ)
+ for plugin in plugins:
+ creds = plugin.extract(environ)
+ logger and logger.debug(
+ 'credentials returned from extractor %s: %s' %
+ (plugin, creds)
+ )
if creds:
# XXX PAS returns all credentials (it fully iterates over all
# extraction plugins)
- return creds
- return {}
+ logger and logger.info(
+ 'using credentials returned from extractor %s' % plugin)
+ return creds, plugin
+ logger and logger.info('no extractor plugins found credentials')
+ return {}, None
+
+ def after_extract(self, environ, credentials, extractor, classification):
+ candidates = self.registry.get(IPostExtractorPlugin, ())
+ plugins = self._match_classification(candidates,
+ classification,
+ 'request_classifications')
+ logger = self.logger
+ logger and logger.info(
+ 'post-extractor plugins consulted %s' % plugins)
+
+ extra_headers = {}
+ for plugin in plugins:
+ headers = plugin.post_extract(environ, credentials, extractor)
+ logger and logger.debug(
+ 'headers returned from post-extractor %s: %s' %
+ (plugin, headers)
+ )
+ if headers:
+ extra_headers[plugin] = headers
+
+ logger and logger.info('extra headers gathered: %s' % extra_headers)
+
+ return flatten(extra_headers.values())
def authenticate(self, environ, credentials, classification):
- # on ingress
- userid = None
- auth_candidates = self.registry.get(IAuthenticatorPlugin)
- authenticators = self._match_classifier(auth_candidates, classification)
- for authenticator in authenticators:
- userid = authenticator.authenticate(environ, credentials)
+ # happens on ingress
+ candidates = self.registry.get(IAuthenticatorPlugin, ())
+ plugins = self._match_classification(candidates,
+ classification,
+ 'request_classifications')
+ logger = self.logger
+
+ logger and logger.info(
+ 'authenticator plugins consulted %s' % plugins)
+
+ for plugin in plugins:
+ userid = plugin.authenticate(environ, credentials)
+ logger and logger.info(
+ 'userid returned from authenticator %s: %s' %
+ (plugin, userid)
+ )
if userid:
- # XXX PAS calls all authenticators (it fully iterates over all
- # authenticator plugins)
- break
- return userid
-
- def challenge(self, environ, request_classification, headers, exception):
- # on egress
- classification = self.response_classifier(environ,
- request_classification,
- headers,
- exception)
- challenger_candidates = self.registry.get(IChallengerPlugin)
- challengers = self._match_classifier(challenger_candidates,
- classification)
- for challenger in challengers:
- new_headers, new_status = challengers.challenge(environ)
+ logger and logger.info(
+ 'using userid returned from authenticator %s' % plugin)
+ return userid, plugin
+
+ logger and logger.info('no authenticator plugin authenticated a userid')
+ return None, None
+
+ def challenge(self, environ, request_classification, status, headers):
+ # happens on egress
+ classification = self.response_classifier(
+ environ,
+ request_classification,
+ status,
+ headers
+ )
+
+ logger = self.logger
+ logger and logger.info('response classification: %s' % classification)
+
+ candidates = self.registry.get(IChallengerPlugin, ())
+ plugins = self._match_classification(candidates,
+ classification,
+ 'response_classifications')
+ logger and logger.info('challenger plugins consulted: %s' % plugins)
+ for plugin in plugins:
+ app = plugin.challenge(environ, status, headers)
+ logger and logger.debug('app returned from challenger %s: %s' %
+ (plugin, app)
+ )
+ if app is not None:
+ # new WSGI application
+ logger and logger.info(
+ 'challenger plugin %s returned an app: %s' % (plugin, app))
+ return app
+ logger and logger.info('no challenge app returned')
+ # signifies no challenge
+ return None
- def _match_classifier(self, plugins, classifier):
+ def _match_classification(self, plugins, classification, attr):
result = []
for plugin in plugins:
- plugin_classifiers = getattr(plugin, 'classifiers', None)
- if not plugin_classifiers: # good for any
+ plugin_classifications = getattr(plugin, attr, None)
+ if not plugin_classifications: # good for any
result.append(plugin)
continue
- if classifier in plugin_classifiers:
+ if classification in plugin_classifications:
result.append(plugin)
return result
+def flatten(L):
+ result = []
+ for seq in L:
+ for item in seq:
+ result.append(item)
+ return result
+
def make_middleware(app, global_conf, config_file=None):
if config_file is None:
raise ValueError('config_file must be specified')
return PluggableAuthenticationMiddleware(app)
def make_test_middleware(app, global_conf):
- # no config file required
+ # be able to test without a config file
from repoze.pam.plugins.basicauth import BasicAuthPlugin
from repoze.pam.plugins.htpasswd import HTPasswdPlugin
+ from repoze.pam.plugins.cookie import InsecureCookiePlugin
+ from repoze.pam.plugins.form import FormPlugin
basicauth = BasicAuthPlugin('repoze.pam')
- basicauth.classifiers = set() # good for any
+ any = set() # means good for any classification
+ basicauth.request_classifications = any
+ basicauth.response_classifications = any
from StringIO import StringIO
from repoze.pam.plugins.htpasswd import crypt_check
- io = StringIO('chrism:aajfMKNH1hTm2\n')
+ io = StringIO()
+ salt = 'aa'
+ import crypt
+ for name, password in [ ('admin', 'admin') ]:
+ io.write('name:%s\n' % crypt.crypt(password, salt))
htpasswd = HTPasswdPlugin(io, crypt_check)
- htpasswd.classifiers = set() # good for any
- registry = make_registry((basicauth,), (htpasswd,), (basicauth,))
- class DummyClassifier:
- def __call__(self, *arg, **kw):
- return None
- classifier = DummyClassifier()
- middleware = PluggableAuthenticationMiddleware(app, registry,
- classifier, classifier)
+ htpasswd.request_classifications = any
+ htpasswd.response_classifications = any
+ cookie = InsecureCookiePlugin('oatmeal')
+ cookie.request_classifications = any
+ cookie.response_classifications = any
+ form = FormPlugin('__do_login')
+ # only do form extract/challenge for browser requests
+ form.request_classifications = set(('browser',))
+ form.response_classifications = set(('browser',))
+ registry = make_registry(
+ extractors = (cookie, basicauth, form),
+ post_extractors = (cookie, basicauth),
+ authenticators = (htpasswd,),
+ challengers = (form, basicauth),
+ )
+ from repoze.pam.classifiers import DefaultRequestClassifier
+ from repoze.pam.classifiers import DefaultResponseClassifier
+ request_classifier = DefaultRequestClassifier()
+ response_classifier = DefaultResponseClassifier()
+ middleware = PluggableAuthenticationMiddleware(app,
+ registry,
+ request_classifier,
+ response_classifier,
+ log_stream=sys.stdout,
+ log_level = logging.DEBUG
+ )
return middleware
def verify(plugins, iface):
@@ -115,10 +292,12 @@
for plugin in plugins:
verifyObject(iface, plugin, tentative=True)
-def make_registry(extractors, authenticators, challengers):
+def make_registry(extractors, post_extractors, authenticators, challengers):
registry = {}
verify(extractors, IExtractorPlugin)
registry[IExtractorPlugin] = extractors
+ verify(post_extractors, IExtractorPlugin)
+ registry[IPostExtractorPlugin] = post_extractors
verify(authenticators, IAuthenticatorPlugin)
registry[IAuthenticatorPlugin] = authenticators
verify(challengers, IChallengerPlugin)
Modified: repoze.pam/trunk/repoze/pam/plugins/basicauth.py
==============================================================================
--- repoze.pam/trunk/repoze/pam/plugins/basicauth.py (original)
+++ repoze.pam/trunk/repoze/pam/plugins/basicauth.py Sun Feb 24 22:02:50 2008
@@ -8,43 +8,51 @@
from repoze.pam.interfaces import IChallengerPlugin
from repoze.pam.interfaces import IExtractorPlugin
+from repoze.pam.interfaces import IPostExtractorPlugin
class BasicAuthPlugin(object):
- implements(IChallengerPlugin, IExtractorPlugin)
+ implements(IChallengerPlugin, IExtractorPlugin, IPostExtractorPlugin)
def __init__(self, realm):
self.realm = realm
# IChallengerPlugin
- def challenge(self, environ, request_classifier, headers, exception):
- head = WWW_AUTHENTICATE.tuples('Basic realm="%s"' % self.realm)
- raise HTTPUnauthorized(headers=head)
+ def challenge(self, environ, status, headers):
+ if status == '401 Unauthorized':
+ headers = WWW_AUTHENTICATE.tuples('Basic realm="%s"' % self.realm)
+ return HTTPUnauthorized(headers=headers)
# IExtractorPlugin
def extract(self, environ):
authorization = AUTHORIZATION(environ)
try:
authmeth, auth = authorization.split(' ', 1)
- except ValueError:
- # not enough values to unpack
+ except ValueError: # not enough values to unpack
return {}
if authmeth.lower() == 'basic':
try:
auth = auth.strip().decode('base64')
- except binascii.Error:
- # can't decode
+ except binascii.Error: # can't decode
return {}
try:
login, password = auth.split(':', 1)
- except ValueError:
- # not enough values to unpack
+ except ValueError: # not enough values to unpack
return {}
-
- return {'login':login, 'password':password}
+ auth = {'login':login, 'password':password}
+ return auth
return {}
+ # IPostExtractorPlugin
+ def post_extract(self, environ, credentials, extractor):
+ if credentials:
+ if not AUTHORIZATION(environ):
+ auth = '%(login)s:%(password)s' % credentials
+ auth = auth.encode('base64').rstrip()
+ header = 'Basic %s' % auth
+ environ['HTTP_AUTHORIZATION'] = header
+
def make_plugin(pam_conf, realm='basic'):
plugin = BasicAuthPlugin(realm)
return plugin
Added: repoze.pam/trunk/repoze/pam/plugins/cookie.py
==============================================================================
--- (empty file)
+++ repoze.pam/trunk/repoze/pam/plugins/cookie.py Sun Feb 24 22:02:50 2008
@@ -0,0 +1,57 @@
+import binascii
+
+from paste.request import get_cookies
+
+from zope.interface import implements
+
+from repoze.pam.interfaces import IExtractorPlugin
+from repoze.pam.interfaces import IPostExtractorPlugin
+
+class InsecureCookiePlugin(object):
+
+ implements(IExtractorPlugin, IPostExtractorPlugin)
+
+ def __init__(self, cookie_name):
+ self.cookie_name = cookie_name
+
+ # IExtractorPlugin
+ def extract(self, environ):
+ cookies = get_cookies(environ)
+ cookie = cookies.get(self.cookie_name)
+
+ if cookie is None:
+ return {}
+
+ try:
+ auth = cookie.value.decode('base64')
+ except binascii.Error: # can't decode
+ return {}
+
+ try:
+ login, password = auth.split(':', 1)
+ return {'login':login, 'password':password}
+ except ValueError: # not enough values to unpack
+ return {}
+
+ # IPostExtractorPlugin
+ def post_extract(self, environ, credentials, extractor):
+ if credentials:
+ cookie_value = '%(login)s:%(password)s' % credentials
+ cookie_value = cookie_value.encode('base64').rstrip()
+ cookies = get_cookies(environ)
+ existing = cookies.get(self.cookie_name)
+ value = getattr(existing, 'value', None)
+ if value != cookie_value:
+ # go ahead and set it in the environment for downstream
+ # apps to consume (XXX?)
+ cookies[self.cookie_name] = cookie_value
+ output = cookies.output(header='', sep='').lstrip()
+ environ['HTTP_COOKIE'] = output
+ # return a Set-Cookie header
+ set_cookie = '%s=%s; Path=/;' % (self.cookie_name, cookie_value)
+ return [('Set-Cookie', set_cookie)]
+
+def make_plugin(pam_conf, cookie_name='repoze.pam.plugins.cookie'):
+ plugin = InsecureCookiePlugin(cookie_name)
+ return plugin
+
Added: repoze.pam/trunk/repoze/pam/plugins/form.py
==============================================================================
--- (empty file)
+++ repoze.pam/trunk/repoze/pam/plugins/form.py Sun Feb 24 22:02:50 2008
@@ -0,0 +1,90 @@
+from paste.httpheaders import CONTENT_LENGTH
+from paste.httpheaders import CONTENT_TYPE
+
+from paste.request import parse_dict_querystring
+from paste.request import parse_formvars
+
+from zope.interface import implements
+
+from repoze.pam.interfaces import IChallengerPlugin
+from repoze.pam.interfaces import IExtractorPlugin
+
+_DEFAULT_FORM = """
+<html>
+<head>
+ <title>Login Form</title>
+</head>
+<body>
+ <div>
+ <b>Log In</b>
+ </div>
+ <br/>
+ <form method="POST" action="?__do_login=true">
+ <table border="0">
+ <tr>
+ <td>User Name</td>
+ <td><input type="text" name="login"></input></td>
+ </tr>
+ <tr>
+ <td>Password</td>
+ <td><input type="password" name="password"></input></td>
+ </tr>
+ <tr>
+ <td></td>
+ <td><input type="submit" name="submit" value="Log In"/></td>
+ </tr>
+ </table>
+ </form>
+ <pre>
+ %s
+ </pre>
+</body>
+</html>
+"""
+
+def auth_form(environ, start_response):
+ import pprint
+ form = _DEFAULT_FORM % pprint.pformat(environ)
+ content_length = CONTENT_LENGTH.tuples(str(len(form)))
+ content_type = CONTENT_TYPE.tuples('text/html')
+ headers = content_length + content_type
+ start_response('200 OK', headers)
+ return [form]
+
+class FormPlugin(object):
+
+ implements(IChallengerPlugin, IExtractorPlugin)
+
+ def __init__(self, login_form_qs):
+ self.login_form_qs = login_form_qs
+
+ # IExtractorPlugin
+ def extract(self, environ):
+ query = parse_dict_querystring(environ)
+ # If the extractor finds a special query string on any request,
+ # it will attempt to find the values in the input body.
+ if query.get(self.login_form_qs):
+ form = parse_formvars(environ)
+ from StringIO import StringIO
+ # XXX we need to replace wsgi.input because we've read it
+ # this smells funny
+ environ['wsgi.input'] = StringIO()
+ form.update(query)
+ try:
+ login = form['login']
+ password = form['password']
+ except KeyError:
+ return {}
+ return {'login':login, 'password':password}
+
+ return {}
+
+ # IChallengerPlugin
+ def challenge(self, environ, status, headers):
+ if status == '401 Unauthorized':
+ return auth_form
+
+def make_plugin(pam_conf, login_form_qs='__do_login'):
+ plugin = FormPlugin(login_form_qs)
+ return plugin
+
Modified: repoze.pam/trunk/repoze/pam/tests.py
==============================================================================
--- repoze.pam/trunk/repoze/pam/tests.py (original)
+++ repoze.pam/trunk/repoze/pam/tests.py Sun Feb 24 22:02:50 2008
@@ -44,31 +44,37 @@
def test_extract_success(self):
environ = self._makeEnviron()
mw = self._makeOne()
- creds = mw.extract(environ, None)
+ creds, plugin = mw.extract(environ, None)
self.assertEqual(creds['login'], 'chris')
self.assertEqual(creds['password'], 'password')
+ self.failIf(plugin is None)
def test_extract_fail(self):
environ = self._makeEnviron()
from repoze.pam.interfaces import IExtractorPlugin
+ extractor = DummyNoResultsExtractor()
registry = {
- IExtractorPlugin:[DummyNoResultsExtractor()]
+ IExtractorPlugin:[extractor]
}
mw = self._makeOne(registry=registry)
- creds = mw.extract(environ, None)
+ creds, plugin = mw.extract(environ, None)
self.assertEqual(creds, {})
+ self.assertEqual(plugin, None)
def test_extract_success_skip_noresults(self):
environ = self._makeEnviron()
mw = self._makeOne()
from repoze.pam.interfaces import IExtractorPlugin
+ extractor1 = DummyNoResultsExtractor()
+ extractor2 = DummyExtractor()
registry = {
- IExtractorPlugin:[DummyNoResultsExtractor(), DummyExtractor()]
+ IExtractorPlugin:[extractor1, extractor2]
}
mw = self._makeOne(registry=registry)
- creds = mw.extract(environ, None)
+ creds, plugin = mw.extract(environ, None)
self.assertEqual(creds['login'], 'chris')
self.assertEqual(creds['password'], 'password')
+ self.assertEqual(plugin, extractor2)
def test_extract_success_firstwins(self):
environ = self._makeEnviron()
@@ -80,46 +86,49 @@
IExtractorPlugin:[extractor1, extractor2]
}
mw = self._makeOne(registry=registry)
- creds = mw.extract(environ, None)
+ creds, plugin = mw.extract(environ, None)
self.assertEqual(creds['login'], 'fred')
self.assertEqual(creds['password'], 'fred')
+ self.assertEqual(plugin, extractor1)
def test_extract_find_implicit_classifier(self):
environ = self._makeEnviron()
mw = self._makeOne()
from repoze.pam.interfaces import IExtractorPlugin
extractor1 = DummyExtractor({'login':'fred','password':'fred'})
- extractor1.classifiers = set(['nomatch'])
+ extractor1.request_classifications = set(['nomatch'])
extractor2 = DummyExtractor({'login':'bob','password':'bob'})
registry = {
IExtractorPlugin:[extractor1, extractor2]
}
mw = self._makeOne(registry=registry)
- creds = mw.extract(environ, None)
+ creds, plugin = mw.extract(environ, None)
self.assertEqual(creds['login'], 'bob')
self.assertEqual(creds['password'], 'bob')
+ self.assertEqual(plugin, extractor2)
def test_extract_find_explicit_classifier(self):
environ = self._makeEnviron()
mw = self._makeOne()
from repoze.pam.interfaces import IExtractorPlugin
extractor1 = DummyExtractor({'login':'fred','password':'fred'})
- extractor1.classifiers = set(['nomatch'])
+ extractor1.request_classifications = set(['nomatch'])
extractor2 = DummyExtractor({'login':'bob','password':'bob'})
- extractor2.classifiers = set(['match'])
+ extractor2.request_classifications = set(['match'])
registry = {
IExtractorPlugin:[extractor1, extractor2]
}
mw = self._makeOne(registry=registry)
- creds = mw.extract(environ, 'match')
+ creds, plugin = mw.extract(environ, 'match')
self.assertEqual(creds['login'], 'bob')
self.assertEqual(creds['password'], 'bob')
+ self.assertEqual(plugin, extractor2)
def test_authenticate_success(self):
environ = self._makeEnviron()
mw = self._makeOne()
creds = {'login':'chris', 'password':'password'}
- userid = mw.authenticate(environ, creds, None)
+ userid, plugin = mw.authenticate(environ, creds, None)
self.assertEqual(userid, 'chris')
def test_authenticate_fail(self):
@@ -131,95 +140,132 @@
IAuthenticatorPlugin:[DummyFailAuthenticator()]
}
mw = self._makeOne(registry=registry)
- userid = mw.authenticate(environ, creds, None)
+ userid, plugin = mw.authenticate(environ, creds, None)
self.assertEqual(userid, None)
+ self.assertEqual(plugin, None)
def test_authenticate_success_skip_fail(self):
environ = self._makeEnviron()
mw = self._makeOne()
from repoze.pam.interfaces import IAuthenticatorPlugin
+ plugin1 = DummyFailAuthenticator()
+ plugin2 = DummyAuthenticator()
registry = {
- IAuthenticatorPlugin:[DummyFailAuthenticator(),DummyAuthenticator()]
+ IAuthenticatorPlugin:[plugin1, plugin2]
}
mw = self._makeOne(registry=registry)
creds = {'login':'chris', 'password':'password'}
- userid = mw.authenticate(environ, creds, None)
+ userid, plugin = mw.authenticate(environ, creds, None)
self.assertEqual(userid, 'chris')
+ self.assertEqual(plugin, plugin2)
def test_authenticate_success_firstwins(self):
environ = self._makeEnviron()
mw = self._makeOne()
from repoze.pam.interfaces import IAuthenticatorPlugin
+ plugin1 = DummyAuthenticator('chris_id1')
+ plugin2 = DummyAuthenticator('chris_id2')
registry = {
- IAuthenticatorPlugin:[DummyAuthenticator('chris_id1'),
- DummyAuthenticator('chris_id2')]
+ IAuthenticatorPlugin:[plugin1, plugin2]
}
mw = self._makeOne(registry=registry)
creds = {'login':'chris', 'password':'password'}
- userid = mw.authenticate(environ, creds, None)
+ userid, plugin = mw.authenticate(environ, creds, None)
self.assertEqual(userid, 'chris_id1')
+ self.assertEqual(plugin, plugin1)
def test_authenticate_find_implicit_classifier(self):
environ = self._makeEnviron()
mw = self._makeOne()
from repoze.pam.interfaces import IAuthenticatorPlugin
plugin1 = DummyAuthenticator('chris_id1')
- plugin1.classifiers = set(['nomatch'])
+ plugin1.request_classifications = set(['nomatch'])
plugin2 = DummyAuthenticator('chris_id2')
registry = {
IAuthenticatorPlugin:[plugin1, plugin2]
}
mw = self._makeOne(registry=registry)
creds = {'login':'chris', 'password':'password'}
- userid = mw.authenticate(environ, creds, None)
+ userid, plugin = mw.authenticate(environ, creds, None)
self.assertEqual(userid, 'chris_id2')
+ self.assertEqual(plugin, plugin2)
def test_authenticate_find_explicit_classifier(self):
environ = self._makeEnviron()
mw = self._makeOne()
from repoze.pam.interfaces import IAuthenticatorPlugin
plugin1 = DummyAuthenticator('chris_id1')
- plugin1.classifiers = set(['nomatch'])
+ plugin1.request_classifications = set(['nomatch'])
plugin2 = DummyAuthenticator('chris_id2')
- plugin2.classifiers = set(['match'])
+ plugin2.request_classifications = set(['match'])
registry = {
IAuthenticatorPlugin:[plugin1, plugin2]
}
mw = self._makeOne(registry=registry)
creds = {'login':'chris', 'password':'password'}
- userid = mw.authenticate(environ, creds, 'match')
+ userid, plugin = mw.authenticate(environ, creds, 'match')
self.assertEqual(userid, 'chris_id2')
+ self.assertEqual(plugin, plugin2)
- def test_on_ingress_success_addcredentials(self):
+ def test_modify_environment_success_addcredentials(self):
environ = self._makeEnviron()
mw = self._makeOne()
- classification = mw.on_ingress(environ)
+ classification, headers = mw.modify_environment(environ)
self.assertEqual(classification, 'browser')
self.assertEqual(environ['REMOTE_USER'], 'chris')
self.assertEqual(environ['repoze.pam.credentials'],
{'login':'chris','password':'password'})
+ self.assertEqual(headers, [])
- def test_on_ingress_success_noaddcredentials(self):
+ def test_modify_environment_noaddcredentials(self):
environ = self._makeEnviron()
mw = self._makeOne()
mw.add_credentials = False
- classification = mw.on_ingress(environ)
+ classification, headers = mw.modify_environment(environ)
self.assertEqual(classification, 'browser')
self.assertEqual(environ['REMOTE_USER'], 'chris')
self.failIf(environ.has_key('repoze.pam.credentials'))
+ self.assertEqual(headers, [])
- def test_on_ingress_nocredentials(self):
+ def test_modify_environment_nocredentials(self):
environ = self._makeEnviron()
from repoze.pam.interfaces import IExtractorPlugin
registry = {
IExtractorPlugin:[DummyNoResultsExtractor()],
}
mw = self._makeOne(registry=registry)
- classification = mw.on_ingress(environ)
+ classification, headers = mw.modify_environment(environ)
self.assertEqual(classification, 'browser')
self.assertEqual(environ.get('REMOTE_USER'), None)
self.assertEqual(environ['repoze.pam.credentials'], {})
+ self.assertEqual(headers, [])
+ def test_modify_environment_remoteuser_already_set(self):
+ environ = self._makeEnviron({'REMOTE_USER':'admin'})
+ mw = self._makeOne()
+ classification, headers = mw.modify_environment(environ)
+ self.assertEqual(classification, 'browser')
+ self.assertEqual(environ.get('REMOTE_USER'), 'admin')
+ self.assertEqual(environ['repoze.pam.credentials'],
+ {'login':'chris', 'password':'password'})
+ self.assertEqual(headers, [])
+
+ def test_modify_environment_with_postextractor(self):
+ environ = self._makeEnviron({'REMOTE_USER':'admin'})
+ from repoze.pam.interfaces import IExtractorPlugin
+ from repoze.pam.interfaces import IPostExtractorPlugin
+ registry = {
+ IExtractorPlugin:[DummyExtractor()],
+ IPostExtractorPlugin:[DummyPostExtractor()],
+ }
+ mw = self._makeOne(registry=registry)
+ classification, headers = mw.modify_environment(environ)
+ self.assertEqual(classification, 'browser')
+ self.assertEqual(environ.get('REMOTE_USER'), 'admin')
+ self.assertEqual(environ['repoze.pam.credentials'],
+ {'login':'chris', 'password':'password'})
+ self.assertEqual(headers, [('foo', 'bar')])
+
class TestBasicAuthPlugin(Base):
def _getTargetClass(self):
from repoze.pam.plugins.basicauth import BasicAuthPlugin
@@ -237,56 +283,87 @@
verifyClass(IChallengerPlugin, klass)
verifyClass(IExtractorPlugin, klass)
- def test_challenge(self):
+ def test_challenge_non401(self):
plugin = self._makeOne('realm')
environ = self._makeEnviron()
- from paste.httpexceptions import HTTPUnauthorized
- self.assertRaises(HTTPUnauthorized, plugin.challenge, environ,
- None, None, None)
+ result = plugin.challenge(environ, '200 OK', {})
+ self.assertEqual(result, None)
+
+ def test_challenge_401(self):
+ plugin = self._makeOne('realm')
+ environ = self._makeEnviron()
+ result = plugin.challenge(environ, '401 Unauthorized', {})
+ self.assertNotEqual(result, None)
+ app_iter = result(environ, lambda *arg: None)
+ items = []
+ for item in app_iter:
+ items.append(item)
+ response = ''.join(items)
+ self.failUnless(response.startswith('401 Unauthorized'))
def test_extract_noauthinfo(self):
plugin = self._makeOne('realm')
environ = self._makeEnviron()
- result = plugin.extract(environ)
- self.assertEqual(result, {})
-
- def test_extract_nonbasic(self):
- plugin = self._makeOne('realm')
- environ = self._makeEnviron({'HTTP_AUTHORIZATION':'Digest abc'})
- result = plugin.extract(environ)
- self.assertEqual(result, {})
+ creds = plugin.extract(environ)
+ self.assertEqual(creds, {})
def test_extract_nonbasic(self):
plugin = self._makeOne('realm')
environ = self._makeEnviron({'HTTP_AUTHORIZATION':'Digest abc'})
- result = plugin.extract(environ)
- self.assertEqual(result, {})
+ creds = plugin.extract(environ)
+ self.assertEqual(creds, {})
def test_extract_basic_badencoding(self):
plugin = self._makeOne('realm')
environ = self._makeEnviron({'HTTP_AUTHORIZATION':'Basic abc'})
- result = plugin.extract(environ)
- self.assertEqual(result, {})
+ creds = plugin.extract(environ)
+ self.assertEqual(creds, {})
def test_extract_basic_badrepr(self):
plugin = self._makeOne('realm')
value = 'foo'.encode('base64')
environ = self._makeEnviron({'HTTP_AUTHORIZATION':'Basic %s' % value})
- result = plugin.extract(environ)
- self.assertEqual(result, {})
+ creds = plugin.extract(environ)
+ self.assertEqual(creds, {})
def test_extract_basic_ok(self):
plugin = self._makeOne('realm')
value = 'foo:bar'.encode('base64')
environ = self._makeEnviron({'HTTP_AUTHORIZATION':'Basic %s' % value})
- result = plugin.extract(environ)
- self.assertEqual(result, {'login':'foo', 'password':'bar'})
+ creds = plugin.extract(environ)
+ self.assertEqual(creds, {'login':'foo', 'password':'bar'})
+
+ def test_post_extract_nocreds(self):
+ plugin = self._makeOne('realm')
+ creds = {}
+ environ = self._makeEnviron()
+ result = plugin.post_extract(environ, creds, plugin)
+ self.assertEqual(result, None)
+ self.assertEqual(environ.get('HTTP_AUTHORIZATION'), None)
+
+ def test_post_extract_creds_withauthorization(self):
+ plugin = self._makeOne('realm')
+ creds = {'login':'foo', 'password':'password'}
+ environ = self._makeEnviron({'HTTP_AUTHORIZATION':'Basic foo'})
+ result = plugin.post_extract(environ, creds, plugin)
+ self.assertEqual(result, None)
+ self.assertEqual(environ['HTTP_AUTHORIZATION'], 'Basic foo')
+ def test_post_extract_creds_mutates(self):
+ plugin = self._makeOne('realm')
+ creds = {'login':'foo', 'password':'password'}
+ environ = self._makeEnviron()
+ result = plugin.post_extract(environ, creds, plugin)
+ self.assertEqual(result, None)
+ auth = 'foo:password'.encode('base64').rstrip()
+ auth = 'Basic ' + auth
+ self.assertEqual(environ['HTTP_AUTHORIZATION'], auth)
+
def test_factory(self):
from repoze.pam.plugins.basicauth import make_plugin
plugin = make_plugin({}, 'realm')
self.assertEqual(plugin.realm, 'realm')
-
+
class TestHTPasswdPlugin(Base):
def _getTargetClass(self):
from repoze.pam.plugins.htpasswd import HTPasswdPlugin
@@ -376,7 +453,84 @@
'repoze.pam.plugins.htpasswd:crypt_check')
self.assertEqual(plugin.filename, 'foo')
self.assertEqual(plugin.check, crypt_check)
+
+
+class TestInsecureCookiePlugin(Base):
+ def _getTargetClass(self):
+ from repoze.pam.plugins.cookie import InsecureCookiePlugin
+ return InsecureCookiePlugin
+
+ def _makeOne(self, *arg, **kw):
+ plugin = self._getTargetClass()(*arg, **kw)
+ return plugin
+
+ def test_implements(self):
+ from zope.interface.verify import verifyClass
+ from repoze.pam.interfaces import IExtractorPlugin
+ from repoze.pam.interfaces import IPostExtractorPlugin
+ klass = self._getTargetClass()
+ verifyClass(IExtractorPlugin, klass)
+ verifyClass(IPostExtractorPlugin, klass)
+
+ def test_extract_nocookies(self):
+ plugin = self._makeOne('oatmeal')
+ environ = self._makeEnviron()
+ result = plugin.extract(environ)
+ self.assertEqual(result, {})
+ def test_extract_badcookies(self):
+ plugin = self._makeOne('oatmeal')
+ environ = self._makeEnviron({'HTTP_COOKIE':'oatmeal=a'})
+ result = plugin.extract(environ)
+ self.assertEqual(result, {})
+
+ def test_extract_badcookies(self):
+ plugin = self._makeOne('oatmeal')
+ environ = self._makeEnviron({'HTTP_COOKIE':'oatmeal=a'})
+ result = plugin.extract(environ)
+ self.assertEqual(result, {})
+
+ def test_extract_success(self):
+ plugin = self._makeOne('oatmeal')
+ auth = 'foo:password'.encode('base64').rstrip()
+ environ = self._makeEnviron({'HTTP_COOKIE':'oatmeal=%s;' % auth})
+ result = plugin.extract(environ)
+ self.assertEqual(result, {'login':'foo', 'password':'password'})
+
+ def test_post_extract_nocreds(self):
+ plugin = self._makeOne('oatmeal')
+ creds = {}
+ environ = self._makeEnviron()
+ result = plugin.post_extract(environ, creds, plugin)
+ self.assertEqual(result, None)
+ self.assertEqual(environ.get('HTTP_COOKIE'), None)
+
+ def test_post_extract_creds_same(self):
+ plugin = self._makeOne('oatmeal')
+ creds = {'login':'foo', 'password':'password'}
+ auth = 'foo:password'.encode('base64').rstrip()
+ auth = 'oatmeal=%s;' % auth
+ environ = self._makeEnviron({'HTTP_COOKIE':auth})
+ result = plugin.post_extract(environ, creds, plugin)
+ self.assertEqual(result, None)
+ self.assertEqual(environ.get('HTTP_COOKIE'), auth)
+
+ def test_post_extract_creds_different(self):
+ plugin = self._makeOne('oatmeal')
+ creds = {'login':'bar', 'password':'password'}
+ auth = 'foo:password'.encode('base64').rstrip()
+ creds_auth = 'bar:password'.encode('base64').rstrip()
+ environ = self._makeEnviron({'HTTP_COOKIE':'oatmeal=%s;' % auth})
+ result = plugin.post_extract(environ, creds, plugin)
+ expected = 'oatmeal=%s; Path=/;' % creds_auth
+ self.assertEqual(result, [('Set-Cookie', expected)])
+ self.assertEqual(environ['HTTP_COOKIE'], 'oatmeal=%s;' % creds_auth)
+
+ def test_factory(self):
+ from repoze.pam.plugins.cookie import make_plugin
+ plugin = make_plugin(None, 'foo')
+ self.assertEqual(plugin.cookie_name, 'foo')
+
class TestDefaultRequestClassifier(Base):
def _getTargetClass(self):
@@ -477,5 +631,10 @@
return None
class DummyChallenger:
- def challenge(self, environ, request_classifier, headers, exception):
+ def challenge(self, environ, status, headers):
environ['challenged'] = True
+
+class DummyPostExtractor:
+ def post_extract(self, environ, credentials, extractor):
+ return [ ('foo', 'bar') ]
+
More information about the Repoze-checkins
mailing list