00001
00002
00003
00004
00005
00006
00007 from __future__ import absolute_import, division, with_statement
00008 from tornado.auth import OpenIdMixin, OAuthMixin, OAuth2Mixin
00009 from tornado.escape import json_decode
00010 from tornado.testing import AsyncHTTPTestCase, LogTrapTestCase
00011 from tornado.util import b
00012 from tornado.web import RequestHandler, Application, asynchronous
00013
00014
00015 class OpenIdClientLoginHandler(RequestHandler, OpenIdMixin):
00016 def initialize(self, test):
00017 self._OPENID_ENDPOINT = test.get_url('/openid/server/authenticate')
00018
00019 @asynchronous
00020 def get(self):
00021 if self.get_argument('openid.mode', None):
00022 self.get_authenticated_user(
00023 self.on_user, http_client=self.settings['http_client'])
00024 return
00025 self.authenticate_redirect()
00026
00027 def on_user(self, user):
00028 assert user is not None
00029 self.finish(user)
00030
00031
00032 class OpenIdServerAuthenticateHandler(RequestHandler):
00033 def post(self):
00034 assert self.get_argument('openid.mode') == 'check_authentication'
00035 self.write('is_valid:true')
00036
00037
00038 class OAuth1ClientLoginHandler(RequestHandler, OAuthMixin):
00039 def initialize(self, test, version):
00040 self._OAUTH_VERSION = version
00041 self._OAUTH_REQUEST_TOKEN_URL = test.get_url('/oauth1/server/request_token')
00042 self._OAUTH_AUTHORIZE_URL = test.get_url('/oauth1/server/authorize')
00043 self._OAUTH_ACCESS_TOKEN_URL = test.get_url('/oauth1/server/access_token')
00044
00045 def _oauth_consumer_token(self):
00046 return dict(key='asdf', secret='qwer')
00047
00048 @asynchronous
00049 def get(self):
00050 if self.get_argument('oauth_token', None):
00051 self.get_authenticated_user(
00052 self.on_user, http_client=self.settings['http_client'])
00053 return
00054 self.authorize_redirect(http_client=self.settings['http_client'])
00055
00056 def on_user(self, user):
00057 assert user is not None
00058 self.finish(user)
00059
00060 def _oauth_get_user(self, access_token, callback):
00061 assert access_token == dict(key=b('uiop'), secret=b('5678')), access_token
00062 callback(dict(email='foo@example.com'))
00063
00064
00065 class OAuth1ClientRequestParametersHandler(RequestHandler, OAuthMixin):
00066 def initialize(self, version):
00067 self._OAUTH_VERSION = version
00068
00069 def _oauth_consumer_token(self):
00070 return dict(key='asdf', secret='qwer')
00071
00072 def get(self):
00073 params = self._oauth_request_parameters(
00074 'http://www.example.com/api/asdf',
00075 dict(key='uiop', secret='5678'),
00076 parameters=dict(foo='bar'))
00077 import urllib
00078 urllib.urlencode(params)
00079 self.write(params)
00080
00081
00082 class OAuth1ServerRequestTokenHandler(RequestHandler):
00083 def get(self):
00084 self.write('oauth_token=zxcv&oauth_token_secret=1234')
00085
00086
00087 class OAuth1ServerAccessTokenHandler(RequestHandler):
00088 def get(self):
00089 self.write('oauth_token=uiop&oauth_token_secret=5678')
00090
00091
00092 class OAuth2ClientLoginHandler(RequestHandler, OAuth2Mixin):
00093 def initialize(self, test):
00094 self._OAUTH_AUTHORIZE_URL = test.get_url('/oauth2/server/authorize')
00095
00096 def get(self):
00097 self.authorize_redirect()
00098
00099
00100 class AuthTest(AsyncHTTPTestCase, LogTrapTestCase):
00101 def get_app(self):
00102 return Application(
00103 [
00104
00105 ('/openid/client/login', OpenIdClientLoginHandler, dict(test=self)),
00106 ('/oauth10/client/login', OAuth1ClientLoginHandler,
00107 dict(test=self, version='1.0')),
00108 ('/oauth10/client/request_params',
00109 OAuth1ClientRequestParametersHandler,
00110 dict(version='1.0')),
00111 ('/oauth10a/client/login', OAuth1ClientLoginHandler,
00112 dict(test=self, version='1.0a')),
00113 ('/oauth10a/client/request_params',
00114 OAuth1ClientRequestParametersHandler,
00115 dict(version='1.0a')),
00116 ('/oauth2/client/login', OAuth2ClientLoginHandler, dict(test=self)),
00117
00118
00119 ('/openid/server/authenticate', OpenIdServerAuthenticateHandler),
00120 ('/oauth1/server/request_token', OAuth1ServerRequestTokenHandler),
00121 ('/oauth1/server/access_token', OAuth1ServerAccessTokenHandler),
00122 ],
00123 http_client=self.http_client)
00124
00125 def test_openid_redirect(self):
00126 response = self.fetch('/openid/client/login', follow_redirects=False)
00127 self.assertEqual(response.code, 302)
00128 self.assertTrue(
00129 '/openid/server/authenticate?' in response.headers['Location'])
00130
00131 def test_openid_get_user(self):
00132 response = self.fetch('/openid/client/login?openid.mode=blah&openid.ns.ax=http://openid.net/srv/ax/1.0&openid.ax.type.email=http://axschema.org/contact/email&openid.ax.value.email=foo@example.com')
00133 response.rethrow()
00134 parsed = json_decode(response.body)
00135 self.assertEqual(parsed["email"], "foo@example.com")
00136
00137 def test_oauth10_redirect(self):
00138 response = self.fetch('/oauth10/client/login', follow_redirects=False)
00139 self.assertEqual(response.code, 302)
00140 self.assertTrue(response.headers['Location'].endswith(
00141 '/oauth1/server/authorize?oauth_token=zxcv'))
00142
00143 self.assertTrue(
00144 '_oauth_request_token="enhjdg==|MTIzNA=="' in response.headers['Set-Cookie'],
00145 response.headers['Set-Cookie'])
00146
00147 def test_oauth10_get_user(self):
00148 response = self.fetch(
00149 '/oauth10/client/login?oauth_token=zxcv',
00150 headers={'Cookie': '_oauth_request_token=enhjdg==|MTIzNA=='})
00151 response.rethrow()
00152 parsed = json_decode(response.body)
00153 self.assertEqual(parsed['email'], 'foo@example.com')
00154 self.assertEqual(parsed['access_token'], dict(key='uiop', secret='5678'))
00155
00156 def test_oauth10_request_parameters(self):
00157 response = self.fetch('/oauth10/client/request_params')
00158 response.rethrow()
00159 parsed = json_decode(response.body)
00160 self.assertEqual(parsed['oauth_consumer_key'], 'asdf')
00161 self.assertEqual(parsed['oauth_token'], 'uiop')
00162 self.assertTrue('oauth_nonce' in parsed)
00163 self.assertTrue('oauth_signature' in parsed)
00164
00165 def test_oauth10a_redirect(self):
00166 response = self.fetch('/oauth10a/client/login', follow_redirects=False)
00167 self.assertEqual(response.code, 302)
00168 self.assertTrue(response.headers['Location'].endswith(
00169 '/oauth1/server/authorize?oauth_token=zxcv'))
00170
00171 self.assertTrue(
00172 '_oauth_request_token="enhjdg==|MTIzNA=="' in response.headers['Set-Cookie'],
00173 response.headers['Set-Cookie'])
00174
00175 def test_oauth10a_get_user(self):
00176 response = self.fetch(
00177 '/oauth10a/client/login?oauth_token=zxcv',
00178 headers={'Cookie': '_oauth_request_token=enhjdg==|MTIzNA=='})
00179 response.rethrow()
00180 parsed = json_decode(response.body)
00181 self.assertEqual(parsed['email'], 'foo@example.com')
00182 self.assertEqual(parsed['access_token'], dict(key='uiop', secret='5678'))
00183
00184 def test_oauth10a_request_parameters(self):
00185 response = self.fetch('/oauth10a/client/request_params')
00186 response.rethrow()
00187 parsed = json_decode(response.body)
00188 self.assertEqual(parsed['oauth_consumer_key'], 'asdf')
00189 self.assertEqual(parsed['oauth_token'], 'uiop')
00190 self.assertTrue('oauth_nonce' in parsed)
00191 self.assertTrue('oauth_signature' in parsed)
00192
00193 def test_oauth2_redirect(self):
00194 response = self.fetch('/oauth2/client/login', follow_redirects=False)
00195 self.assertEqual(response.code, 302)
00196 self.assertTrue('/oauth2/server/authorize?' in response.headers['Location'])