[Repoze-checkins] r768 - in repoze.pam/trunk: . repoze/pam repoze/pam/plugins
Chris McDonough
chrism at agendaless.com
Tue Mar 4 18:55:26 UTC 2008
Author: Chris McDonough <chrism at agendaless.com>
Date: Tue Mar 4 13:55:25 2008
New Revision: 768
Log:
0.2 (03-04-2008)
- Added SQLAuthenticatorPlugin (see plugins/sql.py).
Added:
repoze.pam/trunk/repoze/pam/plugins/sql.py (contents, props changed)
Modified:
repoze.pam/trunk/ (props changed)
repoze.pam/trunk/CHANGES.txt
repoze.pam/trunk/repoze/pam/tests.py
repoze.pam/trunk/setup.py
Modified: repoze.pam/trunk/CHANGES.txt
==============================================================================
--- repoze.pam/trunk/CHANGES.txt (original)
+++ repoze.pam/trunk/CHANGES.txt Tue Mar 4 13:55:25 2008
@@ -1,3 +1,7 @@
+0.2 (03-04-2008)
+
+ - Added SQLAuthenticatorPlugin (see plugins/sql.py).
+
0.1 (02-27-2008)
Initial release (no configuration file support yet).
Added: repoze.pam/trunk/repoze/pam/plugins/sql.py
==============================================================================
--- (empty file)
+++ repoze.pam/trunk/repoze/pam/plugins/sql.py Tue Mar 4 13:55:25 2008
@@ -0,0 +1,73 @@
+from zope.interface import implements
+
+from repoze.pam.interfaces import IAuthenticator
+
+def default_password_compare(cleartext_password, stored_password_hash):
+ import sha
+ import binascii
+
+ # the stored password is stored as '{SHA}<base64(<binary SHA digest>)>'.
+ # or as a cleartext password (no {SHA} prefix)
+
+ if stored_password_hash.startswith('{SHA}'):
+ stored_password_hash = stored_password_hash[5:]
+ digest = sha.new(cleartext_password).digest()
+ try:
+ stored_password_hash = stored_password_hash.decode('base64')
+ except binascii.Error:
+ return False
+ else:
+ digest = cleartext_password
+
+ if stored_password_hash == digest:
+ return True
+
+ return False
+
+def psycopg_connect(dsn):
+ # convenience (I always seem to use Postgres)
+ import psycopg2
+ return psycopg2.connect(dsn)
+
+class SQLAuthenticatorPlugin:
+ implements(IAuthenticator)
+
+ def __init__(self, dsn, statement, compare_fn, conn_factory):
+ self.dsn = dsn
+ # statement should be pyformat dbapi binding-style, e.g.
+ # "select user_id, password from users where login=%(login)s"
+ self.statement = statement
+ self.compare_fn = compare_fn or default_password_compare
+ self.conn_factory = conn_factory or psycopg_connect
+ self.conn = None
+
+ def _connect(self):
+ return self.conn_factory(self.dsn)
+
+ # IAuthenticator
+ def authenticate(self, environ, identity):
+ if not self.conn:
+ self.conn = self._connect()
+ curs = self.conn.cursor()
+ curs.execute(self.statement, identity)
+ result = curs.fetchone()
+ curs.close()
+ if result:
+ user_id, password = result
+ if self.compare_fn(identity['password'], password):
+ return user_id
+
+def make_plugin(pam_conf, dsn=None, statement=None, compare_fn=None,
+ conn_factory=None):
+ from repoze.pam.utils import resolveDotted
+ if dsn is None:
+ raise ValueError('dsn must be specified')
+ if statement is None:
+ raise ValueError('statement must be specified')
+ if compare_fn is not None:
+ compare_fn = resolveDotted(compare_fn)
+ if conn_factory is not None:
+ conn_factory = resolveDotted(conn_factory)
+ return SQLAuthenticatorPlugin(dsn, statement, compare_fn, conn_factory)
+
+
Modified: repoze.pam/trunk/repoze/pam/tests.py
==============================================================================
--- repoze.pam/trunk/repoze/pam/tests.py (original)
+++ repoze.pam/trunk/repoze/pam/tests.py Tue Mar 4 13:55:25 2008
@@ -931,6 +931,182 @@
self.assertEqual(name_reg['auth'], dummy_auth)
self.assertEqual(name_reg['challenger'], dummy_challenger)
+class TestSQLAuthenticatorPlugin(unittest.TestCase):
+ def _makeEnviron(self, kw=None):
+ environ = {}
+ environ['wsgi.version'] = (1,0)
+ if kw is not None:
+ environ.update(kw)
+ return environ
+
+ def _getTargetClass(self):
+ from repoze.pam.plugins.sql import SQLAuthenticatorPlugin
+ return SQLAuthenticatorPlugin
+
+ def _makeOne(self, dsn, statement, compare_fn, cfactory):
+ plugin = self._getTargetClass()(dsn, statement, compare_fn, cfactory)
+ return plugin
+
+ def _makeConnectionFactory(self, result):
+ cursor = DummyCursor(result)
+ def connect(dsn):
+ conn = DummyConnection(dsn, cursor)
+ return conn
+ return connect
+
+ def test_implements(self):
+ from zope.interface.verify import verifyClass
+ from repoze.pam.interfaces import IAuthenticator
+ klass = self._getTargetClass()
+ verifyClass(IAuthenticator, klass)
+
+ def test_authenticate_noresults(self):
+ conn_factory = self._makeConnectionFactory(())
+ plugin = self._makeOne('dsn', 'statement', compare_fail, conn_factory)
+ environ = self._makeEnviron()
+ identity = {'login':'foo', 'password':'bar'}
+ result = plugin.authenticate(environ, identity)
+ self.assertEqual(result, None)
+ self.assertEqual(plugin.conn.dsn, 'dsn')
+ self.assertEqual(plugin.conn.curs.statement, 'statement')
+ self.assertEqual(plugin.conn.curs.bindargs, identity)
+ self.assertEqual(plugin.conn.curs.closed, True)
+
+ def test_authenticate_comparefail(self):
+ conn_factory = self._makeConnectionFactory(('user_id', 'password'))
+ plugin = self._makeOne('dsn', 'statement', compare_fail, conn_factory)
+ environ = self._makeEnviron()
+ identity = {'login':'user_id', 'password':'bar'}
+ result = plugin.authenticate(environ, identity)
+ self.assertEqual(result, None)
+ self.assertEqual(plugin.conn.dsn, 'dsn')
+ self.assertEqual(plugin.conn.curs.statement, 'statement')
+ self.assertEqual(plugin.conn.curs.bindargs, identity)
+ self.assertEqual(plugin.conn.curs.closed, True)
+
+ def test_authenticate_comparesuccess(self):
+ conn_factory = self._makeConnectionFactory(('userid', 'password'))
+ plugin = self._makeOne('dsn', 'statement', compare_success,
+ conn_factory)
+ environ = self._makeEnviron()
+ identity = {'login':'foo', 'password':'bar'}
+ result = plugin.authenticate(environ, identity)
+ self.assertEqual(result, 'userid')
+ self.assertEqual(plugin.conn.dsn, 'dsn')
+ self.assertEqual(plugin.conn.curs.statement, 'statement')
+ self.assertEqual(plugin.conn.curs.bindargs, identity)
+ self.assertEqual(plugin.conn.curs.closed, True)
+
+class TestDefaultPasswordCompare(unittest.TestCase):
+ def _getFUT(self):
+ from repoze.pam.plugins.sql import default_password_compare
+ return default_password_compare
+
+ def test_shaprefix_bad_decode(self):
+ compare = self._getFUT()
+ result = compare('password', '{SHA}undecodable')
+ self.assertEqual(result, False)
+
+ def test_shaprefix_success(self):
+ import sha
+ stored = sha.new('password').digest().encode('base64').rstrip()
+ stored = '{SHA}' + stored
+ compare = self._getFUT()
+ result = compare('password', stored)
+ self.assertEqual(result, True)
+
+ def test_shaprefix_fail(self):
+ import sha
+ stored = sha.new('password').digest().encode('base64').rstrip()
+ stored = '{SHA}' + stored
+ compare = self._getFUT()
+ result = compare('notpassword', stored)
+ self.assertEqual(result, False)
+
+ def test_noprefix_success(self):
+ stored = 'password'
+ compare = self._getFUT()
+ result = compare('password', stored)
+ self.assertEqual(result, True)
+
+ def test_noprefix_fail(self):
+ stored = 'password'
+ compare = self._getFUT()
+ result = compare('notpassword', stored)
+ self.assertEqual(result, False)
+
+class TestMakeSQLAuthenticatorPlugin(unittest.TestCase):
+ def _getFUT(self):
+ from repoze.pam.plugins.sql import make_plugin
+ return make_plugin
+
+ def test_nodsn(self):
+ f = self._getFUT()
+ self.assertRaises(ValueError, f, None, None, 'statement')
+
+ def test_nostatement(self):
+ f = self._getFUT()
+ self.assertRaises(ValueError, f, None, 'dsn', None)
+
+ def test_comparefunc_specd(self):
+ f = self._getFUT()
+ plugin = f(None, 'dsn', 'statement',
+ 'repoze.pam.plugins.sql:make_plugin')
+ self.assertEqual(plugin.dsn, 'dsn')
+ self.assertEqual(plugin.statement, 'statement')
+ self.assertEqual(plugin.compare_fn, f)
+
+ def test_connfactory_specd(self):
+ f = self._getFUT()
+ plugin = f(None, 'dsn', 'statement', None,
+ 'repoze.pam.plugins.sql:make_plugin')
+ self.assertEqual(plugin.dsn, 'dsn')
+ self.assertEqual(plugin.statement, 'statement')
+ self.assertEqual(plugin.conn_factory, f)
+
+ def test_onlydsnandstatement(self):
+ f = self._getFUT()
+ plugin = f(None, 'dsn', 'statement')
+ self.assertEqual(plugin.dsn, 'dsn')
+ self.assertEqual(plugin.statement, 'statement')
+ from repoze.pam.plugins.sql import psycopg_connect
+ from repoze.pam.plugins.sql import default_password_compare
+ self.assertEqual(plugin.conn_factory, psycopg_connect)
+ self.assertEqual(plugin.compare_fn, default_password_compare)
+
+
+def compare_success(*arg):
+ return True
+
+def compare_fail(*arg):
+ return False
+
+class DummyCursor:
+ def __init__(self, result):
+ self.result = result
+ self.statement = None
+ self.bindargs = None
+ self.closed = False
+
+ def execute(self, statement, bindargs):
+ self.statement = statement
+ self.bindargs = bindargs
+
+ def close(self):
+ self.closed = True
+
+ def fetchone(self):
+ return self.result
+
+class DummyConnection:
+ def __init__(self, dsn, cursor):
+ self.dsn = dsn
+ self.curs = cursor
+
+ def cursor(self):
+ return self.curs
+
+
# XXX need make_middleware tests
class DummyApp:
Modified: repoze.pam/trunk/setup.py
==============================================================================
--- repoze.pam/trunk/setup.py (original)
+++ repoze.pam/trunk/setup.py Tue Mar 4 13:55:25 2008
@@ -12,7 +12,7 @@
#
##############################################################################
-__version__ = '0.1'
+__version__ = '0.2'
import os
More information about the Repoze-checkins
mailing list