[Repoze-checkins] r735 - in repoze.pam/trunk: . repoze/pam repoze/pam/fixtures repoze/pam/plugins
Chris McDonough
chrism at agendaless.com
Tue Feb 26 01:44:48 UTC 2008
Author: Chris McDonough <chrism at agendaless.com>
Date: Mon Feb 25 20:44:48 2008
New Revision: 735
Log:
Extractor -> Identifier
Make pam responsible for deciding whether a challenge is required,
rather than individual challenge plugins.
Identifier plugins now have a forget method (used by the challenge app to
logout)
Identifier plugins now have a remember method (e.g. to set cookies
when a challenge is not required).
Remove "Plugin" from the names of interfaces.
IPostExtractor removed.
Add a simple test WSGI application for poking at through the browser.
Multiple identities and authentications are consulted, and a "best" one wins.
Added:
repoze.pam/trunk/repoze/pam/fixtures/__init__.py (contents, props changed)
repoze.pam/trunk/repoze/pam/fixtures/testapp.py (contents, props changed)
Modified:
repoze.pam/trunk/README.txt
repoze.pam/trunk/repoze/pam/classifiers.py
repoze.pam/trunk/repoze/pam/interfaces.py
repoze.pam/trunk/repoze/pam/middleware.py
repoze.pam/trunk/repoze/pam/plugins/basicauth.py
repoze.pam/trunk/repoze/pam/plugins/cookie.py
repoze.pam/trunk/repoze/pam/plugins/form.py
repoze.pam/trunk/repoze/pam/plugins/htpasswd.py
repoze.pam/trunk/repoze/pam/tests.py
Modified: repoze.pam/trunk/README.txt
==============================================================================
--- repoze.pam/trunk/README.txt (original)
+++ repoze.pam/trunk/README.txt Mon Feb 25 20:44:48 2008
@@ -8,18 +8,18 @@
Description
repoze.pam's ideas are largely culled from Zope 2's Pluggable
- Authentication Service (PAS) (but it is not dependent on Zope).
- Unlike PAS, it provides no facilities for creating user objects,
- assigning roles or groups to users, retrieving or changing user
- properties, or enumerating users, groups, or roles. These
- responsibilities are assumed to be the domain of the WSGI
- application you're serving. It also provides no facility for
- authorization (ensuring whether a user can or cannot perform the
- operation implied by the request). This is also the domain of the
- WSGI application.
+ Authentication Service (PAS) (but repoze.pam is not dependent on
+ Zope 2 in any way). Unlike PAS, it provides no facilities for
+ creating user objects, assigning roles or groups to users,
+ retrieving or changing user properties, or enumerating users,
+ groups, or roles. These responsibilities are assumed to be the
+ domain of the WSGI application you're serving. It also provides no
+ facility for authorization (ensuring whether a user can or cannot
+ perform the operation implied by the request). This is also the
+ domain of the WSGI application.
It attemtps to reuse implementations from AuthKit and paste.auth for
- some of its functionality.
+ some of its functionality. XXX this is, so far, untrue
Middleware Responsibilities
@@ -82,10 +82,10 @@
a WSGI request, and gives some subset of them a chance to influence
what is added to the WSGI environment.
-Ingress Stages
+Request (Ingress) Stages
repoze.pam performs the following operations in the following order
- during request ingress:
+ during middleware ingress:
1. Request Classification
@@ -116,10 +116,10 @@
extractor, the userid of the user would be returned (which would
be the same as the login name).
-Egress Stages
+Response (Egress) Stages
repoze.pam performs the following operations in the following order
- during request egress:
+ during middleware egress:
1. Response Classification
@@ -226,19 +226,19 @@
response_classifier = egg:repoze.pam#defaultresponseclassifier
[extractors]
- # plugin_name:ingressclassifier_name:.. or just plugin_name (good for any)
+ # plugin_name:requestclassifier_name:.. or just plugin_name (good for any)
plugins =
cookieauth:browser
basicauth
[authenticators]
- # plugin_name (ingress classifiers ignored)
+ # plugin_name:requestclassifier_name.. or just plugin_name (good for any)
plugins =
fileusers
sqlusers
[challengers]
- # plugin_name:egressclassifier_name:.. or just plugin_name (good for any)
+ # plugin_name:responseclassifier_name:.. or just plugin_name (good for any)
plugins =
cookieauth:browser
basicauth
@@ -255,12 +255,12 @@
The extractors section provides an ordered list of plugins that are
willing to provide extraction capability. These will be consulted
in the defined order. The tokens on each line of the plugin= key
- are in the form "plugin_name:classifier" (or just "plugin_name" if
- the plugin can be consulted regardless of the classification of the
- request). The configuration above indicates that the system will
- look for credentials using the cookie auth plugin (if the request is
- classified as a browser request), then the basic auth plugin
- (unconditionally).
+ are in the form "plugin_name:requestclassifier_name" (or just
+ "plugin_name" if the plugin can be consulted regardless of the
+ classification of the request). The configuration above indicates
+ that the system will look for credentials using the cookie auth
+ plugin (if the request is classified as a browser request), then the
+ basic auth plugin (unconditionally).
The authenticators section provides an ordered list of plugins that
provide authenticator capability. These will be consulted in the
@@ -274,11 +274,12 @@
provide challenger capability. These will be consulted in the
defined order, so the system will consult the cookie auth plugin
first, then the basic auth plugin. Each will have a chance, based
- on the request, to initiate a challenge.
+ on the response classification, to initiate a challenge. The above
+ configuration indicates that the cookieauth challenger will fire if
+ it's a browser request, and the basic auth challenger will fire if
+ it's not (fallback).
Interfaces
- The following interfaces are expected to be provided by plugins
- which the configuration asserts they're willing to provide::
+ See repoze.pam.interfaces.
- XXX see interfaces.py
Modified: repoze.pam/trunk/repoze/pam/classifiers.py
==============================================================================
--- repoze.pam/trunk/repoze/pam/classifiers.py (original)
+++ repoze.pam/trunk/repoze/pam/classifiers.py Mon Feb 25 20:44:48 2008
@@ -2,58 +2,54 @@
from paste.httpheaders import REQUEST_METHOD
from paste.httpheaders import CONTENT_TYPE
-from zope.interface import implements
-
+import zope.interface
from repoze.pam.interfaces import IRequestClassifier
-from repoze.pam.interfaces import IResponseClassifier
-
-class DefaultRequestClassifier(object):
- implements(IRequestClassifier)
-
- _DAV_METHODS = (
- 'OPTIONS',
- 'PROPFIND',
- 'PROPPATCH',
- 'MKCOL',
- 'LOCK',
- 'UNLOCK',
- 'TRACE',
- 'DELETE',
- 'COPY',
- 'MOVE'
- )
-
- _DAV_USERAGENTS = ( # convenience, override as necessary
- 'Microsoft Data Access Internet Publishing Provider',
- 'WebDrive',
- 'Zope External Editor',
- 'WebDAVFS',
- 'Goliath',
- 'neon',
- 'davlib',
- 'wsAPI',
- 'Microsoft-WebDAV'
- )
+from repoze.pam.interfaces import IChallengeDecider
- def __call__(self, environ):
- """ Returns one of the classifiers 'dav', 'xmlpost', or
- 'browser', depending on the imperative logic below"""
- request_method = REQUEST_METHOD(environ)
- if request_method in self._DAV_METHODS:
- return 'dav'
- useragent = USER_AGENT(environ)
- if useragent:
- for agent in self._DAV_USERAGENTS:
- if useragent.find(agent) != -1:
- return 'dav'
- if request_method == 'POST':
- if CONTENT_TYPE(environ) == 'text/xml':
- return 'xmlpost'
- return 'browser'
-
-class DefaultResponseClassifier(object):
- implements(IResponseClassifier)
- def __call__(self, environ, request_classification, status, headers):
- """ By default, return the request classification """
- return request_classification
-
+_DAV_METHODS = (
+ 'OPTIONS',
+ 'PROPFIND',
+ 'PROPPATCH',
+ 'MKCOL',
+ 'LOCK',
+ 'UNLOCK',
+ 'TRACE',
+ 'DELETE',
+ 'COPY',
+ 'MOVE'
+ )
+
+_DAV_USERAGENTS = (
+ 'Microsoft Data Access Internet Publishing Provider',
+ 'WebDrive',
+ 'Zope External Editor',
+ 'WebDAVFS',
+ 'Goliath',
+ 'neon',
+ 'davlib',
+ 'wsAPI',
+ 'Microsoft-WebDAV'
+ )
+
+ at zope.interface.implementer(IRequestClassifier)
+def default_request_classifier(environ):
+ """ Returns one of the classifiers 'dav', 'xmlpost', or 'browser',
+ depending on the imperative logic below"""
+ request_method = REQUEST_METHOD(environ)
+ if request_method in _DAV_METHODS:
+ return 'dav'
+ useragent = USER_AGENT(environ)
+ if useragent:
+ for agent in _DAV_USERAGENTS:
+ if useragent.find(agent) != -1:
+ return 'dav'
+ if request_method == 'POST':
+ if CONTENT_TYPE(environ) == 'text/xml':
+ return 'xmlpost'
+ return 'browser'
+
+ at zope.interface.implementer(IChallengeDecider)
+def default_challenge_decider(environ, status, headers):
+ if status.startswith('401 '):
+ return True
+ return False
Added: repoze.pam/trunk/repoze/pam/fixtures/__init__.py
==============================================================================
--- (empty file)
+++ repoze.pam/trunk/repoze/pam/fixtures/__init__.py Mon Feb 25 20:44:48 2008
@@ -0,0 +1 @@
+# this is a package
Added: repoze.pam/trunk/repoze/pam/fixtures/testapp.py
==============================================================================
--- (empty file)
+++ repoze.pam/trunk/repoze/pam/fixtures/testapp.py Mon Feb 25 20:44:48 2008
@@ -0,0 +1,46 @@
+def deny(start_response, msg):
+ ct = 'text/plain'
+ cl = str(len(msg))
+ start_response('401 Unauthorized',
+ [ ('Content-Type', ct),
+ ('Content-Length', cl) ],
+ )
+
+def allow(start_response, msg):
+ ct = 'text/plain'
+ cl = str(len(msg))
+ start_response('200 OK',
+ [ ('Content-Type', ct),
+ ('Content-Length', cl) ],
+ )
+ return [msg]
+
+def app(environ, start_response):
+ path_info = environ['PATH_INFO']
+ remote_user = environ.get('REMOTE_USER')
+ if path_info.endswith('/shared'):
+ if not remote_user:
+ return deny(start_response, 'You cant do that')
+ else:
+ return allow(start_response, 'Welcome to the shared area, %s' %
+ remote_user)
+ elif path_info.endswith('/admin'):
+ if remote_user != 'admin':
+ return deny(start_response, 'Only admin can do that')
+ else:
+ return allow(start_response, 'Hello, admin!')
+ elif path_info.endswith('/chris'):
+ if remote_user != 'chris':
+ return deny(start_response, 'Only chris can do that')
+ else:
+ return allow(start_response, 'Hello, chris!')
+ else:
+ return allow(start_response, 'Unprotected page')
+
+def make_app(global_config, **kw):
+ return app
+
+
+
+
+
Modified: repoze.pam/trunk/repoze/pam/interfaces.py
==============================================================================
--- repoze.pam/trunk/repoze/pam/interfaces.py (original)
+++ repoze.pam/trunk/repoze/pam/interfaces.py Mon Feb 25 20:44:48 2008
@@ -12,99 +12,110 @@
o 'environ' is the WSGI environment.
"""
-class IResponseClassifier(Interface):
- """ On egress: classify a response.
+class IChallengeDecider(Interface):
+ """ On egress: decide whether a challenge needs to be presented
+ to the user.
"""
- def __call__(environ, request_classification, status, headers):
- """ args -> response classifier string
-
- This interface is responsible for returning a string representing
- a response classification.
+ def __call__(environ, status, headers):
+ """ args -> True | False
o 'environ' is the WSGI environment.
- o 'request_classification' is the classification returned during
- ingress by the request classifier.
+ o 'status' is the HTTP status as returned by the downstream
+ WSGI application.
+
+ o 'headers' are the headers returned by the downstream WSGI
+ application.
+
+ This interface is responsible for returning True if
+ a challenge needs to be presented to the user, False otherwise.
+ """
- o 'status' is the status written into start_response by
- the downstream application.
+class IIdentifier(Interface):
- o 'headers' is the headers tuple written into start_response
- by the downstream application.
- """
+ """
+ On ingress: Extract credentials from the WSGI environment and
+ turn them into an identity.
-class IExtractorPlugin(Interface):
+ On egress (remember): Conditionally set information in the response headers
+ allowing the remote system to remember this identity.
- """ On ingress: Extract credentials from the WSGI environment.
+ On egress (forget): Conditionally set information in the response
+ headers allowing the remote system to forget this identity (during
+ a challenge).
"""
- def extract(environ):
- """ environ -> { 'login' : login
+ def identify(environ):
+ """ On ingress:
+
+ environ -> { 'login' : login
, 'password' : password
, k1 : v1
, ...
, kN : vN
- } | {}
+ } | None
o 'environ' is the WSGI environment.
- o If credentials are found, the returned mapping will contain at
- least 'login', 'password', 'remote_host' and 'remote_addr' keys.
+ o If credentials are found, the returned identity mapping will
+ contain at least 'login' and 'password' keys (and others as
+ necessary for special-case needs).
- o Return an empty mapping to indicate that the plugin found no
- appropriate credentials.
+ o Return None to indicate that the plugin found no appropriate
+ credentials.
- o Only extraction plugins which match one of the the current
- request's classifications will be asked to perform extraction.
+ o Only IIdentifier plugins which match one of the the current
+ request's classifications will be asked to perform
+ identification.
"""
-class IPostExtractorPlugin(Interface):
- """ On ingress: allow the plugin to have a chance to influence the
- environment once credentials are established and return extra
- headers that will be set in the eventual response.
+ def remember(environ, identity):
+ """ On egress (no challenge required):
- Each post-extractor matching the request classification is called
- unconditionally after extraction.
- """
-
- def post_extract(environ, credentials, extractor):
- """ args -> [ (header-name, header-value), ..] | None
+ args -> [ (header-name, header-value), ...] | None
- o 'environ' is the WSGI environment.
+ Return a list of headers suitable for allowing the requesting
+ system to remember the identification information (e.g. a
+ Set-Cookie header). Return None if no headers need to be set.
+ These headers will be appended to any headers returned by the
+ downstream application.
+ """
- o credentials are the credentials that were extracted by
- repoze.pam during the extraction step.
+ def forget(environ, identity):
+ """ On egress (challenge required):
- o 'extractor' is the plugin instance that provided the
- credentials. If no plugin instance provided credentials to
- repoze.pam, this will be None.
+ args -> [ (header-name, header-value), ...] | None
- The return value should be a list of tuples, where each tuple is
- in the form (header-name, header-value), e.g.
- [ ('Set-Cookie', 'cookie_name=foo; Path=/') ] or None if
- no headers should be set.
+ Return a list of headers suitable for allowing the requesting
+ system to forget the identification information (e.g. a
+ Set-Cookie header with an expires date in the past). Return
+ None if no headers need to be set. These headers will be
+ included in the response provided by the challenge app.
"""
-class IAuthenticatorPlugin(Interface):
+class IAuthenticator(Interface):
- """ On ingress: Map credentials to a user ID.
+ """ On ingress: validate the identity and return a user id or None.
"""
- def authenticate(environ, credentials):
- """ credentials -> userid
+ def authenticate(environ, identity):
+ """ identity -> 'userid' | None
o 'environ' is the WSGI environment.
- o 'credentials' will be a mapping, as returned by IExtractionPlugin.
+ o 'identity' will be a mapping containing at least 'login' and
+ 'password' key/value pairs.
- o If credentials are found, the userid will be returned; this will
- be the value placed into the REMOTE_USER key in the environ
- to be used by downstream applications.
+ o The IAuthenticator should return a single user id (should be
+ a string) or None if the identify cannot be authenticated.
- o If the credentials cannot be authenticated, None will be returned.
+ Each instance of a registered IAuthenticator plugin that
+ matches the request classifier will be called N times during a
+ single request, where N is the number of identities found by
+ any IIdentifierPlugin instances.
"""
-class IChallengerPlugin(Interface):
+class IChallenger(Interface):
""" On egress: Conditionally initiate a challenge to the user to
provide credentials.
@@ -114,7 +125,7 @@
challenge.
"""
- def challenge(environ, status, headers):
+ def challenge(environ, status, app_headers, forget_headers):
""" args -> WSGI application or None
o 'environ' is the WSGI environment.
@@ -122,9 +133,14 @@
o 'status' is the status written into start_response by the
downstream application.
- o 'headers' is the headers tuple written into start_response by the
+ o 'app_headers' is the headers list written into start_response by the
downstream application.
+ o 'forget_headers' is a list of headers which must be passed
+ back in the response in order to perform credentials reset
+ (logout). These come from the 'forget' method of
+ IIdentifier plugin used to do the request's identification.
+
Examine the values passed in and return a WSGI application
(a callable which accepts environ and start_response as its
two positional arguments, ala PEP 333) which causes a
Modified: repoze.pam/trunk/repoze/pam/middleware.py
==============================================================================
--- repoze.pam/trunk/repoze/pam/middleware.py (original)
+++ repoze.pam/trunk/repoze/pam/middleware.py Mon Feb 25 20:44:48 2008
@@ -4,48 +4,30 @@
from paste.httpheaders import REMOTE_USER
-from repoze.pam.interfaces import IAuthenticatorPlugin
-from repoze.pam.interfaces import IExtractorPlugin
-from repoze.pam.interfaces import IPostExtractorPlugin
-from repoze.pam.interfaces import IChallengerPlugin
-
-class StartResponseWrapper(object):
- def __init__(self, start_response, extra_headers):
- self.start_response = start_response
- self.extra_headers = extra_headers
- self.headers = []
- self.buffer = StringIO()
-
- def wrap_start_response(self, status, headers, exc_info=None):
- self.headers = headers
- self.status = status
- return self.buffer.write
-
- def finish_response(self):
- headers = self.headers + self.extra_headers
- write = self.start_response(self.status, headers)
- if write:
- self.buffer.seek(0)
- write(self.buffer.getvalue())
- if hasattr(write, 'close'):
- write.close()
+from repoze.pam.interfaces import IIdentifier
+from repoze.pam.interfaces import IAuthenticator
+from repoze.pam.interfaces import IChallenger
_STARTED = '-- repoze.pam request started --'
_ENDED = '-- repoze.pam request ended --'
class PluggableAuthenticationMiddleware(object):
def __init__(self, app,
- registry,
- request_classifier,
- response_classifier,
- add_credentials=False,
+ identifiers,
+ authenticators,
+ challengers,
+ classifier,
+ challenge_decider,
log_stream=None,
- log_level=logging.INFO):
- self.registry = registry
+ log_level=logging.INFO
+ ):
+ iregistry, nregistry = make_registries(identifiers, authenticators,
+ challengers)
+ self.registry = iregistry
+ self.name_registry = nregistry
self.app = app
- self.request_classifier = request_classifier
- self.response_classifier = response_classifier
- self.add_credentials = add_credentials
+ self.classifier = classifier
+ self.challenge_decider = challenge_decider
self.logger = None
if log_stream:
handler = logging.StreamHandler(log_stream)
@@ -57,170 +39,145 @@
self.logger.setLevel(log_level)
def __call__(self, environ, start_response):
- logger = self.logger
- logger and logger.info(_STARTED)
- classification, extra_headers = self.modify_environment(environ)
+ if REMOTE_USER(environ):
+ # act as a pass through if REMOTE_USER is already set
+ return self.app(environ, start_response)
- wrapper = StartResponseWrapper(start_response, extra_headers)
- app_iter = self.app(environ, wrapper.wrap_start_response)
+ environ['repoze.pam.plugins'] = self.name_registry
- challenge_app = self.challenge(
- environ,
- classification,
- wrapper.status,
- wrapper.headers
- )
- logger and logger.info('challenge app used: %s' % challenge_app)
-
- if challenge_app is not None:
- if hasattr(app_iter, 'close'):
- app_iter.close()
- logger and logger.info(_ENDED)
- return challenge_app(environ, start_response)
- else:
- wrapper.finish_response()
- logger and logger.info(_ENDED)
- return app_iter
-
- def modify_environment(self, environ):
- # happens on ingress
- classification = self.request_classifier(environ)
logger = self.logger
+ logger and logger.info(_STARTED)
+ classification = self.classifier(environ)
logger and logger.info('request classification: %s' % classification)
- credentials, extractor = self.extract(environ, classification)
- headers = self.after_extract(environ, credentials, extractor,
- classification)
userid = None
+ identity = None
+ identifier = None
- if credentials:
- userid, authenticator = self.authenticate(environ,
- credentials,
- classification)
- else:
- logger and logger.info(
- 'no authenticator plugin used (no credentials)')
+ ids = self.identify(environ, classification)
+ # ids will be list of tuples: [ (IIdentifier, identity) ]
+ if ids:
+ auth_ids = self.authenticate(environ, classification, ids)
+ # auth_ids will be a list of four-tuples; when sorted,
+ # its first element will be the "best" identity. The fourth
+ # element in the tuple is the user_id.
+ if auth_ids:
+ auth_ids.sort()
+ best = auth_ids[0]
+ identity = best[2]
+ userid = best[3]
+ identifier = best[1][1]
+ environ['REMOTE_USER'] = userid
- if self.add_credentials:
- environ['repoze.pam.credentials'] = credentials
+ wrapper = StartResponseWrapper(start_response)
+ app_iter = self.app(environ, wrapper.wrap_start_response)
- remote_user_not_set = not REMOTE_USER(environ)
+ if self.challenge_decider(environ, wrapper.status, wrapper.headers):
- if remote_user_not_set and userid:
- # only set REMOTE_USER if it's not yet set
- logger and logger.info('REMOTE_USER set to %s' % userid)
- environ['REMOTE_USER'] = userid
+ challenge_app = self.challenge(
+ environ,
+ classification,
+ wrapper.status,
+ wrapper.headers,
+ identifier,
+ identity
+ )
+ if challenge_app is not None:
+ if app_iter:
+ list(app_iter) # unwind the original app iterator
+ # replace the downstream app with the challenge app
+ app_iter = challenge_app(environ, start_response)
+ else:
+ raise RuntimeError('no challengers found')
else:
- logger and logger.info('REMOTE_USER not set')
+ wrapper.finish_response()
+ logger and logger.info(_ENDED)
- return classification, headers
-
- def extract(self, environ, classification):
- # happens on ingress
- candidates = self.registry.get(IExtractorPlugin, ())
- plugins = self._match_classification(candidates, classification,
- 'request_classifications')
- logger = self.logger
- logger and self.logger.info(
- 'extractor plugins consulted %s' % plugins)
+ logger and logger.info(_ENDED)
+ return app_iter
- for plugin in plugins:
- creds = plugin.extract(environ)
- logger and logger.debug(
- 'credentials returned from extractor %s: %s' %
- (plugin, creds)
- )
- if creds:
- # XXX PAS returns all credentials (it fully iterates over all
- # extraction plugins)
- logger and logger.info(
- 'using credentials returned from extractor %s' % plugin)
- return creds, plugin
- logger and logger.info('no extractor plugins found credentials')
- return {}, None
-
- def after_extract(self, environ, credentials, extractor, classification):
- candidates = self.registry.get(IPostExtractorPlugin, ())
- plugins = self._match_classification(candidates,
- classification,
- 'request_classifications')
+ def identify(self, environ, classification):
logger = self.logger
- logger and logger.info(
- 'post-extractor plugins consulted %s' % plugins)
+ candidates = self.registry.get(IIdentifier, ())
+ logger and self.logger.info('identifier plugins registered %s' %
+ candidates)
+ plugins = self._match_classification(candidates, classification)
+ logger and self.logger.info(
+ 'identifier plugins matched for '
+ 'classification "%s": %s' % (classification, plugins))
- extra_headers = {}
+ results = []
for plugin in plugins:
- headers = plugin.post_extract(environ, credentials, extractor)
- logger and logger.debug(
- 'headers returned from post-extractor %s: %s' %
- (plugin, headers)
- )
- if headers:
- extra_headers[plugin] = headers
-
- logger and logger.info('extra headers gathered: %s' % extra_headers)
+ identity = plugin.identify(environ)
+ if identity:
+ logger and logger.debug(
+ 'identity returned from %s: %s' % (plugin, identity))
+ results.append((plugin, identity))
+ else:
+ logger and logger.debug(
+ 'no identity returned from %s (%s)' % (plugin, identity))
+
+ logger and logger.debug('identities found: %s' % results)
+ return results
+
+ def authenticate(self, environ, classification, identities):
+ candidates = self.registry.get(IAuthenticator, ())
+ plugins = self._match_classification(candidates, classification)
- return flatten(extra_headers.values())
-
- def authenticate(self, environ, credentials, classification):
- # happens on ingress
- candidates = self.registry.get(IAuthenticatorPlugin, ())
- plugins = self._match_classification(candidates,
- classification,
- 'request_classifications')
- logger = self.logger
-
- logger and logger.info(
- 'authenticator plugins consulted %s' % plugins)
+ results = []
+ auth_rank = 0
for plugin in plugins:
- userid = plugin.authenticate(environ, credentials)
- logger and logger.info(
- 'userid returned from authenticator %s: %s' %
- (plugin, userid)
- )
- if userid:
- logger and logger.info(
- 'using userid returned from authenticator %s' % plugin)
- return userid, plugin
+ identifier_rank = 0
+ for identifier, identity in identities:
+ userid = plugin.authenticate(environ, identity)
+ if userid:
+ tup = ( (auth_rank, plugin),
+ (identifier_rank, identifier),
+ identity,
+ userid
+ )
+ results.append(tup)
+ identifier_rank += 1
+ auth_rank += 1
- logger and logger.info('no authenticator plugin authenticated a userid')
- return None, None
+ return results
- def challenge(self, environ, request_classification, status, headers):
+ def challenge(self, environ, classification, status, app_headers,
+ identifier, identity):
# happens on egress
- classification = self.response_classifier(
- environ,
- request_classification,
- status,
- headers
- )
-
logger = self.logger
- logger and logger.info('response classification: %s' % classification)
- candidates = self.registry.get(IChallengerPlugin, ())
- plugins = self._match_classification(candidates,
- classification,
- 'response_classifications')
- logger and logger.info('challenger plugins consulted: %s' % plugins)
+ forget_headers = []
+
+ if identifier:
+ forget_headers = identifier.forget(environ, identity)
+ if forget_headers is None:
+ forget_headers = []
+
+ candidates = self.registry.get(IChallenger, ())
+ logger and logger.info('challengers registered: %s' % candidates)
+ plugins = self._match_classification(candidates, classification)
+ logger and logger.info('challengers matched for '
+ 'classification "%s": %s' % (classification,
+ plugins))
for plugin in plugins:
- app = plugin.challenge(environ, status, headers)
- logger and logger.debug('app returned from challenger %s: %s' %
- (plugin, app)
- )
+ app = plugin.challenge(environ, status, app_headers,
+ forget_headers)
if app is not None:
# new WSGI application
logger and logger.info(
- 'challenger plugin %s returned an app: %s' % (plugin, app))
+ 'challenger plugin %s "challenge" returned an app' % (
+ plugin))
return app
- logger and logger.info('no challenge app returned')
+
# signifies no challenge
+ logger and logger.info('no challenge app returned')
return None
- def _match_classification(self, plugins, classification, attr):
+ def _match_classification(self, plugins, classification):
result = []
for plugin in plugins:
- plugin_classifications = getattr(plugin, attr, None)
+ plugin_classifications = getattr(plugin, 'classifications', None)
if not plugin_classifications: # good for any
result.append(plugin)
continue
@@ -229,6 +186,28 @@
return result
+class StartResponseWrapper(object):
+ def __init__(self, start_response):
+ self.start_response = start_response
+ self.headers = []
+ self.buffer = StringIO()
+
+ def wrap_start_response(self, status, headers, exc_info=None):
+ self.headers = headers
+ self.status = status
+ return self.buffer.write
+
+ def finish_response(self):
+ headers = self.headers
+ write = self.start_response(self.status, headers)
+ if write:
+ self.buffer.seek(0)
+ value = self.buffer.getvalue()
+ if value:
+ write(value)
+ if hasattr(write, 'close'):
+ write.close()
+
def flatten(L):
result = []
for seq in L:
@@ -248,59 +227,61 @@
from repoze.pam.plugins.cookie import InsecureCookiePlugin
from repoze.pam.plugins.form import FormPlugin
basicauth = BasicAuthPlugin('repoze.pam')
- any = set() # means good for any classification
- basicauth.request_classifications = any
- basicauth.response_classifications = any
+ any = None # means good for any classification
+ basicauth.classifications = any
from StringIO import StringIO
from repoze.pam.plugins.htpasswd import crypt_check
io = StringIO()
salt = 'aa'
import crypt
- for name, password in [ ('admin', 'admin') ]:
- io.write('name:%s\n' % crypt.crypt(password, salt))
+ for name, password in [ ('admin', 'admin'), ('chris', 'chris') ]:
+ io.write('%s:%s\n' % (name, crypt.crypt(password, salt)))
+ io.seek(0)
htpasswd = HTPasswdPlugin(io, crypt_check)
- htpasswd.request_classifications = any
- htpasswd.response_classifications = any
+ htpasswd.classifications = any
cookie = InsecureCookiePlugin('oatmeal')
- cookie.request_classifications = any
- cookie.response_classifications = any
- form = FormPlugin('__do_login')
- # only do form extract/challenge for browser requests
- form.request_classifications = set(('browser',))
- form.response_classifications = set(('browser',))
- registry = make_registry(
- extractors = (cookie, basicauth, form),
- post_extractors = (cookie, basicauth),
- authenticators = (htpasswd,),
- challengers = (form, basicauth),
+ cookie.classifications = any
+ form = FormPlugin('__do_login', rememberer_name='cookie')
+ form.classifications = set(('browser',)) # only for for browser requests
+ identifiers = [('form', form),('cookie',cookie),('basicauth',basicauth) ]
+ authenticators = [('htpasswd', htpasswd)]
+ challengers = [('form',form), ('basicauth',basicauth)]
+ from repoze.pam.classifiers import default_request_classifier
+ from repoze.pam.classifiers import default_challenge_decider
+ middleware = PluggableAuthenticationMiddleware(
+ app,
+ identifiers,
+ authenticators,
+ challengers,
+ default_request_classifier,
+ default_challenge_decider,
+ log_stream=sys.stdout,
+ log_level = logging.DEBUG
)
- from repoze.pam.classifiers import DefaultRequestClassifier
- from repoze.pam.classifiers import DefaultResponseClassifier
- request_classifier = DefaultRequestClassifier()
- response_classifier = DefaultResponseClassifier()
- middleware = PluggableAuthenticationMiddleware(app,
- registry,
- request_classifier,
- response_classifier,
- log_stream=sys.stdout,
- log_level = logging.DEBUG
- )
return middleware
-def verify(plugins, iface):
+def verify(plugin, iface):
from zope.interface.verify import verifyObject
- for plugin in plugins:
- verifyObject(iface, plugin, tentative=True)
+ verifyObject(iface, plugin, tentative=True)
-def make_registry(extractors, post_extractors, authenticators, challengers):
- registry = {}
- verify(extractors, IExtractorPlugin)
- registry[IExtractorPlugin] = extractors
- verify(post_extractors, IExtractorPlugin)
- registry[IPostExtractorPlugin] = post_extractors
- verify(authenticators, IAuthenticatorPlugin)
- registry[IAuthenticatorPlugin] = authenticators
- verify(challengers, IChallengerPlugin)
- registry[IChallengerPlugin] = challengers
- return registry
+def make_registries(identifiers, authenticators, challengers):
+ from zope.interface.verify import BrokenImplementation
+ interface_registry = {}
+ name_registry = {}
+
+ for supplied, iface in [ (identifiers, IIdentifier),
+ (authenticators, IAuthenticator),
+ (challengers, IChallenger) ]:
+ for name, value in supplied:
+ try:
+ verify(value, iface)
+ except BrokenImplementation, why:
+ why = str(why)
+ raise ValueError(name + ': ' + why)
+ L = interface_registry.setdefault(iface, [])
+ L.append(value)
+ name_registry[name] = value
+
+ return interface_registry, name_registry
+
Modified: repoze.pam/trunk/repoze/pam/plugins/basicauth.py
==============================================================================
--- repoze.pam/trunk/repoze/pam/plugins/basicauth.py (original)
+++ repoze.pam/trunk/repoze/pam/plugins/basicauth.py Mon Feb 25 20:44:48 2008
@@ -6,25 +6,18 @@
from zope.interface import implements
-from repoze.pam.interfaces import IChallengerPlugin
-from repoze.pam.interfaces import IExtractorPlugin
-from repoze.pam.interfaces import IPostExtractorPlugin
+from repoze.pam.interfaces import IIdentifier
+from repoze.pam.interfaces import IChallenger
class BasicAuthPlugin(object):
- implements(IChallengerPlugin, IExtractorPlugin, IPostExtractorPlugin)
+ implements(IIdentifier, IChallenger)
def __init__(self, realm):
self.realm = realm
- # IChallengerPlugin
- def challenge(self, environ, status, headers):
- if status == '401 Unauthorized':
- headers = WWW_AUTHENTICATE.tuples('Basic realm="%s"' % self.realm)
- return HTTPUnauthorized(headers=headers)
-
- # IExtractorPlugin
- def extract(self, environ):
+ # IIdentifier
+ def identify(self, environ):
authorization = AUTHORIZATION(environ)
try:
authmeth, auth = authorization.split(' ', 1)
@@ -44,14 +37,26 @@
return {}
- # IPostExtractorPlugin
- def post_extract(self, environ, credentials, extractor):
- if credentials:
- if not AUTHORIZATION(environ):
- auth = '%(login)s:%(password)s' % credentials
- auth = auth.encode('base64').rstrip()
- header = 'Basic %s' % auth
- environ['HTTP_AUTHORIZATION'] = header
+ # IIdentifier
+ def remember(self, environ, identity):
+ # we need to do nothing here; the browser remembers the basic
+ # auth info as a result of the user typing it in.
+ pass
+
+ def _get_wwwauth(self):
+ head = WWW_AUTHENTICATE.tuples('Basic realm="%s"' % self.realm)
+ return head
+
+ # IIdentifier
+ def forget(self, environ, identity):
+ return self._get_wwwauth()
+
+ # IChallenger
+ def challenge(self, environ, status, app_headers, forget_headers):
+ head = self._get_wwwauth()
+ if head != forget_headers:
+ head = head + forget_headers
+ return HTTPUnauthorized(headers=head)
def make_plugin(pam_conf, realm='basic'):
plugin = BasicAuthPlugin(realm)
Modified: repoze.pam/trunk/repoze/pam/plugins/cookie.py
==============================================================================
--- repoze.pam/trunk/repoze/pam/plugins/cookie.py (original)
+++ repoze.pam/trunk/repoze/pam/plugins/cookie.py Mon Feb 25 20:44:48 2008
@@ -4,18 +4,17 @@
from zope.interface import implements
-from repoze.pam.interfaces import IExtractorPlugin
-from repoze.pam.interfaces import IPostExtractorPlugin
+from repoze.pam.interfaces import IIdentifier
class InsecureCookiePlugin(object):
- implements(IExtractorPlugin, IPostExtractorPlugin)
+ implements(IIdentifier)
def __init__(self, cookie_name):
self.cookie_name = cookie_name
- # IExtractorPlugin
- def extract(self, environ):
+ # IIdentifier
+ def identify(self, environ):
cookies = get_cookies(environ)
cookie = cookies.get(self.cookie_name)
@@ -33,23 +32,24 @@
except ValueError: # not enough values to unpack
return {}
- # IPostExtractorPlugin
- def post_extract(self, environ, credentials, extractor):
- if credentials:
- cookie_value = '%(login)s:%(password)s' % credentials
- cookie_value = cookie_value.encode('base64').rstrip()
- cookies = get_cookies(environ)
- existing = cookies.get(self.cookie_name)
- value = getattr(existing, 'value', None)
- if value != cookie_value:
- # go ahead and set it in the environment for downstream
- # apps to consume (XXX?)
- cookies[self.cookie_name] = cookie_value
- output = cookies.output(header='', sep='').lstrip()
- environ['HTTP_COOKIE'] = output
- # return a Set-Cookie header
- set_cookie = '%s=%s; Path=/;' % (self.cookie_name, cookie_value)
- return [('Set-Cookie', set_cookie)]
+ # IIdentifier
+ def forget(self, environ, identity):
+ # return a expires Set-Cookie header
+ expired = ('%s=""; Path=/; Expires=Sun, 10-May-1971 11:59:00 GMT' %
+ self.cookie_name)
+ return [('Set-Cookie', expired)]
+
+ # IIdentifier
+ def remember(self, environ, identity):
+ cookie_value = '%(login)s:%(password)s' % identity
+ cookie_value = cookie_value.encode('base64').rstrip()
+ cookies = get_cookies(environ)
+ existing = cookies.get(self.cookie_name)
+ value = getattr(existing, 'value', None)
+ if value != cookie_value:
+ # return a Set-Cookie header
+ set_cookie = '%s=%s; Path=/;' % (self.cookie_name, cookie_value)
+ return [('Set-Cookie', set_cookie)]
def make_plugin(pam_conf, cookie_name='repoze.pam.plugins.cookie'):
plugin = InsecureCookiePlugin(cookie_name)
Modified: repoze.pam/trunk/repoze/pam/plugins/form.py
==============================================================================
--- repoze.pam/trunk/repoze/pam/plugins/form.py (original)
+++ repoze.pam/trunk/repoze/pam/plugins/form.py Mon Feb 25 20:44:48 2008
@@ -6,8 +6,8 @@
from zope.interface import implements
-from repoze.pam.interfaces import IChallengerPlugin
-from repoze.pam.interfaces import IExtractorPlugin
+from repoze.pam.interfaces import IChallenger
+from repoze.pam.interfaces import IIdentifier
_DEFAULT_FORM = """
<html>
@@ -42,24 +42,19 @@
</html>
"""
-def auth_form(environ, start_response):
- import pprint
- form = _DEFAULT_FORM % pprint.pformat(environ)
- content_length = CONTENT_LENGTH.tuples(str(len(form)))
- content_type = CONTENT_TYPE.tuples('text/html')
- headers = content_length + content_type
- start_response('200 OK', headers)
- return [form]
-
class FormPlugin(object):
- implements(IChallengerPlugin, IExtractorPlugin)
+ implements(IChallenger, IIdentifier)
- def __init__(self, login_form_qs):
+ def __init__(self, login_form_qs, rememberer_name):
self.login_form_qs = login_form_qs
+ # rememberer_name is the name of another configured plugin which
+ # implements IIdentifier, to handle remember and forget duties
+ # (ala a cookie plugin or a session plugin)
+ self.rememberer_name = rememberer_name
- # IExtractorPlugin
- def extract(self, environ):
+ # IIdentifier
+ def identify(self, environ):
query = parse_dict_querystring(environ)
# If the extractor finds a special query string on any request,
# it will attempt to find the values in the input body.
@@ -79,12 +74,38 @@
return {}
- # IChallengerPlugin
- def challenge(self, environ, status, headers):
- if status == '401 Unauthorized':
- return auth_form
-
-def make_plugin(pam_conf, login_form_qs='__do_login'):
- plugin = FormPlugin(login_form_qs)
+ def _get_rememberer(self, environ):
+ rememberer = environ['repoze.pam.plugins'][self.rememberer_name]
+ return rememberer
+
+ # IIdentifier
+ def remember(self, environ, identity):
+ rememberer = self._get_rememberer(environ)
+ return rememberer.remember(environ, identity)
+
+ # IIdentifier
+ def forget(self, environ, identity):
+ rememberer = self._get_rememberer(environ)
+ self.rememberer.forget(environ, identity)
+
+ # IChallenger
+ def challenge(self, environ, status, app_headers, forget_headers):
+ # heck yeah.
+ def auth_form(environ, start_response):
+ import pprint
+ form = _DEFAULT_FORM % pprint.pformat(environ)
+ content_length = CONTENT_LENGTH.tuples(str(len(form)))
+ content_type = CONTENT_TYPE.tuples('text/html')
+ headers = content_length + content_type + forget_headers
+ start_response('200 OK', headers)
+ return [form]
+
+ return auth_form
+
+def make_plugin(pam_conf, login_form_qs='__do_login', rememberer_name=None):
+ if rememberer_name is None:
+ raise ValueError(
+ 'must include rememberer key (name of another IIdentifier plugin)')
+ plugin = FormPlugin(login_form_qs, rememberer_name)
return plugin
Modified: repoze.pam/trunk/repoze/pam/plugins/htpasswd.py
==============================================================================
--- repoze.pam/trunk/repoze/pam/plugins/htpasswd.py (original)
+++ repoze.pam/trunk/repoze/pam/plugins/htpasswd.py Mon Feb 25 20:44:48 2008
@@ -1,23 +1,23 @@
from zope.interface import implements
-from repoze.pam.interfaces import IAuthenticatorPlugin
+from repoze.pam.interfaces import IAuthenticator
from repoze.pam.utils import resolveDotted
class HTPasswdPlugin(object):
- implements(IAuthenticatorPlugin)
+ implements(IAuthenticator)
def __init__(self, filename, check):
self.filename = filename
self.check = check
# IAuthenticatorPlugin
- def authenticate(self, environ, credentials):
+ def authenticate(self, environ, identity):
try:
- login = credentials['login']
- password = credentials['password']
+ login = identity['login']
+ password = identity['password']
except KeyError:
- return False
+ return None
if hasattr(self.filename, 'seek'):
# assumed to have a readline
@@ -27,7 +27,7 @@
try:
f = open(self.filename, 'r')
except IOError:
- return False
+ return None
for line in f:
try:
@@ -35,8 +35,9 @@
except ValueError:
continue
if username == login:
- return self.check(password, hashed)
- return False
+ if self.check(password, hashed):
+ return username
+ return None
def crypt_check(password, hashed):
from crypt import crypt
Modified: repoze.pam/trunk/repoze/pam/tests.py
==============================================================================
--- repoze.pam/trunk/repoze/pam/tests.py (original)
+++ repoze.pam/trunk/repoze/pam/tests.py Mon Feb 25 20:44:48 2008
@@ -1,8 +1,6 @@
import os
import unittest
-here = os.path.abspath(os.path.dirname(__file__))
-
class Base(unittest.TestCase):
def _makeEnviron(self, kw=None):
environ = {}
@@ -18,254 +16,235 @@
def _makeOne(self,
app=None,
- registry=None,
- request_classifier=None,
- response_classifier=None,
- add_credentials=True
+ identifiers=None,
+ authenticators=None,
+ challengers=None,
+ classifier=None,
+ challenge_decider=None,
+ log_stream=None,
+ log_level=None,
):
- if registry is None:
- registry = {}
- from repoze.pam.interfaces import IAuthenticatorPlugin
- from repoze.pam.interfaces import IExtractorPlugin
- from repoze.pam.interfaces import IChallengerPlugin
- registry[IExtractorPlugin] = [ DummyExtractor() ]
- registry[IAuthenticatorPlugin] = [ DummyAuthenticator() ]
- registry[IChallengerPlugin] = [ DummyChallenger() ]
if app is None:
app = DummyApp()
- if request_classifier is None:
- request_classifier = DummyRequestClassifier()
- if response_classifier is None:
- response_classifier = DummyResponseClassifier()
- mw = self._getTargetClass()(app, registry, request_classifier,
- response_classifier, add_credentials)
+ if identifiers is None:
+ identifiers = []
+ if authenticators is None:
+ authenticators = []
+ if challengers is None:
+ challengers = []
+ if classifier is None:
+ classifier = DummyRequestClassifier()
+ if challenge_decider is None:
+ challenge_decider = DummyChallengeDecider()
+ if log_level is None:
+ import logging
+ log_level = logging.DEBUG
+ mw = self._getTargetClass()(app,
+ identifiers,
+ authenticators,
+ challengers,
+ classifier,
+ challenge_decider,
+ log_stream,
+ log_level=logging.DEBUG)
return mw
- def test_extract_success(self):
- environ = self._makeEnviron()
- mw = self._makeOne()
- creds, plugin = mw.extract(environ, None)
- self.assertEqual(creds['login'], 'chris')
- self.assertEqual(creds['password'], 'password')
- self.failIf(plugin is None)
-
- def test_extract_fail(self):
- environ = self._makeEnviron()
- from repoze.pam.interfaces import IExtractorPlugin
- extractor = DummyNoResultsExtractor()
- registry = {
- IExtractorPlugin:[extractor]
- }
- mw = self._makeOne(registry=registry)
- creds, plugin = mw.extract(environ, None)
- self.assertEqual(creds, {})
- self.assertEqual(plugin, None)
-
- def test_extract_success_skip_noresults(self):
- environ = self._makeEnviron()
- mw = self._makeOne()
- from repoze.pam.interfaces import IExtractorPlugin
- extractor1 = DummyNoResultsExtractor()
- extractor2 = DummyExtractor()
- registry = {
- IExtractorPlugin:[extractor1, extractor2]
- }
- mw = self._makeOne(registry=registry)
- creds, plugin = mw.extract(environ, None)
- self.assertEqual(creds['login'], 'chris')
- self.assertEqual(creds['password'], 'password')
- self.assertEqual(plugin, extractor2)
-
- def test_extract_success_firstwins(self):
- environ = self._makeEnviron()
- mw = self._makeOne()
- from repoze.pam.interfaces import IExtractorPlugin
- extractor1 = DummyExtractor({'login':'fred','password':'fred'})
- extractor2 = DummyExtractor({'login':'bob','password':'bob'})
- registry = {
- IExtractorPlugin:[extractor1, extractor2]
- }
- mw = self._makeOne(registry=registry)
- creds, plugin = mw.extract(environ, None)
- self.assertEqual(creds['login'], 'fred')
- self.assertEqual(creds['password'], 'fred')
- self.assertEqual(plugin, extractor1)
-
- def test_extract_find_implicit_classifier(self):
+ def test_identify_success(self):
environ = self._makeEnviron()
- mw = self._makeOne()
- from repoze.pam.interfaces import IExtractorPlugin
- extractor1 = DummyExtractor({'login':'fred','password':'fred'})
- extractor1.request_classifications = set(['nomatch'])
- extractor2 = DummyExtractor({'login':'bob','password':'bob'})
- registry = {
- IExtractorPlugin:[extractor1, extractor2]
- }
- mw = self._makeOne(registry=registry)
- creds, plugin = mw.extract(environ, None)
+ identifier = DummyIdentifier()
+ identifiers = [ ('i', identifier) ]
+ mw = self._makeOne(identifiers=identifiers)
+ results = mw.identify(environ, None)
+ self.assertEqual(len(results), 1)
+ new_identifier, identity = results[0]
+ self.assertEqual(new_identifier, identifier)
+ self.assertEqual(identity['login'], 'chris')
+ self.assertEqual(identity['password'], 'password')
+
+ def test_identify_fail(self):
+ environ = self._makeEnviron()
+ plugin = DummyNoResultsIdentifier()
+ plugins = [ ('dummy', plugin) ]
+ mw = self._makeOne(identifiers=plugins)
+ results = mw.identify(environ, None)
+ self.assertEqual(len(results), 0)
+
+ def test_identify_success_skip_noresults(self):
+ environ = self._makeEnviron()
+ mw = self._makeOne()
+ plugin1 = DummyNoResultsIdentifier()
+ plugin2 = DummyIdentifier()
+ plugins = [ ('identifier1', plugin1), ('identifier2', plugin2) ]
+ mw = self._makeOne(identifiers=plugins)
+ results = mw.identify(environ, None)
+ self.assertEqual(len(results), 1)
+ new_identifier, identity = results[0]
+ self.assertEqual(new_identifier, plugin2)
+ self.assertEqual(identity['login'], 'chris')
+ self.assertEqual(identity['password'], 'password')
+
+ def test_identify_success_multiresults(self):
+ environ = self._makeEnviron()
+ mw = self._makeOne()
+ plugin1 = DummyIdentifier({'login':'fred','password':'fred'})
+ plugin2 = DummyIdentifier({'login':'bob','password':'bob'})
+ plugins = [ ('identifier1', plugin1), ('identifier2', plugin2) ]
+ mw = self._makeOne(identifiers=plugins)
+ results = mw.identify(environ, None)
+ self.assertEqual(len(results), 2)
+ new_identifier, identity = results[0]
+ self.assertEqual(new_identifier, plugin1)
+ self.assertEqual(identity['login'], 'fred')
+ self.assertEqual(identity['password'], 'fred')
+ new_identifier, identity = results[1]
+ self.assertEqual(new_identifier, plugin2)
+ self.assertEqual(identity['login'], 'bob')
+ self.assertEqual(identity['password'], 'bob')
+
+ def test_identify_find_implicit_classifier(self):
+ environ = self._makeEnviron()
+ mw = self._makeOne()
+ plugin1 = DummyIdentifier({'login':'fred','password':'fred'})
+ plugin1.classifications = set(['nomatch'])
+ plugin2 = DummyIdentifier({'login':'bob','password':'bob'})
+ plugins = [ ('identifier1', plugin1), ('identifier2', plugin2) ]
+ mw = self._makeOne(identifiers=plugins)
+ results = mw.identify(environ, 'match')
+ self.assertEqual(len(results), 1)
+ plugin, creds = results[0]
self.assertEqual(creds['login'], 'bob')
self.assertEqual(creds['password'], 'bob')
- self.assertEqual(plugin, extractor2)
+ self.assertEqual(plugin, plugin2)
- def test_extract_find_explicit_classifier(self):
+ def test_identify_find_explicit_classifier(self):
environ = self._makeEnviron()
- mw = self._makeOne()
- from repoze.pam.interfaces import IExtractorPlugin
- extractor1 = DummyExtractor({'login':'fred','password':'fred'})
- extractor1.request_classifications = set(['nomatch'])
- extractor2 = DummyExtractor({'login':'bob','password':'bob'})
- extractor2.request_classifications = set(['match'])
- registry = {
- IExtractorPlugin:[extractor1, extractor2]
- }
- mw = self._makeOne(registry=registry)
- creds, plugin = mw.extract(environ, 'match')
+ plugin1 = DummyIdentifier({'login':'fred','password':'fred'})
+ plugin1.classifications = set(['nomatch'])
+ plugin2 = DummyIdentifier({'login':'bob','password':'bob'})
+ plugin2.classifications = set(['match'])
+ plugins= [ ('identifier1', plugin1), ('identifier2', plugin2) ]
+ mw = self._makeOne(identifiers=plugins)
+ results = mw.identify(environ, 'match')
+ self.assertEqual(len(results), 1)
+ plugin, creds = results[0]
self.assertEqual(creds['login'], 'bob')
self.assertEqual(creds['password'], 'bob')
- self.assertEqual(plugin, extractor2)
+ self.assertEqual(plugin, plugin2)
def test_authenticate_success(self):
environ = self._makeEnviron()
- mw = self._makeOne()
- creds = {'login':'chris', 'password':'password'}
- userid, plugin = mw.authenticate(environ, creds, None)
- self.assertEqual(userid, 'chris')
+ plugin1 = DummyAuthenticator('a')
+ plugins = [ ('identifier1', plugin1) ]
+ mw = self._makeOne(authenticators=plugins)
+ identities = [ (None, {'login':'chris', 'password':'password'}) ]
+ results = mw.authenticate(environ, None, identities)
+ self.assertEqual(len(results), 1)
+ result = results[0]
+ authinfo, identinfo, creds, userid = result
+ self.assertEqual(authinfo, (0, plugin1))
+ self.assertEqual(identinfo, (0, None))
+ self.assertEqual(creds['login'], 'chris')
+ self.assertEqual(creds['password'], 'password')
+ self.assertEqual(userid, 'a')
def test_authenticate_fail(self):
environ = self._makeEnviron()
- mw = self._makeOne()
- creds = {'login':'chris', 'password':'password'}
- from repoze.pam.interfaces import IAuthenticatorPlugin
- registry = {
- IAuthenticatorPlugin:[DummyFailAuthenticator()]
- }
- mw = self._makeOne(registry=registry)
- userid, plugin = mw.authenticate(environ, creds, None)
- self.assertEqual(userid, None)
- self.assertEqual(plugin, None)
+ mw = self._makeOne() # no authenticators
+ identities = [ (None, {'login':'chris', 'password':'password'}) ]
+ result = mw.authenticate(environ, None, identities)
+ self.assertEqual(len(result), 0)
def test_authenticate_success_skip_fail(self):
environ = self._makeEnviron()
mw = self._makeOne()
- from repoze.pam.interfaces import IAuthenticatorPlugin
plugin1 = DummyFailAuthenticator()
plugin2 = DummyAuthenticator()
- registry = {
- IAuthenticatorPlugin:[plugin1, plugin2]
- }
- mw = self._makeOne(registry=registry)
+ plugins = [ ('dummy1', plugin1), ('dummy2', plugin2) ]
+ mw = self._makeOne(authenticators=plugins)
creds = {'login':'chris', 'password':'password'}
- userid, plugin = mw.authenticate(environ, creds, None)
+ identities = [ (None, {'login':'chris', 'password':'password'}) ]
+ results = mw.authenticate(environ, None, identities)
+ self.assertEqual(len(results), 1)
+ result = results[0]
+ authinfo, identinfo, creds, userid = result
+ self.assertEqual(authinfo, (1, plugin2))
+ self.assertEqual(identinfo, (0, None))
+ self.assertEqual(creds['login'], 'chris')
+ self.assertEqual(creds['password'], 'password')
self.assertEqual(userid, 'chris')
- self.assertEqual(plugin, plugin2)
- def test_authenticate_success_firstwins(self):
+ def test_authenticate_success_multiresult(self):
environ = self._makeEnviron()
mw = self._makeOne()
- from repoze.pam.interfaces import IAuthenticatorPlugin
plugin1 = DummyAuthenticator('chris_id1')
plugin2 = DummyAuthenticator('chris_id2')
- registry = {
- IAuthenticatorPlugin:[plugin1, plugin2]
- }
- mw = self._makeOne(registry=registry)
+ plugins = [ ('dummy1',plugin1), ('dummy2',plugin2) ]
+ mw = self._makeOne(authenticators=plugins)
creds = {'login':'chris', 'password':'password'}
- userid, plugin = mw.authenticate(environ, creds, None)
+ identities = [ (None, {'login':'chris', 'password':'password'}) ]
+ results = mw.authenticate(environ, None, identities)
+ self.assertEqual(len(results), 2)
+ result = results[0]
+ authinfo, identinfo, creds, userid = result
+ self.assertEqual(authinfo, (0, plugin1))
+ self.assertEqual(identinfo, (0, None))
+ self.assertEqual(creds['login'], 'chris')
+ self.assertEqual(creds['password'], 'password')
self.assertEqual(userid, 'chris_id1')
- self.assertEqual(plugin, plugin1)
+ result = results[1]
+ authinfo, identinfo, creds, userid = result
+ self.assertEqual(authinfo, (1, plugin2))
+ self.assertEqual(identinfo, (0, None))
+ self.assertEqual(creds['login'], 'chris')
+ self.assertEqual(creds['password'], 'password')
+ self.assertEqual(userid, 'chris_id2')
def test_authenticate_find_implicit_classifier(self):
environ = self._makeEnviron()
mw = self._makeOne()
- from repoze.pam.interfaces import IAuthenticatorPlugin
plugin1 = DummyAuthenticator('chris_id1')
- plugin1.request_classifications = set(['nomatch'])
+ plugin1.classifications = set(['nomatch'])
plugin2 = DummyAuthenticator('chris_id2')
- registry = {
- IAuthenticatorPlugin:[plugin1, plugin2]
- }
- mw = self._makeOne(registry=registry)
- creds = {'login':'chris', 'password':'password'}
- userid, plugin = mw.authenticate(environ, creds, None)
+ plugins = [ ('auth1', plugin1), ('auth2', plugin2) ]
+ mw = self._makeOne(authenticators = plugins)
+ identities = [ (None, {'login':'chris', 'password':'password'}) ]
+ results = mw.authenticate(environ, 'match', identities)
+ self.assertEqual(len(results), 1)
+ result = results[0]
+ authinfo, identinfo, creds, userid = result
+ self.assertEqual(authinfo, (0, plugin2))
+ self.assertEqual(identinfo, (0, None))
+ self.assertEqual(creds['login'], 'chris')
+ self.assertEqual(creds['password'], 'password')
self.assertEqual(userid, 'chris_id2')
- self.assertEqual(plugin, plugin2)
def test_authenticate_find_explicit_classifier(self):
environ = self._makeEnviron()
mw = self._makeOne()
- from repoze.pam.interfaces import IAuthenticatorPlugin
plugin1 = DummyAuthenticator('chris_id1')
- plugin1.request_classifications = set(['nomatch'])
+ plugin1.classifications = set(['nomatch'])
plugin2 = DummyAuthenticator('chris_id2')
- plugin2.request_classifications = set(['match'])
- registry = {
- IAuthenticatorPlugin:[plugin1, plugin2]
- }
- mw = self._makeOne(registry=registry)
- creds = {'login':'chris', 'password':'password'}
- userid, plugin = mw.authenticate(environ, creds, 'match')
+ plugin2.classificationans = set(['match']) # game
+ plugins = [ ('auth1', plugin1), ('auth2', plugin2) ]
+ mw = self._makeOne(authenticators = plugins)
+ identities = [ (None, {'login':'chris', 'password':'password'}) ]
+ results = mw.authenticate(environ, 'match', identities)
+ self.assertEqual(len(results), 1)
+ result = results[0]
+ authinfo, identinfo, creds, userid = result
+ self.assertEqual(authinfo, (0, plugin2))
+ self.assertEqual(identinfo, (0, None))
+ self.assertEqual(creds['login'], 'chris')
+ self.assertEqual(creds['password'], 'password')
self.assertEqual(userid, 'chris_id2')
- self.assertEqual(plugin, plugin2)
-
- def test_modify_environment_success_addcredentials(self):
- environ = self._makeEnviron()
- mw = self._makeOne()
- classification, headers = mw.modify_environment(environ)
- self.assertEqual(classification, 'browser')
- self.assertEqual(environ['REMOTE_USER'], 'chris')
- self.assertEqual(environ['repoze.pam.credentials'],
- {'login':'chris','password':'password'})
- self.assertEqual(headers, [])
-
- def test_modify_environment_noaddcredentials(self):
- environ = self._makeEnviron()
- mw = self._makeOne()
- mw.add_credentials = False
- classification, headers = mw.modify_environment(environ)
- self.assertEqual(classification, 'browser')
- self.assertEqual(environ['REMOTE_USER'], 'chris')
- self.failIf(environ.has_key('repoze.pam.credentials'))
- self.assertEqual(headers, [])
-
- def test_modify_environment_nocredentials(self):
- environ = self._makeEnviron()
- from repoze.pam.interfaces import IExtractorPlugin
- registry = {
- IExtractorPlugin:[DummyNoResultsExtractor()],
- }
- mw = self._makeOne(registry=registry)
- classification, headers = mw.modify_environment(environ)
- self.assertEqual(classification, 'browser')
- self.assertEqual(environ.get('REMOTE_USER'), None)
- self.assertEqual(environ['repoze.pam.credentials'], {})
- self.assertEqual(headers, [])
- def test_modify_environment_remoteuser_already_set(self):
+ def test_call_remoteuser_already_set(self):
environ = self._makeEnviron({'REMOTE_USER':'admin'})
mw = self._makeOne()
- classification, headers = mw.modify_environment(environ)
- self.assertEqual(classification, 'browser')
- self.assertEqual(environ.get('REMOTE_USER'), 'admin')
- self.assertEqual(environ['repoze.pam.credentials'],
- {'login':'chris', 'password':'password'})
- self.assertEqual(headers, [])
+ result = mw(environ, None)
+ self.assertEqual(mw.app.environ, environ)
+ self.assertEqual(result, [])
- def test_modify_environment_with_postextractor(self):
- environ = self._makeEnviron({'REMOTE_USER':'admin'})
- from repoze.pam.interfaces import IExtractorPlugin
- from repoze.pam.interfaces import IPostExtractorPlugin
- registry = {
- IExtractorPlugin:[DummyExtractor()],
- IPostExtractorPlugin:[DummyPostExtractor()],
- }
- mw = self._makeOne(registry=registry)
- classification, headers = mw.modify_environment(environ)
- self.assertEqual(classification, 'browser')
- self.assertEqual(environ.get('REMOTE_USER'), 'admin')
- self.assertEqual(environ['repoze.pam.credentials'],
- {'login':'chris', 'password':'password'})
- self.assertEqual(headers, [('foo', 'bar')])
-
class TestBasicAuthPlugin(Base):
def _getTargetClass(self):
from repoze.pam.plugins.basicauth import BasicAuthPlugin
@@ -277,22 +256,16 @@
def test_implements(self):
from zope.interface.verify import verifyClass
- from repoze.pam.interfaces import IChallengerPlugin
- from repoze.pam.interfaces import IExtractorPlugin
+ from repoze.pam.interfaces import IChallenger
+ from repoze.pam.interfaces import IIdentifier
klass = self._getTargetClass()
- verifyClass(IChallengerPlugin, klass)
- verifyClass(IExtractorPlugin, klass)
-
- def test_challenge_non401(self):
- plugin = self._makeOne('realm')
- environ = self._makeEnviron()
- result = plugin.challenge(environ, '200 OK', {})
- self.assertEqual(result, None)
+ verifyClass(IChallenger, klass)
+ verifyClass(IIdentifier, klass)
- def test_challenge_401(self):
+ def test_challenge(self):
plugin = self._makeOne('realm')
environ = self._makeEnviron()
- result = plugin.challenge(environ, '401 Unauthorized', {})
+ result = plugin.challenge(environ, '401 Unauthorized', [], [])
self.assertNotEqual(result, None)
app_iter = result(environ, lambda *arg: None)
items = []
@@ -301,63 +274,53 @@
response = ''.join(items)
self.failUnless(response.startswith('401 Unauthorized'))
- def test_extract_noauthinfo(self):
+ def test_identify_noauthinfo(self):
plugin = self._makeOne('realm')
environ = self._makeEnviron()
- creds = plugin.extract(environ)
+ creds = plugin.identify(environ)
self.assertEqual(creds, {})
- def test_extract_nonbasic(self):
+ def test_identify_nonbasic(self):
plugin = self._makeOne('realm')
environ = self._makeEnviron({'HTTP_AUTHORIZATION':'Digest abc'})
- creds = plugin.extract(environ)
+ creds = plugin.identify(environ)
self.assertEqual(creds, {})
- def test_extract_basic_badencoding(self):
+ def test_identify_basic_badencoding(self):
plugin = self._makeOne('realm')
environ = self._makeEnviron({'HTTP_AUTHORIZATION':'Basic abc'})
- creds = plugin.extract(environ)
+ creds = plugin.identify(environ)
self.assertEqual(creds, {})
- def test_extract_basic_badrepr(self):
+ def test_identify_basic_badrepr(self):
plugin = self._makeOne('realm')
value = 'foo'.encode('base64')
environ = self._makeEnviron({'HTTP_AUTHORIZATION':'Basic %s' % value})
- creds = plugin.extract(environ)
+ creds = plugin.identify(environ)
self.assertEqual(creds, {})
- def test_extract_basic_ok(self):
+ def test_identify_basic_ok(self):
plugin = self._makeOne('realm')
value = 'foo:bar'.encode('base64')
environ = self._makeEnviron({'HTTP_AUTHORIZATION':'Basic %s' % value})
- creds = plugin.extract(environ)
+ creds = plugin.identify(environ)
self.assertEqual(creds, {'login':'foo', 'password':'bar'})
- def test_post_extract_nocreds(self):
+ def test_remember(self):
plugin = self._makeOne('realm')
creds = {}
environ = self._makeEnviron()
- result = plugin.post_extract(environ, creds, plugin)
- self.assertEqual(result, None)
- self.assertEqual(environ.get('HTTP_AUTHORIZATION'), None)
-
- def test_post_extract_creds_withauthorization(self):
- plugin = self._makeOne('realm')
- creds = {'login':'foo', 'password':'password'}
- environ = self._makeEnviron({'HTTP_AUTHORIZATION':'Basic foo'})
- result = plugin.post_extract(environ, creds, plugin)
+ result = plugin.remember(environ, creds)
self.assertEqual(result, None)
- self.assertEqual(environ['HTTP_AUTHORIZATION'], 'Basic foo')
- def test_post_extract_creds_mutates(self):
+ def test_forget(self):
plugin = self._makeOne('realm')
creds = {'login':'foo', 'password':'password'}
environ = self._makeEnviron()
- result = plugin.post_extract(environ, creds, plugin)
- self.assertEqual(result, None)
- auth = 'foo:password'.encode('base64').rstrip()
- auth = 'Basic ' + auth
- self.assertEqual(environ['HTTP_AUTHORIZATION'], auth)
+ result = plugin.forget(environ, creds)
+ self.assertEqual(result, [('WWW-Authenticate', 'Basic realm="realm"')] )
+
+ # XXX need challenge tests
def test_factory(self):
from repoze.pam.plugins.basicauth import make_plugin
@@ -375,9 +338,9 @@
def test_implements(self):
from zope.interface.verify import verifyClass
- from repoze.pam.interfaces import IAuthenticatorPlugin
+ from repoze.pam.interfaces import IAuthenticator
klass = self._getTargetClass()
- verifyClass(IAuthenticatorPlugin, klass)
+ verifyClass(IAuthenticator, klass)
def test_authenticate_nocreds(self):
from StringIO import StringIO
@@ -386,7 +349,7 @@
environ = self._makeEnviron()
creds = {}
result = plugin.authenticate(environ, creds)
- self.assertEqual(result, False)
+ self.assertEqual(result, None)
def test_authenticate_nolines(self):
from StringIO import StringIO
@@ -395,7 +358,7 @@
environ = self._makeEnviron()
creds = {'login':'chrism', 'password':'pass'}
result = plugin.authenticate(environ, creds)
- self.assertEqual(result, False)
+ self.assertEqual(result, None)
def test_authenticate_nousermatch(self):
from StringIO import StringIO
@@ -404,7 +367,7 @@
environ = self._makeEnviron()
creds = {'login':'chrism', 'password':'pass'}
result = plugin.authenticate(environ, creds)
- self.assertEqual(result, False)
+ self.assertEqual(result, None)
def test_authenticate_match(self):
from StringIO import StringIO
@@ -415,7 +378,7 @@
environ = self._makeEnviron()
creds = {'login':'chrism', 'password':'pass'}
result = plugin.authenticate(environ, creds)
- self.assertEqual(result, True)
+ self.assertEqual(result, 'chrism')
def test_authenticate_badline(self):
from StringIO import StringIO
@@ -426,9 +389,10 @@
environ = self._makeEnviron()
creds = {'login':'chrism', 'password':'pass'}
result = plugin.authenticate(environ, creds)
- self.assertEqual(result, True)
+ self.assertEqual(result, 'chrism')
def test_authenticate_filename(self):
+ here = os.path.abspath(os.path.dirname(__file__))
htpasswd = os.path.join(here, 'fixtures', 'test.htpasswd')
def check(password, hashed):
return True
@@ -436,7 +400,7 @@
environ = self._makeEnviron()
creds = {'login':'chrism', 'password':'pass'}
result = plugin.authenticate(environ, creds)
- self.assertEqual(result, True)
+ self.assertEqual(result, 'chrism')
def test_crypt_check(self):
from crypt import crypt
@@ -466,65 +430,53 @@
def test_implements(self):
from zope.interface.verify import verifyClass
- from repoze.pam.interfaces import IExtractorPlugin
- from repoze.pam.interfaces import IPostExtractorPlugin
+ from repoze.pam.interfaces import IIdentifier
klass = self._getTargetClass()
- verifyClass(IExtractorPlugin, klass)
- verifyClass(IPostExtractorPlugin, klass)
+ verifyClass(IIdentifier, klass)
- def test_extract_nocookies(self):
+ def test_identify_nocookies(self):
plugin = self._makeOne('oatmeal')
environ = self._makeEnviron()
- result = plugin.extract(environ)
+ result = plugin.identify(environ)
self.assertEqual(result, {})
- def test_extract_badcookies(self):
+ def test_identify_badcookies(self):
plugin = self._makeOne('oatmeal')
environ = self._makeEnviron({'HTTP_COOKIE':'oatmeal=a'})
- result = plugin.extract(environ)
+ result = plugin.identify(environ)
self.assertEqual(result, {})
- def test_extract_badcookies(self):
+ def test_identify_badcookies(self):
plugin = self._makeOne('oatmeal')
environ = self._makeEnviron({'HTTP_COOKIE':'oatmeal=a'})
- result = plugin.extract(environ)
+ result = plugin.identify(environ)
self.assertEqual(result, {})
- def test_extract_success(self):
+ def test_identify_success(self):
plugin = self._makeOne('oatmeal')
auth = 'foo:password'.encode('base64').rstrip()
environ = self._makeEnviron({'HTTP_COOKIE':'oatmeal=%s;' % auth})
- result = plugin.extract(environ)
+ result = plugin.identify(environ)
self.assertEqual(result, {'login':'foo', 'password':'password'})
- def test_post_extract_nocreds(self):
- plugin = self._makeOne('oatmeal')
- creds = {}
- environ = self._makeEnviron()
- result = plugin.post_extract(environ, creds, plugin)
- self.assertEqual(result, None)
- self.assertEqual(environ.get('HTTP_COOKIE'), None)
-
- def test_post_extract_creds_same(self):
+ def test_remember_creds_same(self):
plugin = self._makeOne('oatmeal')
creds = {'login':'foo', 'password':'password'}
auth = 'foo:password'.encode('base64').rstrip()
auth = 'oatmeal=%s;' % auth
environ = self._makeEnviron({'HTTP_COOKIE':auth})
- result = plugin.post_extract(environ, creds, plugin)
+ result = plugin.remember(environ, creds)
self.assertEqual(result, None)
- self.assertEqual(environ.get('HTTP_COOKIE'), auth)
- def test_post_extract_creds_different(self):
+ def test_remember_creds_different(self):
plugin = self._makeOne('oatmeal')
creds = {'login':'bar', 'password':'password'}
auth = 'foo:password'.encode('base64').rstrip()
creds_auth = 'bar:password'.encode('base64').rstrip()
environ = self._makeEnviron({'HTTP_COOKIE':'oatmeal=%s;' % auth})
- result = plugin.post_extract(environ, creds, plugin)
+ result = plugin.remember(environ, creds)
expected = 'oatmeal=%s; Path=/;' % creds_auth
self.assertEqual(result, [('Set-Cookie', expected)])
- self.assertEqual(environ['HTTP_COOKIE'], 'oatmeal=%s;' % creds_auth)
def test_factory(self):
from repoze.pam.plugins.cookie import make_plugin
@@ -533,69 +485,40 @@
class TestDefaultRequestClassifier(Base):
- def _getTargetClass(self):
- from repoze.pam.classifiers import DefaultRequestClassifier
- return DefaultRequestClassifier
-
- def _makeOne(self, *arg, **kw):
- classifier = self._getTargetClass()(*arg, **kw)
- return classifier
-
- def test_implements(self):
- from zope.interface.verify import verifyClass
- from repoze.pam.interfaces import IRequestClassifier
- klass = self._getTargetClass()
- verifyClass(IRequestClassifier, klass)
+ def _getFUT(self):
+ from repoze.pam.classifiers import default_request_classifier
+ return default_request_classifier
def test_classify_dav_method(self):
- classifier = self._makeOne()
+ classifier = self._getFUT()
environ = self._makeEnviron({'REQUEST_METHOD':'COPY'})
result = classifier(environ)
self.assertEqual(result, 'dav')
def test_classify_dav_useragent(self):
- classifier = self._makeOne()
+ classifier = self._getFUT()
environ = self._makeEnviron({'HTTP_USER_AGENT':'WebDrive'})
result = classifier(environ)
self.assertEqual(result, 'dav')
def test_classify_xmlpost(self):
- classifier = self._makeOne()
+ classifier = self._getFUT()
environ = self._makeEnviron({'CONTENT_TYPE':'text/xml',
'REQUEST_METHOD':'POST'})
result = classifier(environ)
self.assertEqual(result, 'xmlpost')
def test_classify_browser(self):
- classifier = self._makeOne()
+ classifier = self._getFUT()
environ = self._makeEnviron({'CONTENT_TYPE':'text/xml',
'REQUEST_METHOD':'GET'})
result = classifier(environ)
self.assertEqual(result, 'browser')
-class TestDefaultResponseClassifier(Base):
- def _getTargetClass(self):
- from repoze.pam.classifiers import DefaultResponseClassifier
- return DefaultResponseClassifier
-
- def _makeOne(self, *arg, **kw):
- classifier = self._getTargetClass()(*arg, **kw)
- return classifier
-
- def test_implements(self):
- from zope.interface.verify import verifyClass
- from repoze.pam.interfaces import IResponseClassifier
- klass = self._getTargetClass()
- verifyClass(IResponseClassifier, klass)
-
- def test_classify(self):
- classifier = self._makeOne()
- result = classifier(None, 'dav', None, None)
- self.assertEqual(result, 'dav')
-
class DummyApp:
def __call__(self, environ, start_response):
- return ['a']
+ self.environ = environ
+ return []
class DummyRequestClassifier:
def __call__(self, environ):
@@ -605,17 +528,30 @@
def __call__(self, environ, request_classification, headers, exception):
return request_classification
-class DummyExtractor:
+class DummyIdentifier:
def __init__(self, credentials=None):
if credentials is None:
credentials = {'login':'chris', 'password':'password'}
self.credentials = credentials
- def extract(self, environ):
+
+ def identify(self, environ):
return self.credentials
-class DummyNoResultsExtractor:
- def extract(self, environ):
+ def forget(self, environ, identity):
+ pass
+
+ def remember(self, environ, identity):
+ pass
+
+class DummyNoResultsIdentifier:
+ def identify(self, environ):
return {}
+
+ def remember(self, *arg, **kw):
+ pass
+
+ def forget(self, *arg, **kw):
+ pass
class DummyAuthenticator:
def __init__(self, userid=None):
@@ -631,10 +567,11 @@
return None
class DummyChallenger:
- def challenge(self, environ, status, headers):
+ def challenge(self, environ, status, app_headers, forget_headers):
environ['challenged'] = True
-class DummyPostExtractor:
- def post_extract(self, environ, credentials, extractor):
- return [ ('foo', 'bar') ]
-
+class DummyChallengeDecider:
+ def __call__(self, environ, status, headers):
+ if status.startswith('401 '):
+ return True
+
More information about the Repoze-checkins
mailing list