[Repoze-checkins] r768 - in repoze.pam/trunk: . repoze/pam repoze/pam/plugins

Chris McDonough chrism at agendaless.com
Tue Mar 4 18:55:26 UTC 2008


Author: Chris McDonough <chrism at agendaless.com>
Date: Tue Mar  4 13:55:25 2008
New Revision: 768

Log:
0.2 (03-04-2008)

  - Added SQLAuthenticatorPlugin (see plugins/sql.py).




Added:
   repoze.pam/trunk/repoze/pam/plugins/sql.py   (contents, props changed)
Modified:
   repoze.pam/trunk/   (props changed)
   repoze.pam/trunk/CHANGES.txt
   repoze.pam/trunk/repoze/pam/tests.py
   repoze.pam/trunk/setup.py

Modified: repoze.pam/trunk/CHANGES.txt
==============================================================================
--- repoze.pam/trunk/CHANGES.txt	(original)
+++ repoze.pam/trunk/CHANGES.txt	Tue Mar  4 13:55:25 2008
@@ -1,3 +1,7 @@
+0.2 (03-04-2008)
+
+  - Added SQLAuthenticatorPlugin (see plugins/sql.py).
+
 0.1 (02-27-2008)
 
   Initial release (no configuration file support yet).

Added: repoze.pam/trunk/repoze/pam/plugins/sql.py
==============================================================================
--- (empty file)
+++ repoze.pam/trunk/repoze/pam/plugins/sql.py	Tue Mar  4 13:55:25 2008
@@ -0,0 +1,73 @@
+from zope.interface import implements
+
+from repoze.pam.interfaces import IAuthenticator
+
+def default_password_compare(cleartext_password, stored_password_hash):
+    import sha
+    import binascii
+
+    # the stored password is stored as '{SHA}<base64(<binary SHA digest>)>'.
+    # or as a cleartext password (no {SHA} prefix)
+
+    if stored_password_hash.startswith('{SHA}'):
+        stored_password_hash = stored_password_hash[5:]
+        digest = sha.new(cleartext_password).digest()
+        try:
+            stored_password_hash = stored_password_hash.decode('base64')
+        except binascii.Error:
+            return False
+    else:
+        digest = cleartext_password
+        
+    if stored_password_hash == digest:
+        return True
+
+    return False
+
+def psycopg_connect(dsn):
+    # convenience (I always seem to use Postgres)
+    import psycopg2
+    return psycopg2.connect(dsn)
+
+class SQLAuthenticatorPlugin:
+    implements(IAuthenticator)
+
+    def __init__(self, dsn, statement, compare_fn, conn_factory):
+        self.dsn = dsn
+        # statement should be pyformat dbapi binding-style, e.g.
+        # "select user_id, password from users where login=%(login)s"
+        self.statement = statement
+        self.compare_fn = compare_fn or default_password_compare
+        self.conn_factory = conn_factory or psycopg_connect
+        self.conn = None
+
+    def _connect(self):
+        return self.conn_factory(self.dsn)
+
+    # IAuthenticator
+    def authenticate(self, environ, identity):
+        if not self.conn:
+            self.conn = self._connect()
+        curs = self.conn.cursor()
+        curs.execute(self.statement, identity)
+        result = curs.fetchone()
+        curs.close()
+        if result:
+            user_id, password = result
+            if self.compare_fn(identity['password'], password):
+                return user_id
+
+def make_plugin(pam_conf, dsn=None, statement=None, compare_fn=None,
+                conn_factory=None):
+    from repoze.pam.utils import resolveDotted
+    if dsn is None:
+        raise ValueError('dsn must be specified')
+    if statement is None:
+        raise ValueError('statement must be specified')
+    if compare_fn is not None:
+        compare_fn = resolveDotted(compare_fn)
+    if conn_factory is not None:
+        conn_factory = resolveDotted(conn_factory)
+    return SQLAuthenticatorPlugin(dsn, statement, compare_fn, conn_factory)
+
+    

