blob: 39d7d59a497c748e64f2cd9f5c54f840ba54f00f [file] [log] [blame]
Sergey Shepelev3a6d7cd2019-09-26 22:11:25 +03001import httplib2
2import pytest
3from six.moves import urllib
4import socket
5import ssl
6import tests
7
8
9def test_get_via_https():
10 # Test that we can handle HTTPS
11 http = httplib2.Http(ca_certs=tests.CA_CERTS)
12 with tests.server_const_http(tls=True) as uri:
13 response, _ = http.request(uri, "GET")
14 assert response.status == 200
15
16
17def test_get_301_via_https():
18 http = httplib2.Http(ca_certs=tests.CA_CERTS)
19 glocation = [""] # nonlocal kind of trick, maybe redundant
20
21 def handler(request):
22 if request.uri == "/final":
23 return tests.http_response_bytes(body=b"final")
24 return tests.http_response_bytes(status="301 goto", headers={"location": glocation[0]})
25
26 with tests.server_request(handler, request_count=2, tls=True) as uri:
27 glocation[0] = urllib.parse.urljoin(uri, "/final")
28 response, content = http.request(uri, "GET")
29 assert response.status == 200
30 assert content == b"final"
31 assert response.previous.status == 301
32 assert response.previous["location"] == glocation[0]
33
34
35def test_get_301_via_https_spec_violation_on_location():
36 # Test that we follow redirects through HTTPS
37 # even if they violate the spec by including
38 # a relative Location: header instead of an absolute one.
39 http = httplib2.Http(ca_certs=tests.CA_CERTS)
40
41 def handler(request):
42 if request.uri == "/final":
43 return tests.http_response_bytes(body=b"final")
44 return tests.http_response_bytes(status="301 goto", headers={"location": "/final"})
45
46 with tests.server_request(handler, request_count=2, tls=True) as uri:
47 response, content = http.request(uri, "GET")
48 assert response.status == 200
49 assert content == b"final"
50 assert response.previous.status == 301
51
52
53def test_invalid_ca_certs_path():
54 http = httplib2.Http(ca_certs="/nosuchfile")
55 with tests.server_const_http(request_count=0, tls=True) as uri:
56 with tests.assert_raises(IOError):
57 http.request(uri, "GET")
58
59
60def test_not_trusted_ca():
61 # Test that we get a SSLHandshakeError if we try to access
62 # server using a CA cert file that doesn't contain server's CA.
63 http = httplib2.Http(ca_certs=tests.CA_UNUSED_CERTS)
64 with tests.server_const_http(tls=True) as uri:
65 try:
66 http.request(uri, "GET")
67 assert False, "expected CERTIFICATE_VERIFY_FAILED"
68 except ssl.SSLError as e:
69 assert e.reason == "CERTIFICATE_VERIFY_FAILED"
70 except httplib2.SSLHandshakeError: # Python2
71 pass
72
73
74@pytest.mark.skipif(
75 not hasattr(tests.ssl_context(), "minimum_version"),
76 reason="ssl doesn't support TLS min/max",
77)
78def test_set_min_tls_version():
79 # Test setting minimum TLS version
80 # We expect failure on Python < 3.7 or OpenSSL < 1.1
81 expect_success = hasattr(ssl.SSLContext(), 'minimum_version')
82 try:
83 http = httplib2.Http(tls_minimum_version="TLSv1_2")
84 http.request(tests.DUMMY_HTTPS_URL)
85 except RuntimeError:
86 assert not expect_success
87 except socket.error:
88 assert expect_success
89
90
91@pytest.mark.skipif(
92 not hasattr(tests.ssl_context(), "maximum_version"),
93 reason="ssl doesn't support TLS min/max",
94)
95def test_set_max_tls_version():
96 # Test setting maximum TLS version
97 # We expect RuntimeError on Python < 3.7 or OpenSSL < 1.1
98 # We expect socket error otherwise
99 expect_success = hasattr(ssl.SSLContext(), 'maximum_version')
100 try:
101 http = httplib2.Http(tls_maximum_version="TLSv1_2")
102 http.request(tests.DUMMY_HTTPS_URL)
103 except RuntimeError:
104 assert not expect_success
105 except socket.error:
106 assert expect_success
107
108
109@pytest.mark.skipif(
110 not hasattr(tests.ssl_context(), "minimum_version"),
111 reason="ssl doesn't support TLS min/max",
112)
113def test_min_tls_version():
114 def setup_tls(context, server, skip_errors):
115 skip_errors.append("WRONG_VERSION_NUMBER")
116 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_1)
117 context.load_cert_chain(tests.SERVER_CHAIN)
118 return context.wrap_socket(server, server_side=True)
119
120 http = httplib2.Http(ca_certs=tests.CA_CERTS, tls_minimum_version="TLSv1_2")
121 with tests.server_const_http(tls=setup_tls) as uri:
122 try:
123 http.request(uri)
124 assert False, "expected SSLError"
125 except ssl.SSLError as e:
126 assert e.reason in ("UNSUPPORTED_PROTOCOL", "VERSION_TOO_LOW")
127
128
129@pytest.mark.skipif(
130 not hasattr(tests.ssl_context(), "maximum_version"),
131 reason="ssl doesn't support TLS min/max",
132)
133def test_max_tls_version():
134 http = httplib2.Http(ca_certs=tests.CA_CERTS, tls_maximum_version="TLSv1")
135 with tests.server_const_http(tls=True) as uri:
136 http.request(uri)
137 _, tls_ver, _ = http.connections.popitem()[1].sock.cipher()
138 assert tls_ver == "TLSv1.0"
139
140
141def test_client_cert_verified():
142 cert_log = []
143
144 def setup_tls(context, server, skip_errors):
145 context.load_verify_locations(cafile=tests.CA_CERTS)
146 context.verify_mode = ssl.CERT_REQUIRED
147 return context.wrap_socket(server, server_side=True)
148
149 def handler(request):
150 cert_log.append(request.client_sock.getpeercert())
151 return tests.http_response_bytes()
152
153 http = httplib2.Http(ca_certs=tests.CA_CERTS)
154 with tests.server_request(handler, tls=setup_tls) as uri:
155 uri_parsed = urllib.parse.urlparse(uri)
156 http.add_certificate(tests.CLIENT_PEM, tests.CLIENT_PEM, uri_parsed.netloc)
157 http.request(uri)
158
159 assert len(cert_log) == 1
160 # TODO extract serial from tests.CLIENT_PEM
161 assert cert_log[0]["serialNumber"] == "E2AA6A96D1BF1AEC"
162
163
Konstantin Koshelev4009e8e2019-10-21 07:28:11 -0700164def test_client_cert_password_verified():
165 cert_log = []
166
167 def setup_tls(context, server, skip_errors):
168 context.load_verify_locations(cafile=tests.CA_CERTS)
169 context.verify_mode = ssl.CERT_REQUIRED
170 return context.wrap_socket(server, server_side=True)
171
172 def handler(request):
173 cert_log.append(request.client_sock.getpeercert())
174 return tests.http_response_bytes()
175
176 http = httplib2.Http(ca_certs=tests.CA_CERTS)
177 with tests.server_request(handler, tls=setup_tls) as uri:
178 uri_parsed = urllib.parse.urlparse(uri)
179 http.add_certificate(tests.CLIENT_ENCRYPTED_PEM, tests.CLIENT_ENCRYPTED_PEM,
180 uri_parsed.netloc, password="12345")
181 http.request(uri)
182
183 assert len(cert_log) == 1
184 # TODO extract serial from tests.CLIENT_PEM
185 assert cert_log[0]["serialNumber"] == "E2AA6A96D1BF1AED"
186
187
Sergey Shepelev3a6d7cd2019-09-26 22:11:25 +0300188@pytest.mark.skipif(
189 not hasattr(tests.ssl_context(), "set_servername_callback"),
190 reason="SSLContext.set_servername_callback is not available",
191)
192def test_sni_set_servername_callback():
193 sni_log = []
194
195 def setup_tls(context, server, skip_errors):
196 context.set_servername_callback(lambda _sock, hostname, _context: sni_log.append(hostname))
197 return context.wrap_socket(server, server_side=True)
198
199 http = httplib2.Http(ca_certs=tests.CA_CERTS)
200 with tests.server_const_http(tls=setup_tls) as uri:
201 uri_parsed = urllib.parse.urlparse(uri)
202 http.request(uri)
203 assert sni_log == [uri_parsed.hostname]