You don’t need a monitoring stack that requires its own monitoring. If you’re running a handful of servers, Prometheus and Grafana are often more work than the problems they’re meant to solve.

If you can ssh user@server, you already have everything you need.

In this tutorial, we’ll build a lightweight, no-agent server monitoring tool using Python and SSH, designed for indie hackers, solo founders, and small teams. By the end, you’ll have a single script that checks CPU, memory, disk usage, and critical services across multiple servers, renders a clean terminal dashboard, and sends alerts when something actually breaks.

What we’ll build:

  • Monitor multiple servers over SSH (no agents, no daemons)
  • A real-time terminal dashboard with colors and status indicators
  • Alerts via ntfy.sh
  • Remote server updates with push notifications
  • One Python script you can read, trust, and extend

Why Not Just Use Prometheus?

Before we dive in, let’s get this out of the way: Prometheus and Grafana are excellent tools. At scale, they’re hard to beat, and they’re industry standard for a reason.

But if you’re running a small setup, say 1–20 servers, a few side projects, or a lean SaaS, they often introduce more complexity than actual value.

What starts as “just add monitoring” quickly turns into:

  • Heavy infrastructure: Prometheus, Grafana, node exporters, persistent storage, backups
  • Ongoing maintenance: Upgrades, config drift, broken dashboards at the worst time
  • A steep learning curve: PromQL, recording rules, and yet another YAML ecosystem

All of that just to answer a few basic questions:

Is the server up?
Is disk space running out?
Did nginx or Docker crash?

Meanwhile, SSH is already (probably) open on your servers, and Python is already installed (or trivial to install). You don’t need a full metrics pipeline to check system health, you just need a reliable way to run commands and interpret the results.

If you can ssh user@server, you can monitor it.

That’s the premise of this article. We’ll keep the stack boring, the moving parts minimal, and the script small enough that you actually understand, and trust, it.


Project Setup

Let's get you started by creating a new project directory and set up the environment:

mkdir server-monitor
cd server-monitor

Then, create a requirements.txt file:

paramiko>=4.0.0
rich>=14.3.2
pyyaml>=6.0.3
requests>=2.32.5

And install the dependencies:

pip install -r requirements.txt

What each library does:

  • paramiko -> SSH connections and command execution
  • rich -> Beautiful terminal output with tables and colors
  • pyyaml -> Parse our server configuration file
  • requests -> Send alerts to webhooks (Slack)

Now create a servers.yml file to define which servers to monitor:

servers:
  - name: prd-dokku
    host: XX.XX.XX.XX
    user: root
    key_file: id_rsa
    services:
      - docker

  - name: prd-ghost
    host: XX.XX.XX.XX
    user: root
    key_file: id_rsa
    services:
      - docker

  - name: prd-n8n
    host: XX.XX.XX.XX
    user: root
    key_file: id_rsa
    services:
      - docker

thresholds:
  cpu_load_factor: 2.0 # Alert if load > cores × 2
  memory_percent: 85
  disk_percent: 25

Here you should replace the IP addresses and usernames with your actual servers.

Make sure you have SSH key authentication set up for each server. If your key has a passphrase, use the SSH_KEY_PASSPHRASE environment variable or add key_passphrase to a server entry; the script will also prompt interactively if needed.


SSH Connection Manager

Let's start building the monitor. Create a file called monitor.py and you'll build it piece by piece.

First, you'll need a function to connect to servers and run commands over SSH:

"""
Simple server monitoring over SSH
"""

import os
import sys
import threading
import getpass
import paramiko
import yaml
from typing import Dict, List, Optional
from dataclasses import dataclass, field


@dataclass
class ServerMetrics:
    """Container for server metrics"""
    name: str
    reachable: bool = True
    cpu_load: Optional[float] = None
    memory_percent: Optional[float] = None
    disk_percent: Optional[float] = None
    services: Dict[str, str] = field(default_factory=dict)
    error: Optional[str] = None


