[Repoze-checkins] r937 - in repoze.who/trunk: . repoze/who

Tres Seaver tseaver at palladion.com
Sat May 3 13:12:24 EDT 2008


Author: Tres Seaver <tseaver at palladion.com>
Date: Sat May  3 13:12:24 2008
New Revision: 937

Log:
Added ConfigParser-based WhoConfig, implementing the spec outlined at
http://www.plope.com/static/misc/sphinxtest/intro.html#middleware-configuration-via-config-file, with the following changes:

   o "Bare" plugins (requiring no configuration options) may be specified
     as either egg entry points (e.g., 'egg:distname#entry_point_name') or
     as dotted-path-with-colon (e.g., 'dotted.name:object_id').

   o Therefore, the separator between a plugin and its classifier is
     now a semicolon, rather than a colon. E.g.:

     [plugins:id_plugin]
     use = egg:another.package#identify_with_frobnatz
     frobnatz = baz

     [identifiers]
     plugins =
       egg:my.egg#identify;browser
       dotted.name:identifier
       id_plugin


Added:
   repoze.who/trunk/repoze/who/config.py
Modified:
   repoze.who/trunk/CHANGES.txt
   repoze.who/trunk/repoze/who/tests.py

Modified: repoze.who/trunk/CHANGES.txt
==============================================================================
--- repoze.who/trunk/CHANGES.txt	(original)
+++ repoze.who/trunk/CHANGES.txt	Sat May  3 13:12:24 2008
@@ -1,6 +1,28 @@
 repoze.who changes
 ==================
 
+After 0.9.1
+
+ - Added ConfigParser-based WhoConfig, implementing the spec outlined at
+   http://www.plope.com/static/misc/sphinxtest/intro.html#middleware-configuration-via-config-file, with the following changes:
+
+   o "Bare" plugins (requiring no configuration options) may be specified
+     as either egg entry points (e.g., 'egg:distname#entry_point_name') or
+     as dotted-path-with-colon (e.g., 'dotted.name:object_id').
+
+   o Therefore, the separator between a plugin and its classifier is
+     now a semicolon, rather than a colon. E.g.:
+
+     [plugins:id_plugin]
+     use = egg:another.package#identify_with_frobnatz
+     frobnatz = baz
+
+     [identifiers]
+     plugins =
+       egg:my.egg#identify;browser
+       dotted.name:identifier
+       id_plugin
+
 0.9.1 (2008-04-27)
 
  - Fix auth_tkt plugin to be able to encode and decode integer user

Added: repoze.who/trunk/repoze/who/config.py
==============================================================================
--- (empty file)
+++ repoze.who/trunk/repoze/who/config.py	Sat May  3 13:12:24 2008
@@ -0,0 +1,90 @@
+""" Configuration parser
+"""
+from ConfigParser import ConfigParser
+from StringIO import StringIO
+from pkg_resources import EntryPoint
+
+from repoze.who.interfaces import IChallengeDecider
+from repoze.who.interfaces import IRequestClassifier
+
+def _resolve(name):
+    if name:
+        return EntryPoint.parse('x=%s' % name).load(False)
+
+def _isClassOrType(obj):
+    return type(obj) in (type(WhoConfig), type)
+
+class WhoConfig:
+    def __init__(self):
+        self.request_classifier = None
+        self.challenge_decider = None
+        self.plugins = {}
+        self.identifiers = []
+        self.authenticators = []
+        self.challengers = []
+        self.mdproviders = []
+
+    def _getPlugin(self, name):
+        obj = self.plugins.get(name)
+        if obj is None:
+            obj = _resolve(name)
+            if _isClassOrType(obj):
+                obj = obj()
+        return obj
+
+    def _parsePluginSequence(self, attr, proptext):
+        lines = proptext.split()
+        for line in lines:
+            if ';' in line:
+                plugin_name, classifier = line.split(';')
+            else:
+                plugin_name = line
+                classifier = None
+            attr.append({'plugin': self._getPlugin(plugin_name),
+                         'classifier': classifier
+                        })
+
+    def parse(self, text):
+        if getattr(text, 'readline', None) is None:
+            text = StringIO(text)
+        cp = ConfigParser()
+        cp.readfp(text)
+
+        for s_id in [x for x in cp.sections() if x.startswith('plugin:')]:
+            plugin_id = s_id[len('plugin:'):]
+            options = dict(cp.items(s_id))
+            if 'use' in options:
+                obj = _resolve(options['use'])
+                if _isClassOrType(obj):
+                    del options['use']
+                    obj = obj(**options)
+                self.plugins[plugin_id] = obj
+
+        if 'general' in cp.sections():
+            general = dict(cp.items('general'))
+
+            rc = general.get('request_classifier')
+            self.request_classifier = self._getPlugin(rc)
+
+            cd = general.get('challenge_decider')
+            self.challenge_decider = self._getPlugin(cd)
+
+        if 'identifiers' in cp.sections():
+            identifiers = dict(cp.items('identifiers'))
+            self._parsePluginSequence(self.identifiers,
+                                      identifiers['plugins'])
+
+        if 'authenticators' in cp.sections():
+            authenticators = dict(cp.items('authenticators'))
+            self._parsePluginSequence(self.authenticators,
+                                      authenticators['plugins'])
+
+        if 'challengers' in cp.sections():
+            challengers = dict(cp.items('challengers'))
+            self._parsePluginSequence(self.challengers,
+                                      challengers['plugins'])
+
+        if 'mdproviders' in cp.sections():
+            mdproviders = dict(cp.items('mdproviders'))
+            self._parsePluginSequence(self.mdproviders,
+                                      mdproviders['plugins'])

