testing: handling non-root users with VNETs in pytest-based tests.

Currently isolation and resource requirements are handled directly
 by the kyua runner, based on the requirements specified by the test.
It works well for simple tests, but may cause discrepancy with tests
 doing complex pre-setups. For example, all tests that perform
 VNET setups require root access to properly function.

This change adds additional handling of the "require_user" property
 within the python testing framework. Specifically, it requests
 root access if the test class signals its root requirements and
 drops privileges to the desired user after performing the pre-setup.

Differential Revision: https://reviews.freebsd.org/D37923
MFC after:	2 weeks
This commit is contained in:
Alexander V. Chernikov 2023-02-09 14:31:34 +00:00
parent 5a5436eb5d
commit 6332ef8941
4 changed files with 92 additions and 14 deletions

View File

@ -3,6 +3,7 @@ from typing import Any
from typing import Dict
from typing import List
from typing import NamedTuple
from typing import Optional
from typing import Tuple
import pytest
@ -51,10 +52,32 @@ class ATFTestObj(object):
return line
return obj.name
@staticmethod
def _convert_user_mark(mark, obj, ret: Dict):
username = mark.args[0]
if username == "unprivileged":
# Special unprivileged user requested.
# First, require the unprivileged-user config option presence
key = "require.config"
if key not in ret:
ret[key] = "unprivileged_user"
else:
ret[key] = "{} {}".format(ret[key], "unprivileged_user")
# Check if the framework requires root
test_cls = ATFHandler.get_test_class(obj)
if test_cls and getattr(test_cls, "NEED_ROOT", False):
# Yes, so we ask kyua to run us under root instead
# It is up to the implementation to switch back to the desired
# user
ret["require.user"] = "root"
else:
ret["require.user"] = username
def _convert_marks(self, obj) -> Dict[str, Any]:
wj_func = lambda x: " ".join(x) # noqa: E731
_map: Dict[str, Dict] = {
"require_user": {"name": "require.user"},
"require_user": {"handler": self._convert_user_mark},
"require_arch": {"name": "require.arch", "fmt": wj_func},
"require_diskspace": {"name": "require.diskspace"},
"require_files": {"name": "require.files", "fmt": wj_func},
@ -66,6 +89,9 @@ class ATFTestObj(object):
ret = {}
for mark in obj.iter_markers():
if mark.name in _map:
if "handler" in _map[mark.name]:
_map[mark.name]["handler"](mark, obj, ret)
continue
name = _map[mark.name].get("name", mark.name)
if "fmt" in _map[mark.name]:
val = _map[mark.name]["fmt"](mark.args[0])
@ -91,8 +117,24 @@ class ATFHandler(object):
state: str
reason: str
def __init__(self):
def __init__(self, report_file_name: Optional[str]):
self._tests_state_map: Dict[str, ReportStatus] = {}
self._report_file_name = report_file_name
self._report_file_handle = None
def setup_configure(self):
fname = self._report_file_name
if fname:
self._report_file_handle = open(fname, mode="w")
def setup_method_pre(self, item):
"""Called before actually running the test setup_method"""
# Check if we need to manually drop the privileges
for mark in item.iter_markers():
if mark.name == "require_user":
cls = self.get_test_class(item)
cls.TARGET_USER = mark.args[0]
break
def override_runtest(self, obj):
# Override basic runtest command
@ -220,7 +262,9 @@ class ATFHandler(object):
# global failure
self.set_report_state(test_name, state, reason)
def write_report(self, path):
def write_report(self):
if self._report_file_handle is None:
return
if self._tests_state_map:
# If we're executing in ATF mode, there has to be just one test
# Anyway, deterministically pick the first one
@ -230,8 +274,8 @@ class ATFHandler(object):
line = test.state
else:
line = "{}: {}".format(test.state, test.reason)
with open(path, mode="w") as f:
print(line, file=f)
print(line, file=self._report_file_handle)
self._report_file_handle.close()
@staticmethod
def get_atf_vars() -> Dict[str, str]:

View File

@ -329,6 +329,7 @@ class ObjectsMap(NamedTuple):
class VnetTestTemplate(BaseTest):
NEED_ROOT: bool = True
TOPOLOGY = {}
def _get_vnet_handler(self, vnet_alias: str):
@ -374,6 +375,7 @@ class VnetTestTemplate(BaseTest):
# Do unbuffered stdout for children
# so the logs are present if the child hangs
sys.stdout.reconfigure(line_buffering=True)
self.drop_privileges()
handler(vnet)
def setup_topology(self, topo: Dict, topology_id: str):
@ -465,6 +467,7 @@ class VnetTestTemplate(BaseTest):
# Save state for the main handler
self.iface_map = obj_map.iface_map
self.vnet_map = obj_map.vnet_map
self.drop_privileges()
def cleanup(self, test_id: str):
# pytest test id: file::class::test_name

View File

@ -1,8 +1,10 @@
#!/usr/bin/env python3
import os
import pwd
from ctypes import CDLL
from ctypes import get_errno
from ctypes.util import find_library
from typing import Dict
from typing import List
from typing import Optional
@ -31,6 +33,8 @@ libc = LibCWrapper()
class BaseTest(object):
NEED_ROOT: bool = False # True if the class needs root privileges for the setup
TARGET_USER = None # Set to the target user by the framework
REQUIRED_MODULES: List[str] = []
def _check_modules(self):
@ -41,9 +45,26 @@ class BaseTest(object):
pytest.skip(
"kernel module '{}' not available: {}".format(mod_name, err_str)
)
@property
def atf_vars(self) -> Dict[str, str]:
px = "_ATF_VAR_"
return {k[len(px):]: v for k, v in os.environ.items() if k.startswith(px)}
def drop_privileges_user(self, user: str):
uid = pwd.getpwnam(user)[2]
print("Dropping privs to {}/{}".format(user, uid))
os.setuid(uid)
def drop_privileges(self):
if self.TARGET_USER:
if self.TARGET_USER == "unprivileged":
user = self.atf_vars["unprivileged-user"]
else:
user = self.TARGET_USER
self.drop_privileges_user(user)
@property
def test_id(self):
def test_id(self) -> str:
# 'test_ip6_output.py::TestIP6Output::test_output6_pktinfo[ipandif] (setup)'
return os.environ.get("PYTEST_CURRENT_TEST").split(" ")[0]

View File

@ -7,10 +7,14 @@ PLUGIN_ENABLED = False
DEFAULT_HANDLER = None
def set_handler(config):
global DEFAULT_HANDLER, PLUGIN_ENABLED
DEFAULT_HANDLER = ATFHandler(report_file_name=config.option.atf_file)
PLUGIN_ENABLED = True
return DEFAULT_HANDLER
def get_handler():
global DEFAULT_HANDLER
if DEFAULT_HANDLER is None:
DEFAULT_HANDLER = ATFHandler()
return DEFAULT_HANDLER
@ -81,11 +85,9 @@ def pytest_configure(config):
"markers", "timeout(dur): int/float with max duration in sec"
)
global PLUGIN_ENABLED
PLUGIN_ENABLED = config.option.atf
if not PLUGIN_ENABLED:
if not config.option.atf:
return
get_handler()
handler = set_handler(config)
if config.option.collectonly:
# Need to output list of tests to stdout, hence override
@ -93,6 +95,8 @@ def pytest_configure(config):
reporter = config.pluginmanager.getplugin("terminalreporter")
if reporter:
config.pluginmanager.unregister(reporter)
else:
handler.setup_configure()
def pytest_collection_modifyitems(session, config, items):
@ -114,6 +118,12 @@ def pytest_collection_finish(session):
handler.list_tests(session.items)
def pytest_runtest_setup(item):
if PLUGIN_ENABLED:
handler = get_handler()
handler.setup_method_pre(item)
def pytest_runtest_logreport(report):
if PLUGIN_ENABLED:
handler = get_handler()
@ -123,4 +133,4 @@ def pytest_runtest_logreport(report):
def pytest_unconfigure(config):
if PLUGIN_ENABLED and config.option.atf_file:
handler = get_handler()
handler.write_report(config.option.atf_file)
handler.write_report()