1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287#!/usr/bin/env python3 import json import webbrowser import secrets from os import environ from urllib.request import Request, urlopen from urllib.parse import urlencode, urlparse, parse_qs from pprint import pprint def openid_discovery(): """Perform OpenID Discovery SciStarter supports OpenID Connect Discovery, so there is a standardized, machine readable description of our OpendID/OAuth configuration which can be retrieved at any time. Normally, there is no need to perform discovery very often. Just cache the results. """ r = urlopen("https://scistarter.org/.well-known/openid-configuration") if r.status != 200: raise Exception(r.status, r.reason) return json.loads(r.read()) def authorize(config): """Perform the OAuth flow to retrieve an access token Authorization in OpenID Connect is via OAuth 2. """ # If we were writing a server-side request handler, it would be a # good idea to save the URL for the next page the user expects to # see at this time. Usually, we would store it in the session. # Then, once the authorization was complete, it would be easy to # forward user to where they expected to go. # This is a nonce which has no purpose or meaning except to make # it impossible for a hostile middleman to replay the request. The # state, too, should be saved in the session so as to accessible # later in the process. state = secrets.token_hex(16) # Normally we would be doing this in a server-side HTTP request # handler rather than in a standalone program, in which case we # would simply return a redirect response rather than messing # about with the webbrowser module. webbrowser.open_new_tab( config["authorization_endpoint"] + "?" + urlencode( { # The client_id is provided by SciStarter, email us at # info@scistarter.org to get one. "client_id": environ["SCISTARTER_CLIENT_ID"], "response_type": "code", # Requesting the openid scope is a protocol requirement # Requesting the profile scope gives us read access to # basic user information, if the user grants it. # Requesting the participation scope gives us the # right to add participation events to the user's # record, if the user grants it. "scope": "openid profile participation", "state": state, "redirect_uri": # This URL would never be the one you really use. # Instead, you will have provided one or more URLs on # your own domain to us when you requested a # client_id. The redirect_uri parameter must be one of # those URLs. "https://example.com/openid/scistarter", } ) ) # Once the user has granted permissions, scistarter will redirect # the user to the specified redirect_uri. That would result in # another request handler on your server processing that request to # complete the authorization. Since we're not in a server here, we # do this instead: redirected_to = input( "Please perform the authorization in your browser, then copy and paste the URL you get redirected to here: " ) params = parse_qs(urlparse(redirected_to).query) try: code = params["code"][0] except (KeyError, IndexError): raise Exception("No authorization code was provided") # Now that the user has identified themself to SciStarter and # granted permissions, we have to identify ourself as the rightful # recipient of those permissions and retrieve an access token. req = Request( method="POST", url=config["token_endpoint"], data=urlencode( { "client_id": environ["SCISTARTER_CLIENT_ID"], "client_secret": environ["SCISTARTER_CLIENT_SECRET"], # redirect_uri must match the redirect_uri used above "redirect_uri": "https://example.com/openid/scistarter", "grant_type": "authorization_code", "code": code, "state": state, } ).encode("utf8"), ) r = urlopen(req) if r.status != 200: raise Exception(r.status, r.reason) authorization = json.loads(r.read()) # Now we have an access token, which will be good for several # hours and which will allow us to access SciStarter on the user's # behalf. access_token = authorization["access_token"] # We also have a refresh token, which can be used to retrieve a # new access token after the current one expires, as long as the # user does not have it revoked. refresh_token = authorization["refresh_token"] return access_token, refresh_token def refresh(config, refresh_token): """Retrieve a new access token, given a refresh token Access tokens expire after a time. Refresh tokens don't expire until they are used or revoked. For use cases where you're not using OpenID Connect as your primary way of authenticating the user, it's quite possible for the user to come back to your site and authenticate through other means long after the access token is no longer valid. When this happens, you can use the refresh token to renew your access to SciStarter, without interrupting the user to ask for permissions that you have already been granted. Refresh tokens should never leave your server, except to be sent to ours over an encrypted connection. SciStarter's refresh tokens are single use. Using a refresh token consumes it, and issues a new one along with an access token. """ req = Request( method="POST", url=config["token_endpoint"], data=urlencode( { "client_id": environ["SCISTARTER_CLIENT_ID"], "client_secret": environ["SCISTARTER_CLIENT_SECRET"], "grant_type": "refresh_token", "refresh_token": refresh_token, } ).encode("utf8"), ) r = urlopen(req) if r.status != 200: raise Exception(r.status, r.reason) authorization = json.loads(r.read()) return authorization['access_token'], authorization['refresh_token'] def get_profile(config, token): """Retrieve user information In the process of granting the token, the user was prompted to allow (or deny) us access to their information. The specific information depends on the scopes we requested, but whatever information they allowed is available via the userinfo endpoint. If participation API access was granted, the 'participation_api_granted' field in the return value will be true. It's not necessary to retrieve the profile information every time you are authorized. Once is usually sufficient. """ req = Request( method="GET", url=config["userinfo_endpoint"], # Any request that is authorized by the token should have this # Authorization HTTP header headers={"Authorization": "Bearer " + token}, ) r = urlopen(req) if r.status != 200: raise Exception(r.status, r.reason) return json.loads(r.read()) def record_participation(config, token, project_slug=None): """Record a participation event for the user This will only work if the participation scope is available, which means that the user specifically granted permission to share information with SciStarter. There are two variations on this process: if we only have one project associated with the client_id, all we need is our token. On the other hand, if have more than one project for the same client_id, we also need to provide the slug which identifies the specific project that the user participated in. The project_slug parameter should contain the textual unique identifier of the project. It is easily accesible from the project URL. In the URL https://scistarter.org/airborne-walrus-capture the slug is the string airborne-walrus-capture """ if project_slug is None: endpoint = "https://scistarter.org/api/participation/openid" else: endpoint = "https://scistarter.org/api/participation/openid/" + project_slug req = Request( method="POST", url=endpoint, headers={"Authorization": "Bearer " + token}, data=urlencode( { "type": "classification", # other options: 'collection', 'signup' "duration": 31, # Seconds the user spent participating, or an estimate } ).encode("utf8"), ) r = urlopen(req) if r.status != 200: raise Exception(r.status, r.reason) return json.loads(r.read()) if __name__ == "__main__": config = openid_discovery() print("CONFIG") pprint(config) access_token, refresh_token = authorize(config) print("REFRESH_TOKEN") print(refresh_token) # We don't need to do this at this time, except to demonstrate # how. The current access token is valid, since we just retrieved # it, and does not need to be refreshed. access_token, refresh_token = refresh(config, refresh_token) print("ACCESS TOKEN") pprint(access_token) profile = get_profile(config, access_token) print("PROFILE") pprint(profile) result = record_participation(config, access_token) print("RESULT") pprint(result)
Website Security Test