[Repoze-checkins] r1236 - in repoze.bfg/trunk/repoze/bfg: . tests

Chris McDonough chrism at agendaless.com
Mon Jul 7 00:44:58 EDT 2008


Author: Chris McDonough <chrism at agendaless.com>
Date: Mon Jul  7 00:44:57 2008
New Revision: 1236

Log:
Look up a view after traversal; adapt it to IWSGIApplication.


Added:
   repoze.bfg/trunk/repoze/bfg/location.py   (contents, props changed)
   repoze.bfg/trunk/repoze/bfg/mapply.py   (contents, props changed)
   repoze.bfg/trunk/repoze/bfg/tests/test_wsgiadapter.py   (contents, props changed)
   repoze.bfg/trunk/repoze/bfg/wsgiadapter.py   (contents, props changed)
Modified:
   repoze.bfg/trunk/repoze/bfg/interfaces.py
   repoze.bfg/trunk/repoze/bfg/router.py
   repoze.bfg/trunk/repoze/bfg/tests/test_router.py
   repoze.bfg/trunk/repoze/bfg/tests/test_traversal.py
   repoze.bfg/trunk/repoze/bfg/traversal.py

Modified: repoze.bfg/trunk/repoze/bfg/interfaces.py
==============================================================================
--- repoze.bfg/trunk/repoze/bfg/interfaces.py	(original)
+++ repoze.bfg/trunk/repoze/bfg/interfaces.py	Mon Jul  7 00:44:57 2008
@@ -1,29 +1,45 @@
 from zope.interface import Interface
+from zope.interface import Attribute
 
-class IWSGIApplicationFactory(Interface):
-    def __call__(context):
-        """ Return a WSGI (PEP333) application """
+class IResponse(Interface):
+    status = Attribute('WSGI status code of response')
+    headerlist = Attribute('List of response headers')
+    app_iter = Attribute('Iterable representing the response body')
+
+class IView(Interface):
+    def __call__(*arg, **kw):
+        """ Must return an object that implements IResponse; args are
+        mapped into an IView's __call__ by mapply-like code """
+        
+class IViewFactory(Interface):
+    def __call__(context, request):
+        """ Return an object that implements IView """
 
 class IRootPolicy(Interface):
     def __call__(environ):
         """ Return a root object """
 
-class ITraversalPolicy(Interface):
-    def __call__(environ, root):
-        """ Return a tuple in the form (context, name, subpath) """
-
-class ISecurityPolicy(Interface):
-    def __call__(environ, context, name):
-        """ Return a WSGI app on unauthorized or None to signify that
-        the request is allowed to continue """
-        
-class ITraverser(Interface):
-    def __init__(context):
-        """ Accept a context """
+class IPublishTraverser(Interface):
+    def __call__(path):
+        """ Return a tuple in the form (context, name, subpath), typically
+        the result of an object graph traversal """
 
-    def __call__(environ, name):
-        """ Return a subcontext or based on name """
-        
-class IWebObRequest(Interface):
-    """ Marker interface for a webob.Request object """
+class IPublishTraverserFactory(Interface):
+    def __call__(context, request):
+        """ Return an object that implements IPublishTraverser """
+
+class IWSGIApplication(Interface):
+    def __call__(environ, start_response):
+        """ A PEP 333 application """
+
+class IWSGIApplicationFactory(Interface):
+    def __call__(view, request):
+        """ Return an object that implements IWSGIApplication """
+
+class IRequest(Interface):
+    """ Marker interface for a request object """
     
+class ILocation(Interface):
+    """Objects that have a structural location"""
+    __parent__ = Attribute("The parent in the location hierarchy")
+    __name__ = Attribute("The name of the object")