Modified: repoze.who/trunk/repoze/who/tests.py
==============================================================================
--- repoze.who/trunk/repoze/who/tests.py	(original)
+++ repoze.who/trunk/repoze/who/tests.py	Sat May  3 13:12:24 2008
@@ -1816,6 +1816,272 @@
         self.failUnless(str(identity).startswith('<repoze.who identity'))
         self.assertEqual(identity['foo'], 1)
 
+class TestWhoConfig(unittest.TestCase):
+
+    def _getTargetClass(self):
+        from repoze.who.config import WhoConfig
+        return WhoConfig
+
+    def _makeOne(self, *args, **kw):
+        return self._getTargetClass()(*args, **kw)
+
+    def test_defaults_before_parse(self):
+        config = self._makeOne()
+        self.assertEqual(config.request_classifier, None)
+        self.assertEqual(config.challenge_decider, None)
+        self.assertEqual(len(config.plugins), 0)
+        self.assertEqual(len(config.identifiers), 0)
+        self.assertEqual(len(config.authenticators), 0)
+        self.assertEqual(len(config.challengers), 0)
+        self.assertEqual(len(config.mdproviders), 0)
+
+    def test_parse_empty_string(self):
+        config = self._makeOne()
+        config.parse('')
+        self.assertEqual(config.request_classifier, None)
+        self.assertEqual(config.challenge_decider, None)
+        self.assertEqual(len(config.plugins), 0)
+        self.assertEqual(len(config.identifiers), 0)
+        self.assertEqual(len(config.authenticators), 0)
+        self.assertEqual(len(config.challengers), 0)
+        self.assertEqual(len(config.mdproviders), 0)
+
+    def test_parse_empty_file(self):
+        from StringIO import StringIO
+        config = self._makeOne()
+        config.parse(StringIO())
+        self.assertEqual(config.request_classifier, None)
+        self.assertEqual(config.challenge_decider, None)
+        self.assertEqual(len(config.plugins), 0)
+        self.assertEqual(len(config.identifiers), 0)
+        self.assertEqual(len(config.authenticators), 0)
+        self.assertEqual(len(config.challengers), 0)
+        self.assertEqual(len(config.mdproviders), 0)
+
+    def test_parse_plugins(self):
+        config = self._makeOne()
+        config.parse(PLUGINS_ONLY)
+        self.assertEqual(len(config.plugins), 2)
+        self.failUnless(isinstance(config.plugins['foo'],
+                                   DummyRequestClassifier))
+        bar = config.plugins['bar']
+        self.failUnless(isinstance(bar, DummyIdentifier))
+        self.assertEqual(bar.credentials, 'qux')
+
+    def test_parse_general_empty(self):
+        config = self._makeOne()
+        config.parse('[general]')
+        self.assertEqual(config.request_classifier, None)
+        self.assertEqual(config.challenge_decider, None)
+        self.assertEqual(len(config.plugins), 0)
+
+    def test_parse_general_only(self):
+        config = self._makeOne()
+        config.parse(GENERAL_ONLY)
+        self.failUnless(isinstance(config.request_classifier,
+                                   DummyRequestClassifier))
+        self.failUnless(isinstance(config.challenge_decider,
+                                   DummyChallengeDecider))
+        self.assertEqual(len(config.plugins), 0)
+
+    def test_parse_general_with_plugins(self):
+        config = self._makeOne()
+        config.parse(GENERAL_WITH_PLUGINS)
+        self.failUnless(isinstance(config.request_classifier,
+                                   DummyRequestClassifier))
+        self.failUnless(isinstance(config.challenge_decider,
+                                   DummyChallengeDecider))
+
+    def test_parse_identifiers_only(self):
+        config = self._makeOne()
+        config.parse(IDENTIFIERS_ONLY)
+        identifiers = config.identifiers
+        self.assertEqual(len(identifiers), 2)
+        self.failUnless(identifiers[0]['plugin'] is cp_first)
+        self.assertEqual(identifiers[0]['classifier'], 'klass1')
+        self.failUnless(identifiers[1]['plugin'] is cp_second)
+        self.assertEqual(identifiers[1]['classifier'], None)
+
+    def test_parse_identifiers_with_plugins(self):
+        config = self._makeOne()
+        config.parse(IDENTIFIERS_WITH_PLUGINS)
+        identifiers = config.identifiers
+        self.assertEqual(len(identifiers), 2)
+        self.failUnless(identifiers[0]['plugin'] is cp_first)
+        self.assertEqual(identifiers[0]['classifier'], 'klass1')
+        self.failUnless(identifiers[1]['plugin'] is cp_second)
+        self.assertEqual(identifiers[1]['classifier'], None)
+
+    def test_parse_authenticators_only(self):
+        config = self._makeOne()
+        config.parse(AUTHENTICATORS_ONLY)
+        authenticators = config.authenticators
+        self.assertEqual(len(authenticators), 2)
+        self.failUnless(authenticators[0]['plugin'] is cp_first)
+        self.assertEqual(authenticators[0]['classifier'], 'klass1')
+        self.failUnless(authenticators[1]['plugin'] is cp_second)
+        self.assertEqual(authenticators[1]['classifier'], None)
+
+    def test_parse_authenticators_with_plugins(self):
+        config = self._makeOne()
+        config.parse(AUTHENTICATORS_WITH_PLUGINS)
+        authenticators = config.authenticators
+        self.assertEqual(len(authenticators), 2)
+        self.failUnless(authenticators[0]['plugin'] is cp_first)
+        self.assertEqual(authenticators[0]['classifier'], 'klass1')
+        self.failUnless(authenticators[1]['plugin'] is cp_second)
+        self.assertEqual(authenticators[1]['classifier'], None)
+
+    def test_parse_challengers_only(self):
+        config = self._makeOne()
+        config.parse(CHALLENGERS_ONLY)
+        challengers = config.challengers
+        self.assertEqual(len(challengers), 2)
+        self.failUnless(challengers[0]['plugin'] is cp_first)
+        self.assertEqual(challengers[0]['classifier'], 'klass1')
+        self.failUnless(challengers[1]['plugin'] is cp_second)
+        self.assertEqual(challengers[1]['classifier'], None)
+
+    def test_parse_challengers_with_plugins(self):
+        config = self._makeOne()
+        config.parse(CHALLENGERS_WITH_PLUGINS)
+        challengers = config.challengers
+        self.assertEqual(len(challengers), 2)
+        self.failUnless(challengers[0]['plugin'] is cp_first)
+        self.assertEqual(challengers[0]['classifier'], 'klass1')
+        self.failUnless(challengers[1]['plugin'] is cp_second)
+        self.assertEqual(challengers[1]['classifier'], None)
+
+    def test_parse_mdproviders_only(self):
+        config = self._makeOne()
+        config.parse(MDPROVIDERS_ONLY)
+        mdproviders = config.mdproviders
+        self.assertEqual(len(mdproviders), 2)
+        self.failUnless(mdproviders[0]['plugin'] is cp_first)
+        self.assertEqual(mdproviders[0]['classifier'], 'klass1')
+        self.failUnless(mdproviders[1]['plugin'] is cp_second)
+        self.assertEqual(mdproviders[1]['classifier'], None)
+
+    def test_parse_mdproviders_with_plugins(self):
+        config = self._makeOne()
+        config.parse(MDPROVIDERS_WITH_PLUGINS)
+        mdproviders = config.mdproviders
+        self.assertEqual(len(mdproviders), 2)
+        self.failUnless(mdproviders[0]['plugin'] is cp_first)
+        self.assertEqual(mdproviders[0]['classifier'], 'klass1')
+        self.failUnless(mdproviders[1]['plugin'] is cp_second)
+        self.assertEqual(mdproviders[1]['classifier'], None)
+
+cp_first = object()
+cp_second = object()
+
+PLUGINS_ONLY = """\
+[plugin:foo]
+use = repoze.who.tests:DummyRequestClassifier
+
+[plugin:bar]
+use = repoze.who.tests:DummyIdentifier
+credentials = qux
+"""
+
+GENERAL_ONLY = """\
+[general]
+request_classifier = repoze.who.tests:DummyRequestClassifier
+challenge_decider = repoze.who.tests:DummyChallengeDecider
+"""
+
+GENERAL_WITH_PLUGINS = """\
+[general]
+request_classifier = classifier
+challenge_decider = decider
+
+[plugin:classifier]
+use = repoze.who.tests:DummyRequestClassifier
+
+[plugin:decider]
+use = repoze.who.tests:DummyChallengeDecider
+"""
+
+IDENTIFIERS_ONLY = """\
+[identifiers]
+plugins = 
+    repoze.who.tests:cp_first;klass1
+    repoze.who.tests:cp_second
+"""
+
+IDENTIFIERS_WITH_PLUGINS = """\
+[identifiers]
+plugins = 
+    foo;klass1
+    bar
+
+[plugin:foo]
+use = repoze.who.tests:cp_first
+
+[plugin:bar]
+use = repoze.who.tests:cp_second
+"""
+
+AUTHENTICATORS_ONLY = """\
+[authenticators]
+plugins = 
+    repoze.who.tests:cp_first;klass1
+    repoze.who.tests:cp_second
+"""
+
+AUTHENTICATORS_WITH_PLUGINS = """\
+[authenticators]
+plugins = 
+    foo;klass1
+    bar
+
+[plugin:foo]
+use = repoze.who.tests:cp_first
+
+[plugin:bar]
+use = repoze.who.tests:cp_second
+"""
+
+CHALLENGERS_ONLY = """\
+[challengers]
+plugins = 
+    repoze.who.tests:cp_first;klass1
+    repoze.who.tests:cp_second
+"""
+
+CHALLENGERS_WITH_PLUGINS = """\
+[challengers]
+plugins = 
+    foo;klass1
+    bar
+
+[plugin:foo]
+use = repoze.who.tests:cp_first
+
+[plugin:bar]
+use = repoze.who.tests:cp_second
+"""
+
+MDPROVIDERS_ONLY = """\
+[mdproviders]
+plugins = 
+    repoze.who.tests:cp_first;klass1
+    repoze.who.tests:cp_second
+"""
+
+MDPROVIDERS_WITH_PLUGINS = """\
+[mdproviders]
+plugins = 
+    foo;klass1
+    bar
+
+[plugin:foo]
+use = repoze.who.tests:cp_first
+
+[plugin:bar]
+use = repoze.who.tests:cp_second
+"""
+
 def compare_success(*arg):
     return True
 


More information about the Repoze-checkins mailing list