blob: ae00d7fc1ad939ce91b85c3a77db2a9dff355c56 [file] [log] [blame]
Luis Hector Chavez05392b82018-10-28 21:40:10 -07001#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3#
4# Copyright (C) 2018 The Android Open Source Project
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17"""Unittests for the compiler module."""
18
19from __future__ import print_function
20
21import os
Luis Hector Chavezb21da7a2018-11-01 16:50:36 -070022import random
23import shutil
Luis Hector Chavez05392b82018-10-28 21:40:10 -070024import tempfile
25import unittest
26
27import arch
28import bpf
29import compiler
30import parser # pylint: disable=wrong-import-order
31
32ARCH_64 = arch.Arch.load_from_json(
33 os.path.join(
34 os.path.dirname(os.path.abspath(__file__)), 'testdata/arch_64.json'))
35
36
37class CompileFilterStatementTests(unittest.TestCase):
38 """Tests for PolicyCompiler.compile_filter_statement."""
39
40 def setUp(self):
41 self.arch = ARCH_64
42 self.compiler = compiler.PolicyCompiler(self.arch)
43
44 def _compile(self, line):
45 with tempfile.NamedTemporaryFile(mode='w') as policy_file:
46 policy_file.write(line)
47 policy_file.flush()
48 policy_parser = parser.PolicyParser(
49 self.arch, kill_action=bpf.KillProcess())
50 parsed_policy = policy_parser.parse_file(policy_file.name)
51 assert len(parsed_policy.filter_statements) == 1
52 return self.compiler.compile_filter_statement(
53 parsed_policy.filter_statements[0],
54 kill_action=bpf.KillProcess())
55
56 def test_allow(self):
57 """Accept lines where the syscall is accepted unconditionally."""
58 block = self._compile('read: allow')
59 self.assertEqual(block.filter, None)
60 self.assertEqual(
61 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
62 0)[1], 'ALLOW')
63 self.assertEqual(
64 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
65 1)[1], 'ALLOW')
66
67 def test_arg0_eq_generated_code(self):
68 """Accept lines with an argument filter with ==."""
69 block = self._compile('read: arg0 == 0x100')
70 # It might be a bit brittle to check the generated code in each test
71 # case instead of just the behavior, but there should be at least one
72 # test where this happens.
73 self.assertEqual(
74 block.filter.instructions,
75 [
76 bpf.SockFilter(bpf.BPF_LD | bpf.BPF_W | bpf.BPF_ABS, 0, 0,
77 bpf.arg_offset(0, True)),
78 # Jump to KILL_PROCESS if the high word does not match.
79 bpf.SockFilter(bpf.BPF_JMP | bpf.BPF_JEQ | bpf.BPF_K, 0, 2, 0),
80 bpf.SockFilter(bpf.BPF_LD | bpf.BPF_W | bpf.BPF_ABS, 0, 0,
81 bpf.arg_offset(0, False)),
82 # Jump to KILL_PROCESS if the low word does not match.
83 bpf.SockFilter(bpf.BPF_JMP | bpf.BPF_JEQ | bpf.BPF_K, 1, 0,
84 0x100),
85 bpf.SockFilter(bpf.BPF_RET, 0, 0,
86 bpf.SECCOMP_RET_KILL_PROCESS),
87 bpf.SockFilter(bpf.BPF_RET, 0, 0, bpf.SECCOMP_RET_ALLOW),
88 ])
89
90 def test_arg0_comparison_operators(self):
91 """Accept lines with an argument filter with comparison operators."""
92 biases = (-1, 0, 1)
93 # For each operator, store the expectations of simulating the program
94 # against the constant plus each entry from the |biases| array.
95 cases = (
96 ('==', ('KILL_PROCESS', 'ALLOW', 'KILL_PROCESS')),
97 ('!=', ('ALLOW', 'KILL_PROCESS', 'ALLOW')),
98 ('<', ('ALLOW', 'KILL_PROCESS', 'KILL_PROCESS')),
99 ('<=', ('ALLOW', 'ALLOW', 'KILL_PROCESS')),
100 ('>', ('KILL_PROCESS', 'KILL_PROCESS', 'ALLOW')),
101 ('>=', ('KILL_PROCESS', 'ALLOW', 'ALLOW')),
102 )
103 for operator, expectations in cases:
104 block = self._compile('read: arg0 %s 0x100' % operator)
105
106 # Check the filter's behavior.
107 for bias, expectation in zip(biases, expectations):
108 self.assertEqual(
109 block.simulate(self.arch.arch_nr,
110 self.arch.syscalls['read'],
111 0x100 + bias)[1], expectation)
112
113 def test_arg0_mask_operator(self):
114 """Accept lines with an argument filter with &."""
115 block = self._compile('read: arg0 & 0x3')
116
117 self.assertEqual(
118 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
119 0)[1], 'KILL_PROCESS')
120 self.assertEqual(
121 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
122 1)[1], 'ALLOW')
123 self.assertEqual(
124 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
125 2)[1], 'ALLOW')
126 self.assertEqual(
127 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
128 3)[1], 'ALLOW')
129 self.assertEqual(
130 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
131 4)[1], 'KILL_PROCESS')
132 self.assertEqual(
133 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
134 5)[1], 'ALLOW')
135 self.assertEqual(
136 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
137 6)[1], 'ALLOW')
138 self.assertEqual(
139 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
140 7)[1], 'ALLOW')
141 self.assertEqual(
142 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
143 8)[1], 'KILL_PROCESS')
144
145 def test_arg0_in_operator(self):
146 """Accept lines with an argument filter with in."""
147 block = self._compile('read: arg0 in 0x3')
148
149 # The 'in' operator only ensures that no bits outside the mask are set,
150 # which means that 0 is always allowed.
151 self.assertEqual(
152 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
153 0)[1], 'ALLOW')
154 self.assertEqual(
155 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
156 1)[1], 'ALLOW')
157 self.assertEqual(
158 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
159 2)[1], 'ALLOW')
160 self.assertEqual(
161 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
162 3)[1], 'ALLOW')
163 self.assertEqual(
164 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
165 4)[1], 'KILL_PROCESS')
166 self.assertEqual(
167 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
168 5)[1], 'KILL_PROCESS')
169 self.assertEqual(
170 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
171 6)[1], 'KILL_PROCESS')
172 self.assertEqual(
173 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
174 7)[1], 'KILL_PROCESS')
175 self.assertEqual(
176 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
177 8)[1], 'KILL_PROCESS')
178
179 def test_arg0_short_gt_ge_comparisons(self):
180 """Ensure that the short comparison optimization kicks in."""
181 if self.arch.bits == 32:
182 return
183 short_constant_str = '0xdeadbeef'
184 short_constant = int(short_constant_str, base=0)
185 long_constant_str = '0xbadc0ffee0ddf00d'
186 long_constant = int(long_constant_str, base=0)
187 biases = (-1, 0, 1)
188 # For each operator, store the expectations of simulating the program
189 # against the constant plus each entry from the |biases| array.
190 cases = (
191 ('<', ('ALLOW', 'KILL_PROCESS', 'KILL_PROCESS')),
192 ('<=', ('ALLOW', 'ALLOW', 'KILL_PROCESS')),
193 ('>', ('KILL_PROCESS', 'KILL_PROCESS', 'ALLOW')),
194 ('>=', ('KILL_PROCESS', 'ALLOW', 'ALLOW')),
195 )
196 for operator, expectations in cases:
197 short_block = self._compile(
198 'read: arg0 %s %s' % (operator, short_constant_str))
199 long_block = self._compile(
200 'read: arg0 %s %s' % (operator, long_constant_str))
201
202 # Check that the emitted code is shorter when the high word of the
203 # constant is zero.
204 self.assertLess(
205 len(short_block.filter.instructions),
206 len(long_block.filter.instructions))
207
208 # Check the filter's behavior.
209 for bias, expectation in zip(biases, expectations):
210 self.assertEqual(
211 long_block.simulate(self.arch.arch_nr,
212 self.arch.syscalls['read'],
213 long_constant + bias)[1], expectation)
214 self.assertEqual(
215 short_block.simulate(
216 self.arch.arch_nr, self.arch.syscalls['read'],
217 short_constant + bias)[1], expectation)
218
219 def test_and_or(self):
220 """Accept lines with a complex expression in DNF."""
221 block = self._compile('read: arg0 == 0 && arg1 == 0 || arg0 == 1')
222
223 self.assertEqual(
224 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'], 0,
225 0)[1], 'ALLOW')
226 self.assertEqual(
227 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'], 0,
228 1)[1], 'KILL_PROCESS')
229 self.assertEqual(
230 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'], 1,
231 0)[1], 'ALLOW')
232 self.assertEqual(
233 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'], 1,
234 1)[1], 'ALLOW')
235
236 def test_ret_errno(self):
237 """Accept lines that return errno."""
238 block = self._compile('read : arg0 == 0 || arg0 == 1 ; return 1')
239
240 self.assertEqual(
241 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
242 0)[1:], ('ERRNO', 1))
243 self.assertEqual(
244 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
245 1)[1:], ('ERRNO', 1))
246 self.assertEqual(
247 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
248 2)[1], 'KILL_PROCESS')
249
250 def test_ret_errno_unconditionally(self):
251 """Accept lines that return errno unconditionally."""
252 block = self._compile('read: return 1')
253
254 self.assertEqual(
255 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
256 0)[1:], ('ERRNO', 1))
257
258 def test_mmap_write_xor_exec(self):
259 """Accept the idiomatic filter for mmap."""
260 block = self._compile(
261 'read : arg0 in ~PROT_WRITE || arg0 in ~PROT_EXEC')
262
263 prot_exec_and_write = 6
264 for prot in range(0, 0xf):
265 if (prot & prot_exec_and_write) == prot_exec_and_write:
266 self.assertEqual(
267 block.simulate(self.arch.arch_nr,
268 self.arch.syscalls['read'], prot)[1],
269 'KILL_PROCESS')
270 else:
271 self.assertEqual(
272 block.simulate(self.arch.arch_nr,
273 self.arch.syscalls['read'], prot)[1],
274 'ALLOW')
275
276
Luis Hector Chavez7a21ffe2018-12-05 16:54:30 -0800277class CompileFileTests(unittest.TestCase):
278 """Tests for PolicyCompiler.compile_file."""
279
280 def setUp(self):
281 self.arch = ARCH_64
282 self.compiler = compiler.PolicyCompiler(self.arch)
283 self.tempdir = tempfile.mkdtemp()
284
285 def tearDown(self):
286 shutil.rmtree(self.tempdir)
287
288 def _write_file(self, filename, contents):
289 """Helper to write out a file for testing."""
290 path = os.path.join(self.tempdir, filename)
291 with open(path, 'w') as outf:
292 outf.write(contents)
293 return path
294
Luis Hector Chaveza54812b2018-11-01 20:02:22 -0700295 def test_compile(self):
296 """Ensure compilation works with all strategies."""
Luis Hector Chavez7a21ffe2018-12-05 16:54:30 -0800297 self._write_file(
298 'test.frequency', """
299 read: 1
300 close: 10
301 """)
302 path = self._write_file(
303 'test.policy', """
304 @frequency ./test.frequency
305 read: 1
306 close: 1
307 """)
308
309 program = self.compiler.compile_file(
310 path,
311 optimization_strategy=compiler.OptimizationStrategy.LINEAR,
312 kill_action=bpf.KillProcess())
313 self.assertGreater(
314 bpf.simulate(program.instructions, self.arch.arch_nr,
315 self.arch.syscalls['read'], 0)[0],
316 bpf.simulate(program.instructions, self.arch.arch_nr,
317 self.arch.syscalls['close'], 0)[0],
318 )
319
320 def test_compile_bst(self):
321 """Ensure compilation with BST is cheaper than the linear model."""
322 self._write_file(
323 'test.frequency', """
324 read: 1
325 close: 10
326 """)
327 path = self._write_file(
328 'test.policy', """
329 @frequency ./test.frequency
330 read: 1
331 close: 1
332 """)
333
Luis Hector Chaveza54812b2018-11-01 20:02:22 -0700334 for strategy in list(compiler.OptimizationStrategy):
335 program = self.compiler.compile_file(
336 path,
337 optimization_strategy=strategy,
338 kill_action=bpf.KillProcess())
339 self.assertGreater(
340 bpf.simulate(program.instructions, self.arch.arch_nr,
341 self.arch.syscalls['read'], 0)[0],
342 bpf.simulate(program.instructions, self.arch.arch_nr,
343 self.arch.syscalls['close'], 0)[0],
344 )
345 self.assertEqual(
346 bpf.simulate(program.instructions, self.arch.arch_nr,
347 self.arch.syscalls['read'], 0)[1], 'ALLOW')
348 self.assertEqual(
349 bpf.simulate(program.instructions, self.arch.arch_nr,
350 self.arch.syscalls['close'], 0)[1], 'ALLOW')
Luis Hector Chavez7a21ffe2018-12-05 16:54:30 -0800351
352 def test_compile_empty_file(self):
353 """Accept empty files."""
354 path = self._write_file(
355 'test.policy', """
356 @default kill-thread
357 """)
358
359 for strategy in list(compiler.OptimizationStrategy):
360 program = self.compiler.compile_file(
361 path,
362 optimization_strategy=strategy,
363 kill_action=bpf.KillProcess())
364 self.assertEqual(
365 bpf.simulate(program.instructions, self.arch.arch_nr,
366 self.arch.syscalls['read'], 0)[1], 'KILL_THREAD')
367
Luis Hector Chavezb21da7a2018-11-01 16:50:36 -0700368 def test_compile_simulate(self):
369 """Ensure policy reflects script by testing some random scripts."""
Luis Hector Chaveza54812b2018-11-01 20:02:22 -0700370 iterations = 5
Luis Hector Chavezb21da7a2018-11-01 16:50:36 -0700371 for i in range(iterations):
372 num_entries = len(self.arch.syscalls) * (i + 1) // iterations
373 syscalls = dict(
374 zip(
375 random.sample(self.arch.syscalls.keys(), num_entries),
376 (random.randint(1, 1024) for _ in range(num_entries)),
377 ))
378
379 frequency_contents = '\n'.join(
380 '%s: %d' % s for s in syscalls.items())
381 policy_contents = '@frequency ./test.frequency\n' + '\n'.join(
382 '%s: 1' % s[0] for s in syscalls.items())
383
384 self._write_file('test.frequency', frequency_contents)
385 path = self._write_file('test.policy', policy_contents)
386
387 for strategy in list(compiler.OptimizationStrategy):
388 program = self.compiler.compile_file(
389 path,
390 optimization_strategy=strategy,
391 kill_action=bpf.KillProcess())
392 for name, number in self.arch.syscalls.items():
393 expected_result = ('ALLOW'
394 if name in syscalls else 'KILL_PROCESS')
395 self.assertEqual(
396 bpf.simulate(program.instructions, self.arch.arch_nr,
397 number, 0)[1], expected_result,
398 ('syscall name: %s, syscall number: %d, '
399 'strategy: %s, policy:\n%s') %
400 (name, number, strategy, policy_contents))
401
Luis Hector Chavez7a21ffe2018-12-05 16:54:30 -0800402
Luis Hector Chavez05392b82018-10-28 21:40:10 -0700403if __name__ == '__main__':
404 unittest.main()