blob: f6e0aae55f76cef00b213795f47ec326952d5819 [file] [log] [blame]
# Postgres Related regression tests
#
# Type the following command to launch start the tests:
# $ test/run_tests -P "load_contrib('postgres')" -t test/contrib/postgres.uts
+ postgres
= postgres initialization
from scapy.contrib.postgres import *
ssl_request = "\x00\x00\x00\x08\x04\xd2\x16\x2f"
startup = Startup(
b"\x00\x00\x00\x57\x00\x03\x00\x00\x75\x73\x65\x72\x00\x70\x6f\x73"
b"\x74\x67\x72\x65\x73\x00\x64\x61\x74\x61\x62\x61\x73\x65\x00\x70"
b"\x6f\x73\x74\x67\x72\x65\x73\x00\x61\x70\x70\x6c\x69\x63\x61\x74"
b"\x69\x6f\x6e\x5f\x6e\x61\x6d\x65\x00\x70\x73\x71\x6c\x00\x63\x6c"
b"\x69\x65\x6e\x74\x5f\x65\x6e\x63\x6f\x64\x69\x6e\x67\x00\x57\x49"
b"\x4e\x31\x32\x35\x32\x00\x00"
)
assert startup.len == 87
assert startup.protocol_version_major == 3
assert startup.protocol_version_minor == 0
assert (
startup.options
== b"user\x00postgres\x00database\x00postgres\x00application_name\x00psql\x00client_encoding\x00WIN1252\x00\x00"
)
init_packet = (
b"\x52\x00\x00\x00\x08\x00\x00\x00\x00\x53\x00\x00\x00\x1a\x61\x70"
b"\x70\x6c\x69\x63\x61\x74\x69\x6f\x6e\x5f\x6e\x61\x6d\x65\x00\x70"
b"\x73\x71\x6c\x00\x53\x00\x00\x00\x1c\x63\x6c\x69\x65\x6e\x74\x5f"
b"\x65\x6e\x63\x6f\x64\x69\x6e\x67\x00\x57\x49\x4e\x31\x32\x35\x32"
b"\x00\x53\x00\x00\x00\x17\x44\x61\x74\x65\x53\x74\x79\x6c\x65\x00"
b"\x49\x53\x4f\x2c\x20\x4d\x44\x59\x00\x53\x00\x00\x00\x26\x64\x65"
b"\x66\x61\x75\x6c\x74\x5f\x74\x72\x61\x6e\x73\x61\x63\x74\x69\x6f"
b"\x6e\x5f\x72\x65\x61\x64\x5f\x6f\x6e\x6c\x79\x00\x6f\x66\x66\x00"
b"\x53\x00\x00\x00\x17\x69\x6e\x5f\x68\x6f\x74\x5f\x73\x74\x61\x6e"
b"\x64\x62\x79\x00\x6f\x66\x66\x00\x53\x00\x00\x00\x19\x69\x6e\x74"
b"\x65\x67\x65\x72\x5f\x64\x61\x74\x65\x74\x69\x6d\x65\x73\x00\x6f"
b"\x6e\x00\x53\x00\x00\x00\x1b\x49\x6e\x74\x65\x72\x76\x61\x6c\x53"
b"\x74\x79\x6c\x65\x00\x70\x6f\x73\x74\x67\x72\x65\x73\x00\x53\x00"
b"\x00\x00\x14\x69\x73\x5f\x73\x75\x70\x65\x72\x75\x73\x65\x72\x00"
b"\x6f\x6e\x00\x53\x00\x00\x00\x19\x73\x65\x72\x76\x65\x72\x5f\x65"
b"\x6e\x63\x6f\x64\x69\x6e\x67\x00\x55\x54\x46\x38\x00\x53\x00\x00"
b"\x00\x32\x73\x65\x72\x76\x65\x72\x5f\x76\x65\x72\x73\x69\x6f\x6e"
b"\x00\x31\x34\x2e\x32\x20\x28\x44\x65\x62\x69\x61\x6e\x20\x31\x34"
b"\x2e\x32\x2d\x31\x2e\x70\x67\x64\x67\x31\x31\x30\x2b\x31\x29\x00"
b"\x53\x00\x00\x00\x23\x73\x65\x73\x73\x69\x6f\x6e\x5f\x61\x75\x74"
b"\x68\x6f\x72\x69\x7a\x61\x74\x69\x6f\x6e\x00\x70\x6f\x73\x74\x67"
b"\x72\x65\x73\x00\x53\x00\x00\x00\x23\x73\x74\x61\x6e\x64\x61\x72"
b"\x64\x5f\x63\x6f\x6e\x66\x6f\x72\x6d\x69\x6e\x67\x5f\x73\x74\x72"
b"\x69\x6e\x67\x73\x00\x6f\x6e\x00\x53\x00\x00\x00\x15\x54\x69\x6d"
b"\x65\x5a\x6f\x6e\x65\x00\x45\x74\x63\x2f\x55\x54\x43\x00\x4b\x00"
b"\x00\x00\x0c\x00\x00\x01\x7f\x43\x4c\x36\xa5\x5a\x00\x00\x00\x05\x49"
)
= postgres backend sequence
init = PostgresBackend(init_packet)
assert isinstance(init.contents[0], Authentication)
assert init.contents[0].len == 8
assert init.contents[0].method == 0
assert len(init.contents) == 16
assert isinstance(init.contents[1], ParameterStatus)
assert init.contents[1].len == 26
assert init.contents[1].parameter == b"application_name"
assert init.contents[1].value == b"psql"
= simple queries
simple_query_packet = (
b"\x51\x00\x00\x00\x15\x53\x45\x4c\x45\x43\x54\x20\x56\x45\x52\x53"
b"\x49\x4f\x4e\x28\x29\x00"
)
simple_query = PostgresFrontend(simple_query_packet)
assert isinstance(simple_query.contents[0], Query)
assert simple_query.contents[0].len == 21
assert simple_query.contents[0].query == b"SELECT VERSION()"
pair = SignedIntStrPair(b"\x00\x00\x00\x04\x01\x02\x03\x04")
assert pair.len == 4
assert pair.data == b"\x01\x02\x03\x04"
command_response_packet = (
b"\x54\x00\x00\x00\x20\x00\x01\x76\x65\x72\x73\x69\x6f\x6e\x00\x00"
b"\x00\x00\x00\x00\x00\x00\x00\x00\x19\xff\xff\xff\xff\xff\xff\x00"
b"\x00\x44\x00\x00\x00\x85\x00\x01\x00\x00\x00\x7b\x50\x6f\x73\x74"
b"\x67\x72\x65\x53\x51\x4c\x20\x31\x34\x2e\x32\x20\x28\x44\x65\x62"
b"\x69\x61\x6e\x20\x31\x34\x2e\x32\x2d\x31\x2e\x70\x67\x64\x67\x31"
b"\x31\x30\x2b\x31\x29\x20\x6f\x6e\x20\x78\x38\x36\x5f\x36\x34\x2d"
b"\x70\x63\x2d\x6c\x69\x6e\x75\x78\x2d\x67\x6e\x75\x2c\x20\x63\x6f"
b"\x6d\x70\x69\x6c\x65\x64\x20\x62\x79\x20\x67\x63\x63\x20\x28\x44"
b"\x65\x62\x69\x61\x6e\x20\x31\x30\x2e\x32\x2e\x31\x2d\x36\x29\x20"
b"\x31\x30\x2e\x32\x2e\x31\x20\x32\x30\x32\x31\x30\x31\x31\x30\x2c"
b"\x20\x36\x34\x2d\x62\x69\x74\x43\x00\x00\x00\x0d\x53\x45\x4c\x45"
b"\x43\x54\x20\x31\x00\x5a\x00\x00\x00\x05\x49"
)
= row data response
command_response = PostgresBackend(command_response_packet)
assert len(command_response.contents) == 4
assert isinstance(command_response.contents[0], RowDescription)
rd = command_response.contents[0]
assert rd.len == 32
assert rd.numfields == 1
assert rd.cols[0].col == b"version"
assert rd.cols[0].tableoid == 0
assert rd.cols[0].colno == 0
assert rd.cols[0].typeoid == 25
assert rd.cols[0].typelen == -1
assert rd.cols[0].format == 0
assert rd.cols[0].typemod == -1
assert isinstance(command_response.contents[1], DataRow)
assert command_response.contents[1].len == 133
assert command_response.contents[1].numfields == 1
assert len(command_response.contents[1].data) == 1
assert isinstance(command_response.contents[1].data[0], SignedIntStrPair)
assert command_response.contents[1].data[0].len == 123
assert (
command_response.contents[1].data[0].data
== b"PostgreSQL 14.2 (Debian 14.2-1.pgdg110+1) on x86_64-pc-linux-gnu, compiled by gcc (Debian 10.2.1-6) 10.2.1 20210110, 64-bit"
)
assert isinstance(command_response.contents[2], CommandComplete)
assert isinstance(command_response.contents[3], ReadyForQuery)
three_col_rd = RowDescription(
b"\x54\x00\x00\x00\x55\x00\x03\x6e\x61\x6d\x65\x00\x00\x00\x00\x00"
b"\x00\x00\x00\x00\x00\x19\xff\xff\xff\xff\xff\xff\x00\x00\x73\x65"
b"\x74\x74\x69\x6e\x67\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x19"
b"\xff\xff\xff\xff\xff\xff\x00\x00\x64\x65\x73\x63\x72\x69\x70\x74"
b"\x69\x6f\x6e\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x19\xff\xff"
b"\xff\xff\xff\xff\x00\x00"
)
assert three_col_rd.len == 85
assert three_col_rd.numfields == 3
assert len(three_col_rd.cols) == 3
three_col_dr = DataRow(
b"\x44\x00\x00\x00\x63\x00\x03\x00\x00\x00\x17\x61\x6c\x6c\x6f\x77"
b"\x5f\x73\x79\x73\x74\x65\x6d\x5f\x74\x61\x62\x6c\x65\x5f\x6d\x6f"
b"\x64\x73\x00\x00\x00\x03\x6f\x66\x66\x00\x00\x00\x37\x41\x6c\x6c"
b"\x6f\x77\x73\x20\x6d\x6f\x64\x69\x66\x69\x63\x61\x74\x69\x6f\x6e"
b"\x73\x20\x6f\x66\x20\x74\x68\x65\x20\x73\x74\x72\x75\x63\x74\x75"
b"\x72\x65\x20\x6f\x66\x20\x73\x79\x73\x74\x65\x6d\x20\x74\x61\x62"
b"\x6c\x65\x73\x2e"
)
assert three_col_dr.numfields == 3
assert len(three_col_dr.data) == 3
assert three_col_dr.data[0].len == 23
assert three_col_dr.data[0].data == b"allow_system_table_mods"
assert three_col_dr.data[1].len == 3
assert three_col_dr.data[1].data == b"off"
assert three_col_dr.data[2].len == 55
assert (
three_col_dr.data[2].data
== b"Allows modifications of the structure of system tables."
)
= errors
error_response = ErrorResponse(
b"\x45\x00\x00\x00\x69\x53\x45\x52\x52\x4f\x52\x00\x56\x45\x52\x52"
b"\x4f\x52\x00\x43\x34\x32\x50\x30\x31\x00\x4d\x72\x65\x6c\x61\x74"
b"\x69\x6f\x6e\x20\x22\x66\x6f\x6f\x62\x61\x72\x22\x20\x64\x6f\x65"
b"\x73\x20\x6e\x6f\x74\x20\x65\x78\x69\x73\x74\x00\x50\x31\x35\x00"
b"\x46\x70\x61\x72\x73\x65\x5f\x72\x65\x6c\x61\x74\x69\x6f\x6e\x2e"
b"\x63\x00\x4c\x31\x33\x38\x31\x00\x52\x70\x61\x72\x73\x65\x72\x4f"
b"\x70\x65\x6e\x54\x61\x62\x6c\x65\x00\x00"
)
assert len(error_response.error_fields) == 8
assert error_response.error_fields[0] == ("Severity", b"ERROR")
assert error_response.error_fields[7] == ("Routine", b"parserOpenTable")
= copy data response and request
copyin_response = CopyInResponse(b"\x47\x00\x00\x00\x0f\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00")
assert copyin_response.len == 15
assert copyin_response.format == 0 # Text
assert len(copyin_response.cols) == 4
assert copyin_response.ncols == 4
assert copyin_response.cols[0] == 0 # Text
assert copyin_response.cols[1] == 0 # Text
assert copyin_response.cols[2] == 0 # Text
assert copyin_response.cols[3] == 0 # Text
copydata_in = PostgresFrontend(b"\x64\x00\x00\x00\x10\x31\x2c\x42\x6f\x62\x2c\x32\x33\x2c\x31\x0d" \
b"\x0a\x64\x00\x00\x00\x12\x32\x2c\x53\x61\x6c\x6c\x79\x2c\x34\x33" \
b"\x2c\x32\x0d\x0a\x64\x00\x00\x00\x14\x33\x2c\x50\x61\x72\x64\x65" \
b"\x65\x70\x2c\x35\x34\x2c\x33\x0d\x0a\x64\x00\x00\x00\x0f\x34\x2c" \
b"\x53\x75\x2c\x33\x32\x2c\x34\x0d\x0a\x64\x00\x00\x00\x0f\x35\x2c" \
b"\x58\x69\x2c\x34\x33\x2c\x35\x0d\x0a\x64\x00\x00\x00\x0e\x36\x2c" \
b"\x50\x69\x70\x2c\x36\x36\x2c\x36\x63\x00\x00\x00\x04"
)
copyout_response = CopyOutResponse(b"\x48\x00\x00\x00\x0f\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00")
assert copyout_response.len == 15
# Combined message
copydata_out = PostgresBackend(b"\x48\x00\x00\x00\x0f\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00" \
b"\x64\x00\x00\x00\x0f\x31\x09\x42\x6f\x62\x09\x32\x33\x09\x31\x0a" \
b"\x64\x00\x00\x00\x11\x32\x09\x53\x61\x6c\x6c\x79\x09\x34\x33\x09" \
b"\x32\x0a\x64\x00\x00\x00\x13\x33\x09\x50\x61\x72\x64\x65\x65\x70" \
b"\x09\x35\x34\x09\x33\x0a\x64\x00\x00\x00\x0e\x34\x09\x53\x75\x09" \
b"\x33\x32\x09\x34\x0a\x64\x00\x00\x00\x0e\x35\x09\x58\x69\x09\x34" \
b"\x33\x09\x35\x0a\x64\x00\x00\x00\x0f\x36\x09\x50\x69\x70\x09\x36" \
b"\x36\x09\x36\x0a\x63\x00\x00\x00\x04\x43\x00\x00\x00\x0b\x43\x4f" \
b"\x50\x59\x20\x36\x00\x5a\x00\x00\x00\x05\x49")
assert len(copydata_out.contents) == 10
assert copydata_out.contents[0].len == 15
assert isinstance(copydata_out.contents[0], CopyOutResponse)
assert isinstance(copydata_out.contents[1], CopyData)
assert copydata_out.contents[1].len == 15
assert copydata_out.contents[1].data == b'1\tBob\t23\t1\n'
assert isinstance(copydata_out.contents[2], CopyData)
assert copydata_out.contents[2].data == b'2\tSally\t43\t2\n'
assert isinstance(copydata_out.contents[3], CopyData)
assert copydata_out.contents[3].data == b'3\tPardeep\t54\t3\n'
assert isinstance(copydata_out.contents[4], CopyData)
assert copydata_out.contents[4].data == b'4\tSu\t32\t4\n'
assert isinstance(copydata_out.contents[5], CopyData)
assert copydata_out.contents[5].data == b'5\tXi\t43\t5\n'
assert isinstance(copydata_out.contents[6], CopyData)
assert copydata_out.contents[6].data == b'6\tPip\t66\t6\n'
assert isinstance(copydata_out.contents[7], CopyDone)
assert isinstance(copydata_out.contents[8], CommandComplete)
assert isinstance(copydata_out.contents[9], ReadyForQuery)
= Check example request packet
request = PostgresFrontend(
b"\x50\x00\x00\x00\x64\x00\x53\x45\x4c\x45\x43\x54\x20\x44\x5f\x4e"
b"\x45\x58\x54\x5f\x4f\x5f\x49\x44\x2c\x20\x44\x5f\x54\x41\x58\x20"
b"\x20\x20\x46\x52\x4f\x4d\x20\x64\x69\x73\x74\x72\x69\x63\x74\x20"
b"\x57\x48\x45\x52\x45\x20\x44\x5f\x57\x5f\x49\x44\x20\x3d\x20\x24"
b"\x31\x20\x41\x4e\x44\x20\x44\x5f\x49\x44\x20\x3d\x20\x24\x32\x20"
b"\x46\x4f\x52\x20\x55\x50\x44\x41\x54\x45\x00\x00\x02\x00\x00\x00"
b"\x17\x00\x00\x00\x17\x42\x00\x00\x00\x20\x00\x00\x00\x02\x00\x01"
b"\x00\x01\x00\x02\x00\x00\x00\x04\x00\x00\x00\x14\x00\x00\x00\x04"
b"\x00\x00\x00\x0a\x00\x00\x44\x00\x00\x00\x06\x50\x00\x45\x00\x00"
b"\x00\x09\x00\x00\x00\x00\x00\x53\x00\x00\x00\x04"
)
assert len(request.contents) == 5
assert isinstance(request.contents[0], Parse)
assert isinstance(request.contents[1], Bind)
assert isinstance(request.contents[2], Describe)
assert isinstance(request.contents[3], Execute)
assert isinstance(request.contents[4], Sync)
= Check parse decoding
parse_msg = request.contents[0]
assert parse_msg.len == 100
assert parse_msg.destination == b""
assert parse_msg.query == b"SELECT D_NEXT_O_ID, D_TAX FROM district WHERE D_W_ID = $1 AND D_ID = $2 FOR UPDATE"
assert parse_msg.num_param_dtypes == 2
assert parse_msg.params[0] == 23
assert parse_msg.params[1] == 23
= Check bind decoding
bind_msg = request.contents[1]
assert bind_msg.len == 32
assert bind_msg.destination == b""
assert bind_msg.statement == b""
assert bind_msg.codes_count == 2
assert bind_msg.codes[0] == 1
assert bind_msg.codes[1] == 1
assert bind_msg.values_count == 2
assert bind_msg.values[0].len == 4
assert bind_msg.values[0].data == b"\x00\x00\x00\x14"
assert bind_msg.values[1].len == 4
assert bind_msg.values[1].data == b"\x00\x00\x00\x0a"
assert bind_msg.results_count == 0
= Check describe decoding
describe_msg = request.contents[2]
assert describe_msg.len == 6
assert describe_msg.close_type == b"P"
assert describe_msg.statement == b""
= Check execute decoding
exec_msg = request.contents[3]
assert exec_msg.len == 9
assert exec_msg.portal == b""
assert exec_msg.rows == 0
= Check sync decoding
sync_msg = request.contents[4]
assert sync_msg.len == 4