class SSHClient:
    """Manages SSH connections to servers"""

    def __init__(
        self,
        host: str,
        user: str,
        key_file: Optional[str] = None,
        key_passphrase: Optional[str] = None,
    ):
        self.host = host
        self.user = user
        self.key_file = key_file
        self.key_passphrase = key_passphrase
        self.client = None

    def _load_pkey(self, passphrase: Optional[str] = None) -> Optional[paramiko.PKey]:
        """Load private key, trying RSA then Ed25519. Returns None to fall back to key_filename."""
        if not self.key_file:
            return None
        password = passphrase or self.key_passphrase
        # Try RSA key first (PEM format)
        try:
            return paramiko.RSAKey.from_private_key_file(
                self.key_file, password=password
            )
        except paramiko.ssh_exception.PasswordRequiredException:
            if password is None:
                password = getpass.getpass(f"SSH key passphrase for {self.key_file}: ")
            return paramiko.RSAKey.from_private_key_file(
                self.key_file, password=password
            )
        except (paramiko.ssh_exception.SSHException, OSError):
            pass
        # Try Ed25519
        try:
            return paramiko.Ed25519Key.from_private_key_file(
                self.key_file, password=password
            )
        except paramiko.ssh_exception.PasswordRequiredException:
            if password is None:
                password = getpass.getpass(f"SSH key passphrase for {self.key_file}: ")
            return paramiko.Ed25519Key.from_private_key_file(
                self.key_file, password=password
            )
        except (paramiko.ssh_exception.SSHException, OSError):
            pass
        return None

    def connect(self) -> bool:
        """Establish SSH connection"""
        try:
            self.client = paramiko.SSHClient()
            self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())

            connect_kwargs = {
                'hostname': self.host,
                'username': self.user,
                'timeout': 10,
                'look_for_keys': True,
                'allow_agent': True,
            }

            # Load key explicitly (RSA / Ed25519) with optional passphrase
            if self.key_file:
                pkey = self._load_pkey()
                if pkey is not None:
                    connect_kwargs['pkey'] = pkey
                else:
                    connect_kwargs['key_filename'] = self.key_file
                    if self.key_passphrase:
                        connect_kwargs['passphrase'] = self.key_passphrase

            self.client.connect(**connect_kwargs)
            return True

        except Exception as e:
            print(f"❌ Failed to connect to {self.host}: {e}")
            return False

    def run_command(
        self, command: str, stream_output: bool = False
    ) -> tuple[str, str, int]:
        """
        Run a command over SSH.
        Returns: (stdout, stderr, exit_code).
        If stream_output=True, print stdout/stderr as it arrives (for long-running commands).
        """
        if not self.client:
            return "", "Not connected", 1

        try:
            stdin, stdout, stderr = self.client.exec_command(command)
            channel = stdout.channel

            if stream_output:
                out_lines = []
                err_lines = []

                def read_stdout():
                    for line in iter(stdout.readline, ""):
                        decoded = line
                        out_lines.append(decoded)
                        print(decoded, end="")

                def read_stderr():
                    for line in iter(stderr.readline, ""):
                        decoded = line
                        err_lines.append(decoded)
                        print(decoded, end="", file=sys.stderr)

                t1 = threading.Thread(target=read_stdout)
                t2 = threading.Thread(target=read_stderr)
                t1.daemon = True
                t2.daemon = True
                t1.start()
                t2.start()
                exit_code = channel.recv_exit_status()
                t1.join(timeout=1)
                t2.join(timeout=1)
                return ("".join(out_lines).strip(), "".join(err_lines).strip(), exit_code)

            exit_code = channel.recv_exit_status()
            return (
                stdout.read().decode("utf-8", errors="replace").strip(),
                stderr.read().decode("utf-8", errors="replace").strip(),
                exit_code,
            )
        except Exception as e:
            return "", str(e), 1

    def close(self):
        """Close SSH connection"""
        if self.client:
            self.client.close()


def load_config(config_file: str = 'servers.yml') -> dict:
    """Load server configuration from YAML"""
    with open(config_file, 'r') as f:
        return yaml.safe_load(f)

This gives you the foundation: a clean SSH client that can connect and run commands, plus a data structure to hold the metrics.