Added: repoze.bfg/trunk/repoze/bfg/location.py
==============================================================================
--- (empty file)
+++ repoze.bfg/trunk/repoze/bfg/location.py	Mon Jul  7 00:44:57 2008
@@ -0,0 +1,118 @@
+##############################################################################
+#
+# Copyright (c) 2003 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+
+import zope.interface
+from zope.proxy import ProxyBase
+from zope.proxy import getProxiedObject
+from zope.proxy import non_overridable
+from zope.proxy.decorator import DecoratorSpecificationDescriptor
+from repoze.bfg.interfaces import ILocation
+
+class Location(object):
+    """Stupid mix-in that defines `__parent__` and `__name__` attributes."""
+
+    zope.interface.implements(ILocation)
+
+    __parent__ = __name__ = None
+
+def locate(object, parent, name=None):
+    """Locate an object in another
+
+    This method should only be called from trusted code, because it
+    sets attributes that are normally unsettable.
+    """
+
+    object.__parent__ = parent
+    object.__name__ = name
+
+
+def located(object, parent, name=None):
+    """Locate an object in another and return it.
+
+    If the object does not provide ILocation a LocationProxy is returned.
+
+    """
+    if ILocation.providedBy(object):
+        if parent is not object.__parent__ or name != object.__name__:
+            locate(object, parent, name)
+        return object
+    return LocationProxy(object, parent, name)
+
+
+def LocationIterator(object):
+    while object is not None:
+        yield object
+        object = getattr(object, '__parent__', None)
+
+
+def inside(l1, l2):
+    """Is l1 inside l2
+
+    L1 is inside l2 if l2 is an ancestor of l1.
+
+    """
+    while l1 is not None:
+        if l1 is l2:
+            return True
+        l1 = l1.__parent__
+
+    return False
+
+
+class ClassAndInstanceDescr(object):
+
+    def __init__(self, *args):
+        self.funcs = args
+
+    def __get__(self, inst, cls):
+        if inst is None:
+            return self.funcs[1](cls)
+        return self.funcs[0](inst)
+
+
+class LocationProxy(ProxyBase):
+    """Location-object proxy
+
+    This is a non-picklable proxy that can be put around objects that
+    don't implement `ILocation`.
+
+    """
+
+    zope.interface.implements(ILocation)
+
+    __slots__ = '__parent__', '__name__'
+    __safe_for_unpickling__ = True
+
+    def __new__(self, ob, container=None, name=None):
+        return ProxyBase.__new__(self, ob)
+
+    def __init__(self, ob, container=None, name=None):
+        ProxyBase.__init__(self, ob)
+        self.__parent__ = container
+        self.__name__ = name
+
+    @non_overridable
+    def __reduce__(self, proto=None):
+        raise TypeError("Not picklable")
+
+
+    __doc__ = ClassAndInstanceDescr(
+        lambda inst: getProxiedObject(inst).__doc__,
+        lambda cls, __doc__ = __doc__: __doc__,
+        )
+
+    __reduce_ex__ = __reduce__
+
+    __providedBy__ = DecoratorSpecificationDescriptor()
+

