[Repoze-checkins] r937 - in repoze.who/trunk: . repoze/who
Tres Seaver
tseaver at palladion.com
Sat May 3 13:12:24 EDT 2008
Author: Tres Seaver <tseaver at palladion.com>
Date: Sat May 3 13:12:24 2008
New Revision: 937
Log:
Added ConfigParser-based WhoConfig, implementing the spec outlined at
http://www.plope.com/static/misc/sphinxtest/intro.html#middleware-configuration-via-config-file, with the following changes:
o "Bare" plugins (requiring no configuration options) may be specified
as either egg entry points (e.g., 'egg:distname#entry_point_name') or
as dotted-path-with-colon (e.g., 'dotted.name:object_id').
o Therefore, the separator between a plugin and its classifier is
now a semicolon, rather than a colon. E.g.:
[plugins:id_plugin]
use = egg:another.package#identify_with_frobnatz
frobnatz = baz
[identifiers]
plugins =
egg:my.egg#identify;browser
dotted.name:identifier
id_plugin
Added:
repoze.who/trunk/repoze/who/config.py
Modified:
repoze.who/trunk/CHANGES.txt
repoze.who/trunk/repoze/who/tests.py
Modified: repoze.who/trunk/CHANGES.txt
==============================================================================
--- repoze.who/trunk/CHANGES.txt (original)
+++ repoze.who/trunk/CHANGES.txt Sat May 3 13:12:24 2008
@@ -1,6 +1,28 @@
repoze.who changes
==================
+After 0.9.1
+
+ - Added ConfigParser-based WhoConfig, implementing the spec outlined at
+ http://www.plope.com/static/misc/sphinxtest/intro.html#middleware-configuration-via-config-file, with the following changes:
+
+ o "Bare" plugins (requiring no configuration options) may be specified
+ as either egg entry points (e.g., 'egg:distname#entry_point_name') or
+ as dotted-path-with-colon (e.g., 'dotted.name:object_id').
+
+ o Therefore, the separator between a plugin and its classifier is
+ now a semicolon, rather than a colon. E.g.:
+
+ [plugins:id_plugin]
+ use = egg:another.package#identify_with_frobnatz
+ frobnatz = baz
+
+ [identifiers]
+ plugins =
+ egg:my.egg#identify;browser
+ dotted.name:identifier
+ id_plugin
+
0.9.1 (2008-04-27)
- Fix auth_tkt plugin to be able to encode and decode integer user
Added: repoze.who/trunk/repoze/who/config.py
==============================================================================
--- (empty file)
+++ repoze.who/trunk/repoze/who/config.py Sat May 3 13:12:24 2008
@@ -0,0 +1,90 @@
+""" Configuration parser
+"""
+from ConfigParser import ConfigParser
+from StringIO import StringIO
+from pkg_resources import EntryPoint
+
+from repoze.who.interfaces import IChallengeDecider
+from repoze.who.interfaces import IRequestClassifier
+
+def _resolve(name):
+ if name:
+ return EntryPoint.parse('x=%s' % name).load(False)
+
+def _isClassOrType(obj):
+ return type(obj) in (type(WhoConfig), type)
+
+class WhoConfig:
+ def __init__(self):
+ self.request_classifier = None
+ self.challenge_decider = None
+ self.plugins = {}
+ self.identifiers = []
+ self.authenticators = []
+ self.challengers = []
+ self.mdproviders = []
+
+ def _getPlugin(self, name):
+ obj = self.plugins.get(name)
+ if obj is None:
+ obj = _resolve(name)
+ if _isClassOrType(obj):
+ obj = obj()
+ return obj
+
+ def _parsePluginSequence(self, attr, proptext):
+ lines = proptext.split()
+ for line in lines:
+ if ';' in line:
+ plugin_name, classifier = line.split(';')
+ else:
+ plugin_name = line
+ classifier = None
+ attr.append({'plugin': self._getPlugin(plugin_name),
+ 'classifier': classifier
+ })
+
+ def parse(self, text):
+ if getattr(text, 'readline', None) is None:
+ text = StringIO(text)
+ cp = ConfigParser()
+ cp.readfp(text)
+
+ for s_id in [x for x in cp.sections() if x.startswith('plugin:')]:
+ plugin_id = s_id[len('plugin:'):]
+ options = dict(cp.items(s_id))
+ if 'use' in options:
+ obj = _resolve(options['use'])
+ if _isClassOrType(obj):
+ del options['use']
+ obj = obj(**options)
+ self.plugins[plugin_id] = obj
+
+ if 'general' in cp.sections():
+ general = dict(cp.items('general'))
+
+ rc = general.get('request_classifier')
+ self.request_classifier = self._getPlugin(rc)
+
+ cd = general.get('challenge_decider')
+ self.challenge_decider = self._getPlugin(cd)
+
+ if 'identifiers' in cp.sections():
+ identifiers = dict(cp.items('identifiers'))
+ self._parsePluginSequence(self.identifiers,
+ identifiers['plugins'])
+
+ if 'authenticators' in cp.sections():
+ authenticators = dict(cp.items('authenticators'))
+ self._parsePluginSequence(self.authenticators,
+ authenticators['plugins'])
+
+ if 'challengers' in cp.sections():
+ challengers = dict(cp.items('challengers'))
+ self._parsePluginSequence(self.challengers,
+ challengers['plugins'])
+
+ if 'mdproviders' in cp.sections():
+ mdproviders = dict(cp.items('mdproviders'))
+ self._parsePluginSequence(self.mdproviders,
+ mdproviders['plugins'])
Modified: repoze.who/trunk/repoze/who/tests.py
==============================================================================
--- repoze.who/trunk/repoze/who/tests.py (original)
+++ repoze.who/trunk/repoze/who/tests.py Sat May 3 13:12:24 2008
@@ -1816,6 +1816,272 @@
self.failUnless(str(identity).startswith('<repoze.who identity'))
self.assertEqual(identity['foo'], 1)
+class TestWhoConfig(unittest.TestCase):
+
+ def _getTargetClass(self):
+ from repoze.who.config import WhoConfig
+ return WhoConfig
+
+ def _makeOne(self, *args, **kw):
+ return self._getTargetClass()(*args, **kw)
+
+ def test_defaults_before_parse(self):
+ config = self._makeOne()
+ self.assertEqual(config.request_classifier, None)
+ self.assertEqual(config.challenge_decider, None)
+ self.assertEqual(len(config.plugins), 0)
+ self.assertEqual(len(config.identifiers), 0)
+ self.assertEqual(len(config.authenticators), 0)
+ self.assertEqual(len(config.challengers), 0)
+ self.assertEqual(len(config.mdproviders), 0)
+
+ def test_parse_empty_string(self):
+ config = self._makeOne()
+ config.parse('')
+ self.assertEqual(config.request_classifier, None)
+ self.assertEqual(config.challenge_decider, None)
+ self.assertEqual(len(config.plugins), 0)
+ self.assertEqual(len(config.identifiers), 0)
+ self.assertEqual(len(config.authenticators), 0)
+ self.assertEqual(len(config.challengers), 0)
+ self.assertEqual(len(config.mdproviders), 0)
+
+ def test_parse_empty_file(self):
+ from StringIO import StringIO
+ config = self._makeOne()
+ config.parse(StringIO())
+ self.assertEqual(config.request_classifier, None)
+ self.assertEqual(config.challenge_decider, None)
+ self.assertEqual(len(config.plugins), 0)
+ self.assertEqual(len(config.identifiers), 0)
+ self.assertEqual(len(config.authenticators), 0)
+ self.assertEqual(len(config.challengers), 0)
+ self.assertEqual(len(config.mdproviders), 0)
+
+ def test_parse_plugins(self):
+ config = self._makeOne()
+ config.parse(PLUGINS_ONLY)
+ self.assertEqual(len(config.plugins), 2)
+ self.failUnless(isinstance(config.plugins['foo'],
+ DummyRequestClassifier))
+ bar = config.plugins['bar']
+ self.failUnless(isinstance(bar, DummyIdentifier))
+ self.assertEqual(bar.credentials, 'qux')
+
+ def test_parse_general_empty(self):
+ config = self._makeOne()
+ config.parse('[general]')
+ self.assertEqual(config.request_classifier, None)
+ self.assertEqual(config.challenge_decider, None)
+ self.assertEqual(len(config.plugins), 0)
+
+ def test_parse_general_only(self):
+ config = self._makeOne()
+ config.parse(GENERAL_ONLY)
+ self.failUnless(isinstance(config.request_classifier,
+ DummyRequestClassifier))
+ self.failUnless(isinstance(config.challenge_decider,
+ DummyChallengeDecider))
+ self.assertEqual(len(config.plugins), 0)
+
+ def test_parse_general_with_plugins(self):
+ config = self._makeOne()
+ config.parse(GENERAL_WITH_PLUGINS)
+ self.failUnless(isinstance(config.request_classifier,
+ DummyRequestClassifier))
+ self.failUnless(isinstance(config.challenge_decider,
+ DummyChallengeDecider))
+
+ def test_parse_identifiers_only(self):
+ config = self._makeOne()
+ config.parse(IDENTIFIERS_ONLY)
+ identifiers = config.identifiers
+ self.assertEqual(len(identifiers), 2)
+ self.failUnless(identifiers[0]['plugin'] is cp_first)
+ self.assertEqual(identifiers[0]['classifier'], 'klass1')
+ self.failUnless(identifiers[1]['plugin'] is cp_second)
+ self.assertEqual(identifiers[1]['classifier'], None)
+
+ def test_parse_identifiers_with_plugins(self):
+ config = self._makeOne()
+ config.parse(IDENTIFIERS_WITH_PLUGINS)
+ identifiers = config.identifiers
+ self.assertEqual(len(identifiers), 2)
+ self.failUnless(identifiers[0]['plugin'] is cp_first)
+ self.assertEqual(identifiers[0]['classifier'], 'klass1')
+ self.failUnless(identifiers[1]['plugin'] is cp_second)
+ self.assertEqual(identifiers[1]['classifier'], None)
+
+ def test_parse_authenticators_only(self):
+ config = self._makeOne()
+ config.parse(AUTHENTICATORS_ONLY)
+ authenticators = config.authenticators
+ self.assertEqual(len(authenticators), 2)
+ self.failUnless(authenticators[0]['plugin'] is cp_first)
+ self.assertEqual(authenticators[0]['classifier'], 'klass1')
+ self.failUnless(authenticators[1]['plugin'] is cp_second)
+ self.assertEqual(authenticators[1]['classifier'], None)
+
+ def test_parse_authenticators_with_plugins(self):
+ config = self._makeOne()
+ config.parse(AUTHENTICATORS_WITH_PLUGINS)
+ authenticators = config.authenticators
+ self.assertEqual(len(authenticators), 2)
+ self.failUnless(authenticators[0]['plugin'] is cp_first)
+ self.assertEqual(authenticators[0]['classifier'], 'klass1')
+ self.failUnless(authenticators[1]['plugin'] is cp_second)
+ self.assertEqual(authenticators[1]['classifier'], None)
+
+ def test_parse_challengers_only(self):
+ config = self._makeOne()
+ config.parse(CHALLENGERS_ONLY)
+ challengers = config.challengers
+ self.assertEqual(len(challengers), 2)
+ self.failUnless(challengers[0]['plugin'] is cp_first)
+ self.assertEqual(challengers[0]['classifier'], 'klass1')
+ self.failUnless(challengers[1]['plugin'] is cp_second)
+ self.assertEqual(challengers[1]['classifier'], None)
+
+ def test_parse_challengers_with_plugins(self):
+ config = self._makeOne()
+ config.parse(CHALLENGERS_WITH_PLUGINS)
+ challengers = config.challengers
+ self.assertEqual(len(challengers), 2)
+ self.failUnless(challengers[0]['plugin'] is cp_first)
+ self.assertEqual(challengers[0]['classifier'], 'klass1')
+ self.failUnless(challengers[1]['plugin'] is cp_second)
+ self.assertEqual(challengers[1]['classifier'], None)
+
+ def test_parse_mdproviders_only(self):
+ config = self._makeOne()
+ config.parse(MDPROVIDERS_ONLY)
+ mdproviders = config.mdproviders
+ self.assertEqual(len(mdproviders), 2)
+ self.failUnless(mdproviders[0]['plugin'] is cp_first)
+ self.assertEqual(mdproviders[0]['classifier'], 'klass1')
+ self.failUnless(mdproviders[1]['plugin'] is cp_second)
+ self.assertEqual(mdproviders[1]['classifier'], None)
+
+ def test_parse_mdproviders_with_plugins(self):
+ config = self._makeOne()
+ config.parse(MDPROVIDERS_WITH_PLUGINS)
+ mdproviders = config.mdproviders
+ self.assertEqual(len(mdproviders), 2)
+ self.failUnless(mdproviders[0]['plugin'] is cp_first)
+ self.assertEqual(mdproviders[0]['classifier'], 'klass1')
+ self.failUnless(mdproviders[1]['plugin'] is cp_second)
+ self.assertEqual(mdproviders[1]['classifier'], None)
+
+cp_first = object()
+cp_second = object()
+
+PLUGINS_ONLY = """\
+[plugin:foo]
+use = repoze.who.tests:DummyRequestClassifier
+
+[plugin:bar]
+use = repoze.who.tests:DummyIdentifier
+credentials = qux
+"""
+
+GENERAL_ONLY = """\
+[general]
+request_classifier = repoze.who.tests:DummyRequestClassifier
+challenge_decider = repoze.who.tests:DummyChallengeDecider
+"""
+
+GENERAL_WITH_PLUGINS = """\
+[general]
+request_classifier = classifier
+challenge_decider = decider
+
+[plugin:classifier]
+use = repoze.who.tests:DummyRequestClassifier
+
+[plugin:decider]
+use = repoze.who.tests:DummyChallengeDecider
+"""
+
+IDENTIFIERS_ONLY = """\
+[identifiers]
+plugins =
+ repoze.who.tests:cp_first;klass1
+ repoze.who.tests:cp_second
+"""
+
+IDENTIFIERS_WITH_PLUGINS = """\
+[identifiers]
+plugins =
+ foo;klass1
+ bar
+
+[plugin:foo]
+use = repoze.who.tests:cp_first
+
+[plugin:bar]
+use = repoze.who.tests:cp_second
+"""
+
+AUTHENTICATORS_ONLY = """\
+[authenticators]
+plugins =
+ repoze.who.tests:cp_first;klass1
+ repoze.who.tests:cp_second
+"""
+
+AUTHENTICATORS_WITH_PLUGINS = """\
+[authenticators]
+plugins =
+ foo;klass1
+ bar
+
+[plugin:foo]
+use = repoze.who.tests:cp_first
+
+[plugin:bar]
+use = repoze.who.tests:cp_second
+"""
+
+CHALLENGERS_ONLY = """\
+[challengers]
+plugins =
+ repoze.who.tests:cp_first;klass1
+ repoze.who.tests:cp_second
+"""
+
+CHALLENGERS_WITH_PLUGINS = """\
+[challengers]
+plugins =
+ foo;klass1
+ bar
+
+[plugin:foo]
+use = repoze.who.tests:cp_first
+
+[plugin:bar]
+use = repoze.who.tests:cp_second
+"""
+
+MDPROVIDERS_ONLY = """\
+[mdproviders]
+plugins =
+ repoze.who.tests:cp_first;klass1
+ repoze.who.tests:cp_second
+"""
+
+MDPROVIDERS_WITH_PLUGINS = """\
+[mdproviders]
+plugins =
+ foo;klass1
+ bar
+
+[plugin:foo]
+use = repoze.who.tests:cp_first
+
+[plugin:bar]
+use = repoze.who.tests:cp_second
+"""
+
def compare_success(*arg):
return True
More information about the Repoze-checkins
mailing list