Files
oculog/clients/ubuntu/client.py
2026-02-12 14:52:37 -06:00

903 lines
38 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Oculog Client Agent
Collects system metrics and sends them to the server
"""
import os
import sys
import time
import json
import socket
import requests
import psutil
import logging
import subprocess
from pathlib import Path
from datetime import datetime, timezone
# Configuration
CONFIG_FILE = '/etc/oculog/client.conf'
LOG_FILE = '/var/log/oculog/client.log'
PID_FILE = '/var/run/oculog-client.pid'
CLIENT_SCRIPT_PATH = '/opt/oculog/client.py'
# Client version - build timestamp in format year-month-day-hour-minute
# This will be injected by the server when serving the script
# Format: CLIENT_VERSION_BUILD_TIMESTAMP = "YYYY-MM-DD-HH-MM"
# If not injected, will use file modification time
CLIENT_VERSION_BUILD_TIMESTAMP = None # Will be injected by server
def get_client_version():
"""Get client version from build timestamp or file modification time"""
# First check if build timestamp was injected
if CLIENT_VERSION_BUILD_TIMESTAMP:
return CLIENT_VERSION_BUILD_TIMESTAMP
# Fallback to file modification time
try:
script_dir = os.path.dirname(os.path.abspath(__file__))
script_path = os.path.join(script_dir, 'client.py')
if not os.path.exists(script_path):
script_path = CLIENT_SCRIPT_PATH
if os.path.exists(script_path):
mtime = os.path.getmtime(script_path)
dt = datetime.fromtimestamp(mtime)
return dt.strftime('%Y-%m-%d-%H-%M')
except Exception as e:
logger.warning(f"Could not determine client version: {e}")
# Final fallback: use current time (for new installations)
return datetime.now().strftime('%Y-%m-%d-%H-%M')
CLIENT_VERSION = get_client_version()
# Ensure log directory exists
log_dir = os.path.dirname(LOG_FILE)
os.makedirs(log_dir, exist_ok=True)
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(LOG_FILE),
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger('oculog-client')
class VulnerabilityScanner:
"""Scans for vulnerabilities in installed packages"""
def __init__(self, enabled=True):
self.enabled = enabled
self.last_scan_time = 0
self.scan_interval = 86400 # 1 day in seconds
def get_installed_packages(self):
"""Get list of all installed packages (excluding removed packages)"""
try:
# Use dpkg-query with status to filter out removed packages
# Status format: wantok installed, config-files, half-configured, etc.
# We only want packages with "install ok installed" status
result = subprocess.run(
['dpkg-query', '-W', '-f=${Status}\t${Package}\t${Version}\n'],
capture_output=True,
text=True,
timeout=10
)
if result.returncode != 0:
logger.error(f"dpkg-query failed: {result.stderr}")
return []
packages = []
for line in result.stdout.strip().split('\n'):
if not line.strip():
continue
parts = line.split('\t')
if len(parts) >= 3:
status, name, version = parts[0], parts[1], parts[2]
# Only include packages that are actually installed
# Status format: "install ok installed" means fully installed
# "deinstall ok config-files" (rc) means removed but config remains - exclude these
if 'install ok installed' in status:
packages.append({
'name': name.strip(),
'version': version.strip()
})
return packages
except subprocess.TimeoutExpired:
logger.error("dpkg-query timed out")
return []
except Exception as e:
logger.error(f"Error getting installed packages: {e}")
return []
def should_scan(self):
"""Check if it's time to run a vulnerability scan"""
if not self.enabled:
return False
current_time = time.time()
return (current_time - self.last_scan_time) >= self.scan_interval
def get_os_version(self):
"""Get Ubuntu version from /etc/os-release"""
try:
if os.path.exists('/etc/os-release'):
with open('/etc/os-release', 'r') as f:
for line in f:
line = line.strip()
if line.startswith('VERSION_ID='):
# Extract version, remove quotes
version = line.split('=', 1)[1].strip('"').strip("'")
return version
elif line.startswith('VERSION='):
# Fallback: try to extract version from VERSION field
version_str = line.split('=', 1)[1].strip('"').strip("'")
# Try to extract version number (e.g., "22.04" from "22.04 LTS")
import re
match = re.search(r'(\d+\.\d+)', version_str)
if match:
return match.group(1)
return None
except Exception as e:
logger.warning(f"Error reading OS version: {e}")
return None
def scan(self):
"""Perform vulnerability scan"""
try:
logger.info("Starting vulnerability scan...")
packages = self.get_installed_packages()
if not packages:
logger.warning("No packages found to scan")
return False
logger.info(f"Scanning {len(packages)} packages for vulnerabilities")
self.last_scan_time = time.time()
# Return packages with OS version info
return {
'packages': packages,
'os_version': self.get_os_version()
}
except Exception as e:
logger.error(f"Error during vulnerability scan: {e}")
return None
class MetricsCollector:
"""Collects system metrics"""
def __init__(self):
self.hostname = socket.gethostname()
self.last_network_stats = None
self.last_network_time = None
# Cache for rarely-changing data
self.cached_os_release = None
self.cached_public_ip = None
self.cached_public_ip_time = 0
self.public_ip_cache_duration = 3600 # Cache public IP for 1 hour
self.last_docker_check = 0
self.docker_check_interval = 300 # Check docker every 5 minutes
self.cached_docker_available = None
self.cached_containers = None
def get_cpu_metrics(self):
"""Get CPU usage percentage"""
try:
# Use interval=None to avoid blocking - first call returns 0.0,
# but since client runs continuously, subsequent calls will be accurate
cpu_percent = psutil.cpu_percent(interval=None)
cpu_count = psutil.cpu_count()
return {
'usage': round(cpu_percent, 2),
'cores': cpu_count
}
except Exception as e:
logger.error(f"Error collecting CPU metrics: {e}")
return {'usage': 0, 'cores': 0}
def get_memory_metrics(self):
"""Get memory usage"""
try:
mem = psutil.virtual_memory()
return {
'total': round(mem.total / (1024**3), 2), # GB
'used': round(mem.used / (1024**3), 2), # GB
'available': round(mem.available / (1024**3), 2), # GB
'percent': round(mem.percent, 2)
}
except Exception as e:
logger.error(f"Error collecting memory metrics: {e}")
return {'total': 0, 'used': 0, 'available': 0, 'percent': 0}
def get_swap_metrics(self):
"""Get swap usage"""
try:
swap = psutil.swap_memory()
return {
'total': round(swap.total / (1024**3), 2), # GB
'used': round(swap.used / (1024**3), 2), # GB
'free': round(swap.free / (1024**3), 2), # GB
'percent': round(swap.percent, 2)
}
except Exception as e:
logger.error(f"Error collecting swap metrics: {e}")
return {'total': 0, 'used': 0, 'free': 0, 'percent': 0}
def get_process_count(self):
"""Get total process count"""
try:
# More efficient: use process_iter with a counter instead of creating full list
count = 0
for _ in psutil.process_iter():
count += 1
return count
except Exception as e:
logger.error(f"Error collecting process count: {e}")
return 0
def get_disk_metrics(self):
"""Get disk usage for all mounted filesystems"""
try:
disks = []
partitions = psutil.disk_partitions()
for partition in partitions:
try:
# Skip virtual filesystems and network mounts
if partition.fstype in ['tmpfs', 'devtmpfs', 'sysfs', 'proc', 'devpts', 'cgroup', 'cgroup2', 'pstore', 'bpf', 'tracefs', 'debugfs', 'securityfs', 'hugetlbfs', 'mqueue', 'overlay', 'autofs', 'squashfs']:
continue
# Skip network filesystems (optional - comment out if you want to include them)
if partition.fstype.startswith('nfs') or partition.fstype.startswith('cifs') or partition.fstype.startswith('smb'):
continue
# Skip loop devices (snap packages, etc.)
if partition.device.startswith('/dev/loop'):
continue
# Skip snap mount points
if partition.mountpoint.startswith('/snap/'):
continue
disk_usage = psutil.disk_usage(partition.mountpoint)
disk_info = {
'mountpoint': partition.mountpoint,
'device': partition.device,
'fstype': partition.fstype,
'total': round(disk_usage.total / (1024**3), 2), # GB
'used': round(disk_usage.used / (1024**3), 2), # GB
'free': round(disk_usage.free / (1024**3), 2), # GB
'percent': round(disk_usage.percent, 2)
}
disks.append(disk_info)
logger.debug(f"Collected disk metrics: {partition.mountpoint} ({partition.device}) - {disk_info['used']:.2f}GB / {disk_info['total']:.2f}GB ({disk_info['percent']:.1f}%)")
except PermissionError:
# Skip partitions we don't have permission to access
logger.debug(f"Skipping {partition.mountpoint} due to permission error")
continue
except Exception as e:
logger.warning(f"Error getting disk usage for {partition.mountpoint}: {e}")
continue
# Return root disk for backward compatibility, plus all disks
root_disk = next((d for d in disks if d['mountpoint'] == '/'), disks[0] if disks else None)
logger.info(f"Collected metrics for {len(disks)} disk(s): {[d['mountpoint'] for d in disks]}")
if root_disk:
return {
'total': root_disk['total'],
'used': root_disk['used'],
'free': root_disk['free'],
'percent': root_disk['percent'],
'disks': disks # Include all disks
}
else:
return {
'total': 0,
'used': 0,
'free': 0,
'percent': 0,
'disks': disks
}
except Exception as e:
logger.error(f"Error collecting disk metrics: {e}")
return {'total': 0, 'used': 0, 'free': 0, 'percent': 0, 'disks': []}
def get_network_metrics(self):
"""Get network throughput"""
try:
net_io = psutil.net_io_counters()
current_time = time.time()
if self.last_network_stats is None:
self.last_network_stats = net_io
self.last_network_time = current_time
return {'rx': 0, 'tx': 0, 'rx_total': 0, 'tx_total': 0}
time_delta = current_time - self.last_network_time
if time_delta == 0:
return {'rx': 0, 'tx': 0, 'rx_total': 0, 'tx_total': 0}
rx_bytes = net_io.bytes_recv - self.last_network_stats.bytes_recv
tx_bytes = net_io.bytes_sent - self.last_network_stats.bytes_sent
rx_mbps = (rx_bytes * 8) / (time_delta * 1024 * 1024) # Mbps
tx_mbps = (tx_bytes * 8) / (time_delta * 1024 * 1024) # Mbps
self.last_network_stats = net_io
self.last_network_time = current_time
return {
'rx': round(rx_mbps, 2), # Mbps
'tx': round(tx_mbps, 2), # Mbps
'rx_total': round(net_io.bytes_recv / (1024**3), 2), # GB
'tx_total': round(net_io.bytes_sent / (1024**3), 2) # GB
}
except Exception as e:
logger.error(f"Error collecting network metrics: {e}")
return {'rx': 0, 'tx': 0, 'rx_total': 0, 'tx_total': 0}
def get_server_info(self):
"""Get server information: OS release, status, processes, containers, IP info"""
server_info = {
'os_release': None,
'live_status': None,
'top_processes': None,
'containers': None,
'ip_info': None
}
# Get OS release info (cached since it rarely changes)
if self.cached_os_release is None:
try:
os_release = {}
if os.path.exists('/etc/os-release'):
with open('/etc/os-release', 'r') as f:
for line in f:
line = line.strip()
if '=' in line and not line.startswith('#'):
key, value = line.split('=', 1)
# Remove quotes from value
value = value.strip('"').strip("'")
os_release[key] = value
self.cached_os_release = os_release if os_release else None
except Exception as e:
logger.warning(f"Error reading /etc/os-release: {e}")
self.cached_os_release = None
server_info['os_release'] = self.cached_os_release
# Get live status (uptime, load average)
try:
uptime_seconds = time.time() - psutil.boot_time()
uptime_days = int(uptime_seconds // 86400)
uptime_hours = int((uptime_seconds % 86400) // 3600)
uptime_minutes = int((uptime_seconds % 3600) // 60)
load_avg = os.getloadavg()
server_info['live_status'] = {
'uptime_days': uptime_days,
'uptime_hours': uptime_hours,
'uptime_minutes': uptime_minutes,
'uptime_seconds': round(uptime_seconds, 2),
'load_average_1min': round(load_avg[0], 2),
'load_average_5min': round(load_avg[1], 2),
'load_average_15min': round(load_avg[2], 2)
}
except Exception as e:
logger.warning(f"Error getting live status: {e}")
# Get top processes by CPU usage
try:
# Collect CPU usage for all processes, then sort to get actual top processes
# This ensures we find the highest CPU-consuming processes regardless of process order
processes = []
for proc in psutil.process_iter(['pid', 'name', 'memory_percent', 'username']):
try:
# Get CPU percent - first call may return 0.0, but that's acceptable
# The client runs continuously so subsequent calls will have accurate values
cpu_pct = proc.cpu_percent(interval=None)
if cpu_pct is None:
cpu_pct = 0.0
proc_info = proc.info
proc_info['cpu_percent'] = round(cpu_pct, 2)
processes.append(proc_info)
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
continue
# Sort by CPU usage and take top 10
processes.sort(key=lambda x: x.get('cpu_percent', 0), reverse=True)
server_info['top_processes'] = processes[:10]
except Exception as e:
logger.warning(f"Error getting top processes: {e}")
# Get running containers (Docker) - check less frequently
current_time = time.time()
if current_time - self.last_docker_check >= self.docker_check_interval:
self.last_docker_check = current_time
try:
# First check if docker is available (cached)
if self.cached_docker_available is None:
# Check if docker command exists
docker_check = subprocess.run(
['which', 'docker'],
capture_output=True,
timeout=1
)
self.cached_docker_available = docker_check.returncode == 0
if self.cached_docker_available:
result = subprocess.run(
['docker', 'ps', '--format', 'json'],
capture_output=True,
text=True,
timeout=5
)
if result.returncode == 0:
containers = []
for line in result.stdout.strip().split('\n'):
if line.strip():
try:
container = json.loads(line)
containers.append({
'id': container.get('ID', '')[:12],
'image': container.get('Image', ''),
'command': container.get('Command', ''),
'created': container.get('CreatedAt', ''),
'status': container.get('Status', ''),
'ports': container.get('Ports', ''),
'names': container.get('Names', '')
})
except json.JSONDecodeError:
continue
self.cached_containers = containers if containers else None
else:
self.cached_containers = None
else:
self.cached_containers = None
except (subprocess.TimeoutExpired, FileNotFoundError, Exception) as e:
# Docker might not be installed or not running
self.cached_docker_available = False
self.cached_containers = None
# Use cached containers value
server_info['containers'] = self.cached_containers
# Get IP info (private and public)
try:
ip_info = {
'private': [],
'public': None
}
# Get private IPs from network interfaces
for interface, addrs in psutil.net_if_addrs().items():
for addr in addrs:
if addr.family == socket.AF_INET: # IPv4
ip_addr = addr.address
# Skip loopback and link-local
if not ip_addr.startswith('127.') and not ip_addr.startswith('169.254.'):
ip_info['private'].append({
'interface': interface,
'ip': ip_addr,
'netmask': addr.netmask
})
# Try to get public IP (cached since it rarely changes)
current_time = time.time()
if self.cached_public_ip is None or (current_time - self.cached_public_ip_time) >= self.public_ip_cache_duration:
try:
# Try multiple services for reliability
public_ip_services = [
'https://api.ipify.org?format=json',
'https://ifconfig.me/ip',
'https://icanhazip.com'
]
for service_url in public_ip_services:
try:
response = requests.get(service_url, timeout=3)
if response.status_code == 200:
# Handle different response formats
if 'ipify' in service_url:
public_ip = response.json().get('ip', '').strip()
else:
public_ip = response.text.strip()
if public_ip:
self.cached_public_ip = public_ip
self.cached_public_ip_time = current_time
ip_info['public'] = public_ip
break
except:
continue
except Exception as e:
logger.debug(f"Could not fetch public IP: {e}")
else:
# Use cached public IP
ip_info['public'] = self.cached_public_ip
server_info['ip_info'] = ip_info if ip_info['private'] or ip_info['public'] else None
except Exception as e:
logger.warning(f"Error getting IP info: {e}")
return server_info
def collect_all_metrics(self):
"""Collect all system metrics"""
server_info = self.get_server_info()
# Extract load average and uptime from server_info for metrics payload
load_avg = None
uptime_seconds = None
if server_info.get('live_status'):
live_status = server_info['live_status']
load_avg = {
'1min': live_status.get('load_average_1min'),
'5min': live_status.get('load_average_5min'),
'15min': live_status.get('load_average_15min')
}
uptime_seconds = live_status.get('uptime_seconds')
return {
'hostname': self.hostname,
'timestamp': datetime.now(timezone.utc).isoformat(),
'cpu': self.get_cpu_metrics(),
'memory': self.get_memory_metrics(),
'swap': self.get_swap_metrics(),
'disk': self.get_disk_metrics(),
'network': self.get_network_metrics(),
'process_count': self.get_process_count(),
'load_avg': load_avg,
'uptime_seconds': uptime_seconds,
'server_info': server_info,
'client_version': CLIENT_VERSION
}
class OculogClient:
"""Main client class"""
def __init__(self, config):
self.config = config
self.server_url = config.get('server_url', 'http://localhost:3001')
self.server_id = config.get('server_id', socket.gethostname())
self.api_key = config.get('api_key', '')
self.interval = config.get('interval', 30) # seconds
self.collector = MetricsCollector()
# Vulnerability scanning is enabled by default, but can be disabled via config
vulnerability_scan_enabled = config.get('vulnerability_scan_enabled', True)
self.vulnerability_scanner = VulnerabilityScanner(enabled=vulnerability_scan_enabled)
self.running = False
self.last_update_check = 0
self.update_check_interval = 3600 # Check for updates every hour
def send_metrics(self, metrics):
"""Send metrics to server"""
try:
url = f"{self.server_url}/api/servers/{self.server_id}/metrics"
headers = {
'Content-Type': 'application/json',
'X-API-Key': self.api_key
}
if not self.api_key:
logger.warning("No API key configured. Metrics may be rejected by server.")
response = requests.post(
url,
json=metrics,
timeout=10,
headers=headers
)
response.raise_for_status()
logger.info(f"Metrics sent successfully: {response.json()}")
return True
except requests.exceptions.RequestException as e:
logger.error(f"Failed to send metrics: {e}")
return False
def send_vulnerability_scan(self, scan_data):
"""Send vulnerability scan results to server in batches if needed"""
try:
url = f"{self.server_url}/api/servers/{self.server_id}/vulnerabilities/scan"
headers = {
'Content-Type': 'application/json',
'X-API-Key': self.api_key
}
if not self.api_key:
logger.warning("No API key configured. Vulnerability scan may be rejected by server.")
return False
# Extract packages and os_version
if isinstance(scan_data, dict) and 'packages' in scan_data:
packages = scan_data['packages']
os_version = scan_data.get('os_version')
else:
# Legacy format: just a list of packages
packages = scan_data
os_version = None
# Batch packages to avoid payload size limits (send 500 packages per batch)
batch_size = 500
total_packages = len(packages)
total_vulnerabilities = 0
success_count = 0
logger.info(f"Sending vulnerability scan for {total_packages} packages in batches of {batch_size}")
for i in range(0, total_packages, batch_size):
batch = packages[i:i + batch_size]
batch_num = (i // batch_size) + 1
total_batches = (total_packages + batch_size - 1) // batch_size
payload = {
'packages': batch,
'os_version': os_version,
'batch_info': {
'batch_number': batch_num,
'total_batches': total_batches,
'is_last_batch': (i + batch_size >= total_packages)
}
}
try:
response = requests.post(
url,
json=payload,
timeout=120, # Longer timeout for vulnerability scans
headers=headers
)
response.raise_for_status()
result = response.json()
batch_vulns = result.get('vulnerabilities_found', 0)
total_vulnerabilities += batch_vulns
success_count += 1
logger.info(f"Batch {batch_num}/{total_batches} sent successfully: {len(batch)} packages, {batch_vulns} vulnerabilities found")
except requests.exceptions.RequestException as e:
logger.error(f"Failed to send vulnerability scan batch {batch_num}/{total_batches}: {e}")
# Continue with other batches even if one fails
continue
if success_count > 0:
logger.info(f"Vulnerability scan completed: {success_count}/{total_batches} batches successful, {total_vulnerabilities} total vulnerabilities found")
return True
else:
logger.error("All vulnerability scan batches failed")
return False
except Exception as e:
logger.error(f"Failed to send vulnerability scan: {e}")
return False
def check_for_updates(self):
"""Check if a newer client version is available"""
try:
url = f"{self.server_url}/api/client-version/latest"
headers = {
'X-API-Key': self.api_key
}
logger.debug(f"Checking for updates: current version={CLIENT_VERSION}, server={url}")
response = requests.get(url, timeout=5, headers=headers)
response.raise_for_status()
data = response.json()
latest_version = data.get('version')
if not latest_version:
logger.warning(f"Update check returned no version data from server")
return False
logger.debug(f"Server reports latest version: {latest_version}")
# Compare versions (format: YYYY-MM-DD-HH-MM)
# Simple string comparison works for this format
if CLIENT_VERSION < latest_version:
logger.info(f"Update available: current={CLIENT_VERSION}, latest={latest_version}")
return True
else:
logger.debug(f"No update needed: current={CLIENT_VERSION}, latest={latest_version}")
return False
except requests.exceptions.RequestException as e:
logger.warning(f"Could not check for updates (network error): {e}")
return False
except Exception as e:
logger.warning(f"Could not check for updates (unexpected error): {e}")
return False
def perform_auto_update(self):
"""Perform automatic update of the client"""
try:
logger.info("Starting automatic client update...")
# Download updated client script
script_url = f"{self.server_url}/api/client-script"
temp_script = '/opt/oculog/client.py.new'
response = requests.get(script_url, timeout=30)
response.raise_for_status()
# Backup current script
if os.path.exists(CLIENT_SCRIPT_PATH):
backup_path = f"{CLIENT_SCRIPT_PATH}.backup"
subprocess.run(['cp', CLIENT_SCRIPT_PATH, backup_path], check=False)
# Write new script
with open(temp_script, 'wb') as f:
f.write(response.content)
# Make executable
os.chmod(temp_script, 0o755)
# Replace old script atomically
subprocess.run(['mv', temp_script, CLIENT_SCRIPT_PATH], check=True)
logger.info("Client script updated successfully")
# Trigger systemd restart by exiting (systemd will restart due to Restart=always)
# We use exit code 0 to indicate successful update
logger.info("Exiting to allow systemd to restart with new version")
return True
except Exception as e:
logger.error(f"Auto-update failed: {e}")
# Try to restore backup if update failed
backup_path = f"{CLIENT_SCRIPT_PATH}.backup"
if os.path.exists(backup_path):
try:
subprocess.run(['cp', backup_path, CLIENT_SCRIPT_PATH], check=False)
logger.info("Restored backup after failed update")
except:
pass
return False
def run(self):
"""Main loop"""
self.running = True
logger.info(f"Starting Oculog client (server_id: {self.server_id}, version: {CLIENT_VERSION}, interval: {self.interval}s)")
# Initial network stats collection (needed for first calculation)
self.collector.get_network_metrics()
time.sleep(1)
while self.running:
try:
# Check for updates periodically
current_time = time.time()
if current_time - self.last_update_check >= self.update_check_interval:
logger.info(f"Checking for client updates (current version: {CLIENT_VERSION})...")
self.last_update_check = current_time
try:
if self.check_for_updates():
logger.info("Newer version detected, performing auto-update...")
if self.perform_auto_update():
logger.info("Auto-update completed, exiting to allow restart...")
# Exit gracefully - systemd will restart with Restart=always
self.running = False
sys.exit(0)
else:
logger.warning("Auto-update failed, continuing with current version")
except Exception as e:
logger.error(f"Error during update check: {e}")
# Continue running even if update check fails
# Check for vulnerability scan (hourly)
if self.vulnerability_scanner.should_scan():
try:
scan_data = self.vulnerability_scanner.scan()
if scan_data:
package_count = len(scan_data['packages']) if isinstance(scan_data, dict) else len(scan_data)
logger.info(f"Sending vulnerability scan for {package_count} packages...")
if self.send_vulnerability_scan(scan_data):
logger.info("Vulnerability scan completed successfully")
else:
logger.warning("Vulnerability scan failed to send, will retry on next cycle")
except Exception as e:
logger.error(f"Error during vulnerability scan: {e}")
# Continue running even if vulnerability scan fails
metrics = self.collector.collect_all_metrics()
logger.debug(f"Collected metrics: {json.dumps(metrics, indent=2)}")
if self.send_metrics(metrics):
logger.info("Metrics collection cycle completed successfully")
else:
logger.warning("Failed to send metrics, will retry on next cycle")
time.sleep(self.interval)
except KeyboardInterrupt:
logger.info("Received interrupt signal, shutting down...")
self.running = False
except Exception as e:
logger.error(f"Unexpected error in main loop: {e}")
time.sleep(self.interval)
def stop(self):
"""Stop the client"""
self.running = False
def load_config():
"""Load configuration from file"""
default_config = {
'server_url': 'http://localhost:3001',
'server_id': socket.gethostname(),
'interval': 30
}
if os.path.exists(CONFIG_FILE):
try:
with open(CONFIG_FILE, 'r') as f:
config = json.load(f)
default_config.update(config)
logger.info(f"Loaded configuration from {CONFIG_FILE}")
except Exception as e:
logger.warning(f"Failed to load config file, using defaults: {e}")
else:
logger.info(f"Config file not found at {CONFIG_FILE}, using defaults")
return default_config
def write_pid_file():
"""Write PID file"""
try:
pid_dir = os.path.dirname(PID_FILE)
os.makedirs(pid_dir, exist_ok=True)
with open(PID_FILE, 'w') as f:
f.write(str(os.getpid()))
except Exception as e:
logger.error(f"Failed to write PID file: {e}")
def remove_pid_file():
"""Remove PID file"""
try:
if os.path.exists(PID_FILE):
os.remove(PID_FILE)
except Exception as e:
logger.error(f"Failed to remove PID file: {e}")
def main():
"""Main entry point"""
if len(sys.argv) > 1:
if sys.argv[1] == '--version':
print(f"Oculog Client version {CLIENT_VERSION}")
sys.exit(0)
elif sys.argv[1] == '--help':
print("Usage: oculog-client [--version|--help]")
print("\nOculog Client Agent")
print("Collects system metrics and sends them to the server")
sys.exit(0)
# Ensure log directory exists
log_dir = os.path.dirname(LOG_FILE)
os.makedirs(log_dir, exist_ok=True)
config = load_config()
client = OculogClient(config)
try:
write_pid_file()
client.run()
finally:
remove_pid_file()
if __name__ == '__main__':
main()