X7ROOT File Manager
Current Path:
/opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/utils
opt
/
imunify360
/
venv
/
lib
/
python3.11
/
site-packages
/
defence360agent
/
utils
/
??
..
??
__init__.py
(55.96 KB)
??
__pycache__
??
_shutil.py
(795 B)
??
antivirus_mode.py
(497 B)
??
async_utils.py
(718 B)
??
benchmark.py
(538 B)
??
buffer.py
(1.24 KB)
??
check_db.py
(7.72 KB)
??
check_lock.py
(636 B)
??
cli.py
(7.39 KB)
??
common.py
(14.41 KB)
??
config.py
(999 B)
??
cronjob.py
(902 B)
??
doctor.py
(1 KB)
??
hyperscan.py
(149 B)
??
importer.py
(2.67 KB)
??
ipecho.py
(3.17 KB)
??
json.py
(953 B)
??
kwconfig.py
(1.56 KB)
??
parsers.py
(11.12 KB)
??
resource_limits.py
(2.29 KB)
??
safe_fileops.py
(7.99 KB)
??
safe_sequence.py
(363 B)
??
serialization.py
(1.72 KB)
??
sshutil.py
(7.94 KB)
??
subprocess.py
(1.53 KB)
??
support.py
(5.2 KB)
??
threads.py
(1005 B)
??
validate.py
(4.27 KB)
??
whmcs.py
(7.6 KB)
??
wordpress_mu_plugin.py
(1.41 KB)
Editing: resource_limits.py
import asyncio import logging from enum import Enum from os import fsdecode from pathlib import Path from typing import List from defence360agent.utils import OsReleaseInfo logger = logging.getLogger(__name__) RUN_WITH_INTENSITY = "/usr/libexec/run-with-intensity" LVECTL_BIN_PATH = Path("/usr/sbin/lvectl") PROC_LVE_LIST_PATH = Path("/proc/lve/list") class LimitsMethod(Enum): NICE = "nice" LVE = "lve" CGROUPS = "cgroups" async def get_current_method() -> LimitsMethod: """Returns limit method, used in run-with-intensity tool.""" proc = await asyncio.create_subprocess_exec( RUN_WITH_INTENSITY, "show", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await proc.communicate() stdout = fsdecode(stdout).strip() if stdout == "nice": return LimitsMethod.NICE if stdout == "lve": return LimitsMethod.LVE if stdout == "cgroups": return LimitsMethod.CGROUPS raise LookupError( "Parsing of used limitation method failed\nstdout: {}\nstderr: {}" .format(stdout, fsdecode(stderr).strip()) ) async def create_subprocess( cmd: List[str], key: str, intensity_cpu: int, intensity_io: int, **subprocess_kwargs ) -> asyncio.subprocess.Process: """ Creates asyncio.Process with limited resources (cpu & io), using run-with-intensity tool. :param cmd: command to execute :param intensity_cpu: cpu intensity limit :param intensity_io: io intensity limit :param subprocess_kwargs: keyword arguments for create_subprocess_exec func :return: executed Process """ limits_cmd = [ RUN_WITH_INTENSITY, "run", "--intensity-cpu", str(intensity_cpu), "--intensity-io", str(intensity_io), ] limits_cmd.extend(["--key", key]) return await asyncio.create_subprocess_exec( *(limits_cmd + cmd), **subprocess_kwargs ) def is_lve_active() -> bool: """Checks that LVE-utils is active resource limiter.""" # to avoid possible errors such as DEF-11941 # make sure that OS is CL return PROC_LVE_LIST_PATH.exists() and OsReleaseInfo.is_cloudlinux() def has_lvectl() -> bool: """Checks that LVE-utils is installed.""" return LVECTL_BIN_PATH.exists()
Upload File
Create Folder