Added: repoze.bfg/trunk/repoze/bfg/mapply.py
==============================================================================
--- (empty file)
+++ repoze.bfg/trunk/repoze/bfg/mapply.py	Mon Jul  7 00:44:57 2008
@@ -0,0 +1,71 @@
+##############################################################################
+#
+# Copyright (c) 2002 Zope Corporation and Contributors. All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+"""Provide an apply-like facility that works with any mapping object
+"""
+
+def mapply(object,
+           positional=(),
+           keyword={},
+           maybe=True,
+           ):
+
+    if hasattr(object,'__bases__'): # the object is a class
+        raise TypeError('Cannot publish class %s' % object)
+    else:
+        f=object
+        im=0
+        if hasattr(f, 'im_func'):
+            im=1
+        elif not hasattr(f,'func_defaults'):
+            if hasattr(f, '__call__'):
+                f=f.__call__
+                if hasattr(f, 'im_func'):
+                    im=1
+                elif not hasattr(f,'func_defaults') and maybe:
+                    return object
+            elif maybe:
+                return object
+
+        if im:
+            f=f.im_func
+            c=f.func_code
+            defaults=f.func_defaults
+            names=c.co_varnames[1:c.co_argcount]
+        else:
+            defaults=f.func_defaults
+            c=f.func_code
+            names=c.co_varnames[:c.co_argcount]
+
+    nargs=len(names)
+    if positional:
+        positional=list(positional)
+        if len(positional) > nargs:
+            raise TypeError('too many arguments')
+        args=positional
+    else:
+        args=[]
+
+    get=keyword.get
+    nrequired=len(names) - (len(defaults or ()))
+    for index in range(len(args), len(names)):
+        name=names[index]
+        v=get(name, args)
+        if v is args:
+            if index < nrequired:
+                raise TypeError('Argument %s was omitted' % name)
+            else:
+                v=defaults[index-nrequired]
+        args.append(v)
+
+    args=tuple(args)
+    return object(*args)

Modified: repoze.bfg/trunk/repoze/bfg/router.py
==============================================================================
--- repoze.bfg/trunk/repoze/bfg/router.py	(original)
+++ repoze.bfg/trunk/repoze/bfg/router.py	Mon Jul  7 00:44:57 2008
@@ -1,28 +1,45 @@
+from zope.component import getGlobalSiteManager
+from zope.component import getMultiAdapter
 from zope.component import queryMultiAdapter
 from zope.interface import directlyProvides
 
 from webob import Request
 from webob.exc import HTTPNotFound
 
+from repoze.bfg.interfaces import IPublishTraverserFactory
+from repoze.bfg.interfaces import IViewFactory
 from repoze.bfg.interfaces import IWSGIApplicationFactory
-from repoze.bfg.interfaces import IWebObRequest
+
+from repoze.bfg.interfaces import IRequest
+
+_marker = ()
 
 class Router:
-    def __init__(self, root_policy, traversal_policy, security_policy):
+    def __init__(self, root_policy):
         self.root_policy = root_policy
-        self.traversal_policy = traversal_policy
-        self.security_policy = security_policy
 
     def __call__(self, environ, start_response):
+        request = Request(environ)
+        directlyProvides(request, IRequest)
         root = self.root_policy(environ)
-        context, name, subpath = self.traversal_policy(environ, root)
-        app = self.security_policy(environ, context, name)
-        if app is None:
-            environ['repoze.bfg.subpath'] = subpath
-            request = Request(environ)
-            directlyProvides(request, IWebObRequest)
-            app = queryMultiAdapter((context, request),
-                                    IWSGIApplicationFactory, name=name)
-            if app is None:
-                app = HTTPNotFound(request.url)
+        path = environ.get('PATH_INFO', '/')
+        traverser = getMultiAdapter((root, request), IPublishTraverserFactory)
+        context, name, subpath = traverser(path)
+        request.subpath = subpath
+        app = queryMultiAdapter((context, request), IViewFactory, name=name,
+                                default=_marker)
+        if app is _marker:
+            app = HTTPNotFound(request.url)
+        else:
+            app = getMultiAdapter((app, request), IWSGIApplicationFactory)
         return app(environ, start_response)
+
+def make_app(**config):
+    from repoze.bfg.traversal import NaivePublishTraverser
+    from repoze.bfg.wsgiadapter import NaiveWSGIViewAdapter
+    gsm = getGlobalSiteManager()
+    gsm.registerAdapter(NaivePublishTraverser, (None, None),
+                        IPublishTraverserFactory)
+    gsm.registerAdapter(NaiveWSGIViewAdapter, (None, None),
+                        IWSGIApplicationFactory)
+    

Modified: repoze.bfg/trunk/repoze/bfg/tests/test_router.py
==============================================================================
--- repoze.bfg/trunk/repoze/bfg/tests/test_router.py	(original)
+++ repoze.bfg/trunk/repoze/bfg/tests/test_router.py	Mon Jul  7 00:44:57 2008
@@ -9,12 +9,24 @@
     def tearDown(self):
         PlacelessSetup.tearDown(self)
 
-    def _registerFactory(self, app, name, *for_):
+    def _registerTraverserFactory(self, app, name, *for_):
+        import zope.component
+        gsm = zope.component.getGlobalSiteManager()
+        from repoze.bfg.interfaces import IPublishTraverserFactory
+        gsm.registerAdapter(app, for_, IPublishTraverserFactory, name)
+
+    def _registerViewFactory(self, app, name, *for_):
+        import zope.component
+        gsm = zope.component.getGlobalSiteManager()
+        from repoze.bfg.interfaces import IViewFactory
+        gsm.registerAdapter(app, for_, IViewFactory, name)
+
+    def _registerWSGIFactory(self, app, name, *for_):
         import zope.component
         gsm = zope.component.getGlobalSiteManager()
         from repoze.bfg.interfaces import IWSGIApplicationFactory
         gsm.registerAdapter(app, for_, IWSGIApplicationFactory, name)
-        
+
     def _getTargetClass(self):
         from repoze.bfg.router import Router
         return Router
@@ -33,151 +45,166 @@
         environ.update(extras)
         return environ
 
-    def test_call_no_app_registered(self):
-        statii = []
-        headerii = []
-        def rootpolicy(environ):
-            return None
-        def traversalpolicy(environ, root):
-            return DummyContext(), 'foo', []
-        def securitypolicy(environ, context, name):
-            return None
-        def start_response(status, headers):
-            statii[:] = [status]
-            headerii[:] = [headers]
+    def test_call_no_view_registered(self):
+        rootpolicy = make_rootpolicy(None)
         environ = self._makeEnviron()
-        router = self._makeOne(rootpolicy, traversalpolicy, securitypolicy)
+        context = DummyContext()
+        traversalfactory = make_traversal_factory(context, '', [])
+        self._registerTraverserFactory(traversalfactory, '', None, None)
+        router = self._makeOne(rootpolicy)
+        start_response = DummyStartResponse()
         result = router(environ, start_response)
-        headers = headerii[0]
+        headers = start_response.headers
         self.assertEqual(len(headers), 2)
-        status = statii[0]
+        status = start_response.status
         self.assertEqual(status, '404 Not Found')
         self.failUnless('http://localhost:8080' in result[0], result)
 
-    def test_call_securitypolicy_denies(self):
-        statii = []
-        headerii = []
-        def rootpolicy(environ):
-            return None
-        def traversalpolicy(environ, root):
-            return DummyContext(), 'foo', []
-        def securitypolicy(environ, context, name):
-            from webob.exc import HTTPUnauthorized
-            return HTTPUnauthorized()
-        def start_response(status, headers):
-            statii[:] = [status]
-            headerii[:] = [headers]
-        environ = self._makeEnviron()
-        router = self._makeOne(rootpolicy, traversalpolicy, securitypolicy)
-        result = router(environ, start_response)
-        headers = headerii[0]
-        self.assertEqual(len(headers), 2)
-        status = statii[0]
-        self.assertEqual(status, '401 Unauthorized')
-
-    def test_call_app_registered_nonspecific_default_path(self):
-        def rootpolicy(environ):
-            return None
+    def test_call_view_registered_nonspecific_default_path(self):
+        rootpolicy = make_rootpolicy(None)
         context = DummyContext()
-        def traversalpolicy(environ, root):
-            return context, '', []
-        def start_response(status, headers):
-            pass
-        def securitypolicy(environ, context, name):
-            return None
-        environ = self._makeEnviron()
-        self._registerFactory(DummyWSGIApplicationFactory, '', None, None)
-        router = self._makeOne(rootpolicy, traversalpolicy, securitypolicy)
-        result = router(environ, start_response)
-        self.failUnless(result[0] is context)
-        import webob
-        self.failUnless(isinstance(result[1], webob.Request))
-        self.assertEqual(environ['repoze.bfg.subpath'], [])
-
-    def test_call_app_registered_nonspecific_nondefault_path_and_subpath(self):
-        def rootpolicy(environ):
-            return None
-        context = DummyContext()
-        def traversalpolicy(environ, root):
-            return context, 'foo', ['bar', 'baz']
-        def start_response(status, headers):
-            pass
-        def securitypolicy(environ, context, name):
-            return None
-        environ = self._makeEnviron()
-        self._registerFactory(DummyWSGIApplicationFactory, 'foo', None, None)
-        router = self._makeOne(rootpolicy, traversalpolicy, securitypolicy)
-        result = router(environ, start_response)
-        self.failUnless(result[0] is context)
-        import webob
-        self.failUnless(isinstance(result[1], webob.Request))
-        self.assertEqual(environ['repoze.bfg.subpath'], ['bar', 'baz'])
-
-    def test_call_app_registered_specific_success(self):
-        def rootpolicy(environ):
-            return None
+        traversalfactory = make_traversal_factory(context, '', [])
+        response = DummyResponse()
+        viewfactory = make_view_factory(response)
+        wsgifactory = make_wsgi_factory('200 OK', (), ['Hello world'])
+        environ = self._makeEnviron()
+        self._registerTraverserFactory(traversalfactory, '', None, None)
+        self._registerViewFactory(viewfactory, '', None, None)
+        self._registerWSGIFactory(wsgifactory, '', None, None)
+        router = self._makeOne(rootpolicy)
+        start_response = DummyStartResponse()
+        result = router(environ, start_response)
+        self.assertEqual(result, ['Hello world'])
+        self.assertEqual(start_response.headers, ())
+        self.assertEqual(start_response.status, '200 OK')
+        request = environ['request']
+        self.assertEqual(environ['request'].subpath, [])
+        self.assertEqual(environ['view'].context, context)
+
+    def test_call_view_registered_nonspecific_nondefault_path_and_subpath(self):
+        rootpolicy = make_rootpolicy(None)
         context = DummyContext()
+        traversalfactory = make_traversal_factory(context, 'foo', ['bar'])
+        response = DummyResponse()
+        viewfactory = make_view_factory(response)
+        wsgifactory = make_wsgi_factory('200 OK', (), ['Hello world'])
+        environ = self._makeEnviron()
+        self._registerTraverserFactory(traversalfactory, '', None, None)
+        self._registerViewFactory(viewfactory, 'foo', None, None)
+        self._registerWSGIFactory(wsgifactory, '', None, None)
+        router = self._makeOne(rootpolicy)
+        start_response = DummyStartResponse()
+        result = router(environ, start_response)
+        self.assertEqual(result, ['Hello world'])
+        self.assertEqual(start_response.headers, ())
+        self.assertEqual(start_response.status, '200 OK')
+        request = environ['request']
+        self.assertEqual(environ['request'].subpath, ['bar'])
+        self.assertEqual(environ['view'].context, context)
+
+    def test_call_view_registered_specific_success(self):
+        rootpolicy = make_rootpolicy(None)
         from zope.interface import Interface
         from zope.interface import directlyProvides
         class IContext(Interface):
             pass
+        from repoze.bfg.interfaces import IRequest
+        context = DummyContext()
         directlyProvides(context, IContext)
-        def traversalpolicy(environ, root):
-            return context, 'foo', ['bar', 'baz']
-        def start_response(status, headers):
-            pass
-        def securitypolicy(environ, context, name):
-            return None
-        environ = self._makeEnviron()
-        from repoze.bfg.interfaces import IWebObRequest
-        self._registerFactory(DummyWSGIApplicationFactory, 'foo', IContext,
-                              IWebObRequest)
-        router = self._makeOne(rootpolicy, traversalpolicy, securitypolicy)
-        result = router(environ, start_response)
-        self.failUnless(result[0] is context)
-        import webob
-        self.failUnless(isinstance(result[1], webob.Request))
-        self.assertEqual(environ['repoze.bfg.subpath'], ['bar', 'baz'])
+        traversalfactory = make_traversal_factory(context, '', [])
+        response = DummyResponse()
+        viewfactory = make_view_factory(response)
+        wsgifactory = make_wsgi_factory('200 OK', (), ['Hello world'])
+        environ = self._makeEnviron()
+        self._registerTraverserFactory(traversalfactory, '', None, None)
+        self._registerViewFactory(viewfactory, '', IContext, IRequest)
+        self._registerWSGIFactory(wsgifactory, '', None, None)
+        router = self._makeOne(rootpolicy)
+        start_response = DummyStartResponse()
+        result = router(environ, start_response)
+        self.assertEqual(result, ['Hello world'])
+        self.assertEqual(start_response.headers, ())
+        self.assertEqual(start_response.status, '200 OK')
+        request = environ['request']
+        self.assertEqual(environ['request'].subpath, [])
+        self.assertEqual(environ['view'].context, context)
 
-    def test_call_app_registered_specific_fail(self):
-        context = DummyContext()
+    def test_call_view_registered_specific_fail(self):
+        rootpolicy = make_rootpolicy(None)
         from zope.interface import Interface
         from zope.interface import directlyProvides
-        class INotContext(Interface):
-            pass
         class IContext(Interface):
             pass
+        class INotContext(Interface):
+            pass
+        from repoze.bfg.interfaces import IRequest
+        context = DummyContext()
         directlyProvides(context, INotContext)
-        statii = []
-        headerii = []
-        def rootpolicy(environ):
-            return None
-        def traversalpolicy(environ, root):
-            return context, 'foo', []
-        def start_response(status, headers):
-            statii[:] = [status]
-            headerii[:] = [headers]
-        def securitypolicy(environ, context, name):
-            return None
-        environ = self._makeEnviron()
-        from repoze.bfg.interfaces import IWebObRequest
-        self._registerFactory(DummyWSGIApplicationFactory, 'foo', IContext,
-                              IWebObRequest)
-        router = self._makeOne(rootpolicy, traversalpolicy, securitypolicy)
+        traversalfactory = make_traversal_factory(context, '', [''])
+        response = DummyResponse()
+        viewfactory = make_view_factory(response)
+        wsgifactory = make_wsgi_factory('200 OK', (), ['Hello world'])
+        environ = self._makeEnviron()
+        self._registerTraverserFactory(traversalfactory, '', None, None)
+        self._registerViewFactory(viewfactory, '', IContext, IRequest)
+        self._registerWSGIFactory(wsgifactory, '', None, None)
+        router = self._makeOne(rootpolicy)
+        start_response = DummyStartResponse()
         result = router(environ, start_response)
-        headers = headerii[0]
-        self.assertEqual(len(headers), 2)
-        status = statii[0]
-        self.assertEqual(status, '404 Not Found')
-        self.failUnless('http://localhost:8080' in result[0], result)
+        self.failUnless('404' in result[0])
+        self.assertEqual(start_response.status, '404 Not Found')
 
 class DummyContext:
     pass
 
-class DummyWSGIApplicationFactory:
-    def __init__(self, context, request):
-        self.context = context
-        self.request = request
-
-    def __call__(self, environ, start_response):
-        return self.context, self.request
+def make_wsgi_factory(status, headers, app_iter):
+    class DummyWSGIApplicationFactory:
+        def __init__(self, view, request):
+            self.view = view
+            self.request = request
+
+        def __call__(self, environ, start_response):
+            environ['view'] = self.view
+            environ['request'] = self.request
+            start_response(status, headers)
+            return app_iter
+
+    return DummyWSGIApplicationFactory
+
+def make_view_factory(response):
+    class DummyViewFactory:
+        def __init__(self, context, request):
+            self.context = context
+            self.request = request
+
+        def __call__(self):
+            return response
+    return DummyViewFactory
+
+def make_traversal_factory(context, name, subpath):
+    class DummyTraversalFactory:
+        def __init__(self, root, request):
+            self.root = root
+            self.request = request
+
+        def __call__(self, path):
+            return context, name, subpath
+    return DummyTraversalFactory
+
+def make_rootpolicy(root):
+    def rootpolicy(environ):
+        return root
+    return rootpolicy
+
+class DummyStartResponse:
+    status = ()
+    headers = ()
+    def __call__(self, status, headers):
+        self.status = status
+        self.headers = headers
+        
+class DummyResponse:
+    status = '200 OK'
+    headerlist = ()
+    app_iter = ()
+    

Modified: repoze.bfg/trunk/repoze/bfg/tests/test_traversal.py
==============================================================================
--- repoze.bfg/trunk/repoze/bfg/tests/test_traversal.py	(original)
+++ repoze.bfg/trunk/repoze/bfg/tests/test_traversal.py	Mon Jul  7 00:44:57 2008
@@ -36,62 +36,61 @@
         PlacelessSetup.tearDown(self)
         
     def _getTargetClass(self):
-        from repoze.bfg.traversal import NaiveTraversalPolicy
-        return NaiveTraversalPolicy
+        from repoze.bfg.traversal import NaivePublishTraverser
+        return NaivePublishTraverser
 
     def _makeOne(self, *arg, **kw):
         import zope.component
         gsm = zope.component.getGlobalSiteManager()
-        from repoze.bfg.interfaces import ITraverser
-        gsm.registerAdapter(DummyTraverser, (None,), ITraverser, '')
         klass = self._getTargetClass()
         return klass(*arg, **kw)
 
-    def test_class_conforms_to_ITraversalPolicy(self):
+    def test_class_conforms_to_IPublishTraverser(self):
         from zope.interface.verify import verifyClass
-        from repoze.bfg.interfaces import ITraversalPolicy
-        verifyClass(ITraversalPolicy, self._getTargetClass())
+        from repoze.bfg.interfaces import IPublishTraverser
+        verifyClass(IPublishTraverser, self._getTargetClass())
 
-    def test_instance_conforms_to_ITraversalPolicy(self):
+    def test_instance_conforms_to_IPublishTraverser(self):
         from zope.interface.verify import verifyObject
-        from repoze.bfg.interfaces import ITraversalPolicy
-        verifyObject(ITraversalPolicy, self._makeOne())
+        from repoze.bfg.interfaces import IPublishTraverser
+        context = DummyContext()
+        request = DummyRequest()
+        verifyObject(IPublishTraverser, self._makeOne(context, request))
 
-    def test_call_nonkeyerror_raises(self):
-        policy = self._makeOne()
-        environ = {'PATH_INFO':'/foo'}
-        root = None
-        self.assertRaises(TypeError, policy, environ, root)
+    def test_call_pathel_with_no_getitem(self):
+        request = DummyRequest()
+        policy = self._makeOne(None, request)
+        ctx, name, subpath = policy('/foo/bar')
+        self.assertEqual(ctx, None)
+        self.assertEqual(name, 'foo')
+        self.assertEqual(subpath, ['bar'])
 
     def test_call_withconn_getitem_emptypath_nosubpath(self):
-        policy = self._makeOne()
-        context = DummyContext()
-        environ = {'PATH_INFO':''}
-        root = context
-        ctx, name, subpath = policy(environ, root)
-        self.assertEqual(context, ctx)
+        root = DummyContext()
+        request = DummyRequest()
+        policy = self._makeOne(root, request)
+        ctx, name, subpath = policy('')
+        self.assertEqual(ctx, root)
         self.assertEqual(name, '')
         self.assertEqual(subpath, [])
 
     def test_call_withconn_getitem_withpath_nosubpath(self):
-        policy = self._makeOne()
-        context = DummyContext()
-        context2 = DummyContext(context)
-        environ = {'PATH_INFO':'/foo/bar'}
-        root = context
-        ctx, name, subpath = policy(environ, root)
-        self.assertEqual(context, ctx)
+        foo = DummyContext()
+        root = DummyContext(foo)
+        request = DummyRequest()
+        policy = self._makeOne(root, request)
+        ctx, name, subpath = policy('/foo/bar')
+        self.assertEqual(ctx, foo)
         self.assertEqual(name, 'bar')
         self.assertEqual(subpath, [])
 
     def test_call_withconn_getitem_withpath_withsubpath(self):
-        policy = self._makeOne()
-        context = DummyContext()
-        context2 = DummyContext(context)
-        environ = {'PATH_INFO':'/foo/bar/baz/buz'}
-        root = context
-        ctx, name, subpath = policy(environ, root)
-        self.assertEqual(context, ctx)
+        foo = DummyContext()
+        request = DummyRequest()
+        root = DummyContext(foo)
+        policy = self._makeOne(root, request)
+        ctx, name, subpath = policy('/foo/bar/baz/buz')
+        self.assertEqual(ctx, foo)
         self.assertEqual(name, 'bar')
         self.assertEqual(subpath, ['baz', 'buz'])
 
@@ -103,6 +102,9 @@
         if self.next is None:
             raise KeyError, name
         return self.next
+
+class DummyRequest:
+    pass
     
 class DummyTraverser:
     def __init__(self, context):

Added: repoze.bfg/trunk/repoze/bfg/tests/test_wsgiadapter.py
==============================================================================
--- (empty file)
+++ repoze.bfg/trunk/repoze/bfg/tests/test_wsgiadapter.py	Mon Jul  7 00:44:57 2008
@@ -0,0 +1,75 @@
+import unittest
+
+class NaiveWSGIAdapterTests(unittest.TestCase):
+    def _getTargetClass(self):
+        from repoze.bfg.wsgiadapter import NaiveWSGIViewAdapter
+        return NaiveWSGIViewAdapter
+
+    def _makeOne(self, *arg, **kw):
+        klass = self._getTargetClass()
+        return klass(*arg, **kw)
+
+    def test_view_takes_no_args(self):
+        response = DummyResponse()
+        response.app_iter = ['Hello world']
+        def view():
+            return response
+        request = DummyRequest()
+        adapter = self._makeOne(view, request)
+        environ = {}
+        start_response = DummyStartResponse()
+        result = adapter(environ, start_response)
+        self.assertEqual(result, ['Hello world'])
+        self.assertEqual(start_response.headers, ())
+        self.assertEqual(start_response.status, '200 OK')
+
+    def test_view_takes_pep_333_args(self):
+        response = DummyResponse()
+        response.app_iter = ['Hello world']
+        def view(environ, start_response):
+            response.environ = environ
+            response.start_response = start_response
+            return response
+        request = DummyRequest()
+        adapter = self._makeOne(view, request)
+        environ = {}
+        start_response = DummyStartResponse()
+        result = adapter(environ, start_response)
+        self.assertEqual(result, ['Hello world'])
+        self.assertEqual(start_response.headers, ())
+        self.assertEqual(start_response.status, '200 OK')
+        self.assertEqual(response.environ, environ)
+        self.assertEqual(response.start_response, start_response)
+
+    def test_view_takes_zopey_args(self):
+        request = DummyRequest()
+        response = DummyResponse()
+        response.app_iter = ['Hello world']
+        def view(request):
+            response.request = request
+            return response
+        adapter = self._makeOne(view, request)
+        environ = {}
+        start_response = DummyStartResponse()
+        result = adapter(environ, start_response)
+        self.assertEqual(result, ['Hello world'])
+        self.assertEqual(start_response.headers, ())
+        self.assertEqual(start_response.status, '200 OK')
+        self.assertEqual(response.request, request)
+
+class DummyRequest:
+    pass
+
+class DummyResponse:
+    status = '200 OK'
+    headerlist = ()
+    app_iter = ()
+    
+class DummyStartResponse:
+    status = None
+    headers = None
+    def __call__(self, status, headers):
+        self.status = status
+        self.headers = headers
+        
+

Modified: repoze.bfg/trunk/repoze/bfg/traversal.py
==============================================================================
--- repoze.bfg/trunk/repoze/bfg/traversal.py	(original)
+++ repoze.bfg/trunk/repoze/bfg/traversal.py	Mon Jul  7 00:44:57 2008
@@ -1,9 +1,10 @@
 import urllib
 
+from zope.interface import classProvides
 from zope.interface import implements
 
-from repoze.bfg.interfaces import ITraversalPolicy
-from repoze.bfg.interfaces import ITraverser
+from repoze.bfg.interfaces import IPublishTraverser
+from repoze.bfg.interfaces import IPublishTraverserFactory
 
 def split_path(path):
     if path.startswith('/'):
@@ -21,27 +22,37 @@
             clean.append(segment)
     return clean
 
-class NaiveTraversalPolicy:
-    implements(ITraversalPolicy)
+def step(ob, name, default):
+    if not hasattr(ob, '__getitem__'):
+        return default
+    try:
+        return ob[name]
+    except KeyError:
+        return default
+
+_marker = ()
+
+class NaivePublishTraverser:
+    classProvides(IPublishTraverserFactory)
+    implements(IPublishTraverser)
+    def __init__(self, root, request):
+        self.root = root
+        self.request = request
 
-    def __call__(self, environ, root):
-        path = split_path(environ['PATH_INFO'])
-        
-        ob = root
+    def __call__(self, path):
+        path = split_path(path)
+
+        ob = self.root
         name = ''
 
         while path:
-            segment = pop(path)
-            traverser = ITraverser(ob)
-            next = traverser(environ, segment)
-            if next is None:
-                if path:
-                    name = pop(path)
+            segment = path.pop(0)
+            next = step(ob, segment, _marker)
+            if next is _marker:
+                name = segment
                 break
             ob = next
 
         return ob, name, path
 
-def pop(path):
-    return path.pop(0)
 

Added: repoze.bfg/trunk/repoze/bfg/wsgiadapter.py
==============================================================================
--- (empty file)
+++ repoze.bfg/trunk/repoze/bfg/wsgiadapter.py	Mon Jul  7 00:44:57 2008
@@ -0,0 +1,29 @@
+from zope.interface import implements
+from zope.interface import classProvides
+
+from repoze.bfg.interfaces import IWSGIApplicationFactory
+from repoze.bfg.interfaces import IWSGIApplication
+from repoze.bfg.mapply import mapply
+
+class NaiveWSGIViewAdapter:
+    classProvides(IWSGIApplicationFactory)
+    implements(IWSGIApplication)
+
+    def __init__(self, view, request):
+        self.view = view
+        self.request = request
+
+    def __call__(self, environ, start_response):
+        catch_response = []
+        def replace_start_response(status, headers):
+            catch_response[:] = (status, headers)
+        kwdict = {
+            'request':self.request,
+            'environ':environ,
+            'start_response':start_response,
+            }
+        response =  mapply(self.view, positional = (), keyword = kwdict)
+        if not catch_response:
+            catch_response = (response.status, response.headerlist)
+        start_response(*catch_response)
+        return response.app_iter


More information about the Repoze-checkins mailing list