00001
00002
00003
00004
00005
00006
00007 from __future__ import absolute_import, division, print_function, with_statement
00008 from tornado.auth import OpenIdMixin, OAuthMixin, OAuth2Mixin, TwitterMixin, GoogleMixin, AuthError
00009 from tornado.concurrent import Future
00010 from tornado.escape import json_decode
00011 from tornado import gen
00012 from tornado.log import gen_log
00013 from tornado.testing import AsyncHTTPTestCase, ExpectLog
00014 from tornado.util import u
00015 from tornado.web import RequestHandler, Application, asynchronous, HTTPError
00016
00017
00018 class OpenIdClientLoginHandler(RequestHandler, OpenIdMixin):
00019 def initialize(self, test):
00020 self._OPENID_ENDPOINT = test.get_url('/openid/server/authenticate')
00021
00022 @asynchronous
00023 def get(self):
00024 if self.get_argument('openid.mode', None):
00025 self.get_authenticated_user(
00026 self.on_user, http_client=self.settings['http_client'])
00027 return
00028 res = self.authenticate_redirect()
00029 assert isinstance(res, Future)
00030 assert res.done()
00031
00032 def on_user(self, user):
00033 if user is None:
00034 raise Exception("user is None")
00035 self.finish(user)
00036
00037
00038 class OpenIdServerAuthenticateHandler(RequestHandler):
00039 def post(self):
00040 if self.get_argument('openid.mode') != 'check_authentication':
00041 raise Exception("incorrect openid.mode %r")
00042 self.write('is_valid:true')
00043
00044
00045 class OAuth1ClientLoginHandler(RequestHandler, OAuthMixin):
00046 def initialize(self, test, version):
00047 self._OAUTH_VERSION = version
00048 self._OAUTH_REQUEST_TOKEN_URL = test.get_url('/oauth1/server/request_token')
00049 self._OAUTH_AUTHORIZE_URL = test.get_url('/oauth1/server/authorize')
00050 self._OAUTH_ACCESS_TOKEN_URL = test.get_url('/oauth1/server/access_token')
00051
00052 def _oauth_consumer_token(self):
00053 return dict(key='asdf', secret='qwer')
00054
00055 @asynchronous
00056 def get(self):
00057 if self.get_argument('oauth_token', None):
00058 self.get_authenticated_user(
00059 self.on_user, http_client=self.settings['http_client'])
00060 return
00061 res = self.authorize_redirect(http_client=self.settings['http_client'])
00062 assert isinstance(res, Future)
00063
00064 def on_user(self, user):
00065 if user is None:
00066 raise Exception("user is None")
00067 self.finish(user)
00068
00069 def _oauth_get_user(self, access_token, callback):
00070 if self.get_argument('fail_in_get_user', None):
00071 raise Exception("failing in get_user")
00072 if access_token != dict(key='uiop', secret='5678'):
00073 raise Exception("incorrect access token %r" % access_token)
00074 callback(dict(email='foo@example.com'))
00075
00076
00077 class OAuth1ClientLoginCoroutineHandler(OAuth1ClientLoginHandler):
00078 """Replaces OAuth1ClientLoginCoroutineHandler's get() with a coroutine."""
00079 @gen.coroutine
00080 def get(self):
00081 if self.get_argument('oauth_token', None):
00082
00083
00084 try:
00085 yield self.get_authenticated_user()
00086 except Exception as e:
00087 self.set_status(503)
00088 self.write("got exception: %s" % e)
00089 else:
00090 yield self.authorize_redirect()
00091
00092
00093 class OAuth1ClientRequestParametersHandler(RequestHandler, OAuthMixin):
00094 def initialize(self, version):
00095 self._OAUTH_VERSION = version
00096
00097 def _oauth_consumer_token(self):
00098 return dict(key='asdf', secret='qwer')
00099
00100 def get(self):
00101 params = self._oauth_request_parameters(
00102 'http://www.example.com/api/asdf',
00103 dict(key='uiop', secret='5678'),
00104 parameters=dict(foo='bar'))
00105 self.write(params)
00106
00107
00108 class OAuth1ServerRequestTokenHandler(RequestHandler):
00109 def get(self):
00110 self.write('oauth_token=zxcv&oauth_token_secret=1234')
00111
00112
00113 class OAuth1ServerAccessTokenHandler(RequestHandler):
00114 def get(self):
00115 self.write('oauth_token=uiop&oauth_token_secret=5678')
00116
00117
00118 class OAuth2ClientLoginHandler(RequestHandler, OAuth2Mixin):
00119 def initialize(self, test):
00120 self._OAUTH_AUTHORIZE_URL = test.get_url('/oauth2/server/authorize')
00121
00122 def get(self):
00123 res = self.authorize_redirect()
00124 assert isinstance(res, Future)
00125 assert res.done()
00126
00127
00128 class TwitterClientHandler(RequestHandler, TwitterMixin):
00129 def initialize(self, test):
00130 self._OAUTH_REQUEST_TOKEN_URL = test.get_url('/oauth1/server/request_token')
00131 self._OAUTH_ACCESS_TOKEN_URL = test.get_url('/twitter/server/access_token')
00132 self._OAUTH_AUTHORIZE_URL = test.get_url('/oauth1/server/authorize')
00133 self._TWITTER_BASE_URL = test.get_url('/twitter/api')
00134
00135 def get_auth_http_client(self):
00136 return self.settings['http_client']
00137
00138
00139 class TwitterClientLoginHandler(TwitterClientHandler):
00140 @asynchronous
00141 def get(self):
00142 if self.get_argument("oauth_token", None):
00143 self.get_authenticated_user(self.on_user)
00144 return
00145 self.authorize_redirect()
00146
00147 def on_user(self, user):
00148 if user is None:
00149 raise Exception("user is None")
00150 self.finish(user)
00151
00152
00153 class TwitterClientLoginGenEngineHandler(TwitterClientHandler):
00154 @asynchronous
00155 @gen.engine
00156 def get(self):
00157 if self.get_argument("oauth_token", None):
00158 user = yield self.get_authenticated_user()
00159 self.finish(user)
00160 else:
00161
00162
00163 self.authorize_redirect()
00164
00165
00166 class TwitterClientLoginGenCoroutineHandler(TwitterClientHandler):
00167 @gen.coroutine
00168 def get(self):
00169 if self.get_argument("oauth_token", None):
00170 user = yield self.get_authenticated_user()
00171 self.finish(user)
00172 else:
00173
00174
00175 yield self.authorize_redirect()
00176
00177
00178 class TwitterClientShowUserHandler(TwitterClientHandler):
00179 @asynchronous
00180 @gen.engine
00181 def get(self):
00182
00183
00184 response = yield gen.Task(self.twitter_request,
00185 '/users/show/%s' % self.get_argument('name'),
00186 access_token=dict(key='hjkl', secret='vbnm'))
00187 if response is None:
00188 self.set_status(500)
00189 self.finish('error from twitter request')
00190 else:
00191 self.finish(response)
00192
00193
00194 class TwitterClientShowUserFutureHandler(TwitterClientHandler):
00195 @asynchronous
00196 @gen.engine
00197 def get(self):
00198 try:
00199 response = yield self.twitter_request(
00200 '/users/show/%s' % self.get_argument('name'),
00201 access_token=dict(key='hjkl', secret='vbnm'))
00202 except AuthError as e:
00203 self.set_status(500)
00204 self.finish(str(e))
00205 return
00206 assert response is not None
00207 self.finish(response)
00208
00209
00210 class TwitterServerAccessTokenHandler(RequestHandler):
00211 def get(self):
00212 self.write('oauth_token=hjkl&oauth_token_secret=vbnm&screen_name=foo')
00213
00214
00215 class TwitterServerShowUserHandler(RequestHandler):
00216 def get(self, screen_name):
00217 if screen_name == 'error':
00218 raise HTTPError(500)
00219 assert 'oauth_nonce' in self.request.arguments
00220 assert 'oauth_timestamp' in self.request.arguments
00221 assert 'oauth_signature' in self.request.arguments
00222 assert self.get_argument('oauth_consumer_key') == 'test_twitter_consumer_key'
00223 assert self.get_argument('oauth_signature_method') == 'HMAC-SHA1'
00224 assert self.get_argument('oauth_version') == '1.0'
00225 assert self.get_argument('oauth_token') == 'hjkl'
00226 self.write(dict(screen_name=screen_name, name=screen_name.capitalize()))
00227
00228
00229 class TwitterServerVerifyCredentialsHandler(RequestHandler):
00230 def get(self):
00231 assert 'oauth_nonce' in self.request.arguments
00232 assert 'oauth_timestamp' in self.request.arguments
00233 assert 'oauth_signature' in self.request.arguments
00234 assert self.get_argument('oauth_consumer_key') == 'test_twitter_consumer_key'
00235 assert self.get_argument('oauth_signature_method') == 'HMAC-SHA1'
00236 assert self.get_argument('oauth_version') == '1.0'
00237 assert self.get_argument('oauth_token') == 'hjkl'
00238 self.write(dict(screen_name='foo', name='Foo'))
00239
00240
00241 class GoogleOpenIdClientLoginHandler(RequestHandler, GoogleMixin):
00242 def initialize(self, test):
00243 self._OPENID_ENDPOINT = test.get_url('/openid/server/authenticate')
00244
00245 @asynchronous
00246 def get(self):
00247 if self.get_argument("openid.mode", None):
00248 self.get_authenticated_user(self.on_user)
00249 return
00250 res = self.authenticate_redirect()
00251 assert isinstance(res, Future)
00252 assert res.done()
00253
00254 def on_user(self, user):
00255 if user is None:
00256 raise Exception("user is None")
00257 self.finish(user)
00258
00259 def get_auth_http_client(self):
00260 return self.settings['http_client']
00261
00262
00263 class AuthTest(AsyncHTTPTestCase):
00264 def get_app(self):
00265 return Application(
00266 [
00267
00268 ('/openid/client/login', OpenIdClientLoginHandler, dict(test=self)),
00269 ('/oauth10/client/login', OAuth1ClientLoginHandler,
00270 dict(test=self, version='1.0')),
00271 ('/oauth10/client/request_params',
00272 OAuth1ClientRequestParametersHandler,
00273 dict(version='1.0')),
00274 ('/oauth10a/client/login', OAuth1ClientLoginHandler,
00275 dict(test=self, version='1.0a')),
00276 ('/oauth10a/client/login_coroutine',
00277 OAuth1ClientLoginCoroutineHandler,
00278 dict(test=self, version='1.0a')),
00279 ('/oauth10a/client/request_params',
00280 OAuth1ClientRequestParametersHandler,
00281 dict(version='1.0a')),
00282 ('/oauth2/client/login', OAuth2ClientLoginHandler, dict(test=self)),
00283
00284 ('/twitter/client/login', TwitterClientLoginHandler, dict(test=self)),
00285 ('/twitter/client/login_gen_engine', TwitterClientLoginGenEngineHandler, dict(test=self)),
00286 ('/twitter/client/login_gen_coroutine', TwitterClientLoginGenCoroutineHandler, dict(test=self)),
00287 ('/twitter/client/show_user', TwitterClientShowUserHandler, dict(test=self)),
00288 ('/twitter/client/show_user_future', TwitterClientShowUserFutureHandler, dict(test=self)),
00289 ('/google/client/openid_login', GoogleOpenIdClientLoginHandler, dict(test=self)),
00290
00291
00292 ('/openid/server/authenticate', OpenIdServerAuthenticateHandler),
00293 ('/oauth1/server/request_token', OAuth1ServerRequestTokenHandler),
00294 ('/oauth1/server/access_token', OAuth1ServerAccessTokenHandler),
00295
00296 ('/twitter/server/access_token', TwitterServerAccessTokenHandler),
00297 (r'/twitter/api/users/show/(.*)\.json', TwitterServerShowUserHandler),
00298 (r'/twitter/api/account/verify_credentials\.json', TwitterServerVerifyCredentialsHandler),
00299 ],
00300 http_client=self.http_client,
00301 twitter_consumer_key='test_twitter_consumer_key',
00302 twitter_consumer_secret='test_twitter_consumer_secret')
00303
00304 def test_openid_redirect(self):
00305 response = self.fetch('/openid/client/login', follow_redirects=False)
00306 self.assertEqual(response.code, 302)
00307 self.assertTrue(
00308 '/openid/server/authenticate?' in response.headers['Location'])
00309
00310 def test_openid_get_user(self):
00311 response = self.fetch('/openid/client/login?openid.mode=blah&openid.ns.ax=http://openid.net/srv/ax/1.0&openid.ax.type.email=http://axschema.org/contact/email&openid.ax.value.email=foo@example.com')
00312 response.rethrow()
00313 parsed = json_decode(response.body)
00314 self.assertEqual(parsed["email"], "foo@example.com")
00315
00316 def test_oauth10_redirect(self):
00317 response = self.fetch('/oauth10/client/login', follow_redirects=False)
00318 self.assertEqual(response.code, 302)
00319 self.assertTrue(response.headers['Location'].endswith(
00320 '/oauth1/server/authorize?oauth_token=zxcv'))
00321
00322 self.assertTrue(
00323 '_oauth_request_token="enhjdg==|MTIzNA=="' in response.headers['Set-Cookie'],
00324 response.headers['Set-Cookie'])
00325
00326 def test_oauth10_get_user(self):
00327 response = self.fetch(
00328 '/oauth10/client/login?oauth_token=zxcv',
00329 headers={'Cookie': '_oauth_request_token=enhjdg==|MTIzNA=='})
00330 response.rethrow()
00331 parsed = json_decode(response.body)
00332 self.assertEqual(parsed['email'], 'foo@example.com')
00333 self.assertEqual(parsed['access_token'], dict(key='uiop', secret='5678'))
00334
00335 def test_oauth10_request_parameters(self):
00336 response = self.fetch('/oauth10/client/request_params')
00337 response.rethrow()
00338 parsed = json_decode(response.body)
00339 self.assertEqual(parsed['oauth_consumer_key'], 'asdf')
00340 self.assertEqual(parsed['oauth_token'], 'uiop')
00341 self.assertTrue('oauth_nonce' in parsed)
00342 self.assertTrue('oauth_signature' in parsed)
00343
00344 def test_oauth10a_redirect(self):
00345 response = self.fetch('/oauth10a/client/login', follow_redirects=False)
00346 self.assertEqual(response.code, 302)
00347 self.assertTrue(response.headers['Location'].endswith(
00348 '/oauth1/server/authorize?oauth_token=zxcv'))
00349
00350 self.assertTrue(
00351 '_oauth_request_token="enhjdg==|MTIzNA=="' in response.headers['Set-Cookie'],
00352 response.headers['Set-Cookie'])
00353
00354 def test_oauth10a_get_user(self):
00355 response = self.fetch(
00356 '/oauth10a/client/login?oauth_token=zxcv',
00357 headers={'Cookie': '_oauth_request_token=enhjdg==|MTIzNA=='})
00358 response.rethrow()
00359 parsed = json_decode(response.body)
00360 self.assertEqual(parsed['email'], 'foo@example.com')
00361 self.assertEqual(parsed['access_token'], dict(key='uiop', secret='5678'))
00362
00363 def test_oauth10a_request_parameters(self):
00364 response = self.fetch('/oauth10a/client/request_params')
00365 response.rethrow()
00366 parsed = json_decode(response.body)
00367 self.assertEqual(parsed['oauth_consumer_key'], 'asdf')
00368 self.assertEqual(parsed['oauth_token'], 'uiop')
00369 self.assertTrue('oauth_nonce' in parsed)
00370 self.assertTrue('oauth_signature' in parsed)
00371
00372 def test_oauth10a_get_user_coroutine_exception(self):
00373 response = self.fetch(
00374 '/oauth10a/client/login_coroutine?oauth_token=zxcv&fail_in_get_user=true',
00375 headers={'Cookie': '_oauth_request_token=enhjdg==|MTIzNA=='})
00376 self.assertEqual(response.code, 503)
00377
00378 def test_oauth2_redirect(self):
00379 response = self.fetch('/oauth2/client/login', follow_redirects=False)
00380 self.assertEqual(response.code, 302)
00381 self.assertTrue('/oauth2/server/authorize?' in response.headers['Location'])
00382
00383 def base_twitter_redirect(self, url):
00384
00385 response = self.fetch(url, follow_redirects=False)
00386 self.assertEqual(response.code, 302)
00387 self.assertTrue(response.headers['Location'].endswith(
00388 '/oauth1/server/authorize?oauth_token=zxcv'))
00389
00390 self.assertTrue(
00391 '_oauth_request_token="enhjdg==|MTIzNA=="' in response.headers['Set-Cookie'],
00392 response.headers['Set-Cookie'])
00393
00394 def test_twitter_redirect(self):
00395 self.base_twitter_redirect('/twitter/client/login')
00396
00397 def test_twitter_redirect_gen_engine(self):
00398 self.base_twitter_redirect('/twitter/client/login_gen_engine')
00399
00400 def test_twitter_redirect_gen_coroutine(self):
00401 self.base_twitter_redirect('/twitter/client/login_gen_coroutine')
00402
00403 def test_twitter_get_user(self):
00404 response = self.fetch(
00405 '/twitter/client/login?oauth_token=zxcv',
00406 headers={'Cookie': '_oauth_request_token=enhjdg==|MTIzNA=='})
00407 response.rethrow()
00408 parsed = json_decode(response.body)
00409 self.assertEqual(parsed,
00410 {u('access_token'): {u('key'): u('hjkl'),
00411 u('screen_name'): u('foo'),
00412 u('secret'): u('vbnm')},
00413 u('name'): u('Foo'),
00414 u('screen_name'): u('foo'),
00415 u('username'): u('foo')})
00416
00417 def test_twitter_show_user(self):
00418 response = self.fetch('/twitter/client/show_user?name=somebody')
00419 response.rethrow()
00420 self.assertEqual(json_decode(response.body),
00421 {'name': 'Somebody', 'screen_name': 'somebody'})
00422
00423 def test_twitter_show_user_error(self):
00424 with ExpectLog(gen_log, 'Error response HTTP 500'):
00425 response = self.fetch('/twitter/client/show_user?name=error')
00426 self.assertEqual(response.code, 500)
00427 self.assertEqual(response.body, b'error from twitter request')
00428
00429 def test_twitter_show_user_future(self):
00430 response = self.fetch('/twitter/client/show_user_future?name=somebody')
00431 response.rethrow()
00432 self.assertEqual(json_decode(response.body),
00433 {'name': 'Somebody', 'screen_name': 'somebody'})
00434
00435 def test_twitter_show_user_future_error(self):
00436 response = self.fetch('/twitter/client/show_user_future?name=error')
00437 self.assertEqual(response.code, 500)
00438 self.assertIn(b'Error response HTTP 500', response.body)
00439
00440 def test_google_redirect(self):
00441
00442 response = self.fetch('/google/client/openid_login', follow_redirects=False)
00443 self.assertEqual(response.code, 302)
00444 self.assertTrue(
00445 '/openid/server/authenticate?' in response.headers['Location'])
00446
00447 def test_google_get_user(self):
00448 response = self.fetch('/google/client/openid_login?openid.mode=blah&openid.ns.ax=http://openid.net/srv/ax/1.0&openid.ax.type.email=http://axschema.org/contact/email&openid.ax.value.email=foo@example.com', follow_redirects=False)
00449 response.rethrow()
00450 parsed = json_decode(response.body)
00451 self.assertEqual(parsed["email"], "foo@example.com")