[Repoze-checkins] r851 - repoze.browserid/trunk/repoze/browserid

Chris McDonough chrism at agendaless.com
Fri Mar 21 14:00:38 EDT 2008


Author: Chris McDonough <chrism at agendaless.com>
Date: Fri Mar 21 14:00:38 2008
New Revision: 851

Log:
Use sha instead of struct.pack; allow an hmac to be sent along with the browser id to allow for things like "browser ids must be sent from the same IP address", and so on.


Modified:
   repoze.browserid/trunk/repoze/browserid/middleware.py
   repoze.browserid/trunk/repoze/browserid/tests.py

Modified: repoze.browserid/trunk/repoze/browserid/middleware.py
==============================================================================
--- repoze.browserid/trunk/repoze/browserid/middleware.py	(original)
+++ repoze.browserid/trunk/repoze/browserid/middleware.py	Fri Mar 21 14:00:38 2008
@@ -12,46 +12,89 @@
 #
 ##############################################################################
 
+import hmac
+import os
 import random
-import sys
 import StringIO
-import struct
+import sha
 import time
+import threading
 
 from paste.request import get_cookies
 
+_RANDS = []
+_CURRENT_PERIOD = None
+_LOCK = threading.Lock()
+
 class BrowserIdMiddleware(object):
+
     def __init__(self, app,
+                 secret_key,
                  cookie_name,
                  cookie_path='/',
                  cookie_domain=None,
                  cookie_lifetime=None,
-                 cookie_secure=False):
+                 cookie_secure=False,
+                 vary=(),
+                 ):
 
         self.app = app
+        self.secret_key = secret_key
         self.cookie_name = cookie_name
         self.cookie_path = cookie_path
         self.cookie_domain = cookie_domain
         self.cookie_lifetime = cookie_lifetime
         self.cookie_secure = cookie_secure
-
-    def __call__(self, environ, start_response, time=time, random=random):
+        self.vary = vary
+        self.randint = random.randint # for testing
+        self.time = time.time # for testing
+        try:
+            self.pid = os.getpid()
+        except AttributeError:
+            # no getpid in Jython
+            self.pid = 1
+
+    def __call__(self, environ, start_response):
+        """
+        If the remote browser has a cookie with a browser id value,
+        and the value hasn't been tampered with, set the value as
+        'repoze.browserid' in the environ and call the downstream
+        application.
+
+        Otherwise, create one and set that as 'repoze.browserid' in
+        the environ, then call the downstream application.  On egress,
+        set a Set-Cookie header with the value so we can retrieve it
+        next time around.
+
+        We use the secret key and the values in self.vary to compose
+        the 'tamper key' when creating a browser id.  This allows a
+        configurer to vary the tamper key on, e.g. 'REMOTE_ADDR' if he
+        believes that the same browser id should always be sent from
+        the same IP address, or 'HTTP_USER_AGENT' if he believes it
+        should always come from the same user agent, or some arbitrary
+        combination thereof made out of environ keys.
+        """
         cookies = get_cookies(environ)
         cookie = cookies.get(self.cookie_name)
         if cookie is not None:
             # this browser already has an id
-            return self.app(environ, start_response)
-            
+            browser_id = cookie.value
+            if not self.tampered(environ, browser_id):
+                environ['repoze.browserid'] = browser_id
+                return self.app(environ, start_response)
+
+        now = self.time()
+        browser_id = self.make_browser_id(now, environ)
+        environ['repoze.browserid'] = browser_id
         wrapper = StartResponseWrapper(start_response)
         app_iter = self.app(environ, wrapper.wrap_start_response)
-        browser_id = make_browser_id(time, random)
         set_cookie = '%s=%s; ' % (self.cookie_name, browser_id)
         if self.cookie_path:
             set_cookie += 'Path=%s; ' % self.cookie_path
         if self.cookie_domain:
             set_cookie += 'Domain=%s; ' % self.cookie_domain
         if self.cookie_lifetime:
