Add a permission test for SOCK_DESTROY.
Test: This *is* a test.
Change-Id: I0df394d059c5327eadc1f8f1f189174d5ca906b4
diff --git a/net/test/sock_diag_test.py b/net/test/sock_diag_test.py
index 28a1a20..fb1e344 100755
--- a/net/test/sock_diag_test.py
+++ b/net/test/sock_diag_test.py
@@ -26,6 +26,7 @@
import multinetwork_base
import net_test
+import netlink
import packets
import sock_diag
import tcp_test
@@ -727,6 +728,32 @@
self.CloseDuringBlockingCall(s, lambda sock: sock.recv(4096),
ECONNABORTED)
+class SockDestroyPermissionTest(SockDiagBaseTest):
+
+ def CheckPermissions(self, socktype):
+ s = socket(AF_INET6, socktype, 0)
+ self.SelectInterface(s, random.choice(self.NETIDS), "mark")
+ if socktype == SOCK_STREAM:
+ s.listen(1)
+ expectedstate = tcp_test.TCP_LISTEN
+ else:
+ s.connect((self.GetRemoteAddress(6), 53))
+ expectedstate = tcp_test.TCP_ESTABLISHED
+
+ with net_test.RunAsUid(12345):
+ self.assertRaisesErrno(
+ EPERM, self.sock_diag.CloseSocketFromFd, s)
+
+ self.sock_diag.CloseSocketFromFd(s)
+ self.assertRaises(ValueError, self.sock_diag.CloseSocketFromFd, s)
+
+
+ def testUdp(self):
+ self.CheckPermissions(SOCK_DGRAM)
+
+ def testTcp(self):
+ self.CheckPermissions(SOCK_STREAM)
+
@unittest.skipUnless(net_test.LINUX_VERSION >= (4, 9, 0), "does not yet exist")
class SockDiagMarkTest(tcp_test.TcpBaseTest, SockDiagBaseTest):