Modified: repoze.pam/trunk/repoze/pam/tests.py
==============================================================================
--- repoze.pam/trunk/repoze/pam/tests.py	(original)
+++ repoze.pam/trunk/repoze/pam/tests.py	Tue Mar  4 13:55:25 2008
@@ -931,6 +931,182 @@
         self.assertEqual(name_reg['auth'], dummy_auth)
         self.assertEqual(name_reg['challenger'], dummy_challenger)
 
+class TestSQLAuthenticatorPlugin(unittest.TestCase):
+    def _makeEnviron(self, kw=None):
+        environ = {}
+        environ['wsgi.version'] = (1,0)
+        if kw is not None:
+            environ.update(kw)
+        return environ
+
+    def _getTargetClass(self):
+        from repoze.pam.plugins.sql import SQLAuthenticatorPlugin
+        return SQLAuthenticatorPlugin
+
+    def _makeOne(self, dsn, statement, compare_fn, cfactory):
+        plugin = self._getTargetClass()(dsn, statement, compare_fn, cfactory)
+        return plugin
+
+    def _makeConnectionFactory(self, result):
+        cursor = DummyCursor(result)
+        def connect(dsn):
+            conn = DummyConnection(dsn, cursor)
+            return conn
+        return connect
+    
+    def test_implements(self):
+        from zope.interface.verify import verifyClass
+        from repoze.pam.interfaces import IAuthenticator
+        klass = self._getTargetClass()
+        verifyClass(IAuthenticator, klass)
+
+    def test_authenticate_noresults(self):
+        conn_factory = self._makeConnectionFactory(())
+        plugin = self._makeOne('dsn', 'statement', compare_fail, conn_factory)
+        environ = self._makeEnviron()
+        identity = {'login':'foo', 'password':'bar'}
+        result = plugin.authenticate(environ, identity)
+        self.assertEqual(result, None)
+        self.assertEqual(plugin.conn.dsn, 'dsn')
+        self.assertEqual(plugin.conn.curs.statement, 'statement')
+        self.assertEqual(plugin.conn.curs.bindargs, identity)
+        self.assertEqual(plugin.conn.curs.closed, True)
+
+    def test_authenticate_comparefail(self):
+        conn_factory = self._makeConnectionFactory(('user_id', 'password'))
+        plugin = self._makeOne('dsn', 'statement', compare_fail, conn_factory)
+        environ = self._makeEnviron()
+        identity = {'login':'user_id', 'password':'bar'}
+        result = plugin.authenticate(environ, identity)
+        self.assertEqual(result, None)
+        self.assertEqual(plugin.conn.dsn, 'dsn')
+        self.assertEqual(plugin.conn.curs.statement, 'statement')
+        self.assertEqual(plugin.conn.curs.bindargs, identity)
+        self.assertEqual(plugin.conn.curs.closed, True)
+
+    def test_authenticate_comparesuccess(self):
+        conn_factory = self._makeConnectionFactory(('userid', 'password'))
+        plugin = self._makeOne('dsn', 'statement', compare_success,
+                               conn_factory)
+        environ = self._makeEnviron()
+        identity = {'login':'foo', 'password':'bar'}
+        result = plugin.authenticate(environ, identity)
+        self.assertEqual(result, 'userid')
+        self.assertEqual(plugin.conn.dsn, 'dsn')
+        self.assertEqual(plugin.conn.curs.statement, 'statement')
+        self.assertEqual(plugin.conn.curs.bindargs, identity)
+        self.assertEqual(plugin.conn.curs.closed, True)
+
+class TestDefaultPasswordCompare(unittest.TestCase):
+    def _getFUT(self):
+        from repoze.pam.plugins.sql import default_password_compare
+        return default_password_compare
+
+    def test_shaprefix_bad_decode(self):
+        compare = self._getFUT()
+        result = compare('password', '{SHA}undecodable')
+        self.assertEqual(result, False)
+
+    def test_shaprefix_success(self):
+        import sha
+        stored = sha.new('password').digest().encode('base64').rstrip()
+        stored = '{SHA}' + stored
+        compare = self._getFUT()
+        result = compare('password', stored)
+        self.assertEqual(result, True)
+
+    def test_shaprefix_fail(self):
+        import sha
+        stored = sha.new('password').digest().encode('base64').rstrip()
+        stored = '{SHA}' + stored
+        compare = self._getFUT()
+        result = compare('notpassword', stored)
+        self.assertEqual(result, False)
+
+    def test_noprefix_success(self):
+        stored = 'password'
+        compare = self._getFUT()
+        result = compare('password', stored)
+        self.assertEqual(result, True)
+
+    def test_noprefix_fail(self):
+        stored = 'password'
+        compare = self._getFUT()
+        result = compare('notpassword', stored)
+        self.assertEqual(result, False)
+
+class TestMakeSQLAuthenticatorPlugin(unittest.TestCase):
+    def _getFUT(self):
+        from repoze.pam.plugins.sql import make_plugin
+        return make_plugin
+
+    def test_nodsn(self):
+        f = self._getFUT()
+        self.assertRaises(ValueError, f, None, None, 'statement')
+
+    def test_nostatement(self):
+        f = self._getFUT()
+        self.assertRaises(ValueError, f, None, 'dsn', None)
+
+    def test_comparefunc_specd(self):
+        f = self._getFUT()
+        plugin = f(None, 'dsn', 'statement',
+                   'repoze.pam.plugins.sql:make_plugin')
+        self.assertEqual(plugin.dsn, 'dsn')
+        self.assertEqual(plugin.statement, 'statement')
+        self.assertEqual(plugin.compare_fn, f)
+
+    def test_connfactory_specd(self):
+        f = self._getFUT()
+        plugin = f(None, 'dsn', 'statement', None,
+                   'repoze.pam.plugins.sql:make_plugin')
+        self.assertEqual(plugin.dsn, 'dsn')
+        self.assertEqual(plugin.statement, 'statement')
+        self.assertEqual(plugin.conn_factory, f)
+
+    def test_onlydsnandstatement(self):
+        f = self._getFUT()
+        plugin = f(None, 'dsn', 'statement')
+        self.assertEqual(plugin.dsn, 'dsn')
+        self.assertEqual(plugin.statement, 'statement')
+        from repoze.pam.plugins.sql import psycopg_connect
+        from repoze.pam.plugins.sql import default_password_compare
+        self.assertEqual(plugin.conn_factory, psycopg_connect)
+        self.assertEqual(plugin.compare_fn, default_password_compare)
+        
+
+def compare_success(*arg):
+    return True
+
+def compare_fail(*arg):
+    return False
+
+class DummyCursor:
+    def __init__(self, result):
+        self.result = result
+        self.statement = None
+        self.bindargs = None
+        self.closed = False
+
+    def execute(self, statement, bindargs):
+        self.statement = statement
+        self.bindargs = bindargs
+
+    def close(self):
+        self.closed = True
+
+    def fetchone(self):
+        return self.result
+
+class DummyConnection:
+    def __init__(self, dsn, cursor):
+        self.dsn = dsn
+        self.curs = cursor
+
+    def cursor(self):
+        return self.curs
+
+
 # XXX need make_middleware tests
 
 class DummyApp:

Modified: repoze.pam/trunk/setup.py
==============================================================================
--- repoze.pam/trunk/setup.py	(original)
+++ repoze.pam/trunk/setup.py	Tue Mar  4 13:55:25 2008
@@ -12,7 +12,7 @@
 #
 ##############################################################################
 
-__version__ = '0.1'
+__version__ = '0.2'
 
 import os
 


More information about the Repoze-checkins mailing list