| #!/usr/bin/env python3 |
| # -*- coding: utf-8 -*- |
| # |
| # Copyright (C) 2018 The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| """Unittests for the parser module.""" |
| |
| from __future__ import absolute_import |
| from __future__ import division |
| from __future__ import print_function |
| |
| import os |
| import shutil |
| import tempfile |
| import unittest |
| |
| import arch |
| import bpf |
| import parser # pylint: disable=wrong-import-order |
| |
| ARCH_64 = arch.Arch.load_from_json( |
| os.path.join( |
| os.path.dirname(os.path.abspath(__file__)), 'testdata/arch_64.json')) |
| |
| |
| class TokenizerTests(unittest.TestCase): |
| """Tests for ParserState.tokenize.""" |
| |
| @staticmethod |
| def _tokenize(line): |
| parser_state = parser.ParserState('<memory>') |
| return list(parser_state.tokenize([line]))[0] |
| |
| def test_tokenize(self): |
| """Accept valid tokens.""" |
| self.assertEqual([ |
| (token.type, token.value) |
| for token in TokenizerTests._tokenize('@include /minijail.policy') |
| ], [ |
| ('INCLUDE', '@include'), |
| ('PATH', '/minijail.policy'), |
| ]) |
| self.assertEqual([ |
| (token.type, token.value) |
| for token in TokenizerTests._tokenize('@include ./minijail.policy') |
| ], [ |
| ('INCLUDE', '@include'), |
| ('PATH', './minijail.policy'), |
| ]) |
| self.assertEqual( |
| [(token.type, token.value) for token in TokenizerTests._tokenize( |
| 'read: arg0 in ~0xffff || arg0 & (1|2) && arg0 == 0o755; ' |
| 'return ENOSYS # ignored')], [ |
| ('IDENTIFIER', 'read'), |
| ('COLON', ':'), |
| ('ARGUMENT', 'arg0'), |
| ('OP', 'in'), |
| ('BITWISE_COMPLEMENT', '~'), |
| ('NUMERIC_CONSTANT', '0xffff'), |
| ('OR', '||'), |
| ('ARGUMENT', 'arg0'), |
| ('OP', '&'), |
| ('LPAREN', '('), |
| ('NUMERIC_CONSTANT', '1'), |
| ('BITWISE_OR', '|'), |
| ('NUMERIC_CONSTANT', '2'), |
| ('RPAREN', ')'), |
| ('AND', '&&'), |
| ('ARGUMENT', 'arg0'), |
| ('OP', '=='), |
| ('NUMERIC_CONSTANT', '0o755'), |
| ('SEMICOLON', ';'), |
| ('RETURN', 'return'), |
| ('IDENTIFIER', 'ENOSYS'), |
| ]) |
| |
| def test_tokenize_invalid_token(self): |
| """Reject tokenizer errors.""" |
| with self.assertRaisesRegex(parser.ParseException, |
| (r'<memory>\(1:1\): invalid token\n' |
| r' %invalid-token%\n' |
| r' \^')): |
| TokenizerTests._tokenize('%invalid-token%') |
| |
| |
| class ParseConstantTests(unittest.TestCase): |
| """Tests for PolicyParser.parse_value.""" |
| |
| def setUp(self): |
| self.arch = ARCH_64 |
| self.parser = parser.PolicyParser( |
| self.arch, kill_action=bpf.KillProcess()) |
| |
| def _tokenize(self, line): |
| # pylint: disable=protected-access |
| return list(self.parser._parser_state.tokenize([line]))[0] |
| |
| def test_parse_constant_unsigned(self): |
| """Accept reasonably-sized unsigned constants.""" |
| self.assertEqual( |
| self.parser.parse_value(self._tokenize('0x80000000')), 0x80000000) |
| if self.arch.bits == 64: |
| self.assertEqual( |
| self.parser.parse_value(self._tokenize('0x8000000000000000')), |
| 0x8000000000000000) |
| |
| def test_parse_constant_unsigned_too_big(self): |
| """Reject unreasonably-sized unsigned constants.""" |
| if self.arch.bits == 32: |
| with self.assertRaisesRegex(parser.ParseException, |
| 'unsigned overflow'): |
| self.parser.parse_value(self._tokenize('0x100000000')) |
| with self.assertRaisesRegex(parser.ParseException, |
| 'unsigned overflow'): |
| self.parser.parse_value(self._tokenize('0x10000000000000000')) |
| |
| def test_parse_constant_signed(self): |
| """Accept reasonably-sized signed constants.""" |
| self.assertEqual( |
| self.parser.parse_value(self._tokenize('-1')), |
| self.arch.max_unsigned) |
| |
| def test_parse_constant_signed_too_negative(self): |
| """Reject unreasonably-sized signed constants.""" |
| if self.arch.bits == 32: |
| with self.assertRaisesRegex(parser.ParseException, |
| 'signed underflow'): |
| self.parser.parse_value(self._tokenize('-0x800000001')) |
| with self.assertRaisesRegex(parser.ParseException, 'signed underflow'): |
| self.parser.parse_value(self._tokenize('-0x8000000000000001')) |
| |
| def test_parse_mask(self): |
| """Accept parsing a mask value.""" |
| self.assertEqual( |
| self.parser.parse_value(self._tokenize('0x1|0x2|0x4|0x8')), 0xf) |
| |
| def test_parse_parenthesized_expressions(self): |
| """Accept parsing parenthesized expressions.""" |
| bad_expressions = [ |
| '(1', |
| '|(1)', |
| '(1)|', |
| '()', |
| '(', |
| '((', |
| '(()', |
| '(()1', |
| ] |
| for expression in bad_expressions: |
| with self.assertRaises(parser.ParseException, msg=expression): |
| self.parser.parse_value(self._tokenize(expression)) |
| |
| bad_partial_expressions = [ |
| '1)', |
| '(1)1', |
| '1(0)', |
| ] |
| for expression in bad_partial_expressions: |
| tokens = self._tokenize(expression) |
| self.parser.parse_value(tokens) |
| self.assertNotEqual(tokens, []) |
| |
| good_expressions = [ |
| '(3)', |
| '(1)|2', |
| '1|(2)', |
| '(1)|(2)', |
| '((3))', |
| '0|(1|2)', |
| '(0|1|2)', |
| ] |
| for expression in good_expressions: |
| self.assertEqual( |
| self.parser.parse_value(self._tokenize(expression)), 3) |
| |
| def test_parse_constant_complements(self): |
| """Accept complementing constants.""" |
| self.assertEqual( |
| self.parser.parse_value(self._tokenize('~0')), |
| self.arch.max_unsigned) |
| self.assertEqual( |
| self.parser.parse_value(self._tokenize('~0|~0')), |
| self.arch.max_unsigned) |
| if self.arch.bits == 32: |
| self.assertEqual( |
| self.parser.parse_value( |
| self._tokenize('~0x005AF0FF|~0xFFA50FFF')), 0xFFFFFF00) |
| self.assertEqual( |
| self.parser.parse_value( |
| self._tokenize('0x0F|~(0x005AF000|0x00A50FFF)|0xF0')), |
| 0xFF0000FF) |
| else: |
| self.assertEqual( |
| self.parser.parse_value( |
| self._tokenize('~0x00005A5AF0F0FFFF|~0xFFFFA5A50F0FFFFF')), |
| 0xFFFFFFFFFFFF0000) |
| self.assertEqual( |
| self.parser.parse_value( |
| self._tokenize( |
| '0x00FF|~(0x00005A5AF0F00000|0x0000A5A50F0FFFFF)|0xFF00' |
| )), 0xFFFF00000000FFFF) |
| |
| def test_parse_double_complement(self): |
| """Reject double-complementing constants.""" |
| with self.assertRaisesRegex(parser.ParseException, |
| 'double complement'): |
| self.parser.parse_value(self._tokenize('~~0')) |
| |
| def test_parse_empty_complement(self): |
| """Reject complementing nothing.""" |
| with self.assertRaisesRegex(parser.ParseException, 'empty complement'): |
| self.parser.parse_value(self._tokenize('0|~')) |
| |
| def test_parse_named_constant(self): |
| """Accept parsing a named constant.""" |
| self.assertEqual( |
| self.parser.parse_value(self._tokenize('O_RDONLY')), 0) |
| |
| def test_parse_empty_constant(self): |
| """Reject parsing nothing.""" |
| with self.assertRaisesRegex(parser.ParseException, 'empty constant'): |
| self.parser.parse_value([]) |
| with self.assertRaisesRegex(parser.ParseException, 'empty constant'): |
| self.parser.parse_value(self._tokenize('0|')) |
| |
| def test_parse_invalid_constant(self): |
| """Reject parsing invalid constants.""" |
| with self.assertRaisesRegex(parser.ParseException, 'invalid constant'): |
| self.parser.parse_value(self._tokenize('foo')) |
| |
| |
| class ParseFilterExpressionTests(unittest.TestCase): |
| """Tests for PolicyParser.parse_argument_expression.""" |
| |
| def setUp(self): |
| self.arch = ARCH_64 |
| self.parser = parser.PolicyParser( |
| self.arch, kill_action=bpf.KillProcess()) |
| |
| def _tokenize(self, line): |
| # pylint: disable=protected-access |
| return list(self.parser._parser_state.tokenize([line]))[0] |
| |
| def test_parse_argument_expression(self): |
| """Accept valid argument expressions.""" |
| self.assertEqual( |
| self.parser.parse_argument_expression( |
| self._tokenize( |
| 'arg0 in 0xffff || arg0 == PROT_EXEC && arg1 == PROT_WRITE' |
| )), [ |
| [parser.Atom(0, 'in', 0xffff)], |
| [parser.Atom(0, '==', 4), |
| parser.Atom(1, '==', 2)], |
| ]) |
| |
| def test_parse_empty_argument_expression(self): |
| """Reject empty argument expressions.""" |
| with self.assertRaisesRegex(parser.ParseException, |
| 'empty argument expression'): |
| self.parser.parse_argument_expression( |
| self._tokenize('arg0 in 0xffff ||')) |
| |
| def test_parse_empty_clause(self): |
| """Reject empty clause.""" |
| with self.assertRaisesRegex(parser.ParseException, 'empty clause'): |
| self.parser.parse_argument_expression( |
| self._tokenize('arg0 in 0xffff &&')) |
| |
| def test_parse_invalid_argument(self): |
| """Reject invalid argument.""" |
| with self.assertRaisesRegex(parser.ParseException, 'invalid argument'): |
| self.parser.parse_argument_expression( |
| self._tokenize('argX in 0xffff')) |
| |
| def test_parse_invalid_operator(self): |
| """Reject invalid operator.""" |
| with self.assertRaisesRegex(parser.ParseException, 'invalid operator'): |
| self.parser.parse_argument_expression( |
| self._tokenize('arg0 = 0xffff')) |
| |
| def test_parse_missing_operator(self): |
| """Reject missing operator.""" |
| with self.assertRaisesRegex(parser.ParseException, 'missing operator'): |
| self.parser.parse_argument_expression(self._tokenize('arg0')) |
| |
| def test_parse_missing_operand(self): |
| """Reject missing operand.""" |
| with self.assertRaisesRegex(parser.ParseException, 'empty constant'): |
| self.parser.parse_argument_expression(self._tokenize('arg0 ==')) |
| |
| |
| class ParseFilterTests(unittest.TestCase): |
| """Tests for PolicyParser.parse_filter.""" |
| |
| def setUp(self): |
| self.arch = ARCH_64 |
| self.parser = parser.PolicyParser( |
| self.arch, kill_action=bpf.KillProcess()) |
| |
| def _tokenize(self, line): |
| # pylint: disable=protected-access |
| return list(self.parser._parser_state.tokenize([line]))[0] |
| |
| def test_parse_filter(self): |
| """Accept valid filters.""" |
| self.assertEqual( |
| self.parser.parse_filter(self._tokenize('arg0 == 0')), [ |
| parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()), |
| ]) |
| self.assertEqual( |
| self.parser.parse_filter(self._tokenize('kill-process')), [ |
| parser.Filter(None, bpf.KillProcess()), |
| ]) |
| self.assertEqual( |
| self.parser.parse_filter(self._tokenize('kill-thread')), [ |
| parser.Filter(None, bpf.KillThread()), |
| ]) |
| self.assertEqual( |
| self.parser.parse_filter(self._tokenize('trap')), [ |
| parser.Filter(None, bpf.Trap()), |
| ]) |
| self.assertEqual( |
| self.parser.parse_filter(self._tokenize('return ENOSYS')), [ |
| parser.Filter(None, |
| bpf.ReturnErrno(self.arch.constants['ENOSYS'])), |
| ]) |
| self.assertEqual( |
| self.parser.parse_filter(self._tokenize('trace')), [ |
| parser.Filter(None, bpf.Trace()), |
| ]) |
| self.assertEqual( |
| self.parser.parse_filter(self._tokenize('log')), [ |
| parser.Filter(None, bpf.Log()), |
| ]) |
| self.assertEqual( |
| self.parser.parse_filter(self._tokenize('allow')), [ |
| parser.Filter(None, bpf.Allow()), |
| ]) |
| self.assertEqual( |
| self.parser.parse_filter(self._tokenize('1')), [ |
| parser.Filter(None, bpf.Allow()), |
| ]) |
| self.assertEqual( |
| self.parser.parse_filter( |
| self._tokenize( |
| '{ arg0 == 0, arg0 == 1; return ENOSYS, trap }')), |
| [ |
| parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()), |
| parser.Filter([[parser.Atom(0, '==', 1)]], |
| bpf.ReturnErrno(self.arch.constants['ENOSYS'])), |
| parser.Filter(None, bpf.Trap()), |
| ]) |
| |
| def test_parse_missing_return_value(self): |
| """Reject missing return value.""" |
| with self.assertRaisesRegex(parser.ParseException, |
| 'missing return value'): |
| self.parser.parse_filter(self._tokenize('return')) |
| |
| def test_parse_invalid_return_value(self): |
| """Reject invalid return value.""" |
| with self.assertRaisesRegex(parser.ParseException, 'invalid constant'): |
| self.parser.parse_filter(self._tokenize('return arg0')) |
| |
| def test_parse_unclosed_brace(self): |
| """Reject unclosed brace.""" |
| with self.assertRaisesRegex(parser.ParseException, 'unclosed brace'): |
| self.parser.parse_filter(self._tokenize('{ allow')) |
| |
| |
| class ParseFilterStatementTests(unittest.TestCase): |
| """Tests for PolicyParser.parse_filter_statement.""" |
| |
| def setUp(self): |
| self.arch = ARCH_64 |
| self.parser = parser.PolicyParser( |
| self.arch, kill_action=bpf.KillProcess()) |
| |
| def _tokenize(self, line): |
| # pylint: disable=protected-access |
| return list(self.parser._parser_state.tokenize([line]))[0] |
| |
| def test_parse_filter_statement(self): |
| """Accept valid filter statements.""" |
| self.assertEqual( |
| self.parser.parse_filter_statement( |
| self._tokenize('read: arg0 == 0')), |
| parser.ParsedFilterStatement((parser.Syscall('read', 0), ), [ |
| parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()), |
| ])) |
| self.assertEqual( |
| self.parser.parse_filter_statement( |
| self._tokenize('{read, write}: arg0 == 0')), |
| parser.ParsedFilterStatement(( |
| parser.Syscall('read', 0), |
| parser.Syscall('write', 1), |
| ), [ |
| parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()), |
| ])) |
| self.assertEqual( |
| self.parser.parse_filter_statement( |
| self._tokenize('io@libc: arg0 == 0')), |
| parser.ParsedFilterStatement(( |
| parser.Syscall('read', 0), |
| parser.Syscall('write', 1), |
| ), [ |
| parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()), |
| ])) |
| self.assertEqual( |
| self.parser.parse_filter_statement( |
| self._tokenize('file-io@systemd: arg0 == 0')), |
| parser.ParsedFilterStatement(( |
| parser.Syscall('read', 0), |
| parser.Syscall('write', 1), |
| ), [ |
| parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()), |
| ])) |
| |
| def test_parse_metadata(self): |
| """Accept valid filter statements with metadata.""" |
| self.assertEqual( |
| self.parser.parse_filter_statement( |
| self._tokenize('read[arch=test]: arg0 == 0')), |
| parser.ParsedFilterStatement((parser.Syscall('read', 0), ), [ |
| parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()), |
| ])) |
| self.assertEqual( |
| self.parser.parse_filter_statement( |
| self._tokenize( |
| '{read, nonexistent[arch=nonexistent]}: arg0 == 0')), |
| parser.ParsedFilterStatement((parser.Syscall('read', 0), ), [ |
| parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()), |
| ])) |
| |
| def test_parse_unclosed_brace(self): |
| """Reject unclosed brace.""" |
| with self.assertRaisesRegex(parser.ParseException, 'unclosed brace'): |
| self.parser.parse_filter(self._tokenize('{ allow')) |
| |
| def test_parse_invalid_syscall_group(self): |
| """Reject invalid syscall groups.""" |
| with self.assertRaisesRegex(parser.ParseException, 'unclosed brace'): |
| self.parser.parse_filter_statement( |
| self._tokenize('{ read, write: arg0 == 0')) |
| |
| def test_parse_missing_colon(self): |
| """Reject missing colon.""" |
| with self.assertRaisesRegex(parser.ParseException, 'missing colon'): |
| self.parser.parse_filter_statement(self._tokenize('read')) |
| |
| def test_parse_invalid_colon(self): |
| """Reject invalid colon.""" |
| with self.assertRaisesRegex(parser.ParseException, 'invalid colon'): |
| self.parser.parse_filter_statement(self._tokenize('read arg0')) |
| |
| def test_parse_missing_filter(self): |
| """Reject missing filter.""" |
| with self.assertRaisesRegex(parser.ParseException, 'missing filter'): |
| self.parser.parse_filter_statement(self._tokenize('read:')) |
| |
| |
| class ParseFileTests(unittest.TestCase): |
| """Tests for PolicyParser.parse_file.""" |
| |
| def setUp(self): |
| self.arch = ARCH_64 |
| self.parser = parser.PolicyParser( |
| self.arch, kill_action=bpf.KillProcess()) |
| self.tempdir = tempfile.mkdtemp() |
| |
| def tearDown(self): |
| shutil.rmtree(self.tempdir) |
| |
| def _write_file(self, filename, contents): |
| """Helper to write out a file for testing.""" |
| path = os.path.join(self.tempdir, filename) |
| with open(path, 'w') as outf: |
| outf.write(contents) |
| return path |
| |
| def test_parse_simple(self): |
| """Allow simple policy files.""" |
| path = self._write_file( |
| 'test.policy', """ |
| # Comment. |
| read: allow |
| write: allow |
| """) |
| |
| self.assertEqual( |
| self.parser.parse_file(path), |
| parser.ParsedPolicy( |
| default_action=bpf.KillProcess(), |
| filter_statements=[ |
| parser.FilterStatement( |
| syscall=parser.Syscall('read', 0), |
| frequency=1, |
| filters=[ |
| parser.Filter(None, bpf.Allow()), |
| ]), |
| parser.FilterStatement( |
| syscall=parser.Syscall('write', 1), |
| frequency=1, |
| filters=[ |
| parser.Filter(None, bpf.Allow()), |
| ]), |
| ])) |
| |
| def test_parse_multiline(self): |
| """Allow simple multi-line policy files.""" |
| path = self._write_file( |
| 'test.policy', """ |
| # Comment. |
| read: \ |
| allow |
| write: allow |
| """) |
| |
| self.assertEqual( |
| self.parser.parse_file(path), |
| parser.ParsedPolicy( |
| default_action=bpf.KillProcess(), |
| filter_statements=[ |
| parser.FilterStatement( |
| syscall=parser.Syscall('read', 0), |
| frequency=1, |
| filters=[ |
| parser.Filter(None, bpf.Allow()), |
| ]), |
| parser.FilterStatement( |
| syscall=parser.Syscall('write', 1), |
| frequency=1, |
| filters=[ |
| parser.Filter(None, bpf.Allow()), |
| ]), |
| ])) |
| |
| def test_parse_default(self): |
| """Allow defining a default action.""" |
| path = self._write_file( |
| 'test.policy', """ |
| @default kill-thread |
| read: allow |
| """) |
| |
| self.assertEqual( |
| self.parser.parse_file(path), |
| parser.ParsedPolicy( |
| default_action=bpf.KillThread(), |
| filter_statements=[ |
| parser.FilterStatement( |
| syscall=parser.Syscall('read', 0), |
| frequency=1, |
| filters=[ |
| parser.Filter(None, bpf.Allow()), |
| ]), |
| ])) |
| |
| def test_parse_default_permissive(self): |
| """Reject defining a permissive default action.""" |
| path = self._write_file( |
| 'test.policy', """ |
| @default log |
| read: allow |
| """) |
| |
| with self.assertRaisesRegex(parser.ParseException, |
| r'invalid permissive default action'): |
| self.parser.parse_file(path) |
| |
| def test_parse_simple_grouped(self): |
| """Allow simple policy files.""" |
| path = self._write_file( |
| 'test.policy', """ |
| # Comment. |
| {read, write}: allow |
| """) |
| |
| self.assertEqual( |
| self.parser.parse_file(path), |
| parser.ParsedPolicy( |
| default_action=bpf.KillProcess(), |
| filter_statements=[ |
| parser.FilterStatement( |
| syscall=parser.Syscall('read', 0), |
| frequency=1, |
| filters=[ |
| parser.Filter(None, bpf.Allow()), |
| ]), |
| parser.FilterStatement( |
| syscall=parser.Syscall('write', 1), |
| frequency=1, |
| filters=[ |
| parser.Filter(None, bpf.Allow()), |
| ]), |
| ])) |
| |
| def test_parse_other_arch(self): |
| """Allow entries that only target another architecture.""" |
| path = self._write_file( |
| 'test.policy', """ |
| # Comment. |
| read[arch=nonexistent]: allow |
| write: allow |
| """) |
| |
| self.assertEqual( |
| self.parser.parse_file(path), |
| parser.ParsedPolicy( |
| default_action=bpf.KillProcess(), |
| filter_statements=[ |
| parser.FilterStatement( |
| syscall=parser.Syscall('write', 1), |
| frequency=1, |
| filters=[ |
| parser.Filter(None, bpf.Allow()), |
| ]), |
| ])) |
| |
| def test_parse_include(self): |
| """Allow including policy files.""" |
| path = self._write_file( |
| 'test.include.policy', """ |
| {read, write}: arg0 == 0; allow |
| """) |
| path = self._write_file( |
| 'test.policy', """ |
| @include ./test.include.policy |
| read: return ENOSYS |
| """) |
| |
| self.assertEqual( |
| self.parser.parse_file(path), |
| parser.ParsedPolicy( |
| default_action=bpf.KillProcess(), |
| filter_statements=[ |
| parser.FilterStatement( |
| syscall=parser.Syscall('read', 0), |
| frequency=1, |
| filters=[ |
| parser.Filter([[parser.Atom(0, '==', 0)]], |
| bpf.Allow()), |
| parser.Filter( |
| None, |
| bpf.ReturnErrno( |
| self.arch.constants['ENOSYS'])), |
| ]), |
| parser.FilterStatement( |
| syscall=parser.Syscall('write', 1), |
| frequency=1, |
| filters=[ |
| parser.Filter([[parser.Atom(0, '==', 0)]], |
| bpf.Allow()), |
| parser.Filter(None, bpf.KillProcess()), |
| ]), |
| ])) |
| |
| def test_parse_invalid_include(self): |
| """Reject including invalid policy files.""" |
| with self.assertRaisesRegex(parser.ParseException, |
| r'empty include path'): |
| path = self._write_file( |
| 'test.policy', """ |
| @include |
| """) |
| self.parser.parse_file(path) |
| |
| with self.assertRaisesRegex(parser.ParseException, |
| r'invalid include path'): |
| path = self._write_file( |
| 'test.policy', """ |
| @include arg0 |
| """) |
| self.parser.parse_file(path) |
| |
| with self.assertRaisesRegex(parser.ParseException, |
| r'@include statement nested too deep'): |
| path = self._write_file( |
| 'test.policy', """ |
| @include ./test.policy |
| """) |
| self.parser.parse_file(path) |
| |
| with self.assertRaisesRegex(parser.ParseException, |
| r'Could not @include .*'): |
| path = self._write_file( |
| 'test.policy', """ |
| @include ./nonexistent.policy |
| """) |
| self.parser.parse_file(path) |
| |
| def test_parse_frequency(self): |
| """Allow including frequency files.""" |
| self._write_file( |
| 'test.frequency', """ |
| read: 2 |
| write: 3 |
| """) |
| path = self._write_file( |
| 'test.policy', """ |
| @frequency ./test.frequency |
| read: allow |
| """) |
| |
| self.assertEqual( |
| self.parser.parse_file(path), |
| parser.ParsedPolicy( |
| default_action=bpf.KillProcess(), |
| filter_statements=[ |
| parser.FilterStatement( |
| syscall=parser.Syscall('read', 0), |
| frequency=2, |
| filters=[ |
| parser.Filter(None, bpf.Allow()), |
| ]), |
| ])) |
| |
| def test_parse_invalid_frequency(self): |
| """Reject including invalid frequency files.""" |
| path = self._write_file('test.policy', |
| """@frequency ./test.frequency""") |
| |
| with self.assertRaisesRegex(parser.ParseException, r'missing colon'): |
| self._write_file('test.frequency', """ |
| read |
| """) |
| self.parser.parse_file(path) |
| |
| with self.assertRaisesRegex(parser.ParseException, r'invalid colon'): |
| self._write_file('test.frequency', """ |
| read foo |
| """) |
| self.parser.parse_file(path) |
| |
| with self.assertRaisesRegex(parser.ParseException, r'missing number'): |
| self._write_file('test.frequency', """ |
| read: |
| """) |
| self.parser.parse_file(path) |
| |
| with self.assertRaisesRegex(parser.ParseException, r'invalid number'): |
| self._write_file('test.frequency', """ |
| read: foo |
| """) |
| self.parser.parse_file(path) |
| |
| with self.assertRaisesRegex(parser.ParseException, r'invalid number'): |
| self._write_file('test.frequency', """ |
| read: -1 |
| """) |
| self.parser.parse_file(path) |
| |
| with self.assertRaisesRegex(parser.ParseException, |
| r'empty frequency path'): |
| path = self._write_file( |
| 'test.policy', """ |
| @frequency |
| """) |
| self.parser.parse_file(path) |
| |
| with self.assertRaisesRegex(parser.ParseException, |
| r'invalid frequency path'): |
| path = self._write_file( |
| 'test.policy', """ |
| @frequency arg0 |
| """) |
| self.parser.parse_file(path) |
| |
| with self.assertRaisesRegex(parser.ParseException, |
| r'Could not open frequency file.*'): |
| path = self._write_file( |
| 'test.policy', """ |
| @frequency ./nonexistent.frequency |
| """) |
| self.parser.parse_file(path) |
| |
| def test_parse_multiple_unconditional(self): |
| """Reject actions after an unconditional action.""" |
| path = self._write_file( |
| 'test.policy', """ |
| read: allow |
| read: allow |
| """) |
| |
| with self.assertRaisesRegex( |
| parser.ParseException, |
| r'Syscall read.*already had an unconditional action applied'): |
| self.parser.parse_file(path) |
| |
| path = self._write_file( |
| 'test.policy', """ |
| read: log |
| read: arg0 == 0; log |
| """) |
| |
| with self.assertRaisesRegex( |
| parser.ParseException, |
| r'Syscall read.*already had an unconditional action applied'): |
| self.parser.parse_file(path) |
| |
| |
| if __name__ == '__main__': |
| unittest.main() |