It defines:

  • A ServerMetrics dataclass to store health information per server (reachability, CPU, memory, disk, service status, and errors).
  • An SSHClient wrapper around Paramiko that:
    • Handles SSH connections using RSA or Ed25519 keys (with optional passphrases)
    • Falls back to SSH agents or default keys when possible
    • Executes remote commands and captures output, errors, and exit codes (with optional streaming for long-running commands like apt upgrade)
    • Manages connection lifecycle cleanly
  • A load_config helper to read server definitions from a YAML file.

For more Python fundamentals that help when reading or extending this script, for instance monitoring at the file-system level (e.g. watching config or log files), see Mastering File System Monitoring with Watchdog in Python.

Note on SSH key passphrase: If your private key has a passphrase you can (1) set the SSH_KEY_PASSPHRASE environment variable (recommended for a cron job), (2) add key_passphrase to a server entry in servers.yml, or (3) let the script prompt you interactively when needed.


Collecting Metrics

Now you'll add functions to gather actual metrics from each server. Add these functions to monitor.py:

def get_cpu_load(ssh: SSHClient) -> Optional[float]:
    """Get 1-minute load average from uptime command"""
    stdout, stderr, code = ssh.run_command('uptime')
    if code != 0:
        return None

    # Parse: "... load average: 0.52, 0.58, 0.59"
    try:
        load_str = stdout.split('load average:')[1].split(',')[0].strip()
        return float(load_str)
    except (IndexError, ValueError):
        return None


def get_memory_usage(ssh: SSHClient) -> Optional[float]:
    """Get memory usage percentage from free command"""
    stdout, stderr, code = ssh.run_command('free -m')
    if code != 0:
        return None

    # Parse output to get used/total
    try:
        lines = stdout.split('\n')
        mem_line = [l for l in lines if l.startswith('Mem:')][0]
        parts = mem_line.split()
        total = float(parts[1])
        used = float(parts[2])
        return (used / total) * 100
    except (IndexError, ValueError):
        return None


def get_disk_usage(ssh: SSHClient) -> Optional[float]:
    """Get root disk usage percentage from df command"""
    stdout, stderr, code = ssh.run_command('df -h /')
    if code != 0:
        return None

    # Parse: "Filesystem Size Used Avail Use% Mounted"
    try:
        lines = stdout.split('\n')
        data_line = lines[1]
        use_percent = data_line.split()[4].rstrip('%')
        return float(use_percent)
    except (IndexError, ValueError):
        return None


def get_service_status(ssh: SSHClient, service_name: str) -> str:
    """Check if a systemd service is active"""
    stdout, stderr, code = ssh.run_command(f'systemctl is-active {service_name}')

    if stdout == 'active':
        return 'active'
    elif code != 0:
        # Could be 'inactive', 'failed', or service doesn't exist
        return stdout if stdout else 'unknown'
    return 'unknown'


def collect_server_metrics(server_config: dict, thresholds: dict) -> ServerMetrics:
    """
    Connect to a server and collect all metrics
    """
    name = server_config['name']
    host = server_config['host']
    user = server_config['user']
    services = server_config.get('services', [])
    key_file = server_config.get('key_file')
    key_passphrase = server_config.get('key_passphrase') or os.environ.get('SSH_KEY_PASSPHRASE')

    metrics = ServerMetrics(name=name)
    ssh = SSHClient(host, user, key_file, key_passphrase)

    # Try to connect
    if not ssh.connect():
        metrics.reachable = False
        metrics.error = f"Could not connect to {host}"
        return metrics

    try:
        # Gather all metrics
        metrics.cpu_load = get_cpu_load(ssh)
        metrics.memory_percent = get_memory_usage(ssh)
        metrics.disk_percent = get_disk_usage(ssh)

        # Check services
        for service in services:
            metrics.services[service] = get_service_status(ssh, service)

    except Exception as e:
        metrics.error = str(e)

    finally:
        ssh.close()

    return metrics

This code collects basic server health metrics over SSH using standard Linux commands.

