[Repoze-checkins] r784 - in repoze.pam/trunk: . repoze/pam repoze/pam/fixtures repoze/pam/plugins
Chris McDonough
chrism at agendaless.com
Sat Mar 8 17:44:23 UTC 2008
Author: Chris McDonough <chrism at agendaless.com>
Date: Sat Mar 8 12:44:22 2008
New Revision: 784
Log:
- Allow a downstream application to reset the identity during egress
(e.g. if a user changes his username or password and the
application would rather not require that the he logs out and logs
back in for the change to "take"). The application should set a
key in the WSGI environment: 'repoze.pam.identity_reset', which
should have a value that is an identity dictionary that can be
consumed by the "remember" method of the identifier that was used
for the current request.
- Add 'repoze.pam.identifier' to environment on ingress, which will
be the identifier plugin instance which gave PAM the identity that
became REMOTE_USER (to support introspection by the application, so
it has a shot at knowing the identity format before it sets the
value of 'repoze.pam.identity_reset' ).
- Allow "remote user key" (default: REMOTE_USER) to be overridden
(pass in remote_user_key to middleware constructor).
- Allow form plugin to override the default form.
Added:
repoze.pam/trunk/repoze/pam/fixtures/form.html
Modified:
repoze.pam/trunk/CHANGES.txt
repoze.pam/trunk/repoze/pam/middleware.py
repoze.pam/trunk/repoze/pam/plugins/form.py
repoze.pam/trunk/repoze/pam/tests.py
Modified: repoze.pam/trunk/CHANGES.txt
==============================================================================
--- repoze.pam/trunk/CHANGES.txt (original)
+++ repoze.pam/trunk/CHANGES.txt Sat Mar 8 12:44:22 2008
@@ -1,3 +1,25 @@
+Unreleased
+
+ - Allow a downstream application to reset the identity during egress
+ (e.g. if a user changes his username or password and the
+ application would rather not require that the he logs out and logs
+ back in for the change to "take"). The application should set a
+ key in the WSGI environment: 'repoze.pam.identity_reset', which
+ should have a value that is an identity dictionary that can be
+ consumed by the "remember" method of the identifier that was used
+ for the current request.
+
+ - Add 'repoze.pam.identifier' to environment on ingress, which will
+ be the identifier plugin instance which gave PAM the identity that
+ became REMOTE_USER (to support introspection by the application, so
+ it has a shot at knowing the identity format before it sets the
+ value of 'repoze.pam.identity_reset' ).
+
+ - Allow "remote user key" (default: REMOTE_USER) to be overridden
+ (pass in remote_user_key to middleware constructor).
+
+ - Allow form plugin to override the default form.
+
0.4 (03-07-2008)
- Allow plugins to specify a classifiers list per interface (instead
Added: repoze.pam/trunk/repoze/pam/fixtures/form.html
==============================================================================
--- (empty file)
+++ repoze.pam/trunk/repoze/pam/fixtures/form.html Sat Mar 8 12:44:22 2008
@@ -0,0 +1,2 @@
+<html>
+</html>
Modified: repoze.pam/trunk/repoze/pam/middleware.py
==============================================================================
--- repoze.pam/trunk/repoze/pam/middleware.py (original)
+++ repoze.pam/trunk/repoze/pam/middleware.py Sat Mar 8 12:44:22 2008
@@ -18,8 +18,9 @@
challengers,
classifier,
challenge_decider,
- log_stream=None,
- log_level=logging.INFO
+ log_stream = None,
+ log_level = logging.INFO,
+ remote_user_key = 'REMOTE_USER',
):
iregistry, nregistry = make_registries(identifiers, authenticators,
challengers)
@@ -28,6 +29,7 @@
self.app = app
self.classifier = classifier
self.challenge_decider = challenge_decider
+ self.remote_user_key = remote_user_key
self.logger = None
if log_stream:
handler = logging.StreamHandler(log_stream)
@@ -39,8 +41,9 @@
self.logger.setLevel(log_level)
def __call__(self, environ, start_response):
- if REMOTE_USER(environ):
- # act as a pass through if REMOTE_USER is already set
+ if self.remote_user_key in environ:
+ # act as a pass through if REMOTE_USER (or whatever) is
+ # already set
return self.app(environ, start_response)
environ['repoze.pam.plugins'] = self.name_registry
@@ -62,10 +65,16 @@
if auth_ids:
auth_ids.sort()
best = auth_ids[0]
+ authenticator = best[0][1]
+ identifier = best[1][1]
identity = best[2]
userid = best[3]
- identifier = best[1][1]
- environ['REMOTE_USER'] = userid
+ # add the identifier plugin to the environment; it may
+ # allow a downstream application to better decide how
+ # to manufacture a 'repoze.pam.identity_reset'
+ environ['repoze.pam.identifier'] = identifier
+ # set the REMOTE_USER
+ environ[self.remote_user_key] = userid
else:
logger and logger.info('no identities found, not authenticating')
@@ -95,6 +104,17 @@
else:
logger and logger.info('no challenge required')
remember_headers = []
+ app_identity = environ.get('repoze.pam.identity_reset')
+ if app_identity:
+ # A downstream application has requested an "identity
+ # reset" (e.g. a user changed his username or password
+ # or both, and the application doesn't want to require
+ # that he log in again for the new identity to
+ # 'take'). We don't want to expose the identity to
+ # upstream consumers as it may contain cleartext
+ # password info.
+ del environ['repoze.pam.identity_reset']
+ identity = app_identity
if identifier:
remember_headers = identifier.remember(environ, identity)
if remember_headers:
@@ -316,7 +336,7 @@
challengers,
default_request_classifier,
default_challenge_decider,
- log_stream= log_stream,
+ log_stream = log_stream,
log_level = logging.DEBUG
)
return middleware
Modified: repoze.pam/trunk/repoze/pam/plugins/form.py
==============================================================================
--- repoze.pam/trunk/repoze/pam/plugins/form.py (original)
+++ repoze.pam/trunk/repoze/pam/plugins/form.py Sat Mar 8 12:44:22 2008
@@ -12,7 +12,7 @@
_DEFAULT_FORM = """
<html>
<head>
- <title>Login Form</title>
+ <title>Log In</title>
</head>
<body>
<div>
@@ -36,7 +36,6 @@
</table>
</form>
<pre>
- %s
</pre>
</body>
</html>
@@ -46,12 +45,13 @@
implements(IChallenger, IIdentifier)
- def __init__(self, login_form_qs, rememberer_name):
+ def __init__(self, login_form_qs, rememberer_name, formbody=None):
self.login_form_qs = login_form_qs
# rememberer_name is the name of another configured plugin which
# implements IIdentifier, to handle remember and forget duties
# (ala a cookie plugin or a session plugin)
self.rememberer_name = rememberer_name
+ self.formbody = formbody
# IIdentifier
def identify(self, environ):
@@ -91,9 +91,8 @@
# IChallenger
def challenge(self, environ, status, app_headers, forget_headers):
# heck yeah.
+ form = self.formbody or _DEFAULT_FORM
def auth_form(environ, start_response):
- import pprint
- form = _DEFAULT_FORM % pprint.pformat(environ)
content_length = CONTENT_LENGTH.tuples(str(len(form)))
content_type = CONTENT_TYPE.tuples('text/html')
headers = content_length + content_type + forget_headers
@@ -105,10 +104,13 @@
def __repr__(self):
return '<%s %s>' % (self.__class__.__name__, id(self))
-def make_plugin(pam_conf, login_form_qs='__do_login', rememberer_name=None):
+def make_plugin(pam_conf, login_form_qs='__do_login', rememberer_name=None,
+ form=None):
if rememberer_name is None:
raise ValueError(
'must include rememberer key (name of another IIdentifier plugin)')
- plugin = FormPlugin(login_form_qs, rememberer_name)
+ if form is not None:
+ form = open(form).read()
+ plugin = FormPlugin(login_form_qs, rememberer_name, form)
return plugin
Modified: repoze.pam/trunk/repoze/pam/tests.py
==============================================================================
--- repoze.pam/trunk/repoze/pam/tests.py (original)
+++ repoze.pam/trunk/repoze/pam/tests.py Sat Mar 8 12:44:22 2008
@@ -276,7 +276,7 @@
[], identifier, identity)
self.assertEqual(result, None)
self.assertEqual(environ['challenged'], None)
- self.assertEqual(identifier.forgotten, True)
+ self.assertEqual(identifier.forgotten, identity)
def test_challenge_identifier_app(self):
environ = self._makeEnviron()
@@ -290,7 +290,7 @@
[], identifier, identity)
self.assertEqual(result, app)
self.assertEqual(environ['challenged'], app)
- self.assertEqual(identifier.forgotten, True)
+ self.assertEqual(identifier.forgotten, identity)
def test_multi_challenge_firstwins(self):
environ = self._makeEnviron()
@@ -306,7 +306,7 @@
[], identifier, identity)
self.assertEqual(result, app1)
self.assertEqual(environ['challenged'], app1)
- self.assertEqual(identifier.forgotten, True)
+ self.assertEqual(identifier.forgotten, identity)
def test_multi_challenge_skipnomatch_findimplicit(self):
environ = self._makeEnviron()
@@ -325,7 +325,7 @@
[], identifier, identity)
self.assertEqual(result, app2)
self.assertEqual(environ['challenged'], app2)
- self.assertEqual(identifier.forgotten, True)
+ self.assertEqual(identifier.forgotten, identity)
def test_multi_challenge_skipnomatch_findexplicit(self):
environ = self._makeEnviron()
@@ -344,7 +344,7 @@
[], identifier, identity)
self.assertEqual(result, app2)
self.assertEqual(environ['challenged'], app2)
- self.assertEqual(identifier.forgotten, True)
+ self.assertEqual(identifier.forgotten, identity)
def test_call_remoteuser_already_set(self):
environ = self._makeEnviron({'REMOTE_USER':'admin'})
@@ -439,8 +439,9 @@
result = mw(environ, start_response)
self.assertEqual(environ['challenged'], challenge_app)
self.failUnless(result[0].startswith('401 Unauthorized\r\n'))
- self.assertEqual(identifier.forgotten, True)
+ self.assertEqual(identifier.forgotten, identifier.credentials)
self.assertEqual(environ['REMOTE_USER'], 'chris')
+ self.assertEqual(environ['repoze.pam.identifier'], identifier)
def test_call_200_challenger_and_identifier_and_authenticator(self):
environ = self._makeEnviron()
@@ -461,9 +462,34 @@
result = mw(environ, start_response)
self.assertEqual(environ.get('challenged'), None)
self.assertEqual(identifier.forgotten, False)
- self.assertEqual(identifier.remembered, True)
+ self.assertEqual(identifier.remembered, identifier.credentials)
self.assertEqual(environ['REMOTE_USER'], 'chris')
+ self.assertEqual(environ['repoze.pam.identifier'], identifier)
+ def test_call_200_identity_reset(self):
+ environ = self._makeEnviron()
+ headers = [('a', '1')]
+ new_identity = {'user_id':'foo', 'password':'bar'}
+ app = DummyIdentityResetApp('200 OK', headers, new_identity)
+ from paste.httpexceptions import HTTPUnauthorized
+ challenge_app = HTTPUnauthorized()
+ challenge = DummyChallenger(challenge_app)
+ challengers = [ ('challenge', challenge) ]
+ identifier = DummyIdentifier()
+ identifiers = [ ('identifier', identifier) ]
+ authenticator = DummyAuthenticator()
+ authenticators = [ ('authenticator', authenticator) ]
+ mw = self._makeOne(app=app, challengers=challengers,
+ identifiers=identifiers,
+ authenticators=authenticators)
+ start_response = DummyStartResponse()
+ result = mw(environ, start_response)
+ self.failIf(environ.has_key('repoze.pam.identity_reset'))
+ self.assertEqual(environ.get('challenged'), None)
+ self.assertEqual(identifier.forgotten, False)
+ self.assertEqual(identifier.remembered, new_identity)
+ self.assertEqual(environ['REMOTE_USER'], 'chris')
+ self.assertEqual(environ['repoze.pam.identifier'], identifier)
# XXX need more call tests:
@@ -807,8 +833,10 @@
from repoze.pam.plugins.form import FormPlugin
return FormPlugin
- def _makeOne(self, login_form_qs='__do_login', rememberer_name='cookie'):
- plugin = self._getTargetClass()(login_form_qs, rememberer_name)
+ def _makeOne(self, login_form_qs='__do_login', rememberer_name='cookie',
+ formbody=None):
+ plugin = self._getTargetClass()(login_form_qs, rememberer_name,
+ formbody)
return plugin
def _makeFormEnviron(self, login=None, password=None, do_login=False):
@@ -873,24 +901,70 @@
def test_remember(self):
plugin = self._makeOne()
environ = self._makeFormEnviron()
- result = plugin.remember(environ, {})
+ identity = {}
+ result = plugin.remember(environ, identity)
self.assertEqual(result, None)
self.assertEqual(environ['repoze.pam.plugins']['cookie'].remembered,
- True)
+ identity)
def test_forget(self):
plugin = self._makeOne()
environ = self._makeFormEnviron()
- result = plugin.forget(environ, {})
+ identity = {}
+ result = plugin.forget(environ, identity)
self.assertEqual(result, None)
self.assertEqual(environ['repoze.pam.plugins']['cookie'].forgotten,
- True)
-
- def test_factory(self):
- from repoze.pam.plugins.cookie import make_plugin
- plugin = make_plugin(None, 'foo')
- self.assertEqual(plugin.cookie_name, 'foo')
+ identity
+ )
+ def test_challenge_defaultform(self):
+ from repoze.pam.plugins.form import _DEFAULT_FORM
+ plugin = self._makeOne()
+ environ = self._makeFormEnviron()
+ app = plugin.challenge(environ, '401 Unauthorized', [], [])
+ sr = DummyStartResponse()
+ result = app(environ, sr)
+ self.assertEqual(''.join(result), _DEFAULT_FORM)
+ self.assertEqual(len(sr.headers), 2)
+ cl = str(len(_DEFAULT_FORM))
+ self.assertEqual(sr.headers[0], ('Content-Length', cl))
+ self.assertEqual(sr.headers[1], ('Content-Type', 'text/html'))
+ self.assertEqual(sr.status, '200 OK')
+
+ def test_challenge_customform(self):
+ here = os.path.dirname(__file__)
+ fixtures = os.path.join(here, 'fixtures')
+ form = os.path.join(fixtures, 'form.html')
+ formbody = open(form).read()
+ plugin = self._makeOne(formbody=formbody)
+ environ = self._makeFormEnviron()
+ app = plugin.challenge(environ, '401 Unauthorized', [], [])
+ sr = DummyStartResponse()
+ result = app(environ, sr)
+ self.assertEqual(''.join(result), formbody)
+ self.assertEqual(len(sr.headers), 2)
+ cl = str(len(formbody))
+ self.assertEqual(sr.headers[0], ('Content-Length', cl))
+ self.assertEqual(sr.headers[1], ('Content-Type', 'text/html'))
+ self.assertEqual(sr.status, '200 OK')
+
+ def test_factory_withform(self):
+ from repoze.pam.plugins.form import make_plugin
+ here = os.path.dirname(__file__)
+ fixtures = os.path.join(here, 'fixtures')
+ form = os.path.join(fixtures, 'form.html')
+ formbody = open(form).read()
+ plugin = make_plugin(None, '__login', 'cookie', form)
+ self.assertEqual(plugin.login_form_qs, '__login')
+ self.assertEqual(plugin.rememberer_name, 'cookie')
+ self.assertEqual(plugin.formbody, formbody)
+
+ def test_factory_defaultform(self):
+ from repoze.pam.plugins.form import make_plugin
+ plugin = make_plugin(None, '__login', 'cookie')
+ self.assertEqual(plugin.login_form_qs, '__login')
+ self.assertEqual(plugin.rememberer_name, 'cookie')
+ self.assertEqual(plugin.formbody, None)
class TestDefaultRequestClassifier(Base):
def _getFUT(self):
@@ -1146,6 +1220,18 @@
self.environ = environ
start_response(self.status, self.headers)
return ['body']
+
+class DummyIdentityResetApp:
+ def __init__(self, status, headers, new_identity):
+ self.status = status
+ self.headers = headers
+ self.new_identity = new_identity
+
+ def __call__(self, environ, start_response):
+ self.environ = environ
+ environ['repoze.pam.identity_reset'] = self.new_identity
+ start_response(self.status, self.headers)
+ return ['body']
class DummyRequestClassifier:
def __call__(self, environ):
@@ -1163,10 +1249,10 @@
return self.credentials
def forget(self, environ, identity):
- self.forgotten = True
+ self.forgotten = identity
def remember(self, environ, identity):
- self.remembered = True
+ self.remembered = identity
class DummyNoResultsIdentifier:
def identify(self, environ):
More information about the Repoze-checkins
mailing list