auth_test.py
Go to the documentation of this file.
00001 # These tests do not currently do much to verify the correct implementation
00002 # of the openid/oauth protocols, they just exercise the major code paths
00003 # and ensure that it doesn't blow up (e.g. with unicode/bytes issues in
00004 # python 3)
00005 
00006 
00007 from __future__ import absolute_import, division, print_function, with_statement
00008 from tornado.auth import OpenIdMixin, OAuthMixin, OAuth2Mixin, TwitterMixin, GoogleMixin, AuthError
00009 from tornado.concurrent import Future
00010 from tornado.escape import json_decode
00011 from tornado import gen
00012 from tornado.log import gen_log
00013 from tornado.testing import AsyncHTTPTestCase, ExpectLog
00014 from tornado.util import u
00015 from tornado.web import RequestHandler, Application, asynchronous, HTTPError
00016 
00017 
00018 class OpenIdClientLoginHandler(RequestHandler, OpenIdMixin):
00019     def initialize(self, test):
00020         self._OPENID_ENDPOINT = test.get_url('/openid/server/authenticate')
00021 
00022     @asynchronous
00023     def get(self):
00024         if self.get_argument('openid.mode', None):
00025             self.get_authenticated_user(
00026                 self.on_user, http_client=self.settings['http_client'])
00027             return
00028         res = self.authenticate_redirect()
00029         assert isinstance(res, Future)
00030         assert res.done()
00031 
00032     def on_user(self, user):
00033         if user is None:
00034             raise Exception("user is None")
00035         self.finish(user)
00036 
00037 
00038 class OpenIdServerAuthenticateHandler(RequestHandler):
00039     def post(self):
00040         if self.get_argument('openid.mode') != 'check_authentication':
00041             raise Exception("incorrect openid.mode %r")
00042         self.write('is_valid:true')
00043 
00044 
00045 class OAuth1ClientLoginHandler(RequestHandler, OAuthMixin):
00046     def initialize(self, test, version):
00047         self._OAUTH_VERSION = version
00048         self._OAUTH_REQUEST_TOKEN_URL = test.get_url('/oauth1/server/request_token')
00049         self._OAUTH_AUTHORIZE_URL = test.get_url('/oauth1/server/authorize')
00050         self._OAUTH_ACCESS_TOKEN_URL = test.get_url('/oauth1/server/access_token')
00051 
00052     def _oauth_consumer_token(self):
00053         return dict(key='asdf', secret='qwer')
00054 
00055     @asynchronous
00056     def get(self):
00057         if self.get_argument('oauth_token', None):
00058             self.get_authenticated_user(
00059                 self.on_user, http_client=self.settings['http_client'])
00060             return
00061         res = self.authorize_redirect(http_client=self.settings['http_client'])
00062         assert isinstance(res, Future)
00063 
00064     def on_user(self, user):
00065         if user is None:
00066             raise Exception("user is None")
00067         self.finish(user)
00068 
00069     def _oauth_get_user(self, access_token, callback):
00070         if self.get_argument('fail_in_get_user', None):
00071             raise Exception("failing in get_user")
00072         if access_token != dict(key='uiop', secret='5678'):
00073             raise Exception("incorrect access token %r" % access_token)
00074         callback(dict(email='foo@example.com'))
00075 
00076 
00077 class OAuth1ClientLoginCoroutineHandler(OAuth1ClientLoginHandler):
00078     """Replaces OAuth1ClientLoginCoroutineHandler's get() with a coroutine."""
00079     @gen.coroutine
00080     def get(self):
00081         if self.get_argument('oauth_token', None):
00082             # Ensure that any exceptions are set on the returned Future,
00083             # not simply thrown into the surrounding StackContext.
00084             try:
00085                 yield self.get_authenticated_user()
00086             except Exception as e:
00087                 self.set_status(503)
00088                 self.write("got exception: %s" % e)
00089         else:
00090             yield self.authorize_redirect()
00091 
00092 
00093 class OAuth1ClientRequestParametersHandler(RequestHandler, OAuthMixin):
00094     def initialize(self, version):
00095         self._OAUTH_VERSION = version
00096 
00097     def _oauth_consumer_token(self):
00098         return dict(key='asdf', secret='qwer')
00099 
00100     def get(self):
00101         params = self._oauth_request_parameters(
00102             'http://www.example.com/api/asdf',
00103             dict(key='uiop', secret='5678'),
00104             parameters=dict(foo='bar'))
00105         self.write(params)
00106 
00107 
00108 class OAuth1ServerRequestTokenHandler(RequestHandler):
00109     def get(self):
00110         self.write('oauth_token=zxcv&oauth_token_secret=1234')
00111 
00112 
00113 class OAuth1ServerAccessTokenHandler(RequestHandler):
00114     def get(self):
00115         self.write('oauth_token=uiop&oauth_token_secret=5678')
00116 
00117 
00118 class OAuth2ClientLoginHandler(RequestHandler, OAuth2Mixin):
00119     def initialize(self, test):
00120         self._OAUTH_AUTHORIZE_URL = test.get_url('/oauth2/server/authorize')
00121 
00122     def get(self):
00123         res = self.authorize_redirect()
00124         assert isinstance(res, Future)
00125         assert res.done()
00126 
00127 
00128 class TwitterClientHandler(RequestHandler, TwitterMixin):
00129     def initialize(self, test):
00130         self._OAUTH_REQUEST_TOKEN_URL = test.get_url('/oauth1/server/request_token')
00131         self._OAUTH_ACCESS_TOKEN_URL = test.get_url('/twitter/server/access_token')
00132         self._OAUTH_AUTHORIZE_URL = test.get_url('/oauth1/server/authorize')
00133         self._TWITTER_BASE_URL = test.get_url('/twitter/api')
00134 
00135     def get_auth_http_client(self):
00136         return self.settings['http_client']
00137 
00138 
00139 class TwitterClientLoginHandler(TwitterClientHandler):
00140     @asynchronous
00141     def get(self):
00142         if self.get_argument("oauth_token", None):
00143             self.get_authenticated_user(self.on_user)
00144             return
00145         self.authorize_redirect()
00146 
00147     def on_user(self, user):
00148         if user is None:
00149             raise Exception("user is None")
00150         self.finish(user)
00151 
00152 
00153 class TwitterClientLoginGenEngineHandler(TwitterClientHandler):
00154     @asynchronous
00155     @gen.engine
00156     def get(self):
00157         if self.get_argument("oauth_token", None):
00158             user = yield self.get_authenticated_user()
00159             self.finish(user)
00160         else:
00161             # Old style: with @gen.engine we can ignore the Future from
00162             # authorize_redirect.
00163             self.authorize_redirect()
00164 
00165 
00166 class TwitterClientLoginGenCoroutineHandler(TwitterClientHandler):
00167     @gen.coroutine
00168     def get(self):
00169         if self.get_argument("oauth_token", None):
00170             user = yield self.get_authenticated_user()
00171             self.finish(user)
00172         else:
00173             # New style: with @gen.coroutine the result must be yielded
00174             # or else the request will be auto-finished too soon.
00175             yield self.authorize_redirect()
00176 
00177 
00178 class TwitterClientShowUserHandler(TwitterClientHandler):
00179     @asynchronous
00180     @gen.engine
00181     def get(self):
00182         # TODO: would be nice to go through the login flow instead of
00183         # cheating with a hard-coded access token.
00184         response = yield gen.Task(self.twitter_request,
00185                                   '/users/show/%s' % self.get_argument('name'),
00186                                   access_token=dict(key='hjkl', secret='vbnm'))
00187         if response is None:
00188             self.set_status(500)
00189             self.finish('error from twitter request')
00190         else:
00191             self.finish(response)
00192 
00193 
00194 class TwitterClientShowUserFutureHandler(TwitterClientHandler):
00195     @asynchronous
00196     @gen.engine
00197     def get(self):
00198         try:
00199             response = yield self.twitter_request(
00200                 '/users/show/%s' % self.get_argument('name'),
00201                 access_token=dict(key='hjkl', secret='vbnm'))
00202         except AuthError as e:
00203             self.set_status(500)
00204             self.finish(str(e))
00205             return
00206         assert response is not None
00207         self.finish(response)
00208 
00209 
00210 class TwitterServerAccessTokenHandler(RequestHandler):
00211     def get(self):
00212         self.write('oauth_token=hjkl&oauth_token_secret=vbnm&screen_name=foo')
00213 
00214 
00215 class TwitterServerShowUserHandler(RequestHandler):
00216     def get(self, screen_name):
00217         if screen_name == 'error':
00218             raise HTTPError(500)
00219         assert 'oauth_nonce' in self.request.arguments
00220         assert 'oauth_timestamp' in self.request.arguments
00221         assert 'oauth_signature' in self.request.arguments
00222         assert self.get_argument('oauth_consumer_key') == 'test_twitter_consumer_key'
00223         assert self.get_argument('oauth_signature_method') == 'HMAC-SHA1'
00224         assert self.get_argument('oauth_version') == '1.0'
00225         assert self.get_argument('oauth_token') == 'hjkl'
00226         self.write(dict(screen_name=screen_name, name=screen_name.capitalize()))
00227 
00228 
00229 class TwitterServerVerifyCredentialsHandler(RequestHandler):
00230     def get(self):
00231         assert 'oauth_nonce' in self.request.arguments
00232         assert 'oauth_timestamp' in self.request.arguments
00233         assert 'oauth_signature' in self.request.arguments
00234         assert self.get_argument('oauth_consumer_key') == 'test_twitter_consumer_key'
00235         assert self.get_argument('oauth_signature_method') == 'HMAC-SHA1'
00236         assert self.get_argument('oauth_version') == '1.0'
00237         assert self.get_argument('oauth_token') == 'hjkl'
00238         self.write(dict(screen_name='foo', name='Foo'))
00239 
00240 
00241 class GoogleOpenIdClientLoginHandler(RequestHandler, GoogleMixin):
00242     def initialize(self, test):
00243         self._OPENID_ENDPOINT = test.get_url('/openid/server/authenticate')
00244 
00245     @asynchronous
00246     def get(self):
00247         if self.get_argument("openid.mode", None):
00248             self.get_authenticated_user(self.on_user)
00249             return
00250         res = self.authenticate_redirect()
00251         assert isinstance(res, Future)
00252         assert res.done()
00253 
00254     def on_user(self, user):
00255         if user is None:
00256             raise Exception("user is None")
00257         self.finish(user)
00258 
00259     def get_auth_http_client(self):
00260         return self.settings['http_client']
00261 
00262 
00263 class AuthTest(AsyncHTTPTestCase):
00264     def get_app(self):
00265         return Application(
00266             [
00267                 # test endpoints
00268                 ('/openid/client/login', OpenIdClientLoginHandler, dict(test=self)),
00269                 ('/oauth10/client/login', OAuth1ClientLoginHandler,
00270                  dict(test=self, version='1.0')),
00271                 ('/oauth10/client/request_params',
00272                  OAuth1ClientRequestParametersHandler,
00273                  dict(version='1.0')),
00274                 ('/oauth10a/client/login', OAuth1ClientLoginHandler,
00275                  dict(test=self, version='1.0a')),
00276                 ('/oauth10a/client/login_coroutine',
00277                  OAuth1ClientLoginCoroutineHandler,
00278                  dict(test=self, version='1.0a')),
00279                 ('/oauth10a/client/request_params',
00280                  OAuth1ClientRequestParametersHandler,
00281                  dict(version='1.0a')),
00282                 ('/oauth2/client/login', OAuth2ClientLoginHandler, dict(test=self)),
00283 
00284                 ('/twitter/client/login', TwitterClientLoginHandler, dict(test=self)),
00285                 ('/twitter/client/login_gen_engine', TwitterClientLoginGenEngineHandler, dict(test=self)),
00286                 ('/twitter/client/login_gen_coroutine', TwitterClientLoginGenCoroutineHandler, dict(test=self)),
00287                 ('/twitter/client/show_user', TwitterClientShowUserHandler, dict(test=self)),
00288                 ('/twitter/client/show_user_future', TwitterClientShowUserFutureHandler, dict(test=self)),
00289                 ('/google/client/openid_login', GoogleOpenIdClientLoginHandler, dict(test=self)),
00290 
00291                 # simulated servers
00292                 ('/openid/server/authenticate', OpenIdServerAuthenticateHandler),
00293                 ('/oauth1/server/request_token', OAuth1ServerRequestTokenHandler),
00294                 ('/oauth1/server/access_token', OAuth1ServerAccessTokenHandler),
00295 
00296                 ('/twitter/server/access_token', TwitterServerAccessTokenHandler),
00297                 (r'/twitter/api/users/show/(.*)\.json', TwitterServerShowUserHandler),
00298                 (r'/twitter/api/account/verify_credentials\.json', TwitterServerVerifyCredentialsHandler),
00299             ],
00300             http_client=self.http_client,
00301             twitter_consumer_key='test_twitter_consumer_key',
00302             twitter_consumer_secret='test_twitter_consumer_secret')
00303 
00304     def test_openid_redirect(self):
00305         response = self.fetch('/openid/client/login', follow_redirects=False)
00306         self.assertEqual(response.code, 302)
00307         self.assertTrue(
00308             '/openid/server/authenticate?' in response.headers['Location'])
00309 
00310     def test_openid_get_user(self):
00311         response = self.fetch('/openid/client/login?openid.mode=blah&openid.ns.ax=http://openid.net/srv/ax/1.0&openid.ax.type.email=http://axschema.org/contact/email&openid.ax.value.email=foo@example.com')
00312         response.rethrow()
00313         parsed = json_decode(response.body)
00314         self.assertEqual(parsed["email"], "foo@example.com")
00315 
00316     def test_oauth10_redirect(self):
00317         response = self.fetch('/oauth10/client/login', follow_redirects=False)
00318         self.assertEqual(response.code, 302)
00319         self.assertTrue(response.headers['Location'].endswith(
00320             '/oauth1/server/authorize?oauth_token=zxcv'))
00321         # the cookie is base64('zxcv')|base64('1234')
00322         self.assertTrue(
00323             '_oauth_request_token="enhjdg==|MTIzNA=="' in response.headers['Set-Cookie'],
00324             response.headers['Set-Cookie'])
00325 
00326     def test_oauth10_get_user(self):
00327         response = self.fetch(
00328             '/oauth10/client/login?oauth_token=zxcv',
00329             headers={'Cookie': '_oauth_request_token=enhjdg==|MTIzNA=='})
00330         response.rethrow()
00331         parsed = json_decode(response.body)
00332         self.assertEqual(parsed['email'], 'foo@example.com')
00333         self.assertEqual(parsed['access_token'], dict(key='uiop', secret='5678'))
00334 
00335     def test_oauth10_request_parameters(self):
00336         response = self.fetch('/oauth10/client/request_params')
00337         response.rethrow()
00338         parsed = json_decode(response.body)
00339         self.assertEqual(parsed['oauth_consumer_key'], 'asdf')
00340         self.assertEqual(parsed['oauth_token'], 'uiop')
00341         self.assertTrue('oauth_nonce' in parsed)
00342         self.assertTrue('oauth_signature' in parsed)
00343 
00344     def test_oauth10a_redirect(self):
00345         response = self.fetch('/oauth10a/client/login', follow_redirects=False)
00346         self.assertEqual(response.code, 302)
00347         self.assertTrue(response.headers['Location'].endswith(
00348             '/oauth1/server/authorize?oauth_token=zxcv'))
00349         # the cookie is base64('zxcv')|base64('1234')
00350         self.assertTrue(
00351             '_oauth_request_token="enhjdg==|MTIzNA=="' in response.headers['Set-Cookie'],
00352             response.headers['Set-Cookie'])
00353 
00354     def test_oauth10a_get_user(self):
00355         response = self.fetch(
00356             '/oauth10a/client/login?oauth_token=zxcv',
00357             headers={'Cookie': '_oauth_request_token=enhjdg==|MTIzNA=='})
00358         response.rethrow()
00359         parsed = json_decode(response.body)
00360         self.assertEqual(parsed['email'], 'foo@example.com')
00361         self.assertEqual(parsed['access_token'], dict(key='uiop', secret='5678'))
00362 
00363     def test_oauth10a_request_parameters(self):
00364         response = self.fetch('/oauth10a/client/request_params')
00365         response.rethrow()
00366         parsed = json_decode(response.body)
00367         self.assertEqual(parsed['oauth_consumer_key'], 'asdf')
00368         self.assertEqual(parsed['oauth_token'], 'uiop')
00369         self.assertTrue('oauth_nonce' in parsed)
00370         self.assertTrue('oauth_signature' in parsed)
00371 
00372     def test_oauth10a_get_user_coroutine_exception(self):
00373         response = self.fetch(
00374             '/oauth10a/client/login_coroutine?oauth_token=zxcv&fail_in_get_user=true',
00375             headers={'Cookie': '_oauth_request_token=enhjdg==|MTIzNA=='})
00376         self.assertEqual(response.code, 503)
00377 
00378     def test_oauth2_redirect(self):
00379         response = self.fetch('/oauth2/client/login', follow_redirects=False)
00380         self.assertEqual(response.code, 302)
00381         self.assertTrue('/oauth2/server/authorize?' in response.headers['Location'])
00382 
00383     def base_twitter_redirect(self, url):
00384         # Same as test_oauth10a_redirect
00385         response = self.fetch(url, follow_redirects=False)
00386         self.assertEqual(response.code, 302)
00387         self.assertTrue(response.headers['Location'].endswith(
00388             '/oauth1/server/authorize?oauth_token=zxcv'))
00389         # the cookie is base64('zxcv')|base64('1234')
00390         self.assertTrue(
00391             '_oauth_request_token="enhjdg==|MTIzNA=="' in response.headers['Set-Cookie'],
00392             response.headers['Set-Cookie'])
00393 
00394     def test_twitter_redirect(self):
00395         self.base_twitter_redirect('/twitter/client/login')
00396 
00397     def test_twitter_redirect_gen_engine(self):
00398         self.base_twitter_redirect('/twitter/client/login_gen_engine')
00399 
00400     def test_twitter_redirect_gen_coroutine(self):
00401         self.base_twitter_redirect('/twitter/client/login_gen_coroutine')
00402 
00403     def test_twitter_get_user(self):
00404         response = self.fetch(
00405             '/twitter/client/login?oauth_token=zxcv',
00406             headers={'Cookie': '_oauth_request_token=enhjdg==|MTIzNA=='})
00407         response.rethrow()
00408         parsed = json_decode(response.body)
00409         self.assertEqual(parsed,
00410                          {u('access_token'): {u('key'): u('hjkl'),
00411                                               u('screen_name'): u('foo'),
00412                                               u('secret'): u('vbnm')},
00413                           u('name'): u('Foo'),
00414                           u('screen_name'): u('foo'),
00415                           u('username'): u('foo')})
00416 
00417     def test_twitter_show_user(self):
00418         response = self.fetch('/twitter/client/show_user?name=somebody')
00419         response.rethrow()
00420         self.assertEqual(json_decode(response.body),
00421                          {'name': 'Somebody', 'screen_name': 'somebody'})
00422 
00423     def test_twitter_show_user_error(self):
00424         with ExpectLog(gen_log, 'Error response HTTP 500'):
00425             response = self.fetch('/twitter/client/show_user?name=error')
00426         self.assertEqual(response.code, 500)
00427         self.assertEqual(response.body, b'error from twitter request')
00428 
00429     def test_twitter_show_user_future(self):
00430         response = self.fetch('/twitter/client/show_user_future?name=somebody')
00431         response.rethrow()
00432         self.assertEqual(json_decode(response.body),
00433                          {'name': 'Somebody', 'screen_name': 'somebody'})
00434 
00435     def test_twitter_show_user_future_error(self):
00436         response = self.fetch('/twitter/client/show_user_future?name=error')
00437         self.assertEqual(response.code, 500)
00438         self.assertIn(b'Error response HTTP 500', response.body)
00439 
00440     def test_google_redirect(self):
00441         # same as test_openid_redirect
00442         response = self.fetch('/google/client/openid_login', follow_redirects=False)
00443         self.assertEqual(response.code, 302)
00444         self.assertTrue(
00445             '/openid/server/authenticate?' in response.headers['Location'])
00446 
00447     def test_google_get_user(self):
00448         response = self.fetch('/google/client/openid_login?openid.mode=blah&openid.ns.ax=http://openid.net/srv/ax/1.0&openid.ax.type.email=http://axschema.org/contact/email&openid.ax.value.email=foo@example.com', follow_redirects=False)
00449         response.rethrow()
00450         parsed = json_decode(response.body)
00451         self.assertEqual(parsed["email"], "foo@example.com")


rosbridge_server
Author(s): Jonathan Mace
autogenerated on Thu Jun 6 2019 21:51:50