blob: b1592d299dbdbd1f8fccac4ac5148b1abc002a7d [file] [log] [blame]
Luis Hector Chavez05392b82018-10-28 21:40:10 -07001#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3#
4# Copyright (C) 2018 The Android Open Source Project
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17"""Unittests for the compiler module."""
18
19from __future__ import print_function
20
21import os
Luis Hector Chavezb21da7a2018-11-01 16:50:36 -070022import random
23import shutil
Luis Hector Chavez05392b82018-10-28 21:40:10 -070024import tempfile
25import unittest
26
27import arch
28import bpf
29import compiler
30import parser # pylint: disable=wrong-import-order
31
32ARCH_64 = arch.Arch.load_from_json(
33 os.path.join(
34 os.path.dirname(os.path.abspath(__file__)), 'testdata/arch_64.json'))
35
36
37class CompileFilterStatementTests(unittest.TestCase):
38 """Tests for PolicyCompiler.compile_filter_statement."""
39
40 def setUp(self):
41 self.arch = ARCH_64
42 self.compiler = compiler.PolicyCompiler(self.arch)
43
44 def _compile(self, line):
45 with tempfile.NamedTemporaryFile(mode='w') as policy_file:
46 policy_file.write(line)
47 policy_file.flush()
48 policy_parser = parser.PolicyParser(
49 self.arch, kill_action=bpf.KillProcess())
50 parsed_policy = policy_parser.parse_file(policy_file.name)
51 assert len(parsed_policy.filter_statements) == 1
52 return self.compiler.compile_filter_statement(
53 parsed_policy.filter_statements[0],
54 kill_action=bpf.KillProcess())
55
56 def test_allow(self):
57 """Accept lines where the syscall is accepted unconditionally."""
58 block = self._compile('read: allow')
59 self.assertEqual(block.filter, None)
60 self.assertEqual(
61 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
62 0)[1], 'ALLOW')
63 self.assertEqual(
64 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
65 1)[1], 'ALLOW')
66
67 def test_arg0_eq_generated_code(self):
68 """Accept lines with an argument filter with ==."""
69 block = self._compile('read: arg0 == 0x100')
70 # It might be a bit brittle to check the generated code in each test
71 # case instead of just the behavior, but there should be at least one
72 # test where this happens.
73 self.assertEqual(
74 block.filter.instructions,
75 [
76 bpf.SockFilter(bpf.BPF_LD | bpf.BPF_W | bpf.BPF_ABS, 0, 0,
77 bpf.arg_offset(0, True)),
78 # Jump to KILL_PROCESS if the high word does not match.
79 bpf.SockFilter(bpf.BPF_JMP | bpf.BPF_JEQ | bpf.BPF_K, 0, 2, 0),
80 bpf.SockFilter(bpf.BPF_LD | bpf.BPF_W | bpf.BPF_ABS, 0, 0,
81 bpf.arg_offset(0, False)),
82 # Jump to KILL_PROCESS if the low word does not match.
83 bpf.SockFilter(bpf.BPF_JMP | bpf.BPF_JEQ | bpf.BPF_K, 1, 0,
84 0x100),
85 bpf.SockFilter(bpf.BPF_RET, 0, 0,
86 bpf.SECCOMP_RET_KILL_PROCESS),
87 bpf.SockFilter(bpf.BPF_RET, 0, 0, bpf.SECCOMP_RET_ALLOW),
88 ])
89
90 def test_arg0_comparison_operators(self):
91 """Accept lines with an argument filter with comparison operators."""
92 biases = (-1, 0, 1)
93 # For each operator, store the expectations of simulating the program
94 # against the constant plus each entry from the |biases| array.
95 cases = (
96 ('==', ('KILL_PROCESS', 'ALLOW', 'KILL_PROCESS')),
97 ('!=', ('ALLOW', 'KILL_PROCESS', 'ALLOW')),
98 ('<', ('ALLOW', 'KILL_PROCESS', 'KILL_PROCESS')),
99 ('<=', ('ALLOW', 'ALLOW', 'KILL_PROCESS')),
100 ('>', ('KILL_PROCESS', 'KILL_PROCESS', 'ALLOW')),
101 ('>=', ('KILL_PROCESS', 'ALLOW', 'ALLOW')),
102 )
103 for operator, expectations in cases:
104 block = self._compile('read: arg0 %s 0x100' % operator)
105
106 # Check the filter's behavior.
107 for bias, expectation in zip(biases, expectations):
108 self.assertEqual(
109 block.simulate(self.arch.arch_nr,
110 self.arch.syscalls['read'],
111 0x100 + bias)[1], expectation)
112
113 def test_arg0_mask_operator(self):
114 """Accept lines with an argument filter with &."""
115 block = self._compile('read: arg0 & 0x3')
116
117 self.assertEqual(
118 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
119 0)[1], 'KILL_PROCESS')
120 self.assertEqual(
121 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
122 1)[1], 'ALLOW')
123 self.assertEqual(
124 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
125 2)[1], 'ALLOW')
126 self.assertEqual(
127 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
128 3)[1], 'ALLOW')
129 self.assertEqual(
130 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
131 4)[1], 'KILL_PROCESS')
132 self.assertEqual(
133 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
134 5)[1], 'ALLOW')
135 self.assertEqual(
136 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
137 6)[1], 'ALLOW')
138 self.assertEqual(
139 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
140 7)[1], 'ALLOW')
141 self.assertEqual(
142 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
143 8)[1], 'KILL_PROCESS')
144
145 def test_arg0_in_operator(self):
146 """Accept lines with an argument filter with in."""
147 block = self._compile('read: arg0 in 0x3')
148
149 # The 'in' operator only ensures that no bits outside the mask are set,
150 # which means that 0 is always allowed.
151 self.assertEqual(
152 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
153 0)[1], 'ALLOW')
154 self.assertEqual(
155 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
156 1)[1], 'ALLOW')
157 self.assertEqual(
158 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
159 2)[1], 'ALLOW')
160 self.assertEqual(
161 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
162 3)[1], 'ALLOW')
163 self.assertEqual(
164 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
165 4)[1], 'KILL_PROCESS')
166 self.assertEqual(
167 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
168 5)[1], 'KILL_PROCESS')
169 self.assertEqual(
170 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
171 6)[1], 'KILL_PROCESS')
172 self.assertEqual(
173 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
174 7)[1], 'KILL_PROCESS')
175 self.assertEqual(
176 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
177 8)[1], 'KILL_PROCESS')
178
179 def test_arg0_short_gt_ge_comparisons(self):
180 """Ensure that the short comparison optimization kicks in."""
181 if self.arch.bits == 32:
182 return
183 short_constant_str = '0xdeadbeef'
184 short_constant = int(short_constant_str, base=0)
185 long_constant_str = '0xbadc0ffee0ddf00d'
186 long_constant = int(long_constant_str, base=0)
187 biases = (-1, 0, 1)
188 # For each operator, store the expectations of simulating the program
189 # against the constant plus each entry from the |biases| array.
190 cases = (
191 ('<', ('ALLOW', 'KILL_PROCESS', 'KILL_PROCESS')),
192 ('<=', ('ALLOW', 'ALLOW', 'KILL_PROCESS')),
193 ('>', ('KILL_PROCESS', 'KILL_PROCESS', 'ALLOW')),
194 ('>=', ('KILL_PROCESS', 'ALLOW', 'ALLOW')),
195 )
196 for operator, expectations in cases:
197 short_block = self._compile(
198 'read: arg0 %s %s' % (operator, short_constant_str))
199 long_block = self._compile(
200 'read: arg0 %s %s' % (operator, long_constant_str))
201
202 # Check that the emitted code is shorter when the high word of the
203 # constant is zero.
204 self.assertLess(
205 len(short_block.filter.instructions),
206 len(long_block.filter.instructions))
207
208 # Check the filter's behavior.
209 for bias, expectation in zip(biases, expectations):
210 self.assertEqual(
211 long_block.simulate(self.arch.arch_nr,
212 self.arch.syscalls['read'],
213 long_constant + bias)[1], expectation)
214 self.assertEqual(
215 short_block.simulate(
216 self.arch.arch_nr, self.arch.syscalls['read'],
217 short_constant + bias)[1], expectation)
218
219 def test_and_or(self):
220 """Accept lines with a complex expression in DNF."""
221 block = self._compile('read: arg0 == 0 && arg1 == 0 || arg0 == 1')
222
223 self.assertEqual(
224 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'], 0,
225 0)[1], 'ALLOW')
226 self.assertEqual(
227 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'], 0,
228 1)[1], 'KILL_PROCESS')
229 self.assertEqual(
230 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'], 1,
231 0)[1], 'ALLOW')
232 self.assertEqual(
233 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'], 1,
234 1)[1], 'ALLOW')
235
Luis Hector Chavez89a27102019-03-09 18:45:52 -0800236 def test_trap(self):
237 """Accept lines that trap unconditionally."""
238 block = self._compile('read: trap')
239
240 self.assertEqual(
241 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
242 0)[1], 'TRAP')
243
Luis Hector Chavez05392b82018-10-28 21:40:10 -0700244 def test_ret_errno(self):
245 """Accept lines that return errno."""
246 block = self._compile('read : arg0 == 0 || arg0 == 1 ; return 1')
247
248 self.assertEqual(
249 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
250 0)[1:], ('ERRNO', 1))
251 self.assertEqual(
252 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
253 1)[1:], ('ERRNO', 1))
254 self.assertEqual(
255 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
256 2)[1], 'KILL_PROCESS')
257
258 def test_ret_errno_unconditionally(self):
259 """Accept lines that return errno unconditionally."""
260 block = self._compile('read: return 1')
261
262 self.assertEqual(
263 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
264 0)[1:], ('ERRNO', 1))
265
Luis Hector Chavez89a27102019-03-09 18:45:52 -0800266 def test_trace(self):
267 """Accept lines that trace unconditionally."""
268 block = self._compile('read: trace')
269
270 self.assertEqual(
271 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
272 0)[1], 'TRACE')
273
Luis Héctor Chávez59a64492021-01-03 05:46:47 -0800274 def test_user_notify(self):
275 """Accept lines that notify unconditionally."""
276 block = self._compile('read: user-notify')
277
278 self.assertEqual(
279 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
280 0)[1], 'USER_NOTIF')
281
Luis Hector Chavez89a27102019-03-09 18:45:52 -0800282 def test_log(self):
283 """Accept lines that log unconditionally."""
284 block = self._compile('read: log')
285
286 self.assertEqual(
287 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
288 0)[1], 'LOG')
289
Luis Hector Chavez05392b82018-10-28 21:40:10 -0700290 def test_mmap_write_xor_exec(self):
291 """Accept the idiomatic filter for mmap."""
292 block = self._compile(
293 'read : arg0 in ~PROT_WRITE || arg0 in ~PROT_EXEC')
294
295 prot_exec_and_write = 6
296 for prot in range(0, 0xf):
297 if (prot & prot_exec_and_write) == prot_exec_and_write:
298 self.assertEqual(
299 block.simulate(self.arch.arch_nr,
300 self.arch.syscalls['read'], prot)[1],
301 'KILL_PROCESS')
302 else:
303 self.assertEqual(
304 block.simulate(self.arch.arch_nr,
305 self.arch.syscalls['read'], prot)[1],
306 'ALLOW')
307
308
Luis Hector Chavez7a21ffe2018-12-05 16:54:30 -0800309class CompileFileTests(unittest.TestCase):
310 """Tests for PolicyCompiler.compile_file."""
311
312 def setUp(self):
313 self.arch = ARCH_64
314 self.compiler = compiler.PolicyCompiler(self.arch)
315 self.tempdir = tempfile.mkdtemp()
316
317 def tearDown(self):
318 shutil.rmtree(self.tempdir)
319
320 def _write_file(self, filename, contents):
321 """Helper to write out a file for testing."""
322 path = os.path.join(self.tempdir, filename)
323 with open(path, 'w') as outf:
324 outf.write(contents)
325 return path
326
Luis Hector Chaveza54812b2018-11-01 20:02:22 -0700327 def test_compile(self):
328 """Ensure compilation works with all strategies."""
Luis Hector Chavez7a21ffe2018-12-05 16:54:30 -0800329 self._write_file(
330 'test.frequency', """
331 read: 1
332 close: 10
333 """)
334 path = self._write_file(
335 'test.policy', """
336 @frequency ./test.frequency
337 read: 1
338 close: 1
339 """)
340
341 program = self.compiler.compile_file(
342 path,
343 optimization_strategy=compiler.OptimizationStrategy.LINEAR,
344 kill_action=bpf.KillProcess())
345 self.assertGreater(
346 bpf.simulate(program.instructions, self.arch.arch_nr,
347 self.arch.syscalls['read'], 0)[0],
348 bpf.simulate(program.instructions, self.arch.arch_nr,
349 self.arch.syscalls['close'], 0)[0],
350 )
351
352 def test_compile_bst(self):
353 """Ensure compilation with BST is cheaper than the linear model."""
354 self._write_file(
355 'test.frequency', """
356 read: 1
357 close: 10
358 """)
359 path = self._write_file(
360 'test.policy', """
361 @frequency ./test.frequency
362 read: 1
363 close: 1
364 """)
365
Luis Hector Chaveza54812b2018-11-01 20:02:22 -0700366 for strategy in list(compiler.OptimizationStrategy):
367 program = self.compiler.compile_file(
368 path,
369 optimization_strategy=strategy,
370 kill_action=bpf.KillProcess())
371 self.assertGreater(
372 bpf.simulate(program.instructions, self.arch.arch_nr,
373 self.arch.syscalls['read'], 0)[0],
374 bpf.simulate(program.instructions, self.arch.arch_nr,
375 self.arch.syscalls['close'], 0)[0],
376 )
377 self.assertEqual(
378 bpf.simulate(program.instructions, self.arch.arch_nr,
379 self.arch.syscalls['read'], 0)[1], 'ALLOW')
380 self.assertEqual(
381 bpf.simulate(program.instructions, self.arch.arch_nr,
382 self.arch.syscalls['close'], 0)[1], 'ALLOW')
Luis Hector Chavez7a21ffe2018-12-05 16:54:30 -0800383
384 def test_compile_empty_file(self):
385 """Accept empty files."""
386 path = self._write_file(
387 'test.policy', """
388 @default kill-thread
389 """)
390
391 for strategy in list(compiler.OptimizationStrategy):
392 program = self.compiler.compile_file(
393 path,
394 optimization_strategy=strategy,
395 kill_action=bpf.KillProcess())
396 self.assertEqual(
397 bpf.simulate(program.instructions, self.arch.arch_nr,
398 self.arch.syscalls['read'], 0)[1], 'KILL_THREAD')
399
Luis Hector Chavezb21da7a2018-11-01 16:50:36 -0700400 def test_compile_simulate(self):
401 """Ensure policy reflects script by testing some random scripts."""
Luis Hector Chaveza54812b2018-11-01 20:02:22 -0700402 iterations = 5
Luis Hector Chavezb21da7a2018-11-01 16:50:36 -0700403 for i in range(iterations):
Luis Hector Chavez89a27102019-03-09 18:45:52 -0800404 num_entries = 64 * (i + 1) // iterations
Luis Hector Chavezb21da7a2018-11-01 16:50:36 -0700405 syscalls = dict(
406 zip(
407 random.sample(self.arch.syscalls.keys(), num_entries),
408 (random.randint(1, 1024) for _ in range(num_entries)),
409 ))
410
411 frequency_contents = '\n'.join(
412 '%s: %d' % s for s in syscalls.items())
413 policy_contents = '@frequency ./test.frequency\n' + '\n'.join(
414 '%s: 1' % s[0] for s in syscalls.items())
415
416 self._write_file('test.frequency', frequency_contents)
417 path = self._write_file('test.policy', policy_contents)
418
419 for strategy in list(compiler.OptimizationStrategy):
420 program = self.compiler.compile_file(
421 path,
422 optimization_strategy=strategy,
423 kill_action=bpf.KillProcess())
424 for name, number in self.arch.syscalls.items():
425 expected_result = ('ALLOW'
426 if name in syscalls else 'KILL_PROCESS')
427 self.assertEqual(
428 bpf.simulate(program.instructions, self.arch.arch_nr,
429 number, 0)[1], expected_result,
430 ('syscall name: %s, syscall number: %d, '
431 'strategy: %s, policy:\n%s') %
432 (name, number, strategy, policy_contents))
433
Luis Hector Chavez89a27102019-03-09 18:45:52 -0800434 @unittest.skipIf(not int(os.getenv('SLOW_TESTS', '0')), 'slow')
435 def test_compile_huge_policy(self):
436 """Ensure jumps while compiling a huge policy are still valid."""
437 # Given that the BST strategy is O(n^3), don't choose a crazy large
438 # value, but it still needs to be around 128 so that we exercise the
439 # codegen paths that depend on the length of the jump.
440 #
441 # Immediate jump offsets in BPF comparison instructions are limited to
442 # 256 instructions, so given that every syscall filter consists of a
443 # load and jump instructions, with 128 syscalls there will be at least
444 # one jump that's further than 256 instructions.
445 num_entries = 128
446 syscalls = dict(random.sample(self.arch.syscalls.items(), num_entries))
447 # Here we force every single filter to be distinct. Otherwise the
448 # codegen layer will coalesce filters that compile to the same
449 # instructions.
450 policy_contents = '\n'.join(
451 '%s: arg0 == %d' % s for s in syscalls.items())
452
453 path = self._write_file('test.policy', policy_contents)
454
455 program = self.compiler.compile_file(
456 path,
457 optimization_strategy=compiler.OptimizationStrategy.BST,
458 kill_action=bpf.KillProcess())
459 for name, number in self.arch.syscalls.items():
460 expected_result = ('ALLOW'
461 if name in syscalls else 'KILL_PROCESS')
462 self.assertEqual(
463 bpf.simulate(program.instructions, self.arch.arch_nr,
464 self.arch.syscalls[name], number)[1],
465 expected_result)
466 self.assertEqual(
467 bpf.simulate(program.instructions, self.arch.arch_nr,
468 self.arch.syscalls[name], number + 1)[1],
469 'KILL_PROCESS')
470
Luis Hector Chavez3b41ae32019-03-09 18:46:20 -0800471 def test_compile_huge_filter(self):
472 """Ensure jumps while compiling a huge policy are still valid."""
473 # This is intended to force cases where the AST visitation would result
474 # in a combinatorial explosion of calls to Block.accept(). An optimized
475 # implementation should be O(n).
476 num_entries = 128
477 syscalls = {}
478 # Here we force every single filter to be distinct. Otherwise the
479 # codegen layer will coalesce filters that compile to the same
480 # instructions.
481 policy_contents = []
482 for name in random.sample(self.arch.syscalls.keys(), num_entries):
483 values = random.sample(range(1024), num_entries)
484 syscalls[name] = values
485 policy_contents.append(
486 '%s: %s' % (name, ' || '.join('arg0 == %d' % value
487 for value in values)))
488
489 path = self._write_file('test.policy', '\n'.join(policy_contents))
490
491 program = self.compiler.compile_file(
492 path,
493 optimization_strategy=compiler.OptimizationStrategy.LINEAR,
494 kill_action=bpf.KillProcess())
495 for name, values in syscalls.items():
496 self.assertEqual(
497 bpf.simulate(program.instructions,
498 self.arch.arch_nr, self.arch.syscalls[name],
499 random.choice(values))[1], 'ALLOW')
500 self.assertEqual(
501 bpf.simulate(program.instructions, self.arch.arch_nr,
502 self.arch.syscalls[name], 1025)[1],
503 'KILL_PROCESS')
504
Luis Hector Chavez7a21ffe2018-12-05 16:54:30 -0800505
Luis Hector Chavez05392b82018-10-28 21:40:10 -0700506if __name__ == '__main__':
507 unittest.main()