auth_test.py
Go to the documentation of this file.
00001 # These tests do not currently do much to verify the correct implementation
00002 # of the openid/oauth protocols, they just exercise the major code paths
00003 # and ensure that it doesn't blow up (e.g. with unicode/bytes issues in
00004 # python 3)
00005 
00006 
00007 from __future__ import absolute_import, division, with_statement
00008 from tornado.auth import OpenIdMixin, OAuthMixin, OAuth2Mixin
00009 from tornado.escape import json_decode
00010 from tornado.testing import AsyncHTTPTestCase, LogTrapTestCase
00011 from tornado.util import b
00012 from tornado.web import RequestHandler, Application, asynchronous
00013 
00014 
00015 class OpenIdClientLoginHandler(RequestHandler, OpenIdMixin):
00016     def initialize(self, test):
00017         self._OPENID_ENDPOINT = test.get_url('/openid/server/authenticate')
00018 
00019     @asynchronous
00020     def get(self):
00021         if self.get_argument('openid.mode', None):
00022             self.get_authenticated_user(
00023                 self.on_user, http_client=self.settings['http_client'])
00024             return
00025         self.authenticate_redirect()
00026 
00027     def on_user(self, user):
00028         assert user is not None
00029         self.finish(user)
00030 
00031 
00032 class OpenIdServerAuthenticateHandler(RequestHandler):
00033     def post(self):
00034         assert self.get_argument('openid.mode') == 'check_authentication'
00035         self.write('is_valid:true')
00036 
00037 
00038 class OAuth1ClientLoginHandler(RequestHandler, OAuthMixin):
00039     def initialize(self, test, version):
00040         self._OAUTH_VERSION = version
00041         self._OAUTH_REQUEST_TOKEN_URL = test.get_url('/oauth1/server/request_token')
00042         self._OAUTH_AUTHORIZE_URL = test.get_url('/oauth1/server/authorize')
00043         self._OAUTH_ACCESS_TOKEN_URL = test.get_url('/oauth1/server/access_token')
00044 
00045     def _oauth_consumer_token(self):
00046         return dict(key='asdf', secret='qwer')
00047 
00048     @asynchronous
00049     def get(self):
00050         if self.get_argument('oauth_token', None):
00051             self.get_authenticated_user(
00052                 self.on_user, http_client=self.settings['http_client'])
00053             return
00054         self.authorize_redirect(http_client=self.settings['http_client'])
00055 
00056     def on_user(self, user):
00057         assert user is not None
00058         self.finish(user)
00059 
00060     def _oauth_get_user(self, access_token, callback):
00061         assert access_token == dict(key=b('uiop'), secret=b('5678')), access_token
00062         callback(dict(email='foo@example.com'))
00063 
00064 
00065 class OAuth1ClientRequestParametersHandler(RequestHandler, OAuthMixin):
00066     def initialize(self, version):
00067         self._OAUTH_VERSION = version
00068 
00069     def _oauth_consumer_token(self):
00070         return dict(key='asdf', secret='qwer')
00071 
00072     def get(self):
00073         params = self._oauth_request_parameters(
00074             'http://www.example.com/api/asdf',
00075             dict(key='uiop', secret='5678'),
00076             parameters=dict(foo='bar'))
00077         import urllib
00078         urllib.urlencode(params)
00079         self.write(params)
00080 
00081 
00082 class OAuth1ServerRequestTokenHandler(RequestHandler):
00083     def get(self):
00084         self.write('oauth_token=zxcv&oauth_token_secret=1234')
00085 
00086 
00087 class OAuth1ServerAccessTokenHandler(RequestHandler):
00088     def get(self):
00089         self.write('oauth_token=uiop&oauth_token_secret=5678')
00090 
00091 
00092 class OAuth2ClientLoginHandler(RequestHandler, OAuth2Mixin):
00093     def initialize(self, test):
00094         self._OAUTH_AUTHORIZE_URL = test.get_url('/oauth2/server/authorize')
00095 
00096     def get(self):
00097         self.authorize_redirect()
00098 
00099 
00100 class AuthTest(AsyncHTTPTestCase, LogTrapTestCase):
00101     def get_app(self):
00102         return Application(
00103             [
00104                 # test endpoints
00105                 ('/openid/client/login', OpenIdClientLoginHandler, dict(test=self)),
00106                 ('/oauth10/client/login', OAuth1ClientLoginHandler,
00107                  dict(test=self, version='1.0')),
00108                 ('/oauth10/client/request_params',
00109                  OAuth1ClientRequestParametersHandler,
00110                  dict(version='1.0')),
00111                 ('/oauth10a/client/login', OAuth1ClientLoginHandler,
00112                  dict(test=self, version='1.0a')),
00113                 ('/oauth10a/client/request_params',
00114                  OAuth1ClientRequestParametersHandler,
00115                  dict(version='1.0a')),
00116                 ('/oauth2/client/login', OAuth2ClientLoginHandler, dict(test=self)),
00117 
00118                 # simulated servers
00119                 ('/openid/server/authenticate', OpenIdServerAuthenticateHandler),
00120                 ('/oauth1/server/request_token', OAuth1ServerRequestTokenHandler),
00121                 ('/oauth1/server/access_token', OAuth1ServerAccessTokenHandler),
00122                 ],
00123             http_client=self.http_client)
00124 
00125     def test_openid_redirect(self):
00126         response = self.fetch('/openid/client/login', follow_redirects=False)
00127         self.assertEqual(response.code, 302)
00128         self.assertTrue(
00129             '/openid/server/authenticate?' in response.headers['Location'])
00130 
00131     def test_openid_get_user(self):
00132         response = self.fetch('/openid/client/login?openid.mode=blah&openid.ns.ax=http://openid.net/srv/ax/1.0&openid.ax.type.email=http://axschema.org/contact/email&openid.ax.value.email=foo@example.com')
00133         response.rethrow()
00134         parsed = json_decode(response.body)
00135         self.assertEqual(parsed["email"], "foo@example.com")
00136 
00137     def test_oauth10_redirect(self):
00138         response = self.fetch('/oauth10/client/login', follow_redirects=False)
00139         self.assertEqual(response.code, 302)
00140         self.assertTrue(response.headers['Location'].endswith(
00141             '/oauth1/server/authorize?oauth_token=zxcv'))
00142         # the cookie is base64('zxcv')|base64('1234')
00143         self.assertTrue(
00144             '_oauth_request_token="enhjdg==|MTIzNA=="' in response.headers['Set-Cookie'],
00145             response.headers['Set-Cookie'])
00146 
00147     def test_oauth10_get_user(self):
00148         response = self.fetch(
00149             '/oauth10/client/login?oauth_token=zxcv',
00150             headers={'Cookie': '_oauth_request_token=enhjdg==|MTIzNA=='})
00151         response.rethrow()
00152         parsed = json_decode(response.body)
00153         self.assertEqual(parsed['email'], 'foo@example.com')
00154         self.assertEqual(parsed['access_token'], dict(key='uiop', secret='5678'))
00155 
00156     def test_oauth10_request_parameters(self):
00157         response = self.fetch('/oauth10/client/request_params')
00158         response.rethrow()
00159         parsed = json_decode(response.body)
00160         self.assertEqual(parsed['oauth_consumer_key'], 'asdf')
00161         self.assertEqual(parsed['oauth_token'], 'uiop')
00162         self.assertTrue('oauth_nonce' in parsed)
00163         self.assertTrue('oauth_signature' in parsed)
00164 
00165     def test_oauth10a_redirect(self):
00166         response = self.fetch('/oauth10a/client/login', follow_redirects=False)
00167         self.assertEqual(response.code, 302)
00168         self.assertTrue(response.headers['Location'].endswith(
00169             '/oauth1/server/authorize?oauth_token=zxcv'))
00170         # the cookie is base64('zxcv')|base64('1234')
00171         self.assertTrue(
00172             '_oauth_request_token="enhjdg==|MTIzNA=="' in response.headers['Set-Cookie'],
00173             response.headers['Set-Cookie'])
00174 
00175     def test_oauth10a_get_user(self):
00176         response = self.fetch(
00177             '/oauth10a/client/login?oauth_token=zxcv',
00178             headers={'Cookie': '_oauth_request_token=enhjdg==|MTIzNA=='})
00179         response.rethrow()
00180         parsed = json_decode(response.body)
00181         self.assertEqual(parsed['email'], 'foo@example.com')
00182         self.assertEqual(parsed['access_token'], dict(key='uiop', secret='5678'))
00183 
00184     def test_oauth10a_request_parameters(self):
00185         response = self.fetch('/oauth10a/client/request_params')
00186         response.rethrow()
00187         parsed = json_decode(response.body)
00188         self.assertEqual(parsed['oauth_consumer_key'], 'asdf')
00189         self.assertEqual(parsed['oauth_token'], 'uiop')
00190         self.assertTrue('oauth_nonce' in parsed)
00191         self.assertTrue('oauth_signature' in parsed)
00192 
00193     def test_oauth2_redirect(self):
00194         response = self.fetch('/oauth2/client/login', follow_redirects=False)
00195         self.assertEqual(response.code, 302)
00196         self.assertTrue('/oauth2/server/authorize?' in response.headers['Location'])


rosbridge_server
Author(s): Jonathan Mace
autogenerated on Thu Jan 2 2014 11:53:55