Fix ResourceWarnings when running the xfrm tests.
Test: passes on Pixel kernel
Change-Id: Id480917605ac8d64807319e3dff8f27cc680d3c4
diff --git a/net/test/multinetwork_base.py b/net/test/multinetwork_base.py
index 97a6f2b..940be49 100644
--- a/net/test/multinetwork_base.py
+++ b/net/test/multinetwork_base.py
@@ -443,6 +443,7 @@
cls._RunSetupCommands(netid, False)
cls.tuns[netid].close()
+ cls.iproute.close()
cls._RestoreSysctls()
cls.SetConsoleLogLevel(cls.loglevel)
diff --git a/net/test/net_test.py b/net/test/net_test.py
index 72b3fb1..0dae1c2 100644
--- a/net/test/net_test.py
+++ b/net/test/net_test.py
@@ -213,35 +213,35 @@
def GetInterfaceIndex(ifname):
- s = UDPSocket(AF_INET)
- ifr = struct.pack("%dsi" % IFNAMSIZ, ifname.encode(), 0)
- ifr = fcntl.ioctl(s, scapy.SIOCGIFINDEX, ifr)
- return struct.unpack("%dsi" % IFNAMSIZ, ifr)[1]
+ with UDPSocket(AF_INET) as s:
+ ifr = struct.pack("%dsi" % IFNAMSIZ, ifname.encode(), 0)
+ ifr = fcntl.ioctl(s, scapy.SIOCGIFINDEX, ifr)
+ return struct.unpack("%dsi" % IFNAMSIZ, ifr)[1]
def SetInterfaceHWAddr(ifname, hwaddr):
- s = UDPSocket(AF_INET)
- hwaddr = hwaddr.replace(":", "")
- hwaddr = binascii.unhexlify(hwaddr)
- if len(hwaddr) != 6:
- raise ValueError("Unknown hardware address length %d" % len(hwaddr))
- ifr = struct.pack("%dsH6s" % IFNAMSIZ, ifname.encode(), scapy.ARPHDR_ETHER,
- hwaddr)
- fcntl.ioctl(s, SIOCSIFHWADDR, ifr)
+ with UDPSocket(AF_INET) as s:
+ hwaddr = hwaddr.replace(":", "")
+ hwaddr = binascii.unhexlify(hwaddr)
+ if len(hwaddr) != 6:
+ raise ValueError("Unknown hardware address length %d" % len(hwaddr))
+ ifr = struct.pack("%dsH6s" % IFNAMSIZ, ifname.encode(), scapy.ARPHDR_ETHER,
+ hwaddr)
+ fcntl.ioctl(s, SIOCSIFHWADDR, ifr)
def SetInterfaceState(ifname, up):
ifname_bytes = ifname.encode()
- s = UDPSocket(AF_INET)
- ifr = struct.pack("%dsH" % IFNAMSIZ, ifname_bytes, 0)
- ifr = fcntl.ioctl(s, scapy.SIOCGIFFLAGS, ifr)
- _, flags = struct.unpack("%dsH" % IFNAMSIZ, ifr)
- if up:
- flags |= scapy.IFF_UP
- else:
- flags &= ~scapy.IFF_UP
- ifr = struct.pack("%dsH" % IFNAMSIZ, ifname_bytes, flags)
- ifr = fcntl.ioctl(s, scapy.SIOCSIFFLAGS, ifr)
+ with UDPSocket(AF_INET) as s:
+ ifr = struct.pack("%dsH" % IFNAMSIZ, ifname_bytes, 0)
+ ifr = fcntl.ioctl(s, scapy.SIOCGIFFLAGS, ifr)
+ _, flags = struct.unpack("%dsH" % IFNAMSIZ, ifr)
+ if up:
+ flags |= scapy.IFF_UP
+ else:
+ flags &= ~scapy.IFF_UP
+ ifr = struct.pack("%dsH" % IFNAMSIZ, ifname_bytes, flags)
+ ifr = fcntl.ioctl(s, scapy.SIOCSIFFLAGS, ifr)
def SetInterfaceUp(ifname):
diff --git a/net/test/netlink.py b/net/test/netlink.py
index 2360400..190667c 100644
--- a/net/test/netlink.py
+++ b/net/test/netlink.py
@@ -181,6 +181,9 @@
self.sock = self._OpenNetlinkSocket(family, groups)
self.pid = self.sock.getsockname()[1]
+ def close(self):
+ self.sock.close()
+
def MaybeDebugCommand(self, command, flags, data):
# Default no-op implementation to be overridden by subclasses.
pass
diff --git a/net/test/xfrm_base.py b/net/test/xfrm_base.py
index 4ce194b..06e8cf2 100644
--- a/net/test/xfrm_base.py
+++ b/net/test/xfrm_base.py
@@ -324,3 +324,4 @@
super(XfrmBaseTest, self).tearDown()
self.xfrm.FlushSaInfo()
self.xfrm.FlushPolicyInfo()
+ self.xfrm.close()
diff --git a/net/test/xfrm_test.py b/net/test/xfrm_test.py
index 803bb18..6296b40 100755
--- a/net/test/xfrm_test.py
+++ b/net/test/xfrm_test.py
@@ -194,11 +194,15 @@
xfrm_base.SetPolicySockopt(s, family, None)
s.sendto(net_test.UDP_PAYLOAD, (remotesockaddr, 53))
self.ExpectPacketOn(netid, "Send after clear 2, expected %s" % desc, pkt)
+ s.close()
# Clearing if a policy was never set is safe.
s = socket(AF_INET6, SOCK_DGRAM, 0)
xfrm_base.SetPolicySockopt(s, family, None)
+ s.close()
+ s2.close()
+
def testSocketPolicyIPv4(self):
self._TestSocketPolicy(4)
@@ -329,6 +333,8 @@
sport=srcport, dport=53) / net_test.UDP_PAYLOAD)
self.assertRaisesErrno(EAGAIN, twisted_socket.recv, 4096)
+ twisted_socket.close()
+
def _RunEncapSocketPolicyTest(self, in_spi, out_spi, use_null_auth):
netid, myaddr, remoteaddr, encap_sock, encap_port, s = \
self._SetupUdpEncapSockets()
@@ -339,6 +345,8 @@
# Check that UDP encap sockets work with socket policy and given SAs
self._VerifyUdpEncapSocket(netid, remoteaddr, myaddr, encap_port, s, in_spi,
out_spi, use_null_auth, 1)
+ encap_sock.close()
+ s.close()
# TODO: Add tests for ESP (non-encap) sockets.
def testUdpEncapSameSpisNullAuth(self):
@@ -397,6 +405,8 @@
# Check that UDP encap socket works with updated socket policy and new SAs
self._VerifyUdpEncapSocket(netid, remoteaddr, myaddr, encap_port, s,
rekey_spi, rekey_spi, True, 3)
+ encap_sock.close()
+ s.close()
def testAllocSpecificSpi(self):
spi = 0xABCD
@@ -467,6 +477,7 @@
with self.assertRaisesErrno(EAGAIN):
s.send(net_test.UDP_PAYLOAD)
self.ExpectNoPacketsOn(netid, "Packet not blocked by policy")
+ s.close()
def _CheckNullEncryptionTunnelMode(self, version):
family = net_test.GetAddressFamily(version)
@@ -530,6 +541,7 @@
# length of the payload plus the UDP header
self.assertEqual(b"output hello", bytes(output_pkt[scapy.UDP].payload))
self.assertEqual(0xABCD, esp_hdr.spi)
+ sock.close()
def testNullEncryptionTunnelMode(self):
"""Verify null encryption in tunnel mode.
@@ -593,6 +605,7 @@
self.assertEqual(remote_port, output_pkt[scapy.UDP].dport)
self.assertEqual(b"output hello", bytes(output_pkt[scapy.UDP].payload))
self.assertEqual(0xABCD, esp_hdr.spi)
+ sock.close()
def testNullEncryptionTransportMode(self):
"""Verify null encryption in transport mode.
@@ -732,6 +745,8 @@
with self.assertRaisesErrno(ENETUNREACH):
s.sendto(net_test.UDP_PAYLOAD, (remoteaddr, 53))
+ s.close()
+
def testTunnelModeOutputMarkIPv4(self):
for netid in self.NETIDS:
tunsrc = self.MyAddress(4, netid)
@@ -901,5 +916,7 @@
self.xfrm.DeleteSaInfo(remote, TEST_SPI, IPPROTO_ESP, mark)
self.xfrm.DeletePolicyInfo(sel, xfrm.XFRM_POLICY_OUT, mark)
+ s.close()
+
if __name__ == "__main__":
unittest.main()