-            expires = time.gmtime(time.time() + self.cookie_lifetime)
+            expires = time.gmtime(now + self.cookie_lifetime)
             expires = time.strftime('%a %d-%b-%Y %H:%M:%S GMT', expires)
             set_cookie += 'Expires=%s; ' % expires
         if self.cookie_secure:
@@ -59,44 +102,69 @@
         wrapper.finish_response([('Set-Cookie', set_cookie)])
         return app_iter
 
-def make_browser_id(time=time, random=random):
-    """ Returns 40-character string browser id
-    'AAAAAAAABBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB'
-    where:
-
-    A == 8-byte string representation of random integer
-    B == 32-byte hex representation of a timetime value
-
-    An example is: 0000000047e0d7e3000000006468f3ff
-    """
-    rand = random.randint(0, 99999999)
-    stamp = make_timestamp(time)
-    browser_id = '%08i%s' % (rand, stamp)
-    return browser_id
-
-def time_from_browser_id(browser_id):
-    stamp = browser_id[8:]
-    return timestamp_to_time(stamp)
-        
-def make_timestamp(time=time):
-    now = time.time()
-    int_part = int(now)
-    frac_part = int((now - int_part) * sys.maxint)
-    stamp = struct.pack(">QQ", int_part, frac_part).encode('hex')
-    return stamp
-
-def timestamp_to_time(stamp):
-    try:
-        binary = stamp.decode('hex')
-    except TypeError:
-        return None
-    try:
-        int_part, frac_part = struct.unpack('>QQ', binary)
-    except struct.error:
-        return None
-    frac_part = float(frac_part / sys.maxint)
-    time = int_part + frac_part
-    return time
+    def _get_tamper_key(self, environ):
+        key = self.secret_key
+        for name in self.vary:
+            key = key + environ.get(name, '')
+        return key
+
+    def tampered(self, environ, browser_id):
+        try:
+            component, provided_h = browser_id.split('!')
+        except ValueError:
+            return True
+        key = self._get_tamper_key(environ)
+        computed_h = hmac.new(key, component).hexdigest()
+        return computed_h != provided_h
+
+    def make_browser_id(self, when, environ):
+        """ Returns opaque browser id
+
+        An example is: XXX
+        """
+        rand = self.get_rand_for(when)
+        source = '%s%s%s' % (rand, when, self.pid)
+        component = sha.new(source).hexdigest()
+        key = self._get_tamper_key(environ)
+        h = hmac.new(key, component).hexdigest()
+        browser_id = '%s!%s' % (component, h)
+        return browser_id
+
+    def get_rand_for(self, when):
+        """
+        There is a good chance that two simultaneous callers will
+        obtain the same random number when the system first starts, as
+        all Python threads/interpreters will start with the same
+        random seed (the time) when they come up on platforms that
+        dont have an entropy generator.
+
+        We'd really like to be sure that two callers never get the
+        same browser id, so this is a problem.  But since our browser
+        id has a time component and a random component, the random
+        component only needs to be unique within the resolution of the
+        time component to ensure browser id uniqueness.
+
+        We keep around a set of recently-generated random numbers at a
+        global scope for the past second, only returning numbers that
+        aren't in this set.  The lowest-known-resolution time.time
+        timer is on Windows, which changes 18.2 times per second, so
+        using a period of one second should be conservative enough.
+        """
+        period = 1
+        this_period = int(when - (when % period))
+        _LOCK.acquire()
+        try:
+            while 1:
+                rand = self.randint(0, 99999999)
+                global _CURRENT_PERIOD
+                if this_period != _CURRENT_PERIOD:
+                    _CURRENT_PERIOD = this_period
+                    _RANDS[:] = []
+                if rand not in _RANDS:
+                    _RANDS.append(rand)
+                    return rand
+        finally:
+            _LOCK.release()
 
 class StartResponseWrapper(object):
     def __init__(self, start_response):
@@ -124,3 +192,4 @@
                 write(value)
             if hasattr(write, 'close'):
                 write.close()
