X7ROOT File Manager
Current Path:
/opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/utils
opt
/
imunify360
/
venv
/
lib
/
python3.11
/
site-packages
/
defence360agent
/
utils
/
??
..
??
__init__.py
(55.96 KB)
??
__pycache__
??
_shutil.py
(795 B)
??
antivirus_mode.py
(497 B)
??
async_utils.py
(718 B)
??
benchmark.py
(538 B)
??
buffer.py
(1.24 KB)
??
check_db.py
(7.72 KB)
??
check_lock.py
(636 B)
??
cli.py
(7.39 KB)
??
common.py
(14.41 KB)
??
config.py
(999 B)
??
cronjob.py
(902 B)
??
doctor.py
(1 KB)
??
hyperscan.py
(149 B)
??
importer.py
(2.67 KB)
??
ipecho.py
(3.17 KB)
??
json.py
(953 B)
??
kwconfig.py
(1.56 KB)
??
parsers.py
(11.12 KB)
??
resource_limits.py
(2.29 KB)
??
safe_fileops.py
(7.99 KB)
??
safe_sequence.py
(363 B)
??
serialization.py
(1.72 KB)
??
sshutil.py
(7.94 KB)
??
subprocess.py
(1.53 KB)
??
support.py
(5.2 KB)
??
threads.py
(1005 B)
??
validate.py
(4.27 KB)
??
whmcs.py
(7.6 KB)
??
wordpress_mu_plugin.py
(1.41 KB)
Editing: sshutil.py
import asyncio import re import subprocess import urllib.request import os from logging import getLogger from urllib.error import URLError from pathlib import Path from defence360agent.utils import atomic_rewrite logger = getLogger(__name__) ANALYST_PUB_KEY_URL = ( "https://repo.imunify360.cloudlinux.com/defense360/assisted-cleanup.pub" ) # Define the key pattern to match (FROM ANALYST_PUB_KEY_URL) KEY_PATTERN = r"clsupport@sshbox\.cloudlinux\.com" SSH_CONFIG_PATH = Path("/etc/ssh/sshd_config") SSH_CONFIG_DIR = Path("/etc/ssh/sshd_config.d") async def get_ssh_port(): """ Detect SSH port from config and its overrides. Searches configs in reverse order to find the last override first. """ port = 22 # default port try: # Collect and sort config files config_files = [SSH_CONFIG_PATH] if SSH_CONFIG_DIR.exists(): config_files.extend(sorted(SSH_CONFIG_DIR.glob("*.conf"))) # Process files for config_file in reversed(config_files): try: for line in config_file.read_text().splitlines(): line = line.strip() if line.startswith("Port ") and not line.startswith("#"): try: # return first match # since we are searching backwards port = int(line.split()[1]) return port except (IndexError, ValueError): continue except IOError as e: logger.warning(f"Failed to read {config_file}: {e}") continue except Exception as e: logger.warning(f"Failed to get SSH port: {e}") finally: return port async def check_ssh_connection(port=22): """Test if port is actually an SSH port by checking the server banner""" try: reader, writer = await asyncio.open_connection("127.0.0.1", port) try: banner = await asyncio.wait_for(reader.readline(), timeout=5.0) banner = banner.decode("utf-8", errors="ignore").strip() if re.match(r"^SSH-[12]\.", banner): logger.info( f"Port {port} is confirmed as SSH (banner: {banner})" ) return True else: logger.warning( f"Port {port} is open but not SSH (got: {banner})" ) return False except asyncio.TimeoutError: logger.warning(f"Timeout waiting for SSH banner on port {port}") return False finally: writer.close() await writer.wait_closed() except (ConnectionRefusedError, OSError) as e: logger.warning(f"Failed to connect to port {port}: {e}") return False except Exception as e: logger.warning(f"Unexpected error checking SSH port {port}: {e}") return False async def install_pub_key(username="root"): """Install analyst public key for the user downloads the public key from ANALYST_PUB_KEY_URL and adds it to the user's authorized_keys file """ try: # Determine the path to the authorized_keys file if username == "root": auth_keys_path = Path("/root/.ssh/authorized_keys") else: auth_keys_path = Path(f"/home/{username}/.ssh/authorized_keys") # If not running as root, fail if os.geteuid() != 0: logger.error("Function must be run as root") return False # Download the public key try: pub_key = ( urllib.request.urlopen(ANALYST_PUB_KEY_URL) .read() .decode() .strip() ) except URLError as e: logger.error(f"Failed to download public key: {e}") return False # Check if the authorized_keys directory exists, create if not auth_keys_dir = auth_keys_path.parent if not auth_keys_dir.exists(): try: auth_keys_dir.mkdir(mode=0o700, parents=True, exist_ok=True) # Set proper ownership for the .ssh directory if username != "root": subprocess.run( ["chown", f"{username}:{username}", str(auth_keys_dir)] ) except Exception as e: logger.error( f"Failed to create directory {auth_keys_dir}: {e}" ) return False # Check if the authorized_keys file exists, create if not if not auth_keys_path.exists(): try: auth_keys_path.touch(mode=0o600) # Set proper ownership for the authorized_keys file if username != "root": subprocess.run( [ "chown", f"{username}:{username}", str(auth_keys_path), ] ) except Exception as e: logger.error(f"Failed to create file {auth_keys_path}: {e}") return False try: # Read existing content to avoid adding duplicate keys content = auth_keys_path.read_text() if pub_key in content: logger.info(f"Key already exists in {auth_keys_path}") return True # Append the key to the authorized_keys file with open(auth_keys_path, "a") as f: f.write(f"{pub_key}\n") logger.info(f"Successfully installed key for user {username}") return True except IOError as e: logger.error(f"Failed to write to {auth_keys_path}: {e}") return False except Exception as e: logger.error(f"Failed to install public key: {e}") return False def remove_pub_key(username="root") -> bool: """Remove analyst public key for the specified user This function removes the analyst's public key that was previously installed using the install_pub_key function. returns: True if key was successfully removed, False otherwise. """ try: # Determine the path to the authorized_keys file if username == "root": auth_keys_path = Path("/root/.ssh/authorized_keys") else: auth_keys_path = Path(f"/home/{username}/.ssh/authorized_keys") # Check if the file exists if not auth_keys_path.exists(): logger.warning( f"authorized_keys file not found at {auth_keys_path}" ) return False # Read the current content of the file try: content = auth_keys_path.read_text() except IOError as e: logger.error(f"Failed to read {auth_keys_path}: {e}") return False # Check if the key exists in the file if not re.search(KEY_PATTERN, content): logger.info(f"Analyst public key not found in {auth_keys_path}") return False # Remove the key (including the line it's on) new_content = re.sub(r".*" + KEY_PATTERN + r".*\n?", "", content) # If the file ends up empty, consider adding a note if not new_content.strip(): logger.info(f"File {auth_keys_path} will be empty after removal") # Write the updated content back to the file try: atomic_rewrite( auth_keys_path, new_content, backup=True, ) logger.info( "Successfully removed analyst public key from" f" {auth_keys_path}" ) return True except IOError as e: logger.error(f"Failed to write to {auth_keys_path}: {e}") return False except Exception as e: logger.error(f"Failed to remove public key: {e}") return False
Upload File
Create Folder