blob: c58a17e202b7ab7902508ee3f13615bd1c9e72fd [file] [log] [blame] [edit]
import time
import urllib
import pytest
import httplib2
import tests
def test_credentials():
c = httplib2.Credentials()
c.add("joe", "password")
assert tuple(c.iter("bitworking.org"))[0] == ("joe", "password")
assert tuple(c.iter(""))[0] == ("joe", "password")
c.add("fred", "password2", "wellformedweb.org")
assert tuple(c.iter("bitworking.org"))[0] == ("joe", "password")
assert len(tuple(c.iter("bitworking.org"))) == 1
assert len(tuple(c.iter("wellformedweb.org"))) == 2
assert ("fred", "password2") in tuple(c.iter("wellformedweb.org"))
c.clear()
assert len(tuple(c.iter("bitworking.org"))) == 0
c.add("fred", "password2", "wellformedweb.org")
assert ("fred", "password2") in tuple(c.iter("wellformedweb.org"))
assert len(tuple(c.iter("bitworking.org"))) == 0
assert len(tuple(c.iter(""))) == 0
def test_basic():
# Test Basic Authentication
http = httplib2.Http()
password = tests.gen_password()
handler = tests.http_reflect_with_auth(allow_scheme="basic", allow_credentials=(("joe", password),))
with tests.server_request(handler, request_count=3) as uri:
response, content = http.request(uri, "GET")
assert response.status == 401
http.add_credentials("joe", password)
response, content = http.request(uri, "GET")
assert response.status == 200
def test_basic_for_domain():
# Test Basic Authentication
http = httplib2.Http()
password = tests.gen_password()
handler = tests.http_reflect_with_auth(allow_scheme="basic", allow_credentials=(("joe", password),))
with tests.server_request(handler, request_count=4) as uri:
response, content = http.request(uri, "GET")
assert response.status == 401
http.add_credentials("joe", password, "example.org")
response, content = http.request(uri, "GET")
assert response.status == 401
domain = urllib.parse.urlparse(uri)[1]
http.add_credentials("joe", password, domain)
response, content = http.request(uri, "GET")
assert response.status == 200
def test_basic_two_credentials():
# Test Basic Authentication with multiple sets of credentials
http = httplib2.Http()
password1 = tests.gen_password()
password2 = tests.gen_password()
allowed = [("joe", password1)] # exploit shared mutable list
handler = tests.http_reflect_with_auth(allow_scheme="basic", allow_credentials=allowed)
with tests.server_request(handler, request_count=7) as uri:
http.add_credentials("fred", password2)
response, content = http.request(uri, "GET")
assert response.status == 401
http.add_credentials("joe", password1)
response, content = http.request(uri, "GET")
assert response.status == 200
allowed[0] = ("fred", password2)
response, content = http.request(uri, "GET")
assert response.status == 200
def test_digest():
# Test that we support Digest Authentication
http = httplib2.Http()
password = tests.gen_password()
handler = tests.http_reflect_with_auth(allow_scheme="digest", allow_credentials=(("joe", password),))
with tests.server_request(handler, request_count=3) as uri:
response, content = http.request(uri, "GET")
assert response.status == 401
http.add_credentials("joe", password)
response, content = http.request(uri, "GET")
assert response.status == 200, content.decode()
def test_digest_next_nonce_nc():
# Test that if the server sets nextnonce that we reset
# the nonce count back to 1
http = httplib2.Http()
password = tests.gen_password()
grenew_nonce = [None]
handler = tests.http_reflect_with_auth(
allow_scheme="digest", allow_credentials=(("joe", password),), out_renew_nonce=grenew_nonce,
)
with tests.server_request(handler, request_count=5) as uri:
http.add_credentials("joe", password)
response1, _ = http.request(uri, "GET")
info = httplib2.auth._parse_authentication_info(response1)
print("debug: response1 authentication-info: {}\nparsed: {}".format(response1.get("authentication-info"), info))
assert response1.status == 200
assert info.get("nc") == "00000001", info
assert not info.get("digest", {}).get("nextnonce"), info
response2, _ = http.request(uri, "GET")
info2 = httplib2.auth._parse_authentication_info(response2)
assert info2.get("nc") == "00000002", info2
grenew_nonce[0]()
response3, content = http.request(uri, "GET")
info3 = httplib2.auth._parse_authentication_info(response3)
assert response3.status == 200
assert info3.get("nc") == "00000001", info3
def test_digest_auth_stale():
# Test that we can handle a nonce becoming stale
http = httplib2.Http()
password = tests.gen_password()
grenew_nonce = [None]
requests = []
handler = tests.http_reflect_with_auth(
allow_scheme="digest",
allow_credentials=(("joe", password),),
out_renew_nonce=grenew_nonce,
out_requests=requests,
)
with tests.server_request(handler, request_count=4) as uri:
http.add_credentials("joe", password)
response, _ = http.request(uri, "GET")
assert response.status == 200
info = httplib2.auth._parse_www_authenticate(requests[0][1].headers, "www-authenticate")
grenew_nonce[0]()
response, _ = http.request(uri, "GET")
assert response.status == 200
assert not response.fromcache
assert getattr(response, "_stale_digest", False)
info2 = httplib2.auth._parse_www_authenticate(requests[2][1].headers, "www-authenticate")
nonce1 = info.get("digest", {}).get("nonce", "")
nonce2 = info2.get("digest", {}).get("nonce", "")
assert nonce1 != ""
assert nonce2 != ""
assert nonce1 != nonce2, (nonce1, nonce2)
@pytest.mark.parametrize(
"data",
(
({}, {}),
({"www-authenticate": ""}, {}),
(
{"www-authenticate": 'Test realm="test realm" , foo=foo ,bar="bar", baz=baz,qux=qux'},
{"test": {"realm": "test realm", "foo": "foo", "bar": "bar", "baz": "baz", "qux": "qux"}},
),
(
{"www-authenticate": 'T*!%#st realm=to*!%#en, to*!%#en="quoted string"'},
{"t*!%#st": {"realm": "to*!%#en", "to*!%#en": "quoted string"}},
),
({"www-authenticate": 'Test realm="a \\"test\\" realm"'}, {"test": {"realm": 'a "test" realm'}},),
({"www-authenticate": 'Basic realm="me"'}, {"basic": {"realm": "me"}}),
({"www-authenticate": 'Basic realm="me", algorithm="MD5"'}, {"basic": {"realm": "me", "algorithm": "MD5"}},),
({"www-authenticate": 'Basic realm="me", algorithm=MD5'}, {"basic": {"realm": "me", "algorithm": "MD5"}},),
({"www-authenticate": 'Basic realm="me",other="fred" '}, {"basic": {"realm": "me", "other": "fred"}},),
({"www-authenticate": 'Basic REAlm="me" '}, {"basic": {"realm": "me"}}),
(
{"www-authenticate": 'Digest realm="digest1", qop="auth,auth-int", nonce="7102dd2", opaque="e9517f"'},
{"digest": {"realm": "digest1", "qop": "auth,auth-int", "nonce": "7102dd2", "opaque": "e9517f"}},
),
# comma between schemas (glue for multiple headers with same name)
(
{"www-authenticate": 'Digest realm="2-comma-d", qop="auth-int", nonce="c0c8ff1", Basic realm="2-comma-b"'},
{"digest": {"realm": "2-comma-d", "qop": "auth-int", "nonce": "c0c8ff1"}, "basic": {"realm": "2-comma-b"}},
),
# comma between schemas + WSSE (glue for multiple headers with same name)
(
{"www-authenticate": 'Digest realm="com3d", Basic realm="com3b", WSSE realm="com3w", profile="token"'},
{"digest": {"realm": "com3d"}, "basic": {"realm": "com3b"}, "wsse": {"realm": "com3w", "profile": "token"}},
),
# FIXME
# multiple syntax figures
# ({'www-authenticate':
# 'Digest realm="brig", qop \t=\t"\tauth,auth-int", nonce="(*)&^&$%#",opaque="5ccc"' +
# ', Basic REAlm="zoo", WSSE realm="very", profile="UsernameToken"'},
# {'digest': {'realm': 'brig', 'qop': 'auth,auth-int', 'nonce': '(*)&^&$%#', 'opaque': '5ccc'},
# 'basic': {'realm': 'zoo'},
# 'wsse': {'realm': 'very', 'profile': 'UsernameToken'}}),
# more quote combos
(
{"www-authenticate": 'Digest realm="myrealm", nonce="KBAA=3", algorithm=MD5, qop="auth", stale=true'},
{"digest": {"realm": "myrealm", "nonce": "KBAA=3", "algorithm": "MD5", "qop": "auth", "stale": "true"}},
),
({"www-authenticate": "Basic param='single quote'"}, {"basic": {"param": "'single"}}),
),
ids=lambda data: str(data[0]),
)
@pytest.mark.parametrize("strict", (True, False), ids=("strict", "relax"))
def test_parse_www_authenticate_correct(data, strict):
headers, info = data
# FIXME: move strict to parse argument
httplib2.USE_WWW_AUTH_STRICT_PARSING = strict
try:
assert httplib2.auth._parse_www_authenticate(headers) == info
finally:
httplib2.USE_WWW_AUTH_STRICT_PARSING = 0
@pytest.mark.parametrize(
"data",
(({"www-authenticate": 'OAuth "Facebook Platform" "invalid_token" "Invalid OAuth access token."'}, None),),
ids=lambda data: str(data[0]),
)
def test_parse_www_authenticate_malformed(data):
# TODO: test (and fix) header value 'barbqwnbm-bb...:asd' leads to dead loop
headers, info = data
try:
result = httplib2.auth._parse_www_authenticate(headers)
except httplib2.error.MalformedHeader:
assert info is None, "unexpected MalformedHeader"
else:
assert result == info
assert info is not None, "expected parsing error"
def test_parse_www_authenticate_complexity():
# TODO just use time.process_time() after python2 support is removed
process_time = getattr(time, "process_time", time.time)
def check(size):
header = {"www-authenticate": 'scheme {0}key=value,{0}quoted="foo=bar"'.format(" \t" * size)}
tbegin = process_time()
result = httplib2.auth._parse_www_authenticate(header)
tend = process_time()
assert result == {"scheme": {"key": "value", "quoted": "foo=bar"}}
elapsed_us = round((tend * 1e6) - (tbegin * 1e6), 0)
return elapsed_us
n1, n2, repeat = 50, 100, 7
time1 = min(check(n1) for _ in range(repeat))
time2 = min(check(n2) for _ in range(repeat))
speed1 = round(time1 / n1, 1)
speed2 = round(time2 / n2, 1)
expect2 = round(speed1 * (float(n2) / n1), 1)
error = round(speed2 / expect2, 1)
print("x{}: time={}us speed={} us/op".format(n1, time1, speed1))
print("x{}: time={}us speed={} us/op expected={} us/op error={}".format(n2, time2, speed2, expect2, error))
assert error < 2, "_parse_www_authenticate scales too fast"
@tests.skip_benchmark
@pytest.mark.parametrize(
"data",
(
'Basic realm="me", algorithm="MD5"',
'Digest realm="digest1", qop="auth,auth-int", nonce="7102dd2", opaque="e9517f"',
'Digest realm="2-comma-d", qop="auth-int", nonce="c0c8ff1", Basic realm="2-comma-b"',
"Bearer 0b79bab50daca910b000d4f1a2b675d604257e42",
),
)
def test_benchmark_parse_www_authenticate(data):
# TODO just use time.process_time() after python2 support is removed
process_time = getattr(time, "process_time", time.time)
def measure(header, loop):
tbegin = process_time()
for _ in range(loop):
httplib2.auth._parse_www_authenticate(header)
tend = process_time()
elapsed_us = round((tend * 1e6) - (tbegin * 1e6), 0)
return elapsed_us
header = {"www-authenticate": data}
min_time = 1e6
repeat = 7
# timeit.autorange
loop = 1
while True:
elapsed_us = measure(header, loop)
if elapsed_us >= min_time:
break
elif elapsed_us < min_time / 10:
k = 10.0
else:
k = 1.2
loop = int(loop * k) + 1
fastest = min(measure(header, loop) for _ in range(repeat))
speed = int(fastest / loop)
print("x{} time={}us speed={} us/op".format(loop, fastest, speed))
def test_digest_object():
credentials = ("joe", "password")
host = None
request_uri = "/test/digest/"
headers = {}
response = {"www-authenticate": 'Digest realm="myrealm", nonce="KBAA=35", algorithm=MD5, qop="auth"'}
content = b""
d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content, None)
d.request("GET", request_uri, headers, content, cnonce="33033375ec278a46")
our_request = "authorization: " + headers["authorization"]
working_request = (
'authorization: Digest username="joe", realm="myrealm", '
'nonce="KBAA=35", uri="/test/digest/"' + ', algorithm=MD5, response="de6d4a123b80801d0e94550411b6283f", '
'qop=auth, nc=00000001, cnonce="33033375ec278a46"'
)
assert our_request == working_request
def test_digest_object_with_opaque():
credentials = ("joe", "password")
host = None
request_uri = "/digest/opaque/"
headers = {}
response = {
"www-authenticate": 'Digest realm="myrealm", nonce="30352fd", algorithm=MD5, ' 'qop="auth", opaque="atestopaque"'
}
content = ""
d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content, None)
d.request("GET", request_uri, headers, content, cnonce="5ec2")
our_request = "authorization: " + headers["authorization"]
working_request = (
'authorization: Digest username="joe", realm="myrealm", '
'nonce="30352fd", uri="/digest/opaque/", algorithm=MD5'
+ ', response="a1fab43041f8f3789a447f48018bee48", qop=auth, nc=00000001, '
'cnonce="5ec2", opaque="atestopaque"'
)
assert our_request == working_request
def test_digest_object_stale():
credentials = ("joe", "password")
host = None
request_uri = "/digest/stale/"
headers = {}
response = httplib2.Response({})
response["www-authenticate"] = 'Digest realm="myrealm", nonce="bd669f", ' 'algorithm=MD5, qop="auth", stale=true'
response.status = 401
content = b""
d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content, None)
# Returns true to force a retry
assert d.response(response, content)
def test_digest_object_auth_info():
credentials = ("joe", "password")
host = None
request_uri = "/digest/nextnonce/"
headers = {}
response = httplib2.Response({})
response["www-authenticate"] = 'Digest realm="myrealm", nonce="barney", ' 'algorithm=MD5, qop="auth", stale=true'
response["authentication-info"] = 'nextnonce="fred"'
content = b""
d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content, None)
# Returns true to force a retry
assert not d.response(response, content)
assert d.challenge["nonce"] == "fred"
assert d.challenge["nc"] == 1
def test_wsse_algorithm():
digest = httplib2._wsse_username_token("d36e316282959a9ed4c89851497a717f", "2003-12-15T14:43:07Z", "taadtaadpstcsm")
expected = "quR/EWLAV4xLf9Zqyw4pDmfV9OY="
assert expected == digest
def test_wsse_invalid():
http = httplib2.Http()
username = "user294"
password = tests.gen_password()
grenew_nonce = [None]
requests = []
handler = tests.http_reflect_with_auth(
allow_scheme="wsse",
allow_credentials=((username, password),),
out_renew_nonce=grenew_nonce,
out_requests=requests,
)
http.add_credentials(username, "wrong" + password)
with tests.server_request(handler, request_count=2) as uri:
response, _ = http.request(uri)
assert requests[0][1].status == 401
assert requests[1][0].headers["authorization"] == 'WSSE profile="UsernameToken"'
assert requests[1][1].status == 401
assert response.status == 401
def test_wsse_ok():
http = httplib2.Http()
username = "user314"
password = tests.gen_password()
grenew_nonce = [None]
requests = []
handler = tests.http_reflect_with_auth(
allow_scheme="wsse",
allow_credentials=((username, password),),
out_renew_nonce=grenew_nonce,
out_requests=requests,
)
http.add_credentials(username, password)
with tests.server_request(handler, request_count=2) as uri:
response, _ = http.request(uri)
assert requests[0][1].status == 401
assert requests[1][0].headers["authorization"] == 'WSSE profile="UsernameToken"'
assert response.status == 200