+

Modified: repoze.browserid/trunk/repoze/browserid/tests.py
==============================================================================
--- repoze.browserid/trunk/repoze/browserid/tests.py	(original)
+++ repoze.browserid/trunk/repoze/browserid/tests.py	Fri Mar 21 14:00:38 2008
@@ -1,121 +1,174 @@
 import unittest
 
+_DEFAULT_BID = """\
+e193a01ecf8d30ad0affefd332ce934e32ffce72!1f2115dde0ba7312bdc94942e227666c
+"""
+
+# see the "d" at the end?
+_BAD_BID = """\
+e193a01ecf8d30ad0affefd332ce934e32ffce72!1f2115dde0ba7312bdc94942e227666d
+"""
+
 class TestBrowserIdMiddleware(unittest.TestCase):
     def _getTargetClass(self):
         from repoze.browserid.middleware import BrowserIdMiddleware
         return BrowserIdMiddleware
 
+    def tearDown(self):
+        import repoze.browserid.middleware
+        repoze.browserid.middleware._RANDS[:] = []
+        repoze.browserid.middleware._CURRENT_PERIOD = None
+        self.headers = None
+        self.status = None
+        self.exc_info = None
+
+    def _assertBrowserId(self, browser_id,
+                        rand=0, when=0, pid=1, secret='secret'):
+        import sha
+        import hmac
+        component = '%s%s%s' % (rand, when, pid)
+        component = sha.new(component).hexdigest()
+        hmac = hmac.new(secret, component).hexdigest()
+        self.assertEqual(browser_id, '%s!%s' % (component, hmac))
+
     def _makeOne(self, *arg, **kw):
         klass = self._getTargetClass()
         app = DummyApp()
-        return klass(app, *arg, **kw)
+        mw = klass(app, *arg, **kw)
+        mw.randint = lambda *arg: 0
+        mw.time = lambda *arg: 0
+        mw.pid = 1
+        return mw
+
+    def _start_response(self, status, headers, exc_info=None):
+        self.status = status
+        self.headers = headers
+        self.exc_info = exc_info
+
+    def _get_cookie_components(self, cookie_str):
+        components =  [ x.strip() for x in cookie_str.rstrip().split(';') if x ]
+        return components
 
     def test_defaults_nocookie(self):
-        middleware = self._makeOne('thecookiename')
+        middleware = self._makeOne('secret', 'thecookiename')
         environ = {}
