| # Copyright 2020 Google LLC |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import datetime |
| import json |
| |
| import mock |
| import pytest |
| from six.moves import http_client |
| from six.moves import urllib |
| |
| from google.auth import _helpers |
| from google.auth import aws |
| from google.auth import environment_vars |
| from google.auth import exceptions |
| from google.auth import transport |
| |
| |
| CLIENT_ID = "username" |
| CLIENT_SECRET = "password" |
| # Base64 encoding of "username:password". |
| BASIC_AUTH_ENCODING = "dXNlcm5hbWU6cGFzc3dvcmQ=" |
| SERVICE_ACCOUNT_EMAIL = "[email protected]" |
| SERVICE_ACCOUNT_IMPERSONATION_URL = ( |
| "https://us-east1-iamcredentials.googleapis.com/v1/projects/-" |
| + "/serviceAccounts/{}:generateAccessToken".format(SERVICE_ACCOUNT_EMAIL) |
| ) |
| QUOTA_PROJECT_ID = "QUOTA_PROJECT_ID" |
| SCOPES = ["scope1", "scope2"] |
| TOKEN_URL = "https://sts.googleapis.com/v1/token" |
| SUBJECT_TOKEN_TYPE = "urn:ietf:params:aws:token-type:aws4_request" |
| AUDIENCE = "//iam.googleapis.com/projects/123456/locations/global/workloadIdentityPools/POOL_ID/providers/PROVIDER_ID" |
| REGION_URL = "http://169.254.169.254/latest/meta-data/placement/availability-zone" |
| SECURITY_CREDS_URL = "http://169.254.169.254/latest/meta-data/iam/security-credentials" |
| CRED_VERIFICATION_URL = ( |
| "https://sts.{region}.amazonaws.com?Action=GetCallerIdentity&Version=2011-06-15" |
| ) |
| # Sample AWS security credentials to be used with tests that require a session token. |
| ACCESS_KEY_ID = "ASIARD4OQDT6A77FR3CL" |
| SECRET_ACCESS_KEY = "Y8AfSaucF37G4PpvfguKZ3/l7Id4uocLXxX0+VTx" |
| TOKEN = "IQoJb3JpZ2luX2VjEIz//////////wEaCXVzLWVhc3QtMiJGMEQCIH7MHX/Oy/OB8OlLQa9GrqU1B914+iMikqWQW7vPCKlgAiA/Lsv8Jcafn14owfxXn95FURZNKaaphj0ykpmS+Ki+CSq0AwhlEAAaDDA3NzA3MTM5MTk5NiIMx9sAeP1ovlMTMKLjKpEDwuJQg41/QUKx0laTZYjPlQvjwSqS3OB9P1KAXPWSLkliVMMqaHqelvMF/WO/glv3KwuTfQsavRNs3v5pcSEm4SPO3l7mCs7KrQUHwGP0neZhIKxEXy+Ls//1C/Bqt53NL+LSbaGv6RPHaX82laz2qElphg95aVLdYgIFY6JWV5fzyjgnhz0DQmy62/Vi8pNcM2/VnxeCQ8CC8dRDSt52ry2v+nc77vstuI9xV5k8mPtnaPoJDRANh0bjwY5Sdwkbp+mGRUJBAQRlNgHUJusefXQgVKBCiyJY4w3Csd8Bgj9IyDV+Azuy1jQqfFZWgP68LSz5bURyIjlWDQunO82stZ0BgplKKAa/KJHBPCp8Qi6i99uy7qh76FQAqgVTsnDuU6fGpHDcsDSGoCls2HgZjZFPeOj8mmRhFk1Xqvkbjuz8V1cJk54d3gIJvQt8gD2D6yJQZecnuGWd5K2e2HohvCc8Fc9kBl1300nUJPV+k4tr/A5R/0QfEKOZL1/k5lf1g9CREnrM8LVkGxCgdYMxLQow1uTL+QU67AHRRSp5PhhGX4Rek+01vdYSnJCMaPhSEgcLqDlQkhk6MPsyT91QMXcWmyO+cAZwUPwnRamFepuP4K8k2KVXs/LIJHLELwAZ0ekyaS7CptgOqS7uaSTFG3U+vzFZLEnGvWQ7y9IPNQZ+Dffgh4p3vF4J68y9049sI6Sr5d5wbKkcbm8hdCDHZcv4lnqohquPirLiFQ3q7B17V9krMPu3mz1cg4Ekgcrn/E09NTsxAqD8NcZ7C7ECom9r+X3zkDOxaajW6hu3Az8hGlyylDaMiFfRbBJpTIlxp7jfa7CxikNgNtEKLH9iCzvuSg2vhA==" |
| # To avoid json.dumps() differing behavior from one version to other, |
| # the JSON payload is hardcoded. |
| REQUEST_PARAMS = '{"KeySchema":[{"KeyType":"HASH","AttributeName":"Id"}],"TableName":"TestTable","AttributeDefinitions":[{"AttributeName":"Id","AttributeType":"S"}],"ProvisionedThroughput":{"WriteCapacityUnits":5,"ReadCapacityUnits":5}}' |
| # Each tuple contains the following entries: |
| # region, time, credentials, original_request, signed_request |
| TEST_FIXTURES = [ |
| # GET request (AWS botocore tests). |
| # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-vanilla.req |
| # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-vanilla.sreq |
| ( |
| "us-east-1", |
| "2011-09-09T23:36:00Z", |
| { |
| "access_key_id": "AKIDEXAMPLE", |
| "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY", |
| }, |
| { |
| "method": "GET", |
| "url": "https://host.foo.com", |
| "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT"}, |
| }, |
| { |
| "url": "https://host.foo.com", |
| "method": "GET", |
| "headers": { |
| "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host, Signature=b27ccfbfa7df52a200ff74193ca6e32d4b48b8856fab7ebf1c595d0670a7e470", |
| "host": "host.foo.com", |
| "date": "Mon, 09 Sep 2011 23:36:00 GMT", |
| }, |
| }, |
| ), |
| # GET request with relative path (AWS botocore tests). |
| # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-relative-relative.req |
| # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-relative-relative.sreq |
| ( |
| "us-east-1", |
| "2011-09-09T23:36:00Z", |
| { |
| "access_key_id": "AKIDEXAMPLE", |
| "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY", |
| }, |
| { |
| "method": "GET", |
| "url": "https://host.foo.com/foo/bar/../..", |
| "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT"}, |
| }, |
| { |
| "url": "https://host.foo.com/foo/bar/../..", |
| "method": "GET", |
| "headers": { |
| "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host, Signature=b27ccfbfa7df52a200ff74193ca6e32d4b48b8856fab7ebf1c595d0670a7e470", |
| "host": "host.foo.com", |
| "date": "Mon, 09 Sep 2011 23:36:00 GMT", |
| }, |
| }, |
| ), |
| # GET request with /./ path (AWS botocore tests). |
| # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-slash-dot-slash.req |
| # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-slash-dot-slash.sreq |
| ( |
| "us-east-1", |
| "2011-09-09T23:36:00Z", |
| { |
| "access_key_id": "AKIDEXAMPLE", |
| "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY", |
| }, |
| { |
| "method": "GET", |
| "url": "https://host.foo.com/./", |
| "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT"}, |
| }, |
| { |
| "url": "https://host.foo.com/./", |
| "method": "GET", |
| "headers": { |
| "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host, Signature=b27ccfbfa7df52a200ff74193ca6e32d4b48b8856fab7ebf1c595d0670a7e470", |
| "host": "host.foo.com", |
| "date": "Mon, 09 Sep 2011 23:36:00 GMT", |
| }, |
| }, |
| ), |
| # GET request with pointless dot path (AWS botocore tests). |
| # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-slash-pointless-dot.req |
| # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-slash-pointless-dot.sreq |
| ( |
| "us-east-1", |
| "2011-09-09T23:36:00Z", |
| { |
| "access_key_id": "AKIDEXAMPLE", |
| "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY", |
| }, |
| { |
| "method": "GET", |
| "url": "https://host.foo.com/./foo", |
| "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT"}, |
| }, |
| { |
| "url": "https://host.foo.com/./foo", |
| "method": "GET", |
| "headers": { |
| "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host, Signature=910e4d6c9abafaf87898e1eb4c929135782ea25bb0279703146455745391e63a", |
| "host": "host.foo.com", |
| "date": "Mon, 09 Sep 2011 23:36:00 GMT", |
| }, |
| }, |
| ), |
| # GET request with utf8 path (AWS botocore tests). |
| # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-utf8.req |
| # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-utf8.sreq |
| ( |
| "us-east-1", |
| "2011-09-09T23:36:00Z", |
| { |
| "access_key_id": "AKIDEXAMPLE", |
| "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY", |
| }, |
| { |
| "method": "GET", |
| "url": "https://host.foo.com/%E1%88%B4", |
| "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT"}, |
| }, |
| { |
| "url": "https://host.foo.com/%E1%88%B4", |
| "method": "GET", |
| "headers": { |
| "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host, Signature=8d6634c189aa8c75c2e51e106b6b5121bed103fdb351f7d7d4381c738823af74", |
| "host": "host.foo.com", |
| "date": "Mon, 09 Sep 2011 23:36:00 GMT", |
| }, |
| }, |
| ), |
| # GET request with duplicate query key (AWS botocore tests). |
| # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-vanilla-query-order-key-case.req |
| # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-vanilla-query-order-key-case.sreq |
| ( |
| "us-east-1", |
| "2011-09-09T23:36:00Z", |
| { |
| "access_key_id": "AKIDEXAMPLE", |
| "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY", |
| }, |
| { |
| "method": "GET", |
| "url": "https://host.foo.com/?foo=Zoo&foo=aha", |
| "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT"}, |
| }, |
| { |
| "url": "https://host.foo.com/?foo=Zoo&foo=aha", |
| "method": "GET", |
| "headers": { |
| "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host, Signature=be7148d34ebccdc6423b19085378aa0bee970bdc61d144bd1a8c48c33079ab09", |
| "host": "host.foo.com", |
| "date": "Mon, 09 Sep 2011 23:36:00 GMT", |
| }, |
| }, |
| ), |
| # GET request with duplicate out of order query key (AWS botocore tests). |
| # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-vanilla-query-order-value.req |
| # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-vanilla-query-order-value.sreq |
| ( |
| "us-east-1", |
| "2011-09-09T23:36:00Z", |
| { |
| "access_key_id": "AKIDEXAMPLE", |
| "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY", |
| }, |
| { |
| "method": "GET", |
| "url": "https://host.foo.com/?foo=b&foo=a", |
| "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT"}, |
| }, |
| { |
| "url": "https://host.foo.com/?foo=b&foo=a", |
| "method": "GET", |
| "headers": { |
| "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host, Signature=feb926e49e382bec75c9d7dcb2a1b6dc8aa50ca43c25d2bc51143768c0875acc", |
| "host": "host.foo.com", |
| "date": "Mon, 09 Sep 2011 23:36:00 GMT", |
| }, |
| }, |
| ), |
| # GET request with utf8 query (AWS botocore tests). |
| # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-vanilla-ut8-query.req |
| # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-vanilla-ut8-query.sreq |
| ( |
| "us-east-1", |
| "2011-09-09T23:36:00Z", |
| { |
| "access_key_id": "AKIDEXAMPLE", |
| "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY", |
| }, |
| { |
| "method": "GET", |
| "url": "https://host.foo.com/?{}=bar".format( |
| urllib.parse.unquote("%E1%88%B4") |
| ), |
| "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT"}, |
| }, |
| { |
| "url": "https://host.foo.com/?{}=bar".format( |
| urllib.parse.unquote("%E1%88%B4") |
| ), |
| "method": "GET", |
| "headers": { |
| "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host, Signature=6fb359e9a05394cc7074e0feb42573a2601abc0c869a953e8c5c12e4e01f1a8c", |
| "host": "host.foo.com", |
| "date": "Mon, 09 Sep 2011 23:36:00 GMT", |
| }, |
| }, |
| ), |
| # POST request with sorted headers (AWS botocore tests). |
| # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/post-header-key-sort.req |
| # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/post-header-key-sort.sreq |
| ( |
| "us-east-1", |
| "2011-09-09T23:36:00Z", |
| { |
| "access_key_id": "AKIDEXAMPLE", |
| "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY", |
| }, |
| { |
| "method": "POST", |
| "url": "https://host.foo.com/", |
| "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT", "ZOO": "zoobar"}, |
| }, |
| { |
| "url": "https://host.foo.com/", |
| "method": "POST", |
| "headers": { |
| "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host;zoo, Signature=b7a95a52518abbca0964a999a880429ab734f35ebbf1235bd79a5de87756dc4a", |
| "host": "host.foo.com", |
| "date": "Mon, 09 Sep 2011 23:36:00 GMT", |
| "ZOO": "zoobar", |
| }, |
| }, |
| ), |
| # POST request with upper case header value from AWS Python test harness. |
| # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/post-header-value-case.req |
| # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/post-header-value-case.sreq |
| ( |
| "us-east-1", |
| "2011-09-09T23:36:00Z", |
| { |
| "access_key_id": "AKIDEXAMPLE", |
| "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY", |
| }, |
| { |
| "method": "POST", |
| "url": "https://host.foo.com/", |
| "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT", "zoo": "ZOOBAR"}, |
| }, |
| { |
| "url": "https://host.foo.com/", |
| "method": "POST", |
| "headers": { |
| "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host;zoo, Signature=273313af9d0c265c531e11db70bbd653f3ba074c1009239e8559d3987039cad7", |
| "host": "host.foo.com", |
| "date": "Mon, 09 Sep 2011 23:36:00 GMT", |
| "zoo": "ZOOBAR", |
| }, |
| }, |
| ), |
| # POST request with header and no body (AWS botocore tests). |
| # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-header-value-trim.req |
| # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-header-value-trim.sreq |
| ( |
| "us-east-1", |
| "2011-09-09T23:36:00Z", |
| { |
| "access_key_id": "AKIDEXAMPLE", |
| "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY", |
| }, |
| { |
| "method": "POST", |
| "url": "https://host.foo.com/", |
| "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT", "p": "phfft"}, |
| }, |
| { |
| "url": "https://host.foo.com/", |
| "method": "POST", |
| "headers": { |
| "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host;p, Signature=debf546796015d6f6ded8626f5ce98597c33b47b9164cf6b17b4642036fcb592", |
| "host": "host.foo.com", |
| "date": "Mon, 09 Sep 2011 23:36:00 GMT", |
| "p": "phfft", |
| }, |
| }, |
| ), |
| # POST request with body and no header (AWS botocore tests). |
| # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/post-x-www-form-urlencoded.req |
| # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/post-x-www-form-urlencoded.sreq |
| ( |
| "us-east-1", |
| "2011-09-09T23:36:00Z", |
| { |
| "access_key_id": "AKIDEXAMPLE", |
| "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY", |
| }, |
| { |
| "method": "POST", |
| "url": "https://host.foo.com/", |
| "headers": { |
| "Content-Type": "application/x-www-form-urlencoded", |
| "date": "Mon, 09 Sep 2011 23:36:00 GMT", |
| }, |
| "data": "foo=bar", |
| }, |
| { |
| "url": "https://host.foo.com/", |
| "method": "POST", |
| "headers": { |
| "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=content-type;date;host, Signature=5a15b22cf462f047318703b92e6f4f38884e4a7ab7b1d6426ca46a8bd1c26cbc", |
| "host": "host.foo.com", |
| "Content-Type": "application/x-www-form-urlencoded", |
| "date": "Mon, 09 Sep 2011 23:36:00 GMT", |
| }, |
| "data": "foo=bar", |
| }, |
| ), |
| # POST request with querystring (AWS botocore tests). |
| # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/post-vanilla-query.req |
| # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/post-vanilla-query.sreq |
| ( |
| "us-east-1", |
| "2011-09-09T23:36:00Z", |
| { |
| "access_key_id": "AKIDEXAMPLE", |
| "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY", |
| }, |
| { |
| "method": "POST", |
| "url": "https://host.foo.com/?foo=bar", |
| "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT"}, |
| }, |
| { |
| "url": "https://host.foo.com/?foo=bar", |
| "method": "POST", |
| "headers": { |
| "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host, Signature=b6e3b79003ce0743a491606ba1035a804593b0efb1e20a11cba83f8c25a57a92", |
| "host": "host.foo.com", |
| "date": "Mon, 09 Sep 2011 23:36:00 GMT", |
| }, |
| }, |
| ), |
| # GET request with session token credentials. |
| ( |
| "us-east-2", |
| "2020-08-11T06:55:22Z", |
| { |
| "access_key_id": ACCESS_KEY_ID, |
| "secret_access_key": SECRET_ACCESS_KEY, |
| "security_token": TOKEN, |
| }, |
| { |
| "method": "GET", |
| "url": "https://ec2.us-east-2.amazonaws.com?Action=DescribeRegions&Version=2013-10-15", |
| }, |
| { |
| "url": "https://ec2.us-east-2.amazonaws.com?Action=DescribeRegions&Version=2013-10-15", |
| "method": "GET", |
| "headers": { |
| "Authorization": "AWS4-HMAC-SHA256 Credential=" |
| + ACCESS_KEY_ID |
| + "/20200811/us-east-2/ec2/aws4_request, SignedHeaders=host;x-amz-date;x-amz-security-token, Signature=631ea80cddfaa545fdadb120dc92c9f18166e38a5c47b50fab9fce476e022855", |
| "host": "ec2.us-east-2.amazonaws.com", |
| "x-amz-date": "20200811T065522Z", |
| "x-amz-security-token": TOKEN, |
| }, |
| }, |
| ), |
| # POST request with session token credentials. |
| ( |
| "us-east-2", |
| "2020-08-11T06:55:22Z", |
| { |
| "access_key_id": ACCESS_KEY_ID, |
| "secret_access_key": SECRET_ACCESS_KEY, |
| "security_token": TOKEN, |
| }, |
| { |
| "method": "POST", |
| "url": "https://sts.us-east-2.amazonaws.com?Action=GetCallerIdentity&Version=2011-06-15", |
| }, |
| { |
| "url": "https://sts.us-east-2.amazonaws.com?Action=GetCallerIdentity&Version=2011-06-15", |
| "method": "POST", |
| "headers": { |
| "Authorization": "AWS4-HMAC-SHA256 Credential=" |
| + ACCESS_KEY_ID |
| + "/20200811/us-east-2/sts/aws4_request, SignedHeaders=host;x-amz-date;x-amz-security-token, Signature=73452984e4a880ffdc5c392355733ec3f5ba310d5e0609a89244440cadfe7a7a", |
| "host": "sts.us-east-2.amazonaws.com", |
| "x-amz-date": "20200811T065522Z", |
| "x-amz-security-token": TOKEN, |
| }, |
| }, |
| ), |
| # POST request with computed x-amz-date and no data. |
| ( |
| "us-east-2", |
| "2020-08-11T06:55:22Z", |
| {"access_key_id": ACCESS_KEY_ID, "secret_access_key": SECRET_ACCESS_KEY}, |
| { |
| "method": "POST", |
| "url": "https://sts.us-east-2.amazonaws.com?Action=GetCallerIdentity&Version=2011-06-15", |
| }, |
| { |
| "url": "https://sts.us-east-2.amazonaws.com?Action=GetCallerIdentity&Version=2011-06-15", |
| "method": "POST", |
| "headers": { |
| "Authorization": "AWS4-HMAC-SHA256 Credential=" |
| + ACCESS_KEY_ID |
| + "/20200811/us-east-2/sts/aws4_request, SignedHeaders=host;x-amz-date, Signature=d095ba304919cd0d5570ba8a3787884ee78b860f268ed040ba23831d55536d56", |
| "host": "sts.us-east-2.amazonaws.com", |
| "x-amz-date": "20200811T065522Z", |
| }, |
| }, |
| ), |
| # POST request with session token and additional headers/data. |
| ( |
| "us-east-2", |
| "2020-08-11T06:55:22Z", |
| { |
| "access_key_id": ACCESS_KEY_ID, |
| "secret_access_key": SECRET_ACCESS_KEY, |
| "security_token": TOKEN, |
| }, |
| { |
| "method": "POST", |
| "url": "https://dynamodb.us-east-2.amazonaws.com/", |
| "headers": { |
| "Content-Type": "application/x-amz-json-1.0", |
| "x-amz-target": "DynamoDB_20120810.CreateTable", |
| }, |
| "data": REQUEST_PARAMS, |
| }, |
| { |
| "url": "https://dynamodb.us-east-2.amazonaws.com/", |
| "method": "POST", |
| "headers": { |
| "Authorization": "AWS4-HMAC-SHA256 Credential=" |
| + ACCESS_KEY_ID |
| + "/20200811/us-east-2/dynamodb/aws4_request, SignedHeaders=content-type;host;x-amz-date;x-amz-security-token;x-amz-target, Signature=fdaa5b9cc9c86b80fe61eaf504141c0b3523780349120f2bd8145448456e0385", |
| "host": "dynamodb.us-east-2.amazonaws.com", |
| "x-amz-date": "20200811T065522Z", |
| "Content-Type": "application/x-amz-json-1.0", |
| "x-amz-target": "DynamoDB_20120810.CreateTable", |
| "x-amz-security-token": TOKEN, |
| }, |
| "data": REQUEST_PARAMS, |
| }, |
| ), |
| ] |
| |
| |
| class TestRequestSigner(object): |
| @pytest.mark.parametrize( |
| "region, time, credentials, original_request, signed_request", TEST_FIXTURES |
| ) |
| @mock.patch("google.auth._helpers.utcnow") |
| def test_get_request_options( |
| self, utcnow, region, time, credentials, original_request, signed_request |
| ): |
| utcnow.return_value = datetime.datetime.strptime(time, "%Y-%m-%dT%H:%M:%SZ") |
| request_signer = aws.RequestSigner(region) |
| actual_signed_request = request_signer.get_request_options( |
| credentials, |
| original_request.get("url"), |
| original_request.get("method"), |
| original_request.get("data"), |
| original_request.get("headers"), |
| ) |
| |
| assert actual_signed_request == signed_request |
| |
| def test_get_request_options_with_missing_scheme_url(self): |
| request_signer = aws.RequestSigner("us-east-2") |
| |
| with pytest.raises(ValueError) as excinfo: |
| request_signer.get_request_options( |
| { |
| "access_key_id": ACCESS_KEY_ID, |
| "secret_access_key": SECRET_ACCESS_KEY, |
| }, |
| "invalid", |
| "POST", |
| ) |
| |
| assert excinfo.match(r"Invalid AWS service URL") |
| |
| def test_get_request_options_with_invalid_scheme_url(self): |
| request_signer = aws.RequestSigner("us-east-2") |
| |
| with pytest.raises(ValueError) as excinfo: |
| request_signer.get_request_options( |
| { |
| "access_key_id": ACCESS_KEY_ID, |
| "secret_access_key": SECRET_ACCESS_KEY, |
| }, |
| "http://invalid", |
| "POST", |
| ) |
| |
| assert excinfo.match(r"Invalid AWS service URL") |
| |
| def test_get_request_options_with_missing_hostname_url(self): |
| request_signer = aws.RequestSigner("us-east-2") |
| |
| with pytest.raises(ValueError) as excinfo: |
| request_signer.get_request_options( |
| { |
| "access_key_id": ACCESS_KEY_ID, |
| "secret_access_key": SECRET_ACCESS_KEY, |
| }, |
| "https://", |
| "POST", |
| ) |
| |
| assert excinfo.match(r"Invalid AWS service URL") |
| |
| |
| class TestCredentials(object): |
| AWS_REGION = "us-east-2" |
| AWS_ROLE = "gcp-aws-role" |
| AWS_SECURITY_CREDENTIALS_RESPONSE = { |
| "AccessKeyId": ACCESS_KEY_ID, |
| "SecretAccessKey": SECRET_ACCESS_KEY, |
| "Token": TOKEN, |
| } |
| AWS_SIGNATURE_TIME = "2020-08-11T06:55:22Z" |
| CREDENTIAL_SOURCE = { |
| "environment_id": "aws1", |
| "region_url": REGION_URL, |
| "url": SECURITY_CREDS_URL, |
| "regional_cred_verification_url": CRED_VERIFICATION_URL, |
| } |
| SUCCESS_RESPONSE = { |
| "access_token": "ACCESS_TOKEN", |
| "issued_token_type": "urn:ietf:params:oauth:token-type:access_token", |
| "token_type": "Bearer", |
| "expires_in": 3600, |
| "scope": " ".join(SCOPES), |
| } |
| |
| @classmethod |
| def make_serialized_aws_signed_request( |
| cls, |
| aws_security_credentials, |
| region_name="us-east-2", |
| url="https://sts.us-east-2.amazonaws.com?Action=GetCallerIdentity&Version=2011-06-15", |
| ): |
| """Utility to generate serialize AWS signed requests. |
| This makes it easy to assert generated subject tokens based on the |
| provided AWS security credentials, regions and AWS STS endpoint. |
| """ |
| request_signer = aws.RequestSigner(region_name) |
| signed_request = request_signer.get_request_options( |
| aws_security_credentials, url, "POST" |
| ) |
| reformatted_signed_request = { |
| "url": signed_request.get("url"), |
| "method": signed_request.get("method"), |
| "headers": [ |
| { |
| "key": "Authorization", |
| "value": signed_request.get("headers").get("Authorization"), |
| }, |
| {"key": "host", "value": signed_request.get("headers").get("host")}, |
| { |
| "key": "x-amz-date", |
| "value": signed_request.get("headers").get("x-amz-date"), |
| }, |
| ], |
| } |
| # Include security token if available. |
| if "security_token" in aws_security_credentials: |
| reformatted_signed_request.get("headers").append( |
| { |
| "key": "x-amz-security-token", |
| "value": signed_request.get("headers").get("x-amz-security-token"), |
| } |
| ) |
| # Append x-goog-cloud-target-resource header. |
| reformatted_signed_request.get("headers").append( |
| {"key": "x-goog-cloud-target-resource", "value": AUDIENCE} |
| ), |
| return urllib.parse.quote( |
| json.dumps( |
| reformatted_signed_request, separators=(",", ":"), sort_keys=True |
| ) |
| ) |
| |
| @classmethod |
| def make_mock_request( |
| cls, |
| region_status=None, |
| region_name=None, |
| role_status=None, |
| role_name=None, |
| security_credentials_status=None, |
| security_credentials_data=None, |
| token_status=None, |
| token_data=None, |
| impersonation_status=None, |
| impersonation_data=None, |
| ): |
| """Utility function to generate a mock HTTP request object. |
| This will facilitate testing various edge cases by specify how the |
| various endpoints will respond while generating a Google Access token |
| in an AWS environment. |
| """ |
| responses = [] |
| if region_status: |
| # AWS region request. |
| region_response = mock.create_autospec(transport.Response, instance=True) |
| region_response.status = region_status |
| if region_name: |
| region_response.data = "{}b".format(region_name).encode("utf-8") |
| responses.append(region_response) |
| |
| if role_status: |
| # AWS role name request. |
| role_response = mock.create_autospec(transport.Response, instance=True) |
| role_response.status = role_status |
| if role_name: |
| role_response.data = role_name.encode("utf-8") |
| responses.append(role_response) |
| |
| if security_credentials_status: |
| # AWS security credentials request. |
| security_credentials_response = mock.create_autospec( |
| transport.Response, instance=True |
| ) |
| security_credentials_response.status = security_credentials_status |
| if security_credentials_data: |
| security_credentials_response.data = json.dumps( |
| security_credentials_data |
| ).encode("utf-8") |
| responses.append(security_credentials_response) |
| |
| if token_status: |
| # GCP token exchange request. |
| token_response = mock.create_autospec(transport.Response, instance=True) |
| token_response.status = token_status |
| token_response.data = json.dumps(token_data).encode("utf-8") |
| responses.append(token_response) |
| |
| if impersonation_status: |
| # Service account impersonation request. |
| impersonation_response = mock.create_autospec( |
| transport.Response, instance=True |
| ) |
| impersonation_response.status = impersonation_status |
| impersonation_response.data = json.dumps(impersonation_data).encode("utf-8") |
| responses.append(impersonation_response) |
| |
| request = mock.create_autospec(transport.Request) |
| request.side_effect = responses |
| |
| return request |
| |
| @classmethod |
| def make_credentials( |
| cls, |
| credential_source, |
| client_id=None, |
| client_secret=None, |
| quota_project_id=None, |
| scopes=None, |
| default_scopes=None, |
| service_account_impersonation_url=None, |
| ): |
| return aws.Credentials( |
| audience=AUDIENCE, |
| subject_token_type=SUBJECT_TOKEN_TYPE, |
| token_url=TOKEN_URL, |
| service_account_impersonation_url=service_account_impersonation_url, |
| credential_source=credential_source, |
| client_id=client_id, |
| client_secret=client_secret, |
| quota_project_id=quota_project_id, |
| scopes=scopes, |
| default_scopes=default_scopes, |
| ) |
| |
| @classmethod |
| def assert_aws_metadata_request_kwargs(cls, request_kwargs, url, headers=None): |
| assert request_kwargs["url"] == url |
| # All used AWS metadata server endpoints use GET HTTP method. |
| assert request_kwargs["method"] == "GET" |
| if headers: |
| assert request_kwargs["headers"] == headers |
| else: |
| assert "headers" not in request_kwargs |
| # None of the endpoints used require any data in request. |
| assert "body" not in request_kwargs |
| |
| @classmethod |
| def assert_token_request_kwargs( |
| cls, request_kwargs, headers, request_data, token_url=TOKEN_URL |
| ): |
| assert request_kwargs["url"] == token_url |
| assert request_kwargs["method"] == "POST" |
| assert request_kwargs["headers"] == headers |
| assert request_kwargs["body"] is not None |
| body_tuples = urllib.parse.parse_qsl(request_kwargs["body"]) |
| assert len(body_tuples) == len(request_data.keys()) |
| for (k, v) in body_tuples: |
| assert v.decode("utf-8") == request_data[k.decode("utf-8")] |
| |
| @classmethod |
| def assert_impersonation_request_kwargs( |
| cls, |
| request_kwargs, |
| headers, |
| request_data, |
| service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, |
| ): |
| assert request_kwargs["url"] == service_account_impersonation_url |
| assert request_kwargs["method"] == "POST" |
| assert request_kwargs["headers"] == headers |
| assert request_kwargs["body"] is not None |
| body_json = json.loads(request_kwargs["body"].decode("utf-8")) |
| assert body_json == request_data |
| |
| @mock.patch.object(aws.Credentials, "__init__", return_value=None) |
| def test_from_info_full_options(self, mock_init): |
| credentials = aws.Credentials.from_info( |
| { |
| "audience": AUDIENCE, |
| "subject_token_type": SUBJECT_TOKEN_TYPE, |
| "token_url": TOKEN_URL, |
| "service_account_impersonation_url": SERVICE_ACCOUNT_IMPERSONATION_URL, |
| "client_id": CLIENT_ID, |
| "client_secret": CLIENT_SECRET, |
| "quota_project_id": QUOTA_PROJECT_ID, |
| "credential_source": self.CREDENTIAL_SOURCE, |
| } |
| ) |
| |
| # Confirm aws.Credentials instance initialized with the expected parameters. |
| assert isinstance(credentials, aws.Credentials) |
| mock_init.assert_called_once_with( |
| audience=AUDIENCE, |
| subject_token_type=SUBJECT_TOKEN_TYPE, |
| token_url=TOKEN_URL, |
| service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, |
| client_id=CLIENT_ID, |
| client_secret=CLIENT_SECRET, |
| credential_source=self.CREDENTIAL_SOURCE, |
| quota_project_id=QUOTA_PROJECT_ID, |
| ) |
| |
| @mock.patch.object(aws.Credentials, "__init__", return_value=None) |
| def test_from_info_required_options_only(self, mock_init): |
| credentials = aws.Credentials.from_info( |
| { |
| "audience": AUDIENCE, |
| "subject_token_type": SUBJECT_TOKEN_TYPE, |
| "token_url": TOKEN_URL, |
| "credential_source": self.CREDENTIAL_SOURCE, |
| } |
| ) |
| |
| # Confirm aws.Credentials instance initialized with the expected parameters. |
| assert isinstance(credentials, aws.Credentials) |
| mock_init.assert_called_once_with( |
| audience=AUDIENCE, |
| subject_token_type=SUBJECT_TOKEN_TYPE, |
| token_url=TOKEN_URL, |
| service_account_impersonation_url=None, |
| client_id=None, |
| client_secret=None, |
| credential_source=self.CREDENTIAL_SOURCE, |
| quota_project_id=None, |
| ) |
| |
| @mock.patch.object(aws.Credentials, "__init__", return_value=None) |
| def test_from_file_full_options(self, mock_init, tmpdir): |
| info = { |
| "audience": AUDIENCE, |
| "subject_token_type": SUBJECT_TOKEN_TYPE, |
| "token_url": TOKEN_URL, |
| "service_account_impersonation_url": SERVICE_ACCOUNT_IMPERSONATION_URL, |
| "client_id": CLIENT_ID, |
| "client_secret": CLIENT_SECRET, |
| "quota_project_id": QUOTA_PROJECT_ID, |
| "credential_source": self.CREDENTIAL_SOURCE, |
| } |
| config_file = tmpdir.join("config.json") |
| config_file.write(json.dumps(info)) |
| credentials = aws.Credentials.from_file(str(config_file)) |
| |
| # Confirm aws.Credentials instance initialized with the expected parameters. |
| assert isinstance(credentials, aws.Credentials) |
| mock_init.assert_called_once_with( |
| audience=AUDIENCE, |
| subject_token_type=SUBJECT_TOKEN_TYPE, |
| token_url=TOKEN_URL, |
| service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, |
| client_id=CLIENT_ID, |
| client_secret=CLIENT_SECRET, |
| credential_source=self.CREDENTIAL_SOURCE, |
| quota_project_id=QUOTA_PROJECT_ID, |
| ) |
| |
| @mock.patch.object(aws.Credentials, "__init__", return_value=None) |
| def test_from_file_required_options_only(self, mock_init, tmpdir): |
| info = { |
| "audience": AUDIENCE, |
| "subject_token_type": SUBJECT_TOKEN_TYPE, |
| "token_url": TOKEN_URL, |
| "credential_source": self.CREDENTIAL_SOURCE, |
| } |
| config_file = tmpdir.join("config.json") |
| config_file.write(json.dumps(info)) |
| credentials = aws.Credentials.from_file(str(config_file)) |
| |
| # Confirm aws.Credentials instance initialized with the expected parameters. |
| assert isinstance(credentials, aws.Credentials) |
| mock_init.assert_called_once_with( |
| audience=AUDIENCE, |
| subject_token_type=SUBJECT_TOKEN_TYPE, |
| token_url=TOKEN_URL, |
| service_account_impersonation_url=None, |
| client_id=None, |
| client_secret=None, |
| credential_source=self.CREDENTIAL_SOURCE, |
| quota_project_id=None, |
| ) |
| |
| def test_constructor_invalid_credential_source(self): |
| # Provide invalid credential source. |
| credential_source = {"unsupported": "value"} |
| |
| with pytest.raises(ValueError) as excinfo: |
| self.make_credentials(credential_source=credential_source) |
| |
| assert excinfo.match(r"No valid AWS 'credential_source' provided") |
| |
| def test_constructor_invalid_environment_id(self): |
| # Provide invalid environment_id. |
| credential_source = self.CREDENTIAL_SOURCE.copy() |
| credential_source["environment_id"] = "azure1" |
| |
| with pytest.raises(ValueError) as excinfo: |
| self.make_credentials(credential_source=credential_source) |
| |
| assert excinfo.match(r"No valid AWS 'credential_source' provided") |
| |
| def test_constructor_missing_cred_verification_url(self): |
| # regional_cred_verification_url is a required field. |
| credential_source = self.CREDENTIAL_SOURCE.copy() |
| credential_source.pop("regional_cred_verification_url") |
| |
| with pytest.raises(ValueError) as excinfo: |
| self.make_credentials(credential_source=credential_source) |
| |
| assert excinfo.match(r"No valid AWS 'credential_source' provided") |
| |
| def test_constructor_invalid_environment_id_version(self): |
| # Provide an unsupported version. |
| credential_source = self.CREDENTIAL_SOURCE.copy() |
| credential_source["environment_id"] = "aws3" |
| |
| with pytest.raises(ValueError) as excinfo: |
| self.make_credentials(credential_source=credential_source) |
| |
| assert excinfo.match(r"aws version '3' is not supported in the current build.") |
| |
| def test_info(self): |
| credentials = self.make_credentials( |
| credential_source=self.CREDENTIAL_SOURCE.copy() |
| ) |
| |
| assert credentials.info == { |
| "type": "external_account", |
| "audience": AUDIENCE, |
| "subject_token_type": SUBJECT_TOKEN_TYPE, |
| "token_url": TOKEN_URL, |
| "credential_source": self.CREDENTIAL_SOURCE, |
| } |
| |
| def test_retrieve_subject_token_missing_region_url(self): |
| # When AWS_REGION envvar is not available, region_url is required for |
| # determining the current AWS region. |
| credential_source = self.CREDENTIAL_SOURCE.copy() |
| credential_source.pop("region_url") |
| credentials = self.make_credentials(credential_source=credential_source) |
| |
| with pytest.raises(exceptions.RefreshError) as excinfo: |
| credentials.retrieve_subject_token(None) |
| |
| assert excinfo.match(r"Unable to determine AWS region") |
| |
| @mock.patch("google.auth._helpers.utcnow") |
| def test_retrieve_subject_token_success_temp_creds_no_environment_vars( |
| self, utcnow |
| ): |
| utcnow.return_value = datetime.datetime.strptime( |
| self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ" |
| ) |
| request = self.make_mock_request( |
| region_status=http_client.OK, |
| region_name=self.AWS_REGION, |
| role_status=http_client.OK, |
| role_name=self.AWS_ROLE, |
| security_credentials_status=http_client.OK, |
| security_credentials_data=self.AWS_SECURITY_CREDENTIALS_RESPONSE, |
| ) |
| credentials = self.make_credentials(credential_source=self.CREDENTIAL_SOURCE) |
| |
| subject_token = credentials.retrieve_subject_token(request) |
| |
| assert subject_token == self.make_serialized_aws_signed_request( |
| { |
| "access_key_id": ACCESS_KEY_ID, |
| "secret_access_key": SECRET_ACCESS_KEY, |
| "security_token": TOKEN, |
| } |
| ) |
| # Assert region request. |
| self.assert_aws_metadata_request_kwargs( |
| request.call_args_list[0][1], REGION_URL |
| ) |
| # Assert role request. |
| self.assert_aws_metadata_request_kwargs( |
| request.call_args_list[1][1], SECURITY_CREDS_URL |
| ) |
| # Assert security credentials request. |
| self.assert_aws_metadata_request_kwargs( |
| request.call_args_list[2][1], |
| "{}/{}".format(SECURITY_CREDS_URL, self.AWS_ROLE), |
| {"Content-Type": "application/json"}, |
| ) |
| |
| # Retrieve subject_token again. Region should not be queried again. |
| new_request = self.make_mock_request( |
| role_status=http_client.OK, |
| role_name=self.AWS_ROLE, |
| security_credentials_status=http_client.OK, |
| security_credentials_data=self.AWS_SECURITY_CREDENTIALS_RESPONSE, |
| ) |
| |
| credentials.retrieve_subject_token(new_request) |
| |
| # Only 2 requests should be sent as the region is cached. |
| assert len(new_request.call_args_list) == 2 |
| # Assert role request. |
| self.assert_aws_metadata_request_kwargs( |
| new_request.call_args_list[0][1], SECURITY_CREDS_URL |
| ) |
| # Assert security credentials request. |
| self.assert_aws_metadata_request_kwargs( |
| new_request.call_args_list[1][1], |
| "{}/{}".format(SECURITY_CREDS_URL, self.AWS_ROLE), |
| {"Content-Type": "application/json"}, |
| ) |
| |
| @mock.patch("google.auth._helpers.utcnow") |
| def test_retrieve_subject_token_success_permanent_creds_no_environment_vars( |
| self, utcnow |
| ): |
| # Simualte a permanent credential without a session token is |
| # returned by the security-credentials endpoint. |
| security_creds_response = self.AWS_SECURITY_CREDENTIALS_RESPONSE.copy() |
| security_creds_response.pop("Token") |
| utcnow.return_value = datetime.datetime.strptime( |
| self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ" |
| ) |
| request = self.make_mock_request( |
| region_status=http_client.OK, |
| region_name=self.AWS_REGION, |
| role_status=http_client.OK, |
| role_name=self.AWS_ROLE, |
| security_credentials_status=http_client.OK, |
| security_credentials_data=security_creds_response, |
| ) |
| credentials = self.make_credentials(credential_source=self.CREDENTIAL_SOURCE) |
| |
| subject_token = credentials.retrieve_subject_token(request) |
| |
| assert subject_token == self.make_serialized_aws_signed_request( |
| {"access_key_id": ACCESS_KEY_ID, "secret_access_key": SECRET_ACCESS_KEY} |
| ) |
| |
| @mock.patch("google.auth._helpers.utcnow") |
| def test_retrieve_subject_token_success_environment_vars(self, utcnow, monkeypatch): |
| monkeypatch.setenv(environment_vars.AWS_ACCESS_KEY_ID, ACCESS_KEY_ID) |
| monkeypatch.setenv(environment_vars.AWS_SECRET_ACCESS_KEY, SECRET_ACCESS_KEY) |
| monkeypatch.setenv(environment_vars.AWS_SESSION_TOKEN, TOKEN) |
| monkeypatch.setenv(environment_vars.AWS_REGION, self.AWS_REGION) |
| utcnow.return_value = datetime.datetime.strptime( |
| self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ" |
| ) |
| credentials = self.make_credentials(credential_source=self.CREDENTIAL_SOURCE) |
| |
| subject_token = credentials.retrieve_subject_token(None) |
| |
| assert subject_token == self.make_serialized_aws_signed_request( |
| { |
| "access_key_id": ACCESS_KEY_ID, |
| "secret_access_key": SECRET_ACCESS_KEY, |
| "security_token": TOKEN, |
| } |
| ) |
| |
| @mock.patch("google.auth._helpers.utcnow") |
| def test_retrieve_subject_token_success_environment_vars_with_default_region( |
| self, utcnow, monkeypatch |
| ): |
| monkeypatch.setenv(environment_vars.AWS_ACCESS_KEY_ID, ACCESS_KEY_ID) |
| monkeypatch.setenv(environment_vars.AWS_SECRET_ACCESS_KEY, SECRET_ACCESS_KEY) |
| monkeypatch.setenv(environment_vars.AWS_SESSION_TOKEN, TOKEN) |
| monkeypatch.setenv(environment_vars.AWS_DEFAULT_REGION, self.AWS_REGION) |
| utcnow.return_value = datetime.datetime.strptime( |
| self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ" |
| ) |
| credentials = self.make_credentials(credential_source=self.CREDENTIAL_SOURCE) |
| |
| subject_token = credentials.retrieve_subject_token(None) |
| |
| assert subject_token == self.make_serialized_aws_signed_request( |
| { |
| "access_key_id": ACCESS_KEY_ID, |
| "secret_access_key": SECRET_ACCESS_KEY, |
| "security_token": TOKEN, |
| } |
| ) |
| |
| @mock.patch("google.auth._helpers.utcnow") |
| def test_retrieve_subject_token_success_environment_vars_with_both_regions_set( |
| self, utcnow, monkeypatch |
| ): |
| monkeypatch.setenv(environment_vars.AWS_ACCESS_KEY_ID, ACCESS_KEY_ID) |
| monkeypatch.setenv(environment_vars.AWS_SECRET_ACCESS_KEY, SECRET_ACCESS_KEY) |
| monkeypatch.setenv(environment_vars.AWS_SESSION_TOKEN, TOKEN) |
| monkeypatch.setenv(environment_vars.AWS_DEFAULT_REGION, "Malformed AWS Region") |
| # This test makes sure that the AWS_REGION gets used over AWS_DEFAULT_REGION, |
| # So, AWS_DEFAULT_REGION is set to something that would cause the test to fail, |
| # And AWS_REGION is set to the a valid value, and it should succeed |
| monkeypatch.setenv(environment_vars.AWS_REGION, self.AWS_REGION) |
| utcnow.return_value = datetime.datetime.strptime( |
| self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ" |
| ) |
| credentials = self.make_credentials(credential_source=self.CREDENTIAL_SOURCE) |
| |
| subject_token = credentials.retrieve_subject_token(None) |
| |
| assert subject_token == self.make_serialized_aws_signed_request( |
| { |
| "access_key_id": ACCESS_KEY_ID, |
| "secret_access_key": SECRET_ACCESS_KEY, |
| "security_token": TOKEN, |
| } |
| ) |
| |
| @mock.patch("google.auth._helpers.utcnow") |
| def test_retrieve_subject_token_success_environment_vars_no_session_token( |
| self, utcnow, monkeypatch |
| ): |
| monkeypatch.setenv(environment_vars.AWS_ACCESS_KEY_ID, ACCESS_KEY_ID) |
| monkeypatch.setenv(environment_vars.AWS_SECRET_ACCESS_KEY, SECRET_ACCESS_KEY) |
| monkeypatch.setenv(environment_vars.AWS_REGION, self.AWS_REGION) |
| utcnow.return_value = datetime.datetime.strptime( |
| self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ" |
| ) |
| credentials = self.make_credentials(credential_source=self.CREDENTIAL_SOURCE) |
| |
| subject_token = credentials.retrieve_subject_token(None) |
| |
| assert subject_token == self.make_serialized_aws_signed_request( |
| {"access_key_id": ACCESS_KEY_ID, "secret_access_key": SECRET_ACCESS_KEY} |
| ) |
| |
| @mock.patch("google.auth._helpers.utcnow") |
| def test_retrieve_subject_token_success_environment_vars_except_region( |
| self, utcnow, monkeypatch |
| ): |
| monkeypatch.setenv(environment_vars.AWS_ACCESS_KEY_ID, ACCESS_KEY_ID) |
| monkeypatch.setenv(environment_vars.AWS_SECRET_ACCESS_KEY, SECRET_ACCESS_KEY) |
| monkeypatch.setenv(environment_vars.AWS_SESSION_TOKEN, TOKEN) |
| utcnow.return_value = datetime.datetime.strptime( |
| self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ" |
| ) |
| # Region will be queried since it is not found in envvars. |
| request = self.make_mock_request( |
| region_status=http_client.OK, region_name=self.AWS_REGION |
| ) |
| credentials = self.make_credentials(credential_source=self.CREDENTIAL_SOURCE) |
| |
| subject_token = credentials.retrieve_subject_token(request) |
| |
| assert subject_token == self.make_serialized_aws_signed_request( |
| { |
| "access_key_id": ACCESS_KEY_ID, |
| "secret_access_key": SECRET_ACCESS_KEY, |
| "security_token": TOKEN, |
| } |
| ) |
| |
| def test_retrieve_subject_token_error_determining_aws_region(self): |
| # Simulate error in retrieving the AWS region. |
| request = self.make_mock_request(region_status=http_client.BAD_REQUEST) |
| credentials = self.make_credentials(credential_source=self.CREDENTIAL_SOURCE) |
| |
| with pytest.raises(exceptions.RefreshError) as excinfo: |
| credentials.retrieve_subject_token(request) |
| |
| assert excinfo.match(r"Unable to retrieve AWS region") |
| |
| def test_retrieve_subject_token_error_determining_aws_role(self): |
| # Simulate error in retrieving the AWS role name. |
| request = self.make_mock_request( |
| region_status=http_client.OK, |
| region_name=self.AWS_REGION, |
| role_status=http_client.BAD_REQUEST, |
| ) |
| credentials = self.make_credentials(credential_source=self.CREDENTIAL_SOURCE) |
| |
| with pytest.raises(exceptions.RefreshError) as excinfo: |
| credentials.retrieve_subject_token(request) |
| |
| assert excinfo.match(r"Unable to retrieve AWS role name") |
| |
| def test_retrieve_subject_token_error_determining_security_creds_url(self): |
| # Simulate the security-credentials url is missing. This is needed for |
| # determining the AWS security credentials when not found in envvars. |
| credential_source = self.CREDENTIAL_SOURCE.copy() |
| credential_source.pop("url") |
| request = self.make_mock_request( |
| region_status=http_client.OK, region_name=self.AWS_REGION |
| ) |
| credentials = self.make_credentials(credential_source=credential_source) |
| |
| with pytest.raises(exceptions.RefreshError) as excinfo: |
| credentials.retrieve_subject_token(request) |
| |
| assert excinfo.match( |
| r"Unable to determine the AWS metadata server security credentials endpoint" |
| ) |
| |
| def test_retrieve_subject_token_error_determining_aws_security_creds(self): |
| # Simulate error in retrieving the AWS security credentials. |
| request = self.make_mock_request( |
| region_status=http_client.OK, |
| region_name=self.AWS_REGION, |
| role_status=http_client.OK, |
| role_name=self.AWS_ROLE, |
| security_credentials_status=http_client.BAD_REQUEST, |
| ) |
| credentials = self.make_credentials(credential_source=self.CREDENTIAL_SOURCE) |
| |
| with pytest.raises(exceptions.RefreshError) as excinfo: |
| credentials.retrieve_subject_token(request) |
| |
| assert excinfo.match(r"Unable to retrieve AWS security credentials") |
| |
| @mock.patch("google.auth._helpers.utcnow") |
| def test_refresh_success_without_impersonation_ignore_default_scopes(self, utcnow): |
| utcnow.return_value = datetime.datetime.strptime( |
| self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ" |
| ) |
| expected_subject_token = self.make_serialized_aws_signed_request( |
| { |
| "access_key_id": ACCESS_KEY_ID, |
| "secret_access_key": SECRET_ACCESS_KEY, |
| "security_token": TOKEN, |
| } |
| ) |
| token_headers = { |
| "Content-Type": "application/x-www-form-urlencoded", |
| "Authorization": "Basic " + BASIC_AUTH_ENCODING, |
| } |
| token_request_data = { |
| "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange", |
| "audience": AUDIENCE, |
| "requested_token_type": "urn:ietf:params:oauth:token-type:access_token", |
| "scope": " ".join(SCOPES), |
| "subject_token": expected_subject_token, |
| "subject_token_type": SUBJECT_TOKEN_TYPE, |
| } |
| request = self.make_mock_request( |
| region_status=http_client.OK, |
| region_name=self.AWS_REGION, |
| role_status=http_client.OK, |
| role_name=self.AWS_ROLE, |
| security_credentials_status=http_client.OK, |
| security_credentials_data=self.AWS_SECURITY_CREDENTIALS_RESPONSE, |
| token_status=http_client.OK, |
| token_data=self.SUCCESS_RESPONSE, |
| ) |
| credentials = self.make_credentials( |
| client_id=CLIENT_ID, |
| client_secret=CLIENT_SECRET, |
| credential_source=self.CREDENTIAL_SOURCE, |
| quota_project_id=QUOTA_PROJECT_ID, |
| scopes=SCOPES, |
| # Default scopes should be ignored. |
| default_scopes=["ignored"], |
| ) |
| |
| credentials.refresh(request) |
| |
| assert len(request.call_args_list) == 4 |
| # Fourth request should be sent to GCP STS endpoint. |
| self.assert_token_request_kwargs( |
| request.call_args_list[3][1], token_headers, token_request_data |
| ) |
| assert credentials.token == self.SUCCESS_RESPONSE["access_token"] |
| assert credentials.quota_project_id == QUOTA_PROJECT_ID |
| assert credentials.scopes == SCOPES |
| assert credentials.default_scopes == ["ignored"] |
| |
| @mock.patch("google.auth._helpers.utcnow") |
| def test_refresh_success_without_impersonation_use_default_scopes(self, utcnow): |
| utcnow.return_value = datetime.datetime.strptime( |
| self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ" |
| ) |
| expected_subject_token = self.make_serialized_aws_signed_request( |
| { |
| "access_key_id": ACCESS_KEY_ID, |
| "secret_access_key": SECRET_ACCESS_KEY, |
| "security_token": TOKEN, |
| } |
| ) |
| token_headers = { |
| "Content-Type": "application/x-www-form-urlencoded", |
| "Authorization": "Basic " + BASIC_AUTH_ENCODING, |
| } |
| token_request_data = { |
| "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange", |
| "audience": AUDIENCE, |
| "requested_token_type": "urn:ietf:params:oauth:token-type:access_token", |
| "scope": " ".join(SCOPES), |
| "subject_token": expected_subject_token, |
| "subject_token_type": SUBJECT_TOKEN_TYPE, |
| } |
| request = self.make_mock_request( |
| region_status=http_client.OK, |
| region_name=self.AWS_REGION, |
| role_status=http_client.OK, |
| role_name=self.AWS_ROLE, |
| security_credentials_status=http_client.OK, |
| security_credentials_data=self.AWS_SECURITY_CREDENTIALS_RESPONSE, |
| token_status=http_client.OK, |
| token_data=self.SUCCESS_RESPONSE, |
| ) |
| credentials = self.make_credentials( |
| client_id=CLIENT_ID, |
| client_secret=CLIENT_SECRET, |
| credential_source=self.CREDENTIAL_SOURCE, |
| quota_project_id=QUOTA_PROJECT_ID, |
| scopes=None, |
| # Default scopes should be used since user specified scopes are none. |
| default_scopes=SCOPES, |
| ) |
| |
| credentials.refresh(request) |
| |
| assert len(request.call_args_list) == 4 |
| # Fourth request should be sent to GCP STS endpoint. |
| self.assert_token_request_kwargs( |
| request.call_args_list[3][1], token_headers, token_request_data |
| ) |
| assert credentials.token == self.SUCCESS_RESPONSE["access_token"] |
| assert credentials.quota_project_id == QUOTA_PROJECT_ID |
| assert credentials.scopes is None |
| assert credentials.default_scopes == SCOPES |
| |
| @mock.patch("google.auth._helpers.utcnow") |
| def test_refresh_success_with_impersonation_ignore_default_scopes(self, utcnow): |
| utcnow.return_value = datetime.datetime.strptime( |
| self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ" |
| ) |
| expire_time = ( |
| _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=3600) |
| ).isoformat("T") + "Z" |
| expected_subject_token = self.make_serialized_aws_signed_request( |
| { |
| "access_key_id": ACCESS_KEY_ID, |
| "secret_access_key": SECRET_ACCESS_KEY, |
| "security_token": TOKEN, |
| } |
| ) |
| token_headers = { |
| "Content-Type": "application/x-www-form-urlencoded", |
| "Authorization": "Basic " + BASIC_AUTH_ENCODING, |
| } |
| token_request_data = { |
| "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange", |
| "audience": AUDIENCE, |
| "requested_token_type": "urn:ietf:params:oauth:token-type:access_token", |
| "scope": "https://www.googleapis.com/auth/iam", |
| "subject_token": expected_subject_token, |
| "subject_token_type": SUBJECT_TOKEN_TYPE, |
| } |
| # Service account impersonation request/response. |
| impersonation_response = { |
| "accessToken": "SA_ACCESS_TOKEN", |
| "expireTime": expire_time, |
| } |
| impersonation_headers = { |
| "Content-Type": "application/json", |
| "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]), |
| "x-goog-user-project": QUOTA_PROJECT_ID, |
| } |
| impersonation_request_data = { |
| "delegates": None, |
| "scope": SCOPES, |
| "lifetime": "3600s", |
| } |
| request = self.make_mock_request( |
| region_status=http_client.OK, |
| region_name=self.AWS_REGION, |
| role_status=http_client.OK, |
| role_name=self.AWS_ROLE, |
| security_credentials_status=http_client.OK, |
| security_credentials_data=self.AWS_SECURITY_CREDENTIALS_RESPONSE, |
| token_status=http_client.OK, |
| token_data=self.SUCCESS_RESPONSE, |
| impersonation_status=http_client.OK, |
| impersonation_data=impersonation_response, |
| ) |
| credentials = self.make_credentials( |
| client_id=CLIENT_ID, |
| client_secret=CLIENT_SECRET, |
| credential_source=self.CREDENTIAL_SOURCE, |
| service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, |
| quota_project_id=QUOTA_PROJECT_ID, |
| scopes=SCOPES, |
| # Default scopes should be ignored. |
| default_scopes=["ignored"], |
| ) |
| |
| credentials.refresh(request) |
| |
| assert len(request.call_args_list) == 5 |
| # Fourth request should be sent to GCP STS endpoint. |
| self.assert_token_request_kwargs( |
| request.call_args_list[3][1], token_headers, token_request_data |
| ) |
| # Fifth request should be sent to iamcredentials endpoint for service |
| # account impersonation. |
| self.assert_impersonation_request_kwargs( |
| request.call_args_list[4][1], |
| impersonation_headers, |
| impersonation_request_data, |
| ) |
| assert credentials.token == impersonation_response["accessToken"] |
| assert credentials.quota_project_id == QUOTA_PROJECT_ID |
| assert credentials.scopes == SCOPES |
| assert credentials.default_scopes == ["ignored"] |
| |
| @mock.patch("google.auth._helpers.utcnow") |
| def test_refresh_success_with_impersonation_use_default_scopes(self, utcnow): |
| utcnow.return_value = datetime.datetime.strptime( |
| self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ" |
| ) |
| expire_time = ( |
| _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=3600) |
| ).isoformat("T") + "Z" |
| expected_subject_token = self.make_serialized_aws_signed_request( |
| { |
| "access_key_id": ACCESS_KEY_ID, |
| "secret_access_key": SECRET_ACCESS_KEY, |
| "security_token": TOKEN, |
| } |
| ) |
| token_headers = { |
| "Content-Type": "application/x-www-form-urlencoded", |
| "Authorization": "Basic " + BASIC_AUTH_ENCODING, |
| } |
| token_request_data = { |
| "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange", |
| "audience": AUDIENCE, |
| "requested_token_type": "urn:ietf:params:oauth:token-type:access_token", |
| "scope": "https://www.googleapis.com/auth/iam", |
| "subject_token": expected_subject_token, |
| "subject_token_type": SUBJECT_TOKEN_TYPE, |
| } |
| # Service account impersonation request/response. |
| impersonation_response = { |
| "accessToken": "SA_ACCESS_TOKEN", |
| "expireTime": expire_time, |
| } |
| impersonation_headers = { |
| "Content-Type": "application/json", |
| "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]), |
| "x-goog-user-project": QUOTA_PROJECT_ID, |
| } |
| impersonation_request_data = { |
| "delegates": None, |
| "scope": SCOPES, |
| "lifetime": "3600s", |
| } |
| request = self.make_mock_request( |
| region_status=http_client.OK, |
| region_name=self.AWS_REGION, |
| role_status=http_client.OK, |
| role_name=self.AWS_ROLE, |
| security_credentials_status=http_client.OK, |
| security_credentials_data=self.AWS_SECURITY_CREDENTIALS_RESPONSE, |
| token_status=http_client.OK, |
| token_data=self.SUCCESS_RESPONSE, |
| impersonation_status=http_client.OK, |
| impersonation_data=impersonation_response, |
| ) |
| credentials = self.make_credentials( |
| client_id=CLIENT_ID, |
| client_secret=CLIENT_SECRET, |
| credential_source=self.CREDENTIAL_SOURCE, |
| service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, |
| quota_project_id=QUOTA_PROJECT_ID, |
| scopes=None, |
| # Default scopes should be used since user specified scopes are none. |
| default_scopes=SCOPES, |
| ) |
| |
| credentials.refresh(request) |
| |
| assert len(request.call_args_list) == 5 |
| # Fourth request should be sent to GCP STS endpoint. |
| self.assert_token_request_kwargs( |
| request.call_args_list[3][1], token_headers, token_request_data |
| ) |
| # Fifth request should be sent to iamcredentials endpoint for service |
| # account impersonation. |
| self.assert_impersonation_request_kwargs( |
| request.call_args_list[4][1], |
| impersonation_headers, |
| impersonation_request_data, |
| ) |
| assert credentials.token == impersonation_response["accessToken"] |
| assert credentials.quota_project_id == QUOTA_PROJECT_ID |
| assert credentials.scopes is None |
| assert credentials.default_scopes == SCOPES |
| |
| def test_refresh_with_retrieve_subject_token_error(self): |
| request = self.make_mock_request(region_status=http_client.BAD_REQUEST) |
| credentials = self.make_credentials(credential_source=self.CREDENTIAL_SOURCE) |
| |
| with pytest.raises(exceptions.RefreshError) as excinfo: |
| credentials.refresh(request) |
| |
| assert excinfo.match(r"Unable to retrieve AWS region") |