test: Create AWS-based external account integration tests (#731)

diff --git a/scripts/setup_external_accounts.sh b/scripts/setup_external_accounts.sh
index 2fd04e2..ecc879b 100644
--- a/scripts/setup_external_accounts.sh
+++ b/scripts/setup_external_accounts.sh
@@ -110,3 +110,4 @@
 
 echo "OIDC audience: "$oidc_aud
 echo "AWS audience: "$aws_aud
+echo "AWS role: arn:aws:iam::$aws_account_id:role/$aws_role_name"
diff --git a/system_tests/system_tests_sync/conftest.py b/system_tests/system_tests_sync/conftest.py
index 37a6fd3..16caa65 100644
--- a/system_tests/system_tests_sync/conftest.py
+++ b/system_tests/system_tests_sync/conftest.py
@@ -54,15 +54,41 @@
 
 
 @pytest.fixture(params=["urllib3", "requests"])
-def http_request(request):
+def request_type(request):
+    yield request.param
+
+
[email protected]
+def http_request(request_type):
     """A transport.request object."""
-    if request.param == "urllib3":
+    if request_type == "urllib3":
         yield google.auth.transport.urllib3.Request(URLLIB3_HTTP)
-    elif request.param == "requests":
+    elif request_type == "requests":
         yield google.auth.transport.requests.Request(REQUESTS_SESSION)
 
 
 @pytest.fixture
+def authenticated_request(request_type):
+    """A transport.request object that takes credentials"""
+    if request_type == "urllib3":
+
+        def wrapper(credentials):
+            return google.auth.transport.urllib3.AuthorizedHttp(
+                credentials, http=URLLIB3_HTTP
+            ).request
+
+        yield wrapper
+    elif request_type == "requests":
+
+        def wrapper(credentials):
+            session = google.auth.transport.requests.AuthorizedSession(credentials)
+            session.verify = False
+            return google.auth.transport.requests.Request(session)
+
+        yield wrapper
+
+
[email protected]
 def token_info(http_request):
     """Returns a function that obtains OAuth2 token info."""
 
diff --git a/system_tests/system_tests_sync/test_external_accounts.py b/system_tests/system_tests_sync/test_external_accounts.py
index db6f281..e24c7b4 100644
--- a/system_tests/system_tests_sync/test_external_accounts.py
+++ b/system_tests/system_tests_sync/test_external_accounts.py
@@ -48,6 +48,8 @@
 
 # Populate values from the output of scripts/setup_external_accounts.sh.
 _AUDIENCE_OIDC = "//iam.googleapis.com/projects/79992041559/locations/global/workloadIdentityPools/pool-73wslmxn/providers/oidc-73wslmxn"
+_AUDIENCE_AWS = "//iam.googleapis.com/projects/79992041559/locations/global/workloadIdentityPools/pool-73wslmxn/providers/aws-73wslmxn"
+_ROLE_AWS = "arn:aws:iam::077071391996:role/ci-python-test"
 
 
 def dns_access_direct(request, project_id):
@@ -100,6 +102,27 @@
         yield json.load(f)
 
 
[email protected]
+def aws_oidc_credentials(
+    service_account_file, service_account_info, authenticated_request
+):
+    credentials = service_account.Credentials.from_service_account_file(
+        service_account_file, scopes=["https://www.googleapis.com/auth/cloud-platform"]
+    )
+    result = authenticated_request(credentials)(
+        url="https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/{}:generateIdToken".format(
+            service_account_info["client_email"]
+        ),
+        method="POST",
+        body=json.dumps(
+            {"audience": service_account_info["client_id"], "includeEmail": True}
+        ),
+    )
+    assert result.status == 200
+
+    yield json.loads(result.data)["token"]
+
+
 # Our external accounts tests involve setting up some preconditions, setting a
 # credential file, and then making sure that our client libraries can work with
 # the set credentials.
@@ -115,6 +138,14 @@
             return dns_access()
 
 
+def get_xml_value_by_tagname(data, tagname):
+    startIndex = data.index("<{}>".format(tagname))
+    if startIndex >= 0:
+        endIndex = data.index("</{}>".format(tagname), startIndex)
+        if endIndex > startIndex:
+            return data[startIndex + len(tagname) + 2 : endIndex]
+
+
 # This test makes sure that setting an accesible credential file
 # works to allow access to Google resources.
 def test_file_based_external_account(
@@ -211,3 +242,64 @@
                 },
             },
         )
+
+
+# AWS provider tests for AWS credentials
+# The test suite will also run tests for AWS credentials. This works as
+# follows. (Note prequisite setup is needed. This is documented in
+# setup_external_accounts.sh).
+# - iamcredentials:generateIdToken is used to generate a Google ID token using
+#   the service account access token. The service account client_id is used as
+#   audience.
+# - AWS STS AssumeRoleWithWebIdentity API is used to exchange this token for
+#   temporary AWS security credentials for a specified AWS ARN role.
+# - AWS_REGION, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY and AWS_SESSION_TOKEN
+#   environment variables are set using these credentials before the test is
+#   run simulating an AWS VM.
+# - The test can now be run.
+def test_aws_based_external_account(
+    aws_oidc_credentials, service_account_info, dns_access, http_request
+):
+
+    response = http_request(
+        url=(
+            "https://sts.amazonaws.com/"
+            "?Action=AssumeRoleWithWebIdentity"
+            "&Version=2011-06-15"
+            "&DurationSeconds=3600"
+            "&RoleSessionName=python-test"
+            "&RoleArn={}"
+            "&WebIdentityToken={}"
+        ).format(_ROLE_AWS, aws_oidc_credentials)
+    )
+    assert response.status == 200
+
+    # The returned data is in XML, but loading an XML parser would be overkill.
+    # Searching the return text manually for the start and finish tag.
+    data = response.data.decode("utf-8")
+
+    with patch.dict(
+        os.environ,
+        {
+            "AWS_REGION": "us-east-2",
+            "AWS_ACCESS_KEY_ID": get_xml_value_by_tagname(data, "AccessKeyId"),
+            "AWS_SECRET_ACCESS_KEY": get_xml_value_by_tagname(data, "SecretAccessKey"),
+            "AWS_SESSION_TOKEN": get_xml_value_by_tagname(data, "SessionToken"),
+        },
+    ):
+        assert get_project_dns(
+            dns_access,
+            {
+                "type": "external_account",
+                "audience": _AUDIENCE_AWS,
+                "subject_token_type": "urn:ietf:params:aws:token-type:aws4_request",
+                "token_url": "https://sts.googleapis.com/v1/token",
+                "service_account_impersonation_url": "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/{}:generateAccessToken".format(
+                    service_account_info["client_email"]
+                ),
+                "credential_source": {
+                    "environment_id": "aws1",
+                    "regional_cred_verification_url": "https://sts.{region}.amazonaws.com?Action=GetCallerIdentity&Version=2011-06-15",
+                },
+            },
+        )