903 lines
38 KiB
Python
Executable File
903 lines
38 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Oculog Client Agent
|
|
Collects system metrics and sends them to the server
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import time
|
|
import json
|
|
import socket
|
|
import requests
|
|
import psutil
|
|
import logging
|
|
import subprocess
|
|
from pathlib import Path
|
|
from datetime import datetime, timezone
|
|
|
|
# Configuration
|
|
CONFIG_FILE = '/etc/oculog/client.conf'
|
|
LOG_FILE = '/var/log/oculog/client.log'
|
|
PID_FILE = '/var/run/oculog-client.pid'
|
|
CLIENT_SCRIPT_PATH = '/opt/oculog/client.py'
|
|
|
|
# Client version - build timestamp in format year-month-day-hour-minute
|
|
# This will be injected by the server when serving the script
|
|
# Format: CLIENT_VERSION_BUILD_TIMESTAMP = "YYYY-MM-DD-HH-MM"
|
|
# If not injected, will use file modification time
|
|
CLIENT_VERSION_BUILD_TIMESTAMP = None # Will be injected by server
|
|
|
|
def get_client_version():
|
|
"""Get client version from build timestamp or file modification time"""
|
|
# First check if build timestamp was injected
|
|
if CLIENT_VERSION_BUILD_TIMESTAMP:
|
|
return CLIENT_VERSION_BUILD_TIMESTAMP
|
|
|
|
# Fallback to file modification time
|
|
try:
|
|
script_dir = os.path.dirname(os.path.abspath(__file__))
|
|
script_path = os.path.join(script_dir, 'client.py')
|
|
if not os.path.exists(script_path):
|
|
script_path = CLIENT_SCRIPT_PATH
|
|
|
|
if os.path.exists(script_path):
|
|
mtime = os.path.getmtime(script_path)
|
|
dt = datetime.fromtimestamp(mtime)
|
|
return dt.strftime('%Y-%m-%d-%H-%M')
|
|
except Exception as e:
|
|
logger.warning(f"Could not determine client version: {e}")
|
|
|
|
# Final fallback: use current time (for new installations)
|
|
return datetime.now().strftime('%Y-%m-%d-%H-%M')
|
|
|
|
CLIENT_VERSION = get_client_version()
|
|
|
|
# Ensure log directory exists
|
|
log_dir = os.path.dirname(LOG_FILE)
|
|
os.makedirs(log_dir, exist_ok=True)
|
|
|
|
# Setup logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
|
handlers=[
|
|
logging.FileHandler(LOG_FILE),
|
|
logging.StreamHandler(sys.stdout)
|
|
]
|
|
)
|
|
logger = logging.getLogger('oculog-client')
|
|
|
|
|
|
class VulnerabilityScanner:
|
|
"""Scans for vulnerabilities in installed packages"""
|
|
|
|
def __init__(self, enabled=True):
|
|
self.enabled = enabled
|
|
self.last_scan_time = 0
|
|
self.scan_interval = 86400 # 1 day in seconds
|
|
|
|
def get_installed_packages(self):
|
|
"""Get list of all installed packages (excluding removed packages)"""
|
|
try:
|
|
# Use dpkg-query with status to filter out removed packages
|
|
# Status format: wantok installed, config-files, half-configured, etc.
|
|
# We only want packages with "install ok installed" status
|
|
result = subprocess.run(
|
|
['dpkg-query', '-W', '-f=${Status}\t${Package}\t${Version}\n'],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=10
|
|
)
|
|
|
|
if result.returncode != 0:
|
|
logger.error(f"dpkg-query failed: {result.stderr}")
|
|
return []
|
|
|
|
packages = []
|
|
for line in result.stdout.strip().split('\n'):
|
|
if not line.strip():
|
|
continue
|
|
parts = line.split('\t')
|
|
if len(parts) >= 3:
|
|
status, name, version = parts[0], parts[1], parts[2]
|
|
# Only include packages that are actually installed
|
|
# Status format: "install ok installed" means fully installed
|
|
# "deinstall ok config-files" (rc) means removed but config remains - exclude these
|
|
if 'install ok installed' in status:
|
|
packages.append({
|
|
'name': name.strip(),
|
|
'version': version.strip()
|
|
})
|
|
|
|
return packages
|
|
except subprocess.TimeoutExpired:
|
|
logger.error("dpkg-query timed out")
|
|
return []
|
|
except Exception as e:
|
|
logger.error(f"Error getting installed packages: {e}")
|
|
return []
|
|
|
|
def should_scan(self):
|
|
"""Check if it's time to run a vulnerability scan"""
|
|
if not self.enabled:
|
|
return False
|
|
current_time = time.time()
|
|
return (current_time - self.last_scan_time) >= self.scan_interval
|
|
|
|
def get_os_version(self):
|
|
"""Get Ubuntu version from /etc/os-release"""
|
|
try:
|
|
if os.path.exists('/etc/os-release'):
|
|
with open('/etc/os-release', 'r') as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if line.startswith('VERSION_ID='):
|
|
# Extract version, remove quotes
|
|
version = line.split('=', 1)[1].strip('"').strip("'")
|
|
return version
|
|
elif line.startswith('VERSION='):
|
|
# Fallback: try to extract version from VERSION field
|
|
version_str = line.split('=', 1)[1].strip('"').strip("'")
|
|
# Try to extract version number (e.g., "22.04" from "22.04 LTS")
|
|
import re
|
|
match = re.search(r'(\d+\.\d+)', version_str)
|
|
if match:
|
|
return match.group(1)
|
|
return None
|
|
except Exception as e:
|
|
logger.warning(f"Error reading OS version: {e}")
|
|
return None
|
|
|
|
def scan(self):
|
|
"""Perform vulnerability scan"""
|
|
try:
|
|
logger.info("Starting vulnerability scan...")
|
|
packages = self.get_installed_packages()
|
|
|
|
if not packages:
|
|
logger.warning("No packages found to scan")
|
|
return False
|
|
|
|
logger.info(f"Scanning {len(packages)} packages for vulnerabilities")
|
|
self.last_scan_time = time.time()
|
|
|
|
# Return packages with OS version info
|
|
return {
|
|
'packages': packages,
|
|
'os_version': self.get_os_version()
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error during vulnerability scan: {e}")
|
|
return None
|
|
|
|
|
|
class MetricsCollector:
|
|
"""Collects system metrics"""
|
|
|
|
def __init__(self):
|
|
self.hostname = socket.gethostname()
|
|
self.last_network_stats = None
|
|
self.last_network_time = None
|
|
# Cache for rarely-changing data
|
|
self.cached_os_release = None
|
|
self.cached_public_ip = None
|
|
self.cached_public_ip_time = 0
|
|
self.public_ip_cache_duration = 3600 # Cache public IP for 1 hour
|
|
self.last_docker_check = 0
|
|
self.docker_check_interval = 300 # Check docker every 5 minutes
|
|
self.cached_docker_available = None
|
|
self.cached_containers = None
|
|
|
|
def get_cpu_metrics(self):
|
|
"""Get CPU usage percentage"""
|
|
try:
|
|
# Use interval=None to avoid blocking - first call returns 0.0,
|
|
# but since client runs continuously, subsequent calls will be accurate
|
|
cpu_percent = psutil.cpu_percent(interval=None)
|
|
cpu_count = psutil.cpu_count()
|
|
return {
|
|
'usage': round(cpu_percent, 2),
|
|
'cores': cpu_count
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error collecting CPU metrics: {e}")
|
|
return {'usage': 0, 'cores': 0}
|
|
|
|
def get_memory_metrics(self):
|
|
"""Get memory usage"""
|
|
try:
|
|
mem = psutil.virtual_memory()
|
|
return {
|
|
'total': round(mem.total / (1024**3), 2), # GB
|
|
'used': round(mem.used / (1024**3), 2), # GB
|
|
'available': round(mem.available / (1024**3), 2), # GB
|
|
'percent': round(mem.percent, 2)
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error collecting memory metrics: {e}")
|
|
return {'total': 0, 'used': 0, 'available': 0, 'percent': 0}
|
|
|
|
def get_swap_metrics(self):
|
|
"""Get swap usage"""
|
|
try:
|
|
swap = psutil.swap_memory()
|
|
return {
|
|
'total': round(swap.total / (1024**3), 2), # GB
|
|
'used': round(swap.used / (1024**3), 2), # GB
|
|
'free': round(swap.free / (1024**3), 2), # GB
|
|
'percent': round(swap.percent, 2)
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error collecting swap metrics: {e}")
|
|
return {'total': 0, 'used': 0, 'free': 0, 'percent': 0}
|
|
|
|
def get_process_count(self):
|
|
"""Get total process count"""
|
|
try:
|
|
# More efficient: use process_iter with a counter instead of creating full list
|
|
count = 0
|
|
for _ in psutil.process_iter():
|
|
count += 1
|
|
return count
|
|
except Exception as e:
|
|
logger.error(f"Error collecting process count: {e}")
|
|
return 0
|
|
|
|
def get_disk_metrics(self):
|
|
"""Get disk usage for all mounted filesystems"""
|
|
try:
|
|
disks = []
|
|
partitions = psutil.disk_partitions()
|
|
|
|
for partition in partitions:
|
|
try:
|
|
# Skip virtual filesystems and network mounts
|
|
if partition.fstype in ['tmpfs', 'devtmpfs', 'sysfs', 'proc', 'devpts', 'cgroup', 'cgroup2', 'pstore', 'bpf', 'tracefs', 'debugfs', 'securityfs', 'hugetlbfs', 'mqueue', 'overlay', 'autofs', 'squashfs']:
|
|
continue
|
|
|
|
# Skip network filesystems (optional - comment out if you want to include them)
|
|
if partition.fstype.startswith('nfs') or partition.fstype.startswith('cifs') or partition.fstype.startswith('smb'):
|
|
continue
|
|
|
|
# Skip loop devices (snap packages, etc.)
|
|
if partition.device.startswith('/dev/loop'):
|
|
continue
|
|
|
|
# Skip snap mount points
|
|
if partition.mountpoint.startswith('/snap/'):
|
|
continue
|
|
|
|
disk_usage = psutil.disk_usage(partition.mountpoint)
|
|
disk_info = {
|
|
'mountpoint': partition.mountpoint,
|
|
'device': partition.device,
|
|
'fstype': partition.fstype,
|
|
'total': round(disk_usage.total / (1024**3), 2), # GB
|
|
'used': round(disk_usage.used / (1024**3), 2), # GB
|
|
'free': round(disk_usage.free / (1024**3), 2), # GB
|
|
'percent': round(disk_usage.percent, 2)
|
|
}
|
|
disks.append(disk_info)
|
|
logger.debug(f"Collected disk metrics: {partition.mountpoint} ({partition.device}) - {disk_info['used']:.2f}GB / {disk_info['total']:.2f}GB ({disk_info['percent']:.1f}%)")
|
|
except PermissionError:
|
|
# Skip partitions we don't have permission to access
|
|
logger.debug(f"Skipping {partition.mountpoint} due to permission error")
|
|
continue
|
|
except Exception as e:
|
|
logger.warning(f"Error getting disk usage for {partition.mountpoint}: {e}")
|
|
continue
|
|
|
|
# Return root disk for backward compatibility, plus all disks
|
|
root_disk = next((d for d in disks if d['mountpoint'] == '/'), disks[0] if disks else None)
|
|
|
|
logger.info(f"Collected metrics for {len(disks)} disk(s): {[d['mountpoint'] for d in disks]}")
|
|
|
|
if root_disk:
|
|
return {
|
|
'total': root_disk['total'],
|
|
'used': root_disk['used'],
|
|
'free': root_disk['free'],
|
|
'percent': root_disk['percent'],
|
|
'disks': disks # Include all disks
|
|
}
|
|
else:
|
|
return {
|
|
'total': 0,
|
|
'used': 0,
|
|
'free': 0,
|
|
'percent': 0,
|
|
'disks': disks
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error collecting disk metrics: {e}")
|
|
return {'total': 0, 'used': 0, 'free': 0, 'percent': 0, 'disks': []}
|
|
|
|
def get_network_metrics(self):
|
|
"""Get network throughput"""
|
|
try:
|
|
net_io = psutil.net_io_counters()
|
|
current_time = time.time()
|
|
|
|
if self.last_network_stats is None:
|
|
self.last_network_stats = net_io
|
|
self.last_network_time = current_time
|
|
return {'rx': 0, 'tx': 0, 'rx_total': 0, 'tx_total': 0}
|
|
|
|
time_delta = current_time - self.last_network_time
|
|
if time_delta == 0:
|
|
return {'rx': 0, 'tx': 0, 'rx_total': 0, 'tx_total': 0}
|
|
|
|
rx_bytes = net_io.bytes_recv - self.last_network_stats.bytes_recv
|
|
tx_bytes = net_io.bytes_sent - self.last_network_stats.bytes_sent
|
|
|
|
rx_mbps = (rx_bytes * 8) / (time_delta * 1024 * 1024) # Mbps
|
|
tx_mbps = (tx_bytes * 8) / (time_delta * 1024 * 1024) # Mbps
|
|
|
|
self.last_network_stats = net_io
|
|
self.last_network_time = current_time
|
|
|
|
return {
|
|
'rx': round(rx_mbps, 2), # Mbps
|
|
'tx': round(tx_mbps, 2), # Mbps
|
|
'rx_total': round(net_io.bytes_recv / (1024**3), 2), # GB
|
|
'tx_total': round(net_io.bytes_sent / (1024**3), 2) # GB
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error collecting network metrics: {e}")
|
|
return {'rx': 0, 'tx': 0, 'rx_total': 0, 'tx_total': 0}
|
|
|
|
def get_server_info(self):
|
|
"""Get server information: OS release, status, processes, containers, IP info"""
|
|
server_info = {
|
|
'os_release': None,
|
|
'live_status': None,
|
|
'top_processes': None,
|
|
'containers': None,
|
|
'ip_info': None
|
|
}
|
|
|
|
# Get OS release info (cached since it rarely changes)
|
|
if self.cached_os_release is None:
|
|
try:
|
|
os_release = {}
|
|
if os.path.exists('/etc/os-release'):
|
|
with open('/etc/os-release', 'r') as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if '=' in line and not line.startswith('#'):
|
|
key, value = line.split('=', 1)
|
|
# Remove quotes from value
|
|
value = value.strip('"').strip("'")
|
|
os_release[key] = value
|
|
self.cached_os_release = os_release if os_release else None
|
|
except Exception as e:
|
|
logger.warning(f"Error reading /etc/os-release: {e}")
|
|
self.cached_os_release = None
|
|
server_info['os_release'] = self.cached_os_release
|
|
|
|
# Get live status (uptime, load average)
|
|
try:
|
|
uptime_seconds = time.time() - psutil.boot_time()
|
|
uptime_days = int(uptime_seconds // 86400)
|
|
uptime_hours = int((uptime_seconds % 86400) // 3600)
|
|
uptime_minutes = int((uptime_seconds % 3600) // 60)
|
|
|
|
load_avg = os.getloadavg()
|
|
|
|
server_info['live_status'] = {
|
|
'uptime_days': uptime_days,
|
|
'uptime_hours': uptime_hours,
|
|
'uptime_minutes': uptime_minutes,
|
|
'uptime_seconds': round(uptime_seconds, 2),
|
|
'load_average_1min': round(load_avg[0], 2),
|
|
'load_average_5min': round(load_avg[1], 2),
|
|
'load_average_15min': round(load_avg[2], 2)
|
|
}
|
|
except Exception as e:
|
|
logger.warning(f"Error getting live status: {e}")
|
|
|
|
# Get top processes by CPU usage
|
|
try:
|
|
# Collect CPU usage for all processes, then sort to get actual top processes
|
|
# This ensures we find the highest CPU-consuming processes regardless of process order
|
|
processes = []
|
|
|
|
for proc in psutil.process_iter(['pid', 'name', 'memory_percent', 'username']):
|
|
try:
|
|
# Get CPU percent - first call may return 0.0, but that's acceptable
|
|
# The client runs continuously so subsequent calls will have accurate values
|
|
cpu_pct = proc.cpu_percent(interval=None)
|
|
if cpu_pct is None:
|
|
cpu_pct = 0.0
|
|
|
|
proc_info = proc.info
|
|
proc_info['cpu_percent'] = round(cpu_pct, 2)
|
|
processes.append(proc_info)
|
|
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
|
|
continue
|
|
|
|
# Sort by CPU usage and take top 10
|
|
processes.sort(key=lambda x: x.get('cpu_percent', 0), reverse=True)
|
|
server_info['top_processes'] = processes[:10]
|
|
except Exception as e:
|
|
logger.warning(f"Error getting top processes: {e}")
|
|
|
|
# Get running containers (Docker) - check less frequently
|
|
current_time = time.time()
|
|
if current_time - self.last_docker_check >= self.docker_check_interval:
|
|
self.last_docker_check = current_time
|
|
try:
|
|
# First check if docker is available (cached)
|
|
if self.cached_docker_available is None:
|
|
# Check if docker command exists
|
|
docker_check = subprocess.run(
|
|
['which', 'docker'],
|
|
capture_output=True,
|
|
timeout=1
|
|
)
|
|
self.cached_docker_available = docker_check.returncode == 0
|
|
|
|
if self.cached_docker_available:
|
|
result = subprocess.run(
|
|
['docker', 'ps', '--format', 'json'],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5
|
|
)
|
|
if result.returncode == 0:
|
|
containers = []
|
|
for line in result.stdout.strip().split('\n'):
|
|
if line.strip():
|
|
try:
|
|
container = json.loads(line)
|
|
containers.append({
|
|
'id': container.get('ID', '')[:12],
|
|
'image': container.get('Image', ''),
|
|
'command': container.get('Command', ''),
|
|
'created': container.get('CreatedAt', ''),
|
|
'status': container.get('Status', ''),
|
|
'ports': container.get('Ports', ''),
|
|
'names': container.get('Names', '')
|
|
})
|
|
except json.JSONDecodeError:
|
|
continue
|
|
self.cached_containers = containers if containers else None
|
|
else:
|
|
self.cached_containers = None
|
|
else:
|
|
self.cached_containers = None
|
|
except (subprocess.TimeoutExpired, FileNotFoundError, Exception) as e:
|
|
# Docker might not be installed or not running
|
|
self.cached_docker_available = False
|
|
self.cached_containers = None
|
|
|
|
# Use cached containers value
|
|
server_info['containers'] = self.cached_containers
|
|
|
|
# Get IP info (private and public)
|
|
try:
|
|
ip_info = {
|
|
'private': [],
|
|
'public': None
|
|
}
|
|
|
|
# Get private IPs from network interfaces
|
|
for interface, addrs in psutil.net_if_addrs().items():
|
|
for addr in addrs:
|
|
if addr.family == socket.AF_INET: # IPv4
|
|
ip_addr = addr.address
|
|
# Skip loopback and link-local
|
|
if not ip_addr.startswith('127.') and not ip_addr.startswith('169.254.'):
|
|
ip_info['private'].append({
|
|
'interface': interface,
|
|
'ip': ip_addr,
|
|
'netmask': addr.netmask
|
|
})
|
|
|
|
# Try to get public IP (cached since it rarely changes)
|
|
current_time = time.time()
|
|
if self.cached_public_ip is None or (current_time - self.cached_public_ip_time) >= self.public_ip_cache_duration:
|
|
try:
|
|
# Try multiple services for reliability
|
|
public_ip_services = [
|
|
'https://api.ipify.org?format=json',
|
|
'https://ifconfig.me/ip',
|
|
'https://icanhazip.com'
|
|
]
|
|
|
|
for service_url in public_ip_services:
|
|
try:
|
|
response = requests.get(service_url, timeout=3)
|
|
if response.status_code == 200:
|
|
# Handle different response formats
|
|
if 'ipify' in service_url:
|
|
public_ip = response.json().get('ip', '').strip()
|
|
else:
|
|
public_ip = response.text.strip()
|
|
|
|
if public_ip:
|
|
self.cached_public_ip = public_ip
|
|
self.cached_public_ip_time = current_time
|
|
ip_info['public'] = public_ip
|
|
break
|
|
except:
|
|
continue
|
|
except Exception as e:
|
|
logger.debug(f"Could not fetch public IP: {e}")
|
|
else:
|
|
# Use cached public IP
|
|
ip_info['public'] = self.cached_public_ip
|
|
|
|
server_info['ip_info'] = ip_info if ip_info['private'] or ip_info['public'] else None
|
|
except Exception as e:
|
|
logger.warning(f"Error getting IP info: {e}")
|
|
|
|
return server_info
|
|
|
|
def collect_all_metrics(self):
|
|
"""Collect all system metrics"""
|
|
server_info = self.get_server_info()
|
|
|
|
# Extract load average and uptime from server_info for metrics payload
|
|
load_avg = None
|
|
uptime_seconds = None
|
|
if server_info.get('live_status'):
|
|
live_status = server_info['live_status']
|
|
load_avg = {
|
|
'1min': live_status.get('load_average_1min'),
|
|
'5min': live_status.get('load_average_5min'),
|
|
'15min': live_status.get('load_average_15min')
|
|
}
|
|
uptime_seconds = live_status.get('uptime_seconds')
|
|
|
|
return {
|
|
'hostname': self.hostname,
|
|
'timestamp': datetime.now(timezone.utc).isoformat(),
|
|
'cpu': self.get_cpu_metrics(),
|
|
'memory': self.get_memory_metrics(),
|
|
'swap': self.get_swap_metrics(),
|
|
'disk': self.get_disk_metrics(),
|
|
'network': self.get_network_metrics(),
|
|
'process_count': self.get_process_count(),
|
|
'load_avg': load_avg,
|
|
'uptime_seconds': uptime_seconds,
|
|
'server_info': server_info,
|
|
'client_version': CLIENT_VERSION
|
|
}
|
|
|
|
|
|
class OculogClient:
|
|
"""Main client class"""
|
|
|
|
def __init__(self, config):
|
|
self.config = config
|
|
self.server_url = config.get('server_url', 'http://localhost:3001')
|
|
self.server_id = config.get('server_id', socket.gethostname())
|
|
self.api_key = config.get('api_key', '')
|
|
self.interval = config.get('interval', 30) # seconds
|
|
self.collector = MetricsCollector()
|
|
# Vulnerability scanning is enabled by default, but can be disabled via config
|
|
vulnerability_scan_enabled = config.get('vulnerability_scan_enabled', True)
|
|
self.vulnerability_scanner = VulnerabilityScanner(enabled=vulnerability_scan_enabled)
|
|
self.running = False
|
|
self.last_update_check = 0
|
|
self.update_check_interval = 3600 # Check for updates every hour
|
|
|
|
def send_metrics(self, metrics):
|
|
"""Send metrics to server"""
|
|
try:
|
|
url = f"{self.server_url}/api/servers/{self.server_id}/metrics"
|
|
headers = {
|
|
'Content-Type': 'application/json',
|
|
'X-API-Key': self.api_key
|
|
}
|
|
|
|
if not self.api_key:
|
|
logger.warning("No API key configured. Metrics may be rejected by server.")
|
|
|
|
response = requests.post(
|
|
url,
|
|
json=metrics,
|
|
timeout=10,
|
|
headers=headers
|
|
)
|
|
response.raise_for_status()
|
|
logger.info(f"Metrics sent successfully: {response.json()}")
|
|
return True
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"Failed to send metrics: {e}")
|
|
return False
|
|
|
|
def send_vulnerability_scan(self, scan_data):
|
|
"""Send vulnerability scan results to server in batches if needed"""
|
|
try:
|
|
url = f"{self.server_url}/api/servers/{self.server_id}/vulnerabilities/scan"
|
|
headers = {
|
|
'Content-Type': 'application/json',
|
|
'X-API-Key': self.api_key
|
|
}
|
|
|
|
if not self.api_key:
|
|
logger.warning("No API key configured. Vulnerability scan may be rejected by server.")
|
|
return False
|
|
|
|
# Extract packages and os_version
|
|
if isinstance(scan_data, dict) and 'packages' in scan_data:
|
|
packages = scan_data['packages']
|
|
os_version = scan_data.get('os_version')
|
|
else:
|
|
# Legacy format: just a list of packages
|
|
packages = scan_data
|
|
os_version = None
|
|
|
|
# Batch packages to avoid payload size limits (send 500 packages per batch)
|
|
batch_size = 500
|
|
total_packages = len(packages)
|
|
total_vulnerabilities = 0
|
|
success_count = 0
|
|
|
|
logger.info(f"Sending vulnerability scan for {total_packages} packages in batches of {batch_size}")
|
|
|
|
for i in range(0, total_packages, batch_size):
|
|
batch = packages[i:i + batch_size]
|
|
batch_num = (i // batch_size) + 1
|
|
total_batches = (total_packages + batch_size - 1) // batch_size
|
|
|
|
payload = {
|
|
'packages': batch,
|
|
'os_version': os_version,
|
|
'batch_info': {
|
|
'batch_number': batch_num,
|
|
'total_batches': total_batches,
|
|
'is_last_batch': (i + batch_size >= total_packages)
|
|
}
|
|
}
|
|
|
|
try:
|
|
response = requests.post(
|
|
url,
|
|
json=payload,
|
|
timeout=120, # Longer timeout for vulnerability scans
|
|
headers=headers
|
|
)
|
|
response.raise_for_status()
|
|
result = response.json()
|
|
batch_vulns = result.get('vulnerabilities_found', 0)
|
|
total_vulnerabilities += batch_vulns
|
|
success_count += 1
|
|
logger.info(f"Batch {batch_num}/{total_batches} sent successfully: {len(batch)} packages, {batch_vulns} vulnerabilities found")
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"Failed to send vulnerability scan batch {batch_num}/{total_batches}: {e}")
|
|
# Continue with other batches even if one fails
|
|
continue
|
|
|
|
if success_count > 0:
|
|
logger.info(f"Vulnerability scan completed: {success_count}/{total_batches} batches successful, {total_vulnerabilities} total vulnerabilities found")
|
|
return True
|
|
else:
|
|
logger.error("All vulnerability scan batches failed")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Failed to send vulnerability scan: {e}")
|
|
return False
|
|
|
|
def check_for_updates(self):
|
|
"""Check if a newer client version is available"""
|
|
try:
|
|
url = f"{self.server_url}/api/client-version/latest"
|
|
headers = {
|
|
'X-API-Key': self.api_key
|
|
}
|
|
|
|
logger.debug(f"Checking for updates: current version={CLIENT_VERSION}, server={url}")
|
|
response = requests.get(url, timeout=5, headers=headers)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
latest_version = data.get('version')
|
|
|
|
if not latest_version:
|
|
logger.warning(f"Update check returned no version data from server")
|
|
return False
|
|
|
|
logger.debug(f"Server reports latest version: {latest_version}")
|
|
|
|
# Compare versions (format: YYYY-MM-DD-HH-MM)
|
|
# Simple string comparison works for this format
|
|
if CLIENT_VERSION < latest_version:
|
|
logger.info(f"Update available: current={CLIENT_VERSION}, latest={latest_version}")
|
|
return True
|
|
else:
|
|
logger.debug(f"No update needed: current={CLIENT_VERSION}, latest={latest_version}")
|
|
|
|
return False
|
|
except requests.exceptions.RequestException as e:
|
|
logger.warning(f"Could not check for updates (network error): {e}")
|
|
return False
|
|
except Exception as e:
|
|
logger.warning(f"Could not check for updates (unexpected error): {e}")
|
|
return False
|
|
|
|
def perform_auto_update(self):
|
|
"""Perform automatic update of the client"""
|
|
try:
|
|
logger.info("Starting automatic client update...")
|
|
|
|
# Download updated client script
|
|
script_url = f"{self.server_url}/api/client-script"
|
|
temp_script = '/opt/oculog/client.py.new'
|
|
|
|
response = requests.get(script_url, timeout=30)
|
|
response.raise_for_status()
|
|
|
|
# Backup current script
|
|
if os.path.exists(CLIENT_SCRIPT_PATH):
|
|
backup_path = f"{CLIENT_SCRIPT_PATH}.backup"
|
|
subprocess.run(['cp', CLIENT_SCRIPT_PATH, backup_path], check=False)
|
|
|
|
# Write new script
|
|
with open(temp_script, 'wb') as f:
|
|
f.write(response.content)
|
|
|
|
# Make executable
|
|
os.chmod(temp_script, 0o755)
|
|
|
|
# Replace old script atomically
|
|
subprocess.run(['mv', temp_script, CLIENT_SCRIPT_PATH], check=True)
|
|
|
|
logger.info("Client script updated successfully")
|
|
|
|
# Trigger systemd restart by exiting (systemd will restart due to Restart=always)
|
|
# We use exit code 0 to indicate successful update
|
|
logger.info("Exiting to allow systemd to restart with new version")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Auto-update failed: {e}")
|
|
# Try to restore backup if update failed
|
|
backup_path = f"{CLIENT_SCRIPT_PATH}.backup"
|
|
if os.path.exists(backup_path):
|
|
try:
|
|
subprocess.run(['cp', backup_path, CLIENT_SCRIPT_PATH], check=False)
|
|
logger.info("Restored backup after failed update")
|
|
except:
|
|
pass
|
|
return False
|
|
|
|
def run(self):
|
|
"""Main loop"""
|
|
self.running = True
|
|
logger.info(f"Starting Oculog client (server_id: {self.server_id}, version: {CLIENT_VERSION}, interval: {self.interval}s)")
|
|
|
|
# Initial network stats collection (needed for first calculation)
|
|
self.collector.get_network_metrics()
|
|
time.sleep(1)
|
|
|
|
while self.running:
|
|
try:
|
|
# Check for updates periodically
|
|
current_time = time.time()
|
|
if current_time - self.last_update_check >= self.update_check_interval:
|
|
logger.info(f"Checking for client updates (current version: {CLIENT_VERSION})...")
|
|
self.last_update_check = current_time
|
|
try:
|
|
if self.check_for_updates():
|
|
logger.info("Newer version detected, performing auto-update...")
|
|
if self.perform_auto_update():
|
|
logger.info("Auto-update completed, exiting to allow restart...")
|
|
# Exit gracefully - systemd will restart with Restart=always
|
|
self.running = False
|
|
sys.exit(0)
|
|
else:
|
|
logger.warning("Auto-update failed, continuing with current version")
|
|
except Exception as e:
|
|
logger.error(f"Error during update check: {e}")
|
|
# Continue running even if update check fails
|
|
|
|
# Check for vulnerability scan (hourly)
|
|
if self.vulnerability_scanner.should_scan():
|
|
try:
|
|
scan_data = self.vulnerability_scanner.scan()
|
|
if scan_data:
|
|
package_count = len(scan_data['packages']) if isinstance(scan_data, dict) else len(scan_data)
|
|
logger.info(f"Sending vulnerability scan for {package_count} packages...")
|
|
if self.send_vulnerability_scan(scan_data):
|
|
logger.info("Vulnerability scan completed successfully")
|
|
else:
|
|
logger.warning("Vulnerability scan failed to send, will retry on next cycle")
|
|
except Exception as e:
|
|
logger.error(f"Error during vulnerability scan: {e}")
|
|
# Continue running even if vulnerability scan fails
|
|
|
|
metrics = self.collector.collect_all_metrics()
|
|
logger.debug(f"Collected metrics: {json.dumps(metrics, indent=2)}")
|
|
|
|
if self.send_metrics(metrics):
|
|
logger.info("Metrics collection cycle completed successfully")
|
|
else:
|
|
logger.warning("Failed to send metrics, will retry on next cycle")
|
|
|
|
time.sleep(self.interval)
|
|
except KeyboardInterrupt:
|
|
logger.info("Received interrupt signal, shutting down...")
|
|
self.running = False
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error in main loop: {e}")
|
|
time.sleep(self.interval)
|
|
|
|
def stop(self):
|
|
"""Stop the client"""
|
|
self.running = False
|
|
|
|
|
|
def load_config():
|
|
"""Load configuration from file"""
|
|
default_config = {
|
|
'server_url': 'http://localhost:3001',
|
|
'server_id': socket.gethostname(),
|
|
'interval': 30
|
|
}
|
|
|
|
if os.path.exists(CONFIG_FILE):
|
|
try:
|
|
with open(CONFIG_FILE, 'r') as f:
|
|
config = json.load(f)
|
|
default_config.update(config)
|
|
logger.info(f"Loaded configuration from {CONFIG_FILE}")
|
|
except Exception as e:
|
|
logger.warning(f"Failed to load config file, using defaults: {e}")
|
|
else:
|
|
logger.info(f"Config file not found at {CONFIG_FILE}, using defaults")
|
|
|
|
return default_config
|
|
|
|
|
|
def write_pid_file():
|
|
"""Write PID file"""
|
|
try:
|
|
pid_dir = os.path.dirname(PID_FILE)
|
|
os.makedirs(pid_dir, exist_ok=True)
|
|
with open(PID_FILE, 'w') as f:
|
|
f.write(str(os.getpid()))
|
|
except Exception as e:
|
|
logger.error(f"Failed to write PID file: {e}")
|
|
|
|
|
|
def remove_pid_file():
|
|
"""Remove PID file"""
|
|
try:
|
|
if os.path.exists(PID_FILE):
|
|
os.remove(PID_FILE)
|
|
except Exception as e:
|
|
logger.error(f"Failed to remove PID file: {e}")
|
|
|
|
|
|
def main():
|
|
"""Main entry point"""
|
|
if len(sys.argv) > 1:
|
|
if sys.argv[1] == '--version':
|
|
print(f"Oculog Client version {CLIENT_VERSION}")
|
|
sys.exit(0)
|
|
elif sys.argv[1] == '--help':
|
|
print("Usage: oculog-client [--version|--help]")
|
|
print("\nOculog Client Agent")
|
|
print("Collects system metrics and sends them to the server")
|
|
sys.exit(0)
|
|
|
|
# Ensure log directory exists
|
|
log_dir = os.path.dirname(LOG_FILE)
|
|
os.makedirs(log_dir, exist_ok=True)
|
|
|
|
config = load_config()
|
|
client = OculogClient(config)
|
|
|
|
try:
|
|
write_pid_file()
|
|
client.run()
|
|
finally:
|
|
remove_pid_file()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|
|
|