-        headerses = []
-        def start_response(status, headers, exc_info=None):
-            headerses.append(headers)
-        import time
-        dummy_t = DummyTime(0, time.gmtime(0), '<timestr>')
-        dummy_random = DummyRandom(0)
-        app_iter = middleware(environ, start_response, time=dummy_t,
-                              random=dummy_random)
-        headers = headerses[0]
-        self.assertEqual(len(headers), 1)
-        header = headers[0]
-        self.assertEqual(header[0], 'Set-Cookie')
-        self.assertEqual(header[1], 'thecookiename=' + '0' * 40 + '; Path=/; ')
-
-    def test_defaults_withcookie(self):
-        middleware = self._makeOne('thecookiename')
-        environ = {'HTTP_COOKIE':'thecookiename=1'}
-        headerses = []
-        def start_response(status, headers, exc_info=None):
-            headerses.append(headers)
-        result = middleware(environ, start_response)
-        self.assertEqual(headerses, [[]])
-
-class TestMakeBrowserId(unittest.TestCase):
-    def _getFUT(self):
-        from repoze.browserid.middleware import make_browser_id
-        return make_browser_id
-
-    def test_it(self):
-        f = self._getFUT()
-        time = DummyTime(0)
-        random = DummyRandom(0)
-        browser_id = f(time, random)
-        self.assertEqual(browser_id, '0' * 40)
-
-class TestMakeTimestamp(unittest.TestCase):
-    def _getFUT(self):
-        from repoze.browserid.middleware import make_timestamp
-        return make_timestamp
-
-    def test_zero(self):
-        t = DummyTime(0)
-        f = self._getFUT()
-        ts = f(t)
-        self.assertEqual(ts, '0'*32)
-
-    def test_mah_birthday(self):
-        import time
-        mah_birthday = (1971, 5, 10, 0, 0, 0, 0, 0, -1)
-        mah_birthday = time.mktime(mah_birthday)
-        t = DummyTime(mah_birthday)
-        f = self._getFUT()
-        ts = f(t)
-        self.assertEqual(ts, '00000000028b7d400000000000000000')
-
-class TestTimestampToTime(unittest.TestCase):
-    def _getFUT(self):
-        from repoze.browserid.middleware import timestamp_to_time
-        return timestamp_to_time
-
-    def test_zero(self):
-        f = self._getFUT()
-        stamp = '0' * 32
-        t = f(stamp)
-        self.assertEqual(t, 0)
-
-    def test_mah_birthday(self):
+        app_iter = middleware(environ, self._start_response)
+        self.assertEqual(len(self.headers), 1)
+        header = self.headers[0]
+        header_name, header_val = header
+        self.assertEqual(header_name, 'Set-Cookie')
+        cookie_val, path = self._get_cookie_components(header_val)
+        name, cookie = cookie_val.split('=')
+        self.assertEqual(name, 'thecookiename')
+        self._assertBrowserId(cookie)
+        self.assertEqual(path, 'Path=/')
+        self.assertEqual(app_iter, [])
+
+    def test_cookie_path(self):
+        middleware = self._makeOne('secret', 'thecookiename',
+                                   cookie_path='/subpath')
+        environ = {}
+        app_iter = middleware(environ, self._start_response)
+        self.assertEqual(len(self.headers), 1)
+        header = self.headers[0]
+        header_name, header_val = header
+        self.assertEqual(header_name, 'Set-Cookie')
+        cookie_val, path = self._get_cookie_components(header_val)
+        name, cookie = cookie_val.split('=')
+        self.assertEqual(name, 'thecookiename')
+        self._assertBrowserId(cookie)
+        self.assertEqual(path, 'Path=/subpath')
+        self.assertEqual(app_iter, [])
+
+    def test_cookie_domain(self):
+        middleware = self._makeOne('secret', 'thecookiename',
+                                   cookie_domain='repoze.org')
+        environ = {}
+        app_iter = middleware(environ, self._start_response)
+        self.assertEqual(len(self.headers), 1)
+        header = self.headers[0]
+        header_name, header_val = header
+        self.assertEqual(header_name, 'Set-Cookie')
+        cookie_val, path, domain = self._get_cookie_components(header_val)
+        name, cookie = cookie_val.split('=')
+        self.assertEqual(name, 'thecookiename')
+        self._assertBrowserId(cookie)
+        self.assertEqual(path, 'Path=/')
+        self.assertEqual(domain, 'Domain=repoze.org')
+        self.assertEqual(app_iter, [])
+
+    def test_cookie_lifetime(self):
+        lifetime = 86400
+        middleware = self._makeOne('secret', 'thecookiename',
+                                   cookie_lifetime=lifetime)
+        environ = {}
+        app_iter = middleware(environ, self._start_response)
+        self.assertEqual(len(self.headers), 1)
+        header = self.headers[0]
+        header_name, header_val = header
+        self.assertEqual(header_name, 'Set-Cookie')
+        cookie_val, path, expiresh = self._get_cookie_components(header_val)
+        name, cookie = cookie_val.split('=')
+        self.assertEqual(name, 'thecookiename')
+        self._assertBrowserId(cookie)
+        self.assertEqual(path, 'Path=/')
         import time
-        mah_birthday = (1971, 5, 10, 0, 0, 0, 0, 0, -1)
-        mah_birthday = time.mktime(mah_birthday)
-        f = self._getFUT()
-        stamp = '00000000028b7d400000000000000000'
-        t = f(stamp)
-        self.assertEqual(t, mah_birthday)
-
-    def test_hexdecode_bad(self):
-        f = self._getFUT()
-        stamp = 'bogus'
-        t = f(stamp)
-        self.assertEqual(t, None)
-
-    def test_unpack_bad(self):
-        f = self._getFUT()
-        stamp = '0' * 16 # not long enough
-        t = f(stamp)
-        self.assertEqual(t, None)
-
-class TestTimeFromBrowserId(unittest.TestCase):
-    def _getFUT(self):
-        from repoze.browserid.middleware import time_from_browser_id
-        return time_from_browser_id
-
-    def test_working(self):
-        browser_id = '0' * 40
-        f = self._getFUT()
-        t = f(browser_id)
-        self.assertEqual(t, 0)
-
-    def test_bogus(self):
-        f = self._getFUT()
-        stamp = '0' * 16 # not long enough
-        t = f(stamp)
-        self.assertEqual(t, None)
+        expires = time.gmtime(lifetime)
+        expires = time.strftime('%a %d-%b-%Y %H:%M:%S GMT', expires)
+        self.assertEqual(expiresh, 'Expires=%s' % expires)
+        self.assertEqual(app_iter, [])
+
+    def test_cookie_secure(self):
+        middleware = self._makeOne('secret', 'thecookiename',
+                                   cookie_secure=True)
+        environ = {}
+        app_iter = middleware(environ, self._start_response)
+        self.assertEqual(len(self.headers), 1)
+        header = self.headers[0]
+        header_name, header_val = header
+        self.assertEqual(header_name, 'Set-Cookie')
+        cookie_val, path, secure = self._get_cookie_components(header_val)
+        name, cookie = cookie_val.split('=')
+        self.assertEqual(name, 'thecookiename')
+        self._assertBrowserId(cookie)
+        self.assertEqual(path, 'Path=/')
+        self.assertEqual(secure, 'Secure')
+        self.assertEqual(app_iter, [])
+
+    def test_defaults_withcookie_untampered(self):
+        middleware = self._makeOne('secret', 'thecookiename')
+        environ = {'HTTP_COOKIE':'thecookiename=%s; Path=/;' % _DEFAULT_BID}
+        result = middleware(environ, self._start_response)
+        self.assertEqual(self.headers, [])
+        self.assertEqual(result, [])
+
+    def test_defaults_withcookie_tampered_badformat(self):
+        middleware = self._makeOne('secret', 'thecookiename')
+        environ = {'HTTP_COOKIE':'thecookiename=bad; Path=/;'}
+        result = middleware(environ, self._start_response)
+        # headers were set because the format of the browser id was bad
+        self.assertEqual(len(self.headers), 1)
+        self.assertEqual(result, [])
+
+    def test_defaults_withcookie_tampered_badhmac(self):
+        middleware = self._makeOne('secret', 'thecookiename')
+        environ = {'HTTP_COOKIE':'thecookiename=%s; Path=/;' % _BAD_BID}
+        result = middleware(environ, self._start_response)
+        # headers were set because the hmac of the browser id was wrong
+        self.assertEqual(len(self.headers), 1)
+        self.assertEqual(result, [])
+
+    def test_make_browser_id(self):
+        middleware = self._makeOne('secret', 'thecookiename')
+        browser_id = middleware.make_browser_id(0, {})
+        self._assertBrowserId(browser_id)
+
+    def test_make_browser_id_vary(self):
+        middleware = self._makeOne('secret', 'thecookiename')
+        middleware.vary = ('REMOTE_ADDR', 'HTTP_USER_AGENT', 'NONEXISTENT')
+        environ = {'REMOTE_ADDR':'127.0.0.1', 'HTTP_USER_AGENT':'Fluzbox'}
+        browser_id = middleware.make_browser_id(0, environ)
+        self._assertBrowserId(browser_id, secret='secret127.0.0.1Fluzbox')
 
 class TestStartResponseWrapper(unittest.TestCase):
     def _getTargetClass(self):


More information about the Repoze-checkins mailing list