It:

  • Retrieves CPU load, memory usage, and disk usage by running uptime, free, and df remotely and parsing their output
  • Checks the status of systemd services via systemctl is-active
  • Wraps all results in a ServerMetrics object
  • Handles connection failures and parsing errors gracefully
  • Uses SSH keys (with optional passphrases) for secure, agentless access

If something fails, it returns None or a status string rather than crashing, which is important when monitoring multiple servers.


Terminal Dashboard with Rich

Now for the fun part, displaying our metrics in a beautiful terminal table. Add this to monitor.py:

from rich.console import Console
from rich.table import Table
from rich import box


def create_status_icon(value: Optional[float], threshold: float,
                       reverse: bool = False) -> str:
    """
    Create a colored status icon based on value vs threshold
    reverse=True means lower is better (e.g., disk usage)
    """
    if value is None:
        return "❓"

    if reverse:
        # Lower is better (disk, memory)
        if value < threshold * 0.7:
            return "✅"
        elif value < threshold:
            return "⚠️"
        else:
            return "❌"
    else:
        # Higher is better
        if value > threshold:
            return "❌"
        elif value > threshold * 0.7:
            return "⚠️"
        else:
            return "✅"


def format_metric_value(value: Optional[float], suffix: str = "",
                       decimals: int = 1) -> str:
    """Format a metric value for display"""
    if value is None:
        return "—"
    return f"{value:.{decimals}f}{suffix}"


def display_dashboard(metrics_list: List[ServerMetrics], thresholds: dict):
    """Display monitoring dashboard using Rich tables"""
    console = Console()

    # Create table
    table = Table(title="🖥️  Server Monitoring Dashboard",
                  box=box.ROUNDED,
                  show_header=True,
                  header_style="bold cyan")

    # Add columns
    table.add_column("Server", style="bold white", no_wrap=True)
    table.add_column("Status", justify="center")
    table.add_column("CPU Load", justify="right")
    table.add_column("Memory", justify="right")
    table.add_column("Disk", justify="right")
    table.add_column("Services", justify="left")

    # Add rows
    for metrics in metrics_list:
        # Overall status
        if not metrics.reachable:
            status = "🔴 Down"
        elif metrics.error:
            status = "⚠️ Error"
        else:
            status = "🟢 Up"

        # CPU with status icon
        cpu_display = format_metric_value(metrics.cpu_load)
        cpu_icon = create_status_icon(
            metrics.cpu_load,
            thresholds.get('cpu_load_factor', 2.0),
            reverse=False
        )
        cpu_cell = f"{cpu_icon} {cpu_display}"

        # Memory with status icon and percentage
        mem_display = format_metric_value(metrics.memory_percent, "%")
        mem_icon = create_status_icon(
            metrics.memory_percent,
            thresholds.get('memory_percent', 85),
            reverse=True
        )
        mem_cell = f"{mem_icon} {mem_display}"

        # Disk with status icon and percentage
        disk_display = format_metric_value(metrics.disk_percent, "%")
        disk_icon = create_status_icon(
            metrics.disk_percent,
            thresholds.get('disk_percent', 80),
            reverse=True
        )
        disk_cell = f"{disk_icon} {disk_display}"

        # Services status
        if metrics.services:
            service_items = []
            for svc, status_val in metrics.services.items():
                icon = "✅" if status_val == "active" else "❌"
                service_items.append(f"{icon} {svc}")
            services_cell = "\n".join(service_items)
        else:
            services_cell = "—"

        # Add row to table
        table.add_row(
            metrics.name,
            status,
            cpu_cell,
            mem_cell,
            disk_cell,
            services_cell
        )

    # Display
    console.print()
    console.print(table)
    console.print()

The rich library makes this surprisingly easy.

It:

  • Converts raw metric values into clear status icons (✅ ⚠️ ❌) based on configurable thresholds
  • Formats CPU, memory, and disk metrics for human-friendly display
  • Builds a styled Rich table showing per-server health, resource usage, and service status

Alert System

Monitoring without alerts is just anxiety. Let's add a simple alert system that can notify you via ntfy.sh. Add these functions: