[Repoze-checkins] r851 - repoze.browserid/trunk/repoze/browserid
Chris McDonough
chrism at agendaless.com
Fri Mar 21 14:00:38 EDT 2008
Author: Chris McDonough <chrism at agendaless.com>
Date: Fri Mar 21 14:00:38 2008
New Revision: 851
Log:
Use sha instead of struct.pack; allow an hmac to be sent along with the browser id to allow for things like "browser ids must be sent from the same IP address", and so on.
Modified:
repoze.browserid/trunk/repoze/browserid/middleware.py
repoze.browserid/trunk/repoze/browserid/tests.py
Modified: repoze.browserid/trunk/repoze/browserid/middleware.py
==============================================================================
--- repoze.browserid/trunk/repoze/browserid/middleware.py (original)
+++ repoze.browserid/trunk/repoze/browserid/middleware.py Fri Mar 21 14:00:38 2008
@@ -12,46 +12,89 @@
#
##############################################################################
+import hmac
+import os
import random
-import sys
import StringIO
-import struct
+import sha
import time
+import threading
from paste.request import get_cookies
+_RANDS = []
+_CURRENT_PERIOD = None
+_LOCK = threading.Lock()
+
class BrowserIdMiddleware(object):
+
def __init__(self, app,
+ secret_key,
cookie_name,
cookie_path='/',
cookie_domain=None,
cookie_lifetime=None,
- cookie_secure=False):
+ cookie_secure=False,
+ vary=(),
+ ):
self.app = app
+ self.secret_key = secret_key
self.cookie_name = cookie_name
self.cookie_path = cookie_path
self.cookie_domain = cookie_domain
self.cookie_lifetime = cookie_lifetime
self.cookie_secure = cookie_secure
-
- def __call__(self, environ, start_response, time=time, random=random):
+ self.vary = vary
+ self.randint = random.randint # for testing
+ self.time = time.time # for testing
+ try:
+ self.pid = os.getpid()
+ except AttributeError:
+ # no getpid in Jython
+ self.pid = 1
+
+ def __call__(self, environ, start_response):
+ """
+ If the remote browser has a cookie with a browser id value,
+ and the value hasn't been tampered with, set the value as
+ 'repoze.browserid' in the environ and call the downstream
+ application.
+
+ Otherwise, create one and set that as 'repoze.browserid' in
+ the environ, then call the downstream application. On egress,
+ set a Set-Cookie header with the value so we can retrieve it
+ next time around.
+
+ We use the secret key and the values in self.vary to compose
+ the 'tamper key' when creating a browser id. This allows a
+ configurer to vary the tamper key on, e.g. 'REMOTE_ADDR' if he
+ believes that the same browser id should always be sent from
+ the same IP address, or 'HTTP_USER_AGENT' if he believes it
+ should always come from the same user agent, or some arbitrary
+ combination thereof made out of environ keys.
+ """
cookies = get_cookies(environ)
cookie = cookies.get(self.cookie_name)
if cookie is not None:
# this browser already has an id
- return self.app(environ, start_response)
-
+ browser_id = cookie.value
+ if not self.tampered(environ, browser_id):
+ environ['repoze.browserid'] = browser_id
+ return self.app(environ, start_response)
+
+ now = self.time()
+ browser_id = self.make_browser_id(now, environ)
+ environ['repoze.browserid'] = browser_id
wrapper = StartResponseWrapper(start_response)
app_iter = self.app(environ, wrapper.wrap_start_response)
- browser_id = make_browser_id(time, random)
set_cookie = '%s=%s; ' % (self.cookie_name, browser_id)
if self.cookie_path:
set_cookie += 'Path=%s; ' % self.cookie_path
if self.cookie_domain:
set_cookie += 'Domain=%s; ' % self.cookie_domain
if self.cookie_lifetime:
- expires = time.gmtime(time.time() + self.cookie_lifetime)
+ expires = time.gmtime(now + self.cookie_lifetime)
expires = time.strftime('%a %d-%b-%Y %H:%M:%S GMT', expires)
set_cookie += 'Expires=%s; ' % expires
if self.cookie_secure:
@@ -59,44 +102,69 @@
wrapper.finish_response([('Set-Cookie', set_cookie)])
return app_iter
-def make_browser_id(time=time, random=random):
- """ Returns 40-character string browser id
- 'AAAAAAAABBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB'
- where:
-
- A == 8-byte string representation of random integer
- B == 32-byte hex representation of a timetime value
-
- An example is: 0000000047e0d7e3000000006468f3ff
- """
- rand = random.randint(0, 99999999)
- stamp = make_timestamp(time)
- browser_id = '%08i%s' % (rand, stamp)
- return browser_id
-
-def time_from_browser_id(browser_id):
- stamp = browser_id[8:]
- return timestamp_to_time(stamp)
-
-def make_timestamp(time=time):
- now = time.time()
- int_part = int(now)
- frac_part = int((now - int_part) * sys.maxint)
- stamp = struct.pack(">QQ", int_part, frac_part).encode('hex')
- return stamp
-
-def timestamp_to_time(stamp):
- try:
- binary = stamp.decode('hex')
- except TypeError:
- return None
- try:
- int_part, frac_part = struct.unpack('>QQ', binary)
- except struct.error:
- return None
- frac_part = float(frac_part / sys.maxint)
- time = int_part + frac_part
- return time
+ def _get_tamper_key(self, environ):
+ key = self.secret_key
+ for name in self.vary:
+ key = key + environ.get(name, '')
+ return key
+
+ def tampered(self, environ, browser_id):
+ try:
+ component, provided_h = browser_id.split('!')
+ except ValueError:
+ return True
+ key = self._get_tamper_key(environ)
+ computed_h = hmac.new(key, component).hexdigest()
+ return computed_h != provided_h
+
+ def make_browser_id(self, when, environ):
+ """ Returns opaque browser id
+
+ An example is: XXX
+ """
+ rand = self.get_rand_for(when)
+ source = '%s%s%s' % (rand, when, self.pid)
+ component = sha.new(source).hexdigest()
+ key = self._get_tamper_key(environ)
+ h = hmac.new(key, component).hexdigest()
+ browser_id = '%s!%s' % (component, h)
+ return browser_id
+
+ def get_rand_for(self, when):
+ """
+ There is a good chance that two simultaneous callers will
+ obtain the same random number when the system first starts, as
+ all Python threads/interpreters will start with the same
+ random seed (the time) when they come up on platforms that
+ dont have an entropy generator.
+
+ We'd really like to be sure that two callers never get the
+ same browser id, so this is a problem. But since our browser
+ id has a time component and a random component, the random
+ component only needs to be unique within the resolution of the
+ time component to ensure browser id uniqueness.
+
+ We keep around a set of recently-generated random numbers at a
+ global scope for the past second, only returning numbers that
+ aren't in this set. The lowest-known-resolution time.time
+ timer is on Windows, which changes 18.2 times per second, so
+ using a period of one second should be conservative enough.
+ """
+ period = 1
+ this_period = int(when - (when % period))
+ _LOCK.acquire()
+ try:
+ while 1:
+ rand = self.randint(0, 99999999)
+ global _CURRENT_PERIOD
+ if this_period != _CURRENT_PERIOD:
+ _CURRENT_PERIOD = this_period
+ _RANDS[:] = []
+ if rand not in _RANDS:
+ _RANDS.append(rand)
+ return rand
+ finally:
+ _LOCK.release()
class StartResponseWrapper(object):
def __init__(self, start_response):
@@ -124,3 +192,4 @@
write(value)
if hasattr(write, 'close'):
write.close()
+
Modified: repoze.browserid/trunk/repoze/browserid/tests.py
==============================================================================
--- repoze.browserid/trunk/repoze/browserid/tests.py (original)
+++ repoze.browserid/trunk/repoze/browserid/tests.py Fri Mar 21 14:00:38 2008
@@ -1,121 +1,174 @@
import unittest
+_DEFAULT_BID = """\
+e193a01ecf8d30ad0affefd332ce934e32ffce72!1f2115dde0ba7312bdc94942e227666c
+"""
+
+# see the "d" at the end?
+_BAD_BID = """\
+e193a01ecf8d30ad0affefd332ce934e32ffce72!1f2115dde0ba7312bdc94942e227666d
+"""
+
class TestBrowserIdMiddleware(unittest.TestCase):
def _getTargetClass(self):
from repoze.browserid.middleware import BrowserIdMiddleware
return BrowserIdMiddleware
+ def tearDown(self):
+ import repoze.browserid.middleware
+ repoze.browserid.middleware._RANDS[:] = []
+ repoze.browserid.middleware._CURRENT_PERIOD = None
+ self.headers = None
+ self.status = None
+ self.exc_info = None
+
+ def _assertBrowserId(self, browser_id,
+ rand=0, when=0, pid=1, secret='secret'):
+ import sha
+ import hmac
+ component = '%s%s%s' % (rand, when, pid)
+ component = sha.new(component).hexdigest()
+ hmac = hmac.new(secret, component).hexdigest()
+ self.assertEqual(browser_id, '%s!%s' % (component, hmac))
+
def _makeOne(self, *arg, **kw):
klass = self._getTargetClass()
app = DummyApp()
- return klass(app, *arg, **kw)
+ mw = klass(app, *arg, **kw)
+ mw.randint = lambda *arg: 0
+ mw.time = lambda *arg: 0
+ mw.pid = 1
+ return mw
+
+ def _start_response(self, status, headers, exc_info=None):
+ self.status = status
+ self.headers = headers
+ self.exc_info = exc_info
+
+ def _get_cookie_components(self, cookie_str):
+ components = [ x.strip() for x in cookie_str.rstrip().split(';') if x ]
+ return components
def test_defaults_nocookie(self):
- middleware = self._makeOne('thecookiename')
+ middleware = self._makeOne('secret', 'thecookiename')
environ = {}
- headerses = []
- def start_response(status, headers, exc_info=None):
- headerses.append(headers)
- import time
- dummy_t = DummyTime(0, time.gmtime(0), '<timestr>')
- dummy_random = DummyRandom(0)
- app_iter = middleware(environ, start_response, time=dummy_t,
- random=dummy_random)
- headers = headerses[0]
- self.assertEqual(len(headers), 1)
- header = headers[0]
- self.assertEqual(header[0], 'Set-Cookie')
- self.assertEqual(header[1], 'thecookiename=' + '0' * 40 + '; Path=/; ')
-
- def test_defaults_withcookie(self):
- middleware = self._makeOne('thecookiename')
- environ = {'HTTP_COOKIE':'thecookiename=1'}
- headerses = []
- def start_response(status, headers, exc_info=None):
- headerses.append(headers)
- result = middleware(environ, start_response)
- self.assertEqual(headerses, [[]])
-
-class TestMakeBrowserId(unittest.TestCase):
- def _getFUT(self):
- from repoze.browserid.middleware import make_browser_id
- return make_browser_id
-
- def test_it(self):
- f = self._getFUT()
- time = DummyTime(0)
- random = DummyRandom(0)
- browser_id = f(time, random)
- self.assertEqual(browser_id, '0' * 40)
-
-class TestMakeTimestamp(unittest.TestCase):
- def _getFUT(self):
- from repoze.browserid.middleware import make_timestamp
- return make_timestamp
-
- def test_zero(self):
- t = DummyTime(0)
- f = self._getFUT()
- ts = f(t)
- self.assertEqual(ts, '0'*32)
-
- def test_mah_birthday(self):
- import time
- mah_birthday = (1971, 5, 10, 0, 0, 0, 0, 0, -1)
- mah_birthday = time.mktime(mah_birthday)
- t = DummyTime(mah_birthday)
- f = self._getFUT()
- ts = f(t)
- self.assertEqual(ts, '00000000028b7d400000000000000000')
-
-class TestTimestampToTime(unittest.TestCase):
- def _getFUT(self):
- from repoze.browserid.middleware import timestamp_to_time
- return timestamp_to_time
-
- def test_zero(self):
- f = self._getFUT()
- stamp = '0' * 32
- t = f(stamp)
- self.assertEqual(t, 0)
-
- def test_mah_birthday(self):
+ app_iter = middleware(environ, self._start_response)
+ self.assertEqual(len(self.headers), 1)
+ header = self.headers[0]
+ header_name, header_val = header
+ self.assertEqual(header_name, 'Set-Cookie')
+ cookie_val, path = self._get_cookie_components(header_val)
+ name, cookie = cookie_val.split('=')
+ self.assertEqual(name, 'thecookiename')
+ self._assertBrowserId(cookie)
+ self.assertEqual(path, 'Path=/')
+ self.assertEqual(app_iter, [])
+
+ def test_cookie_path(self):
+ middleware = self._makeOne('secret', 'thecookiename',
+ cookie_path='/subpath')
+ environ = {}
+ app_iter = middleware(environ, self._start_response)
+ self.assertEqual(len(self.headers), 1)
+ header = self.headers[0]
+ header_name, header_val = header
+ self.assertEqual(header_name, 'Set-Cookie')
+ cookie_val, path = self._get_cookie_components(header_val)
+ name, cookie = cookie_val.split('=')
+ self.assertEqual(name, 'thecookiename')
+ self._assertBrowserId(cookie)
+ self.assertEqual(path, 'Path=/subpath')
+ self.assertEqual(app_iter, [])
+
+ def test_cookie_domain(self):
+ middleware = self._makeOne('secret', 'thecookiename',
+ cookie_domain='repoze.org')
+ environ = {}
+ app_iter = middleware(environ, self._start_response)
+ self.assertEqual(len(self.headers), 1)
+ header = self.headers[0]
+ header_name, header_val = header
+ self.assertEqual(header_name, 'Set-Cookie')
+ cookie_val, path, domain = self._get_cookie_components(header_val)
+ name, cookie = cookie_val.split('=')
+ self.assertEqual(name, 'thecookiename')
+ self._assertBrowserId(cookie)
+ self.assertEqual(path, 'Path=/')
+ self.assertEqual(domain, 'Domain=repoze.org')
+ self.assertEqual(app_iter, [])
+
+ def test_cookie_lifetime(self):
+ lifetime = 86400
+ middleware = self._makeOne('secret', 'thecookiename',
+ cookie_lifetime=lifetime)
+ environ = {}
+ app_iter = middleware(environ, self._start_response)
+ self.assertEqual(len(self.headers), 1)
+ header = self.headers[0]
+ header_name, header_val = header
+ self.assertEqual(header_name, 'Set-Cookie')
+ cookie_val, path, expiresh = self._get_cookie_components(header_val)
+ name, cookie = cookie_val.split('=')
+ self.assertEqual(name, 'thecookiename')
+ self._assertBrowserId(cookie)
+ self.assertEqual(path, 'Path=/')
import time
- mah_birthday = (1971, 5, 10, 0, 0, 0, 0, 0, -1)
- mah_birthday = time.mktime(mah_birthday)
- f = self._getFUT()
- stamp = '00000000028b7d400000000000000000'
- t = f(stamp)
- self.assertEqual(t, mah_birthday)
-
- def test_hexdecode_bad(self):
- f = self._getFUT()
- stamp = 'bogus'
- t = f(stamp)
- self.assertEqual(t, None)
-
- def test_unpack_bad(self):
- f = self._getFUT()
- stamp = '0' * 16 # not long enough
- t = f(stamp)
- self.assertEqual(t, None)
-
-class TestTimeFromBrowserId(unittest.TestCase):
- def _getFUT(self):
- from repoze.browserid.middleware import time_from_browser_id
- return time_from_browser_id
-
- def test_working(self):
- browser_id = '0' * 40
- f = self._getFUT()
- t = f(browser_id)
- self.assertEqual(t, 0)
-
- def test_bogus(self):
- f = self._getFUT()
- stamp = '0' * 16 # not long enough
- t = f(stamp)
- self.assertEqual(t, None)
+ expires = time.gmtime(lifetime)
+ expires = time.strftime('%a %d-%b-%Y %H:%M:%S GMT', expires)
+ self.assertEqual(expiresh, 'Expires=%s' % expires)
+ self.assertEqual(app_iter, [])
+
+ def test_cookie_secure(self):
+ middleware = self._makeOne('secret', 'thecookiename',
+ cookie_secure=True)
+ environ = {}
+ app_iter = middleware(environ, self._start_response)
+ self.assertEqual(len(self.headers), 1)
+ header = self.headers[0]
+ header_name, header_val = header
+ self.assertEqual(header_name, 'Set-Cookie')
+ cookie_val, path, secure = self._get_cookie_components(header_val)
+ name, cookie = cookie_val.split('=')
+ self.assertEqual(name, 'thecookiename')
+ self._assertBrowserId(cookie)
+ self.assertEqual(path, 'Path=/')
+ self.assertEqual(secure, 'Secure')
+ self.assertEqual(app_iter, [])
+
+ def test_defaults_withcookie_untampered(self):
+ middleware = self._makeOne('secret', 'thecookiename')
+ environ = {'HTTP_COOKIE':'thecookiename=%s; Path=/;' % _DEFAULT_BID}
+ result = middleware(environ, self._start_response)
+ self.assertEqual(self.headers, [])
+ self.assertEqual(result, [])
+
+ def test_defaults_withcookie_tampered_badformat(self):
+ middleware = self._makeOne('secret', 'thecookiename')
+ environ = {'HTTP_COOKIE':'thecookiename=bad; Path=/;'}
+ result = middleware(environ, self._start_response)
+ # headers were set because the format of the browser id was bad
+ self.assertEqual(len(self.headers), 1)
+ self.assertEqual(result, [])
+
+ def test_defaults_withcookie_tampered_badhmac(self):
+ middleware = self._makeOne('secret', 'thecookiename')
+ environ = {'HTTP_COOKIE':'thecookiename=%s; Path=/;' % _BAD_BID}
+ result = middleware(environ, self._start_response)
+ # headers were set because the hmac of the browser id was wrong
+ self.assertEqual(len(self.headers), 1)
+ self.assertEqual(result, [])
+
+ def test_make_browser_id(self):
+ middleware = self._makeOne('secret', 'thecookiename')
+ browser_id = middleware.make_browser_id(0, {})
+ self._assertBrowserId(browser_id)
+
+ def test_make_browser_id_vary(self):
+ middleware = self._makeOne('secret', 'thecookiename')
+ middleware.vary = ('REMOTE_ADDR', 'HTTP_USER_AGENT', 'NONEXISTENT')
+ environ = {'REMOTE_ADDR':'127.0.0.1', 'HTTP_USER_AGENT':'Fluzbox'}
+ browser_id = middleware.make_browser_id(0, environ)
+ self._assertBrowserId(browser_id, secret='secret127.0.0.1Fluzbox')
class TestStartResponseWrapper(unittest.TestCase):
def _getTargetClass(self):
More information about the Repoze-checkins
mailing list