From 112d777ec0570bc0e516ac98fcf43cb129596f1e Mon Sep 17 00:00:00 2001 From: hanish520 Date: Wed, 10 Jun 2026 23:50:06 -0700 Subject: [PATCH] feat: add netemu package for per-flow network emulation Adds the netemu package which uses Linux tc (HTB + netem) and iptables to enforce per-flow bandwidth and latency between replicas, configured via a declarative YAML file. Includes a runnable example targeting the bbchain cluster with LAN/WAN group profiles. Co-Authored-By: Claude Sonnet 4.6 --- go.mod | 1 + go.sum | 2 + netemu/config.go | 225 ++++++++++++++++++++ netemu/emulate.go | 122 +++++++++++ netemu/example/check_modules.sh | 44 ++++ netemu/example/install_modules.sh | 65 ++++++ netemu/example/main.go | 71 +++++++ netemu/example/network.yaml | 55 +++++ netemu/example/setup_ssh.sh | 26 +++ netemu/example/verify_ssh.sh | 35 ++++ netemu/marks.go | 33 +++ netemu/netemu_test.go | 327 ++++++++++++++++++++++++++++++ netemu/rules.go | 137 +++++++++++++ 13 files changed, 1143 insertions(+) create mode 100644 netemu/config.go create mode 100644 netemu/emulate.go create mode 100755 netemu/example/check_modules.sh create mode 100644 netemu/example/install_modules.sh create mode 100644 netemu/example/main.go create mode 100644 netemu/example/network.yaml create mode 100644 netemu/example/setup_ssh.sh create mode 100644 netemu/example/verify_ssh.sh create mode 100644 netemu/marks.go create mode 100644 netemu/netemu_test.go create mode 100644 netemu/rules.go diff --git a/go.mod b/go.mod index 51a3df5..f991edd 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/relab/container v0.0.0-20260109140004-4adfae874bb5 github.com/relab/wrfs v0.0.0-20220416082020-a641cd350078 golang.org/x/crypto v0.50.0 + gopkg.in/yaml.v3 v3.0.1 ) require ( diff --git a/go.sum b/go.sum index fcdfb4f..615dd37 100644 --- a/go.sum +++ b/go.sum @@ -20,5 +20,7 @@ golang.org/x/sys v0.43.0 h1:Rlag2XtaFTxp19wS8MXlJwTvoh8ArU6ezoyFsMyCTNI= golang.org/x/sys v0.43.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= golang.org/x/term v0.42.0 h1:UiKe+zDFmJobeJ5ggPwOshJIVt6/Ft0rcfrXZDLWAWY= golang.org/x/term v0.42.0/go.mod h1:Dq/D+snpsbazcBG5+F9Q1n2rXV8Ma+71xEjTRufARgY= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/netemu/config.go b/netemu/config.go new file mode 100644 index 0000000..f58e23f --- /dev/null +++ b/netemu/config.go @@ -0,0 +1,225 @@ +package netemu + +import ( + "fmt" + "os" + + "gopkg.in/yaml.v3" +) + +// Config describes the bandwidth emulation to apply across a set of replicas. +type Config struct { + // Interface is the NIC to apply egress shaping rules on (e.g. "eth0"). + Interface string `yaml:"interface"` + // IFB is the name of the Intermediate Functional Block device used for + // ingress shaping. It is created by SetupEmulation when ShapeIngress is true. + IFB string `yaml:"ifb"` + + // Profiles are named bandwidth templates that can be referenced by + // LinkProfiles or Links. + Profiles map[string]Profile `yaml:"profiles"` + + // DefaultProfile names the Profile to apply to any replica pair not + // covered by LinkProfiles or Links. Leave empty to leave such pairs + // unconstrained (default tc catch-all class). + DefaultProfile string `yaml:"default_profile"` + + Replicas []Replica `yaml:"replicas"` + LinkProfiles []GroupLinkProfile `yaml:"link_profiles"` + + // Links are per-pair overrides. They take precedence over LinkProfiles + // and DefaultProfile. Either set Profile to name a predefined profile, + // or set Rate/Delay/Loss directly (direct fields win over Profile). + Links []Link `yaml:"links"` +} + +// Profile is a reusable bandwidth template. +type Profile struct { + // Rate is passed directly to tc (e.g. "100mbit", "1gbit"). + Rate string `yaml:"rate"` + // Delay is passed directly to netem (e.g. "50ms"). + Delay string `yaml:"delay"` + // Loss is the packet loss percentage passed to netem (e.g. 0.1 for 0.1%). + Loss float64 `yaml:"loss"` +} + +// Replica is a single process endpoint identified by its host and listening port. +// Host may be an IP address or a hostname resolvable from the controller. +// IP, if set, is used instead of Host in iptables rules (required when the +// node running iptables cannot resolve Host via DNS). +// Multiple replicas may share the same host with distinct ports. +type Replica struct { + ID string `yaml:"id"` + Host string `yaml:"host"` + IP string `yaml:"ip,omitempty"` + Port int `yaml:"port"` + Group string `yaml:"group"` +} + +// dst returns the IP address to use in iptables --dst rules: the explicit IP +// field if set, otherwise Host. +func (r Replica) dst() string { + if r.IP != "" { + return r.IP + } + return r.Host +} + +// GroupLinkProfile assigns a named Profile to all (from, to) pairs whose +// replicas belong to the specified groups. +type GroupLinkProfile struct { + FromGroup string `yaml:"from_group"` + ToGroup string `yaml:"to_group"` + Profile string `yaml:"profile"` +} + +// Link is an explicit per-pair bandwidth override. It takes precedence over +// any matching GroupLinkProfile or DefaultProfile. +// Set Profile to reference a named profile, or set Rate/Delay/Loss directly. +// Direct fields win over Profile when both are present. +type Link struct { + From string `yaml:"from"` + To string `yaml:"to"` + Profile string `yaml:"profile"` + Rate string `yaml:"rate"` + Delay string `yaml:"delay"` + Loss float64 `yaml:"loss"` +} + +// LoadConfig reads and validates a YAML config file. +func LoadConfig(path string) (*Config, error) { + data, err := os.ReadFile(path) + if err != nil { + return nil, err + } + var cfg Config + if err := yaml.Unmarshal(data, &cfg); err != nil { + return nil, err + } + if err := cfg.validate(); err != nil { + return nil, err + } + return &cfg, nil +} + +// resolve returns the effective Profile for a given (fromID, toID) pair, +// following the precedence: explicit Link > GroupLinkProfile > DefaultProfile. +// Returns (profile, false) if no rule matches and DefaultProfile is unset. +func (c *Config) resolve(fromID, toID string) (Profile, bool) { + fromGroup := c.groupOf(fromID) + toGroup := c.groupOf(toID) + + // 1. Explicit per-pair link. + for _, l := range c.Links { + if l.From == fromID && l.To == toID { + return c.mergeLink(l), true + } + } + + // 2. Group-to-group profile. + for _, glp := range c.LinkProfiles { + if glp.FromGroup == fromGroup && glp.ToGroup == toGroup { + p, ok := c.Profiles[glp.Profile] + return p, ok + } + } + + // 3. Default profile. + if c.DefaultProfile != "" { + p, ok := c.Profiles[c.DefaultProfile] + return p, ok + } + + return Profile{}, false +} + +// mergeLink builds a Profile from a Link entry. Direct fields (Rate, Delay, +// Loss) take precedence over any named Profile the Link references. +func (c *Config) mergeLink(l Link) Profile { + base := Profile{} + if l.Profile != "" { + base = c.Profiles[l.Profile] + } + if l.Rate != "" { + base.Rate = l.Rate + } + if l.Delay != "" { + base.Delay = l.Delay + } + if l.Loss != 0 { + base.Loss = l.Loss + } + return base +} + +func (c *Config) groupOf(id string) string { + for _, r := range c.Replicas { + if r.ID == id { + return r.Group + } + } + return "" +} + +func (c *Config) validate() error { + if c.Interface == "" { + return fmt.Errorf("netemu: interface is required") + } + + ids := make(map[string]bool, len(c.Replicas)) + for _, r := range c.Replicas { + if r.ID == "" { + return fmt.Errorf("netemu: replica has empty id") + } + if ids[r.ID] { + return fmt.Errorf("netemu: duplicate replica id %q", r.ID) + } + if r.Host == "" { + return fmt.Errorf("netemu: replica %q has empty host", r.ID) + } + if r.Port <= 0 || r.Port > 65535 { + return fmt.Errorf("netemu: replica %q has invalid port %d", r.ID, r.Port) + } + ids[r.ID] = true + } + + for name, p := range c.Profiles { + if p.Rate == "" { + return fmt.Errorf("netemu: profile %q is missing rate", name) + } + if p.Delay == "" { + return fmt.Errorf("netemu: profile %q is missing delay", name) + } + } + + for _, glp := range c.LinkProfiles { + if _, ok := c.Profiles[glp.Profile]; !ok { + return fmt.Errorf("netemu: link_profile references unknown profile %q", glp.Profile) + } + } + + if c.DefaultProfile != "" { + if _, ok := c.Profiles[c.DefaultProfile]; !ok { + return fmt.Errorf("netemu: default_profile references unknown profile %q", c.DefaultProfile) + } + } + + for _, l := range c.Links { + if !ids[l.From] { + return fmt.Errorf("netemu: link references unknown replica %q", l.From) + } + if !ids[l.To] { + return fmt.Errorf("netemu: link references unknown replica %q", l.To) + } + if l.From == l.To { + return fmt.Errorf("netemu: link from replica to itself: %q", l.From) + } + if l.Profile != "" { + if _, ok := c.Profiles[l.Profile]; !ok { + return fmt.Errorf("netemu: link references unknown profile %q", l.Profile) + } + } + } + + return nil +} diff --git a/netemu/emulate.go b/netemu/emulate.go new file mode 100644 index 0000000..42bda05 --- /dev/null +++ b/netemu/emulate.go @@ -0,0 +1,122 @@ +// Package netemu provides Iago actions for per-flow bandwidth emulation using +// Linux tc (traffic control) and iptables. It shapes traffic between replicas +// identified by TCP port, supporting asymmetric bandwidth and latency via a +// declarative YAML configuration. +package netemu + +import ( + "bytes" + "context" + "fmt" + "io" + "strings" + + "github.com/relab/iago" +) + +// SetupEmulation is an Iago action that installs tc HTB qdiscs, netem leaf +// disciplines, and iptables MANGLE rules on a host to enforce the per-flow +// bandwidth and latency declared in Config. +// +// It requires CAP_NET_ADMIN on the target host (run as root or with sudo). +// Use TeardownEmulation to remove the rules when the experiment ends. +type SetupEmulation struct { + Config *Config + // ShapeIngress installs a matching rule set on an IFB device so that + // incoming traffic is also shaped. When false only egress is shaped. + ShapeIngress bool + // Sudo prepends "sudo" to every privileged command. Set to true when the + // SSH user is not root but has passwordless sudo. + Sudo bool +} + +// Apply implements the iago action interface. +func (s SetupEmulation) Apply(ctx context.Context, host iago.Host) error { + cmds := egressSetupCmds(s.Config) + if s.ShapeIngress { + if s.Config.IFB == "" { + return fmt.Errorf("netemu: ShapeIngress is true but config.ifb is empty") + } + cmds = append(cmds, ingressSetupCmds(s.Config)...) + } + if err := runScript(ctx, host, s.Sudo, true, cmds); err != nil { + return fmt.Errorf("netemu setup on %s: %w", host.Name(), err) + } + return nil +} + +// TeardownEmulation is an Iago action that removes the tc qdiscs and iptables +// rules installed by SetupEmulation. +// +// Teardown commands that fail (e.g. because rules were never installed) are +// silently ignored so that TeardownEmulation is safe to call as a deferred +// cleanup regardless of whether SetupEmulation succeeded. +type TeardownEmulation struct { + Config *Config + ShapeIngress bool + Sudo bool +} + +// Apply implements the iago action interface. +func (t TeardownEmulation) Apply(ctx context.Context, host iago.Host) error { + cmds := egressTeardownCmds(t.Config) + if t.ShapeIngress { + cmds = append(cmds, ingressTeardownCmds(t.Config)...) + } + // Intentionally ignore errors: del/flush on non-existent rules exits non-zero. + runScript(ctx, host, t.Sudo, false, cmds) //nolint:errcheck + return nil +} + +// runScript pipes cmds to a single remote sh session, reducing SSH round-trips +// from O(n²) to 1 for n replicas. When failFast is true the shell runs with +// set -e so any failing command aborts the script. +func runScript(ctx context.Context, host iago.Host, sudo, failFast bool, cmds []string) error { + runner, err := host.NewCommand() + if err != nil { + return err + } + stderrPipe, err := runner.StderrPipe() + if err != nil { + return err + } + stdin, err := runner.StdinPipe() + if err != nil { + return err + } + shell := "sh" + if failFast { + shell = "sh -e" + } + if sudo { + shell = "sudo " + shell + } + if err := runner.Start(shell); err != nil { + return err + } + var stderrBuf bytes.Buffer + stderrDone := make(chan struct{}) + go func() { + io.Copy(&stderrBuf, stderrPipe) //nolint:errcheck + close(stderrDone) + }() + for _, cmd := range cmds { + if _, err := fmt.Fprintln(stdin, cmd); err != nil { + stdin.Close() + <-stderrDone + return err + } + } + if err := stdin.Close(); err != nil { + <-stderrDone + return err + } + <-stderrDone + if err := runner.Wait(); err != nil { + if stderrBuf.Len() > 0 { + return fmt.Errorf("%w: %s", err, strings.TrimSpace(stderrBuf.String())) + } + return err + } + return nil +} diff --git a/netemu/example/check_modules.sh b/netemu/example/check_modules.sh new file mode 100755 index 0000000..530976f --- /dev/null +++ b/netemu/example/check_modules.sh @@ -0,0 +1,44 @@ +#!/bin/sh +# Run on bbchain1 as hgogada. +# Checks that all kernel modules required by netemu are present on bbchain2..bbchain30. +set -eu + +USER="hgogada" + +# xt_MARK is not a loadable module on iptables-nft systems; checked functionally below. +MODULES="sch_htb sch_netem cls_fw ifb act_mirred cls_u32" + +PASS=0 +FAIL=0 + +for i in $(seq 2 30); do + node="bbchain${i}" + missing="" + + for mod in $MODULES; do + if ! ssh -o BatchMode=yes -o ConnectTimeout=5 "${USER}@${node}" \ + "lsmod | grep -q '^${mod}' || modinfo ${mod} > /dev/null 2>&1"; then + missing="${missing} ${mod}" + fi + done + + # xt_MARK may be built-in: check the iptables target list instead + if ! ssh -o BatchMode=yes -o ConnectTimeout=5 "${USER}@${node}" \ + "grep -qw MARK /proc/net/ip_tables_targets 2>/dev/null || \ + sudo iptables -t mangle -A OUTPUT -j MARK --set-mark 1 2>/dev/null && \ + sudo iptables -t mangle -D OUTPUT -j MARK --set-mark 1 2>/dev/null"; then + missing="${missing} xt_MARK" + fi + + if [ -z "$missing" ]; then + echo "OK ${node}" + PASS=$((PASS + 1)) + else + echo "FAIL ${node}: missing:${missing}" + FAIL=$((FAIL + 1)) + fi +done + +echo "" +echo "results: ${PASS} ok, ${FAIL} failed out of $((PASS + FAIL)) nodes" +[ "$FAIL" -eq 0 ] diff --git a/netemu/example/install_modules.sh b/netemu/example/install_modules.sh new file mode 100644 index 0000000..914cc5d --- /dev/null +++ b/netemu/example/install_modules.sh @@ -0,0 +1,65 @@ +#!/bin/sh +# Run on bbchain1 as hgogada. +# Installs and loads missing netemu kernel modules on bbchain2..bbchain30. +set -eu + +USER="hgogada" + +# xt_MARK is not a loadable module on iptables-nft systems; MARK target works natively. +MODULES="sch_htb sch_netem cls_fw ifb act_mirred cls_u32" + +PASS=0 +FAIL=0 + +# If a node name is passed as argument, run on that node only; otherwise all nodes. +if [ $# -gt 0 ]; then + NODES="$1" +else + NODES=$(seq 2 30 | awk '{printf "bbchain%d ", $1}') +fi + +for node in $NODES; do + + result=$(ssh -o BatchMode=yes -o ConnectTimeout=5 "${USER}@${node}" " + set -e + KERNEL=\$(uname -r) + PKG=\"linux-modules-extra-\${KERNEL}\" + FAILED='' + + # ensure modules-extra package is installed (provides xt_MARK.ko) + if ! dpkg -l \"\$PKG\" 2>/dev/null | grep -q '^ii'; then + echo \" installing \$PKG...\" + sudo apt-get install -y \"\$PKG\" 2>&1 || { echo \"FAIL apt-get install \$PKG failed\"; exit 1; } + fi + + for mod in $MODULES; do + if lsmod | grep -q \"^\${mod}\"; then + continue + fi + if sudo modprobe \"\$mod\" 2>/dev/null; then + echo \" loaded: \$mod\" + else + FAILED=\"\$FAILED \$mod\" + fi + done + + if [ -n \"\$FAILED\" ]; then + echo \"FAIL still missing:\$FAILED\" + else + echo 'OK' + fi + " 2>&1) + + if echo "$result" | grep -q '^OK'; then + echo "OK ${node}" + PASS=$((PASS + 1)) + else + echo "FAIL ${node}:" + echo "$result" | sed 's/^/ /' + FAIL=$((FAIL + 1)) + fi +done + +echo "" +echo "results: ${PASS} ok, ${FAIL} failed out of $((PASS + FAIL)) nodes" +[ "$FAIL" -eq 0 ] diff --git a/netemu/example/main.go b/netemu/example/main.go new file mode 100644 index 0000000..3ba434f --- /dev/null +++ b/netemu/example/main.go @@ -0,0 +1,71 @@ +// Example program that sets up and tears down network emulation across a +// cluster of machines using the netemu package. +// +// Usage: +// +// go run ./netemu/example --config network.yaml [--teardown-only] +// +// Prerequisites: +// - Passwordless SSH access from this machine to all nodes +// - Passwordless sudo on all nodes +// - Node aliases defined in ~/.ssh/config +package main + +import ( + "flag" + "log" + + "github.com/relab/iago" + "github.com/relab/iago/netemu" +) + +var ( + configFile = flag.String("config", "network.yaml", "path to netemu YAML config") + teardownOnly = flag.Bool("teardown-only", false, "only run teardown, skip setup") +) + +// nodes lists the SSH aliases for all machines in the cluster. +// Each alias must be defined in ~/.ssh/config with the correct HostName. +var nodes = []string{ + "bbchain2", "bbchain3", "bbchain4", "bbchain5", "bbchain6", + "bbchain7", "bbchain8", "bbchain9", "bbchain10", "bbchain11", + "bbchain12", "bbchain13", "bbchain14", "bbchain15", "bbchain16", + "bbchain17", "bbchain18", "bbchain19", "bbchain20", "bbchain21", + "bbchain22", "bbchain23", "bbchain24", "bbchain25", "bbchain26", + "bbchain27", "bbchain28", "bbchain29", "bbchain30", +} + +func main() { + flag.Parse() + + cfg, err := netemu.LoadConfig(*configFile) + if err != nil { + log.Fatalf("load config: %v", err) + } + + group, err := iago.NewSSHGroup(nodes, "") + if err != nil { + log.Fatalf("ssh group: %v", err) + } + defer group.Close() + + setup := netemu.SetupEmulation{Config: cfg, Sudo: true} + teardown := netemu.TeardownEmulation{Config: cfg, Sudo: true} + + // always start from a clean slate + log.Println("tearing down any existing rules...") + group.Run("netemu-teardown", teardown.Apply) + + if *teardownOnly { + log.Println("teardown complete") + return + } + + log.Println("setting up network emulation...") + group.Run("netemu-setup", setup.Apply) + log.Println("setup complete") + + log.Println("tearing down rules...") + group.Run("netemu-teardown", teardown.Apply) + log.Println("teardown complete") +} diff --git a/netemu/example/network.yaml b/netemu/example/network.yaml new file mode 100644 index 0000000..f40bec9 --- /dev/null +++ b/netemu/example/network.yaml @@ -0,0 +1,55 @@ +# netemu test configuration +# Controller: bbchain1 +# Nodes: bbchain2..bbchain15 → group: lan (1gbit, 1ms) +# bbchain16..bbchain30 → group: wan (100mbit, 50ms) +# Cross-group (lan↔wan) links use the wan profile. + +interface: enp1s0f0np0 + +profiles: + lan: + rate: 1gbit + delay: 1ms + wan: + rate: 100mbit + delay: 50ms + loss: 0.1 + +link_profiles: + - {from_group: lan, to_group: lan, profile: lan} + - {from_group: wan, to_group: wan, profile: wan} + - {from_group: lan, to_group: wan, profile: wan} + - {from_group: wan, to_group: lan, profile: wan} + +replicas: + # LAN group (bbchain2–bbchain15) + - {id: "2", host: bbchain2, ip: 152.94.162.12, port: 7000, group: lan} + - {id: "3", host: bbchain3, ip: 152.94.162.13, port: 7000, group: lan} + - {id: "4", host: bbchain4, ip: 152.94.162.14, port: 7000, group: lan} + - {id: "5", host: bbchain5, ip: 152.94.162.15, port: 7000, group: lan} + - {id: "6", host: bbchain6, ip: 152.94.162.16, port: 7000, group: lan} + - {id: "7", host: bbchain7, ip: 152.94.162.17, port: 7000, group: lan} + - {id: "8", host: bbchain8, ip: 152.94.162.18, port: 7000, group: lan} + - {id: "9", host: bbchain9, ip: 152.94.162.19, port: 7000, group: lan} + - {id: "10", host: bbchain10, ip: 152.94.162.20, port: 7000, group: lan} + - {id: "11", host: bbchain11, ip: 152.94.162.21, port: 7000, group: lan} + - {id: "12", host: bbchain12, ip: 152.94.162.22, port: 7000, group: lan} + - {id: "13", host: bbchain13, ip: 152.94.162.23, port: 7000, group: lan} + - {id: "14", host: bbchain14, ip: 152.94.162.24, port: 7000, group: lan} + - {id: "15", host: bbchain15, ip: 152.94.162.25, port: 7000, group: lan} + # WAN group (bbchain16–bbchain30) + - {id: "16", host: bbchain16, ip: 152.94.162.26, port: 7000, group: wan} + - {id: "17", host: bbchain17, ip: 152.94.162.27, port: 7000, group: wan} + - {id: "18", host: bbchain18, ip: 152.94.162.28, port: 7000, group: wan} + - {id: "19", host: bbchain19, ip: 152.94.162.29, port: 7000, group: wan} + - {id: "20", host: bbchain20, ip: 152.94.162.30, port: 7000, group: wan} + - {id: "21", host: bbchain21, ip: 152.94.162.31, port: 7000, group: wan} + - {id: "22", host: bbchain22, ip: 152.94.162.32, port: 7000, group: wan} + - {id: "23", host: bbchain23, ip: 152.94.162.33, port: 7000, group: wan} + - {id: "24", host: bbchain24, ip: 152.94.162.34, port: 7000, group: wan} + - {id: "25", host: bbchain25, ip: 152.94.162.35, port: 7000, group: wan} + - {id: "26", host: bbchain26, ip: 152.94.162.36, port: 7000, group: wan} + - {id: "27", host: bbchain27, ip: 152.94.162.37, port: 7000, group: wan} + - {id: "28", host: bbchain28, ip: 152.94.162.38, port: 7000, group: wan} + - {id: "29", host: bbchain29, ip: 152.94.162.39, port: 7000, group: wan} + - {id: "30", host: bbchain30, ip: 152.94.162.40, port: 7000, group: wan} diff --git a/netemu/example/setup_ssh.sh b/netemu/example/setup_ssh.sh new file mode 100644 index 0000000..765b916 --- /dev/null +++ b/netemu/example/setup_ssh.sh @@ -0,0 +1,26 @@ +#!/bin/sh +# Run on bbchain1 as hgogada. +# SSHes to bbchain2..bbchain30 and grants hgogada passwordless sudo on each node. +set -eu + +USER="hgogada" + +printf "Enter sudo password for %s on remote nodes: " "$USER" +stty -echo +read -r SUDO_PASS +stty echo +echo + +for i in $(seq 2 30); do + node="bbchain${i}" + printf "%s... " "$node" + if ssh -o BatchMode=yes "${USER}@${node}" \ + "echo '${SUDO_PASS}' | sudo -S sh -c 'echo \"${USER} ALL=(ALL) NOPASSWD:ALL\" > /etc/sudoers.d/${USER} && chmod 440 /etc/sudoers.d/${USER}'" \ + 2>/dev/null; then + echo "done" + else + echo "FAILED" + fi +done + +echo "all nodes processed" diff --git a/netemu/example/verify_ssh.sh b/netemu/example/verify_ssh.sh new file mode 100644 index 0000000..0c3b506 --- /dev/null +++ b/netemu/example/verify_ssh.sh @@ -0,0 +1,35 @@ +#!/bin/sh +# Run on bbchain1 as hgogada. +# Verifies passwordless SSH and passwordless sudo on bbchain2..bbchain30. +set -eu + +USER="hgogada" +TOTAL=0 +PASS=0 +FAIL=0 + +for i in $(seq 2 30); do + node="bbchain${i}" + TOTAL=$((TOTAL + 1)) + + # test passwordless SSH + if ! ssh -o BatchMode=yes -o ConnectTimeout=5 "${USER}@${node}" true 2>/dev/null; then + echo "FAIL ${node}: SSH login failed" + FAIL=$((FAIL + 1)) + continue + fi + + # test passwordless sudo + if ! ssh -o BatchMode=yes "${USER}@${node}" "sudo -n true" 2>/dev/null; then + echo "FAIL ${node}: passwordless sudo not working" + FAIL=$((FAIL + 1)) + continue + fi + + echo "OK ${node}" + PASS=$((PASS + 1)) +done + +echo "" +echo "results: ${PASS} ok, ${FAIL} failed out of ${TOTAL} nodes" +[ "$FAIL" -eq 0 ] diff --git a/netemu/marks.go b/netemu/marks.go new file mode 100644 index 0000000..2198935 --- /dev/null +++ b/netemu/marks.go @@ -0,0 +1,33 @@ +package netemu + +// indexMap maps a replica ID to its 0-based index in Config.Replicas. +type indexMap map[string]int + +func buildIndexMap(replicas []Replica) indexMap { + m := make(indexMap, len(replicas)) + for i, r := range replicas { + m[r.ID] = i + } + return m +} + +// flowMark returns a unique non-zero 32-bit integer for the ordered pair +// (fromIdx, toIdx) within a set of n replicas. +// +// The formula is: fromIdx*n + toIdx + 1 +// +// Properties: +// - Unique for all ordered pairs where fromIdx != toIdx. +// - Minimum value is 1 (avoids mark 0, which the kernel treats as "unset"). +// - Maximum value is n^2 - n + n - 1 + 1 = n^2 (for n=100: 10000), well +// within iptables' 32-bit mark space and tc's 16-bit classid minor field. +func flowMark(fromIdx, toIdx, n int) int { + return fromIdx*n + toIdx + 1 +} + +// defaultClassMinor returns the tc classid minor number used for the +// unconstrained catch-all HTB class. It is chosen to be larger than any +// flowMark value so it never collides. +func defaultClassMinor(n int) int { + return n*n + 1 +} diff --git a/netemu/netemu_test.go b/netemu/netemu_test.go new file mode 100644 index 0000000..f8bfdc6 --- /dev/null +++ b/netemu/netemu_test.go @@ -0,0 +1,327 @@ +package netemu + +import ( + "fmt" + "strings" + "testing" +) + +// minimalConfig returns a small valid Config used as a base for tests. +func minimalConfig() *Config { + return &Config{ + Interface: "eth0", + IFB: "ifb0", + Profiles: map[string]Profile{ + "lan": {Rate: "1000mbit", Delay: "2ms"}, + "wan": {Rate: "100mbit", Delay: "80ms", Loss: 0.1}, + }, + Replicas: []Replica{ + {ID: "r1", Host: "bbchain1", Port: 7001, Group: "a"}, + {ID: "r2", Host: "bbchain2", Port: 7002, Group: "a"}, + {ID: "r3", Host: "bbchain2", Port: 7003, Group: "b"}, + }, + LinkProfiles: []GroupLinkProfile{ + {FromGroup: "a", ToGroup: "a", Profile: "lan"}, + {FromGroup: "b", ToGroup: "b", Profile: "lan"}, + {FromGroup: "a", ToGroup: "b", Profile: "wan"}, + {FromGroup: "b", ToGroup: "a", Profile: "wan"}, + }, + } +} + +// --- Config validation --- + +func TestValidateOK(t *testing.T) { + if err := minimalConfig().validate(); err != nil { + t.Fatal(err) + } +} + +func TestValidateMissingInterface(t *testing.T) { + cfg := minimalConfig() + cfg.Interface = "" + if err := cfg.validate(); err == nil { + t.Fatal("expected error for missing interface") + } +} + +func TestValidateDuplicateReplicaID(t *testing.T) { + cfg := minimalConfig() + cfg.Replicas = append(cfg.Replicas, Replica{ID: "r1", Port: 7099, Group: "a"}) + if err := cfg.validate(); err == nil { + t.Fatal("expected error for duplicate replica id") + } +} + +func TestValidateMissingIP(t *testing.T) { + cfg := minimalConfig() + cfg.Replicas[0].Host = "" + if err := cfg.validate(); err == nil { + t.Fatal("expected error for missing host") + } +} + +func TestValidateInvalidPort(t *testing.T) { + cfg := minimalConfig() + cfg.Replicas[0].Port = 0 + if err := cfg.validate(); err == nil { + t.Fatal("expected error for port 0") + } +} + +func TestValidateUnknownLinkProfileProfile(t *testing.T) { + cfg := minimalConfig() + cfg.LinkProfiles = append(cfg.LinkProfiles, GroupLinkProfile{ + FromGroup: "a", ToGroup: "b", Profile: "nonexistent", + }) + if err := cfg.validate(); err == nil { + t.Fatal("expected error for unknown profile in link_profile") + } +} + +func TestValidateUnknownDefaultProfile(t *testing.T) { + cfg := minimalConfig() + cfg.DefaultProfile = "ghost" + if err := cfg.validate(); err == nil { + t.Fatal("expected error for unknown default_profile") + } +} + +func TestValidateLinkSelfLoop(t *testing.T) { + cfg := minimalConfig() + cfg.Links = []Link{{From: "r1", To: "r1", Rate: "100mbit", Delay: "10ms"}} + if err := cfg.validate(); err == nil { + t.Fatal("expected error for self-loop link") + } +} + +func TestValidateLinkUnknownReplica(t *testing.T) { + cfg := minimalConfig() + cfg.Links = []Link{{From: "r1", To: "r99", Rate: "100mbit", Delay: "10ms"}} + if err := cfg.validate(); err == nil { + t.Fatal("expected error for unknown replica in link") + } +} + +func TestValidateLinkUnknownProfile(t *testing.T) { + cfg := minimalConfig() + cfg.Links = []Link{{From: "r1", To: "r3", Profile: "ghost"}} + if err := cfg.validate(); err == nil { + t.Fatal("expected error for unknown profile in link") + } +} + +// --- Mark arithmetic --- + +func TestFlowMarkUnique(t *testing.T) { + n := 10 + seen := make(map[int]bool) + for from := range n { + for to := range n { + if from == to { + continue + } + m := flowMark(from, to, n) + if seen[m] { + t.Fatalf("flowMark(%d, %d, %d) = %d collides with earlier pair", from, to, n, m) + } + seen[m] = true + } + } +} + +func TestFlowMarkNonZero(t *testing.T) { + n := 5 + for from := range n { + for to := range n { + if from == to { + continue + } + if m := flowMark(from, to, n); m == 0 { + t.Fatalf("flowMark(%d, %d, %d) = 0, kernel treats 0 as unset", from, to, n) + } + } + } +} + +func TestDefaultClassMinorNoCollision(t *testing.T) { + for n := 1; n <= 50; n++ { + def := defaultClassMinor(n) + for from := range n { + for to := range n { + if from == to { + continue + } + if flowMark(from, to, n) == def { + t.Fatalf("n=%d: flowMark(%d,%d) collides with defaultClassMinor", n, from, to) + } + } + } + } +} + +// --- Profile resolution --- + +func TestResolveGroupLinkProfile(t *testing.T) { + cfg := minimalConfig() + p, ok := cfg.resolve("r1", "r2") // both group a → lan + if !ok { + t.Fatal("expected resolution") + } + if p.Rate != "1000mbit" { + t.Fatalf("expected lan rate 1000mbit, got %s", p.Rate) + } +} + +func TestResolveInterGroupProfile(t *testing.T) { + cfg := minimalConfig() + p, ok := cfg.resolve("r1", "r3") // a→b → wan + if !ok { + t.Fatal("expected resolution") + } + if p.Rate != "100mbit" { + t.Fatalf("expected wan rate 100mbit, got %s", p.Rate) + } +} + +func TestResolveExplicitLinkWins(t *testing.T) { + cfg := minimalConfig() + cfg.Links = []Link{{From: "r1", To: "r2", Rate: "500mbit", Delay: "5ms"}} + p, ok := cfg.resolve("r1", "r2") + if !ok { + t.Fatal("expected resolution") + } + if p.Rate != "500mbit" { + t.Fatalf("explicit link should win: expected 500mbit, got %s", p.Rate) + } +} + +func TestResolveLinkProfileOverridesGroup(t *testing.T) { + cfg := minimalConfig() + cfg.Links = []Link{{From: "r1", To: "r2", Profile: "wan"}} + p, ok := cfg.resolve("r1", "r2") + if !ok { + t.Fatal("expected resolution") + } + if p.Rate != "100mbit" { + t.Fatalf("link profile 'wan' should override group 'lan': got %s", p.Rate) + } +} + +func TestResolveLinkDirectFieldsOverrideProfile(t *testing.T) { + cfg := minimalConfig() + cfg.Links = []Link{{From: "r1", To: "r3", Profile: "wan", Rate: "50mbit", Delay: "150ms"}} + p, ok := cfg.resolve("r1", "r3") + if !ok { + t.Fatal("expected resolution") + } + if p.Rate != "50mbit" { + t.Fatalf("direct Rate should override profile rate: got %s", p.Rate) + } + if p.Delay != "150ms" { + t.Fatalf("direct Delay should override profile delay: got %s", p.Delay) + } +} + +func TestResolveDefaultProfile(t *testing.T) { + cfg := minimalConfig() + cfg.LinkProfiles = nil + cfg.DefaultProfile = "wan" + p, ok := cfg.resolve("r1", "r3") + if !ok { + t.Fatal("expected resolution via default_profile") + } + if p.Rate != "100mbit" { + t.Fatalf("expected wan rate, got %s", p.Rate) + } +} + +func TestResolveNoRuleReturnsNotFound(t *testing.T) { + cfg := minimalConfig() + cfg.LinkProfiles = nil + _, ok := cfg.resolve("r1", "r3") + if ok { + t.Fatal("expected no resolution when no rules match and no default") + } +} + +// --- Command generation --- + +func TestEgressSetupContainsTCAndIPTables(t *testing.T) { + cmds := egressSetupCmds(minimalConfig()) + + assertContainsSubstr(t, cmds, "tc qdisc add dev eth0 root handle 1: htb") + // r1→r2 (both group a, lan profile) + assertContainsSubstr(t, cmds, "iptables -t mangle -A OUTPUT -p tcp --sport 7001 --dst bbchain2 --dport 7002") + assertContainsSubstr(t, cmds, "htb rate 1000mbit") + // r1→r3 (a→b, wan profile) + assertContainsSubstr(t, cmds, "iptables -t mangle -A OUTPUT -p tcp --sport 7001 --dst bbchain2 --dport 7003") + assertContainsSubstr(t, cmds, "htb rate 100mbit") +} + +func TestEgressSetupNetemIncludesLoss(t *testing.T) { + cmds := egressSetupCmds(minimalConfig()) + // wan profile has Loss: 0.1 — a→b and b→a flows should have loss in netem. + assertContainsSubstr(t, cmds, "loss") +} + +func TestEgressSetupNoNetemLossForLan(t *testing.T) { + cfg := minimalConfig() + cmds := egressSetupCmds(cfg) + n := len(cfg.Replicas) + idx := buildIndexMap(cfg.Replicas) + mark := flowMark(idx["r1"], idx["r2"], n) + target := fmt.Sprintf("parent 1:%d", mark) + for _, cmd := range cmds { + if strings.Contains(cmd, "netem") && strings.Contains(cmd, target) { + if strings.Contains(cmd, "loss") { + t.Fatalf("lan netem cmd should not contain loss: %q", cmd) + } + return + } + } + t.Fatal("no netem command found for r1→r2") +} + +func TestEgressTeardownCommands(t *testing.T) { + cmds := egressTeardownCmds(minimalConfig()) + assertContainsSubstr(t, cmds, "tc qdisc del dev eth0 root") + assertContainsSubstr(t, cmds, "iptables -t mangle -F OUTPUT") +} + +func TestIngressSetupContainsIFBRedirect(t *testing.T) { + cmds := ingressSetupCmds(minimalConfig()) + assertContainsSubstr(t, cmds, "modprobe ifb") + assertContainsSubstr(t, cmds, "ip link add ifb0 type ifb") + assertContainsSubstr(t, cmds, "mirred egress redirect dev ifb0") + assertContainsSubstr(t, cmds, "iptables -t mangle -A PREROUTING") +} + +func TestIngressTeardownCommands(t *testing.T) { + cmds := ingressTeardownCmds(minimalConfig()) + assertContainsSubstr(t, cmds, "iptables -t mangle -F PREROUTING") + assertContainsSubstr(t, cmds, "ip link del ifb0") +} + +func TestNoSelfFlowCommands(t *testing.T) { + cmds := egressSetupCmds(minimalConfig()) + for _, cmd := range cmds { + if strings.Contains(cmd, "--sport 7001 --dport 7001") || + strings.Contains(cmd, "--sport 7002 --dport 7002") || + strings.Contains(cmd, "--sport 7003 --dport 7003") { + t.Fatalf("self-flow rule found: %q", cmd) + } + } +} + +// --- helpers --- + +func assertContainsSubstr(t *testing.T, cmds []string, sub string) { + t.Helper() + for _, c := range cmds { + if strings.Contains(c, sub) { + return + } + } + t.Fatalf("no command contains substring %q\n in: %v", sub, cmds) +} diff --git a/netemu/rules.go b/netemu/rules.go new file mode 100644 index 0000000..531b395 --- /dev/null +++ b/netemu/rules.go @@ -0,0 +1,137 @@ +package netemu + +import "fmt" + +// egressSetupCmds returns the ordered sequence of shell commands that install +// tc HTB + netem qdiscs and iptables MANGLE/OUTPUT marks for egress shaping. +// +// Structure on : +// +// root qdisc: htb default +// class 1: htb rate 100gbit (unconstrained catch-all) +// class 1: htb rate (one per shaped link) +// qdisc : netem delay loss % +// filter: fw handle -> classid 1: +// +// iptables stamps on packets in OUTPUT matching (sport, dport). +func egressSetupCmds(cfg *Config) []string { + n := len(cfg.Replicas) + iface := cfg.Interface + defMinor := defaultClassMinor(n) + idx := buildIndexMap(cfg.Replicas) + + cmds := []string{ + fmt.Sprintf("tc qdisc add dev %s root handle 1: htb default %d", iface, defMinor), + fmt.Sprintf("tc class add dev %s parent 1: classid 1:%d htb rate 100gbit", iface, defMinor), + } + + for _, from := range cfg.Replicas { + for _, to := range cfg.Replicas { + if from.ID == to.ID { + continue + } + p, ok := cfg.resolve(from.ID, to.ID) + if !ok { + continue + } + mark := flowMark(idx[from.ID], idx[to.ID], n) + cmds = append(cmds, + fmt.Sprintf("tc class add dev %s parent 1: classid 1:%d htb rate %s", + iface, mark, p.Rate), + netemCmd(iface, mark, p), + fmt.Sprintf("iptables -t mangle -A OUTPUT -p tcp --sport %d --dst %s --dport %d -j MARK --set-mark %d", + from.Port, to.dst(), to.Port, mark), + fmt.Sprintf("tc filter add dev %s parent 1: handle %d fw classid 1:%d", + iface, mark, mark), + ) + } + } + return cmds +} + +// egressTeardownCmds returns commands that remove the egress tc qdisc and +// flush iptables MANGLE/OUTPUT rules installed by egressSetupCmds. +// Errors from these commands are expected when rules don't exist and should +// be ignored by the caller. +func egressTeardownCmds(cfg *Config) []string { + return []string{ + fmt.Sprintf("tc qdisc del dev %s root", cfg.Interface), + "iptables -t mangle -F OUTPUT", + } +} + +// ingressSetupCmds returns commands that redirect ingress traffic through an +// IFB (Intermediate Functional Block) device and apply the same HTB + netem +// hierarchy there, controlled by iptables MANGLE/PREROUTING marks. +// +// For a packet arriving at this host sent by replica (sport=from.Port) +// destined to replica (dport=to.Port), the mark applied in PREROUTING +// matches the same (from→to) profile used on the sender's egress. This lets +// each host independently enforce its own receive-side cap. +func ingressSetupCmds(cfg *Config) []string { + n := len(cfg.Replicas) + iface := cfg.Interface + ifb := cfg.IFB + defMinor := defaultClassMinor(n) + idx := buildIndexMap(cfg.Replicas) + + cmds := []string{ + "modprobe ifb numifbs=1", + fmt.Sprintf("ip link add %s type ifb", ifb), + fmt.Sprintf("ip link set %s up", ifb), + // Redirect all ingress to the IFB device. + fmt.Sprintf("tc qdisc add dev %s ingress", iface), + fmt.Sprintf( + "tc filter add dev %s parent ffff: protocol ip u32 match u32 0 0 action mirred egress redirect dev %s", + iface, ifb), + // HTB root on IFB. + fmt.Sprintf("tc qdisc add dev %s root handle 1: htb default %d", ifb, defMinor), + fmt.Sprintf("tc class add dev %s parent 1: classid 1:%d htb rate 100gbit", ifb, defMinor), + } + + for _, from := range cfg.Replicas { + for _, to := range cfg.Replicas { + if from.ID == to.ID { + continue + } + p, ok := cfg.resolve(from.ID, to.ID) + if !ok { + continue + } + mark := flowMark(idx[from.ID], idx[to.ID], n) + cmds = append(cmds, + fmt.Sprintf("tc class add dev %s parent 1: classid 1:%d htb rate %s", + ifb, mark, p.Rate), + netemCmd(ifb, mark, p), + // In PREROUTING the packet's source port belongs to the sender + // and destination port belongs to the local replica. + fmt.Sprintf("iptables -t mangle -A PREROUTING -p tcp --sport %d --dst %s --dport %d -j MARK --set-mark %d", + from.Port, to.dst(), to.Port, mark), + fmt.Sprintf("tc filter add dev %s parent 1: handle %d fw classid 1:%d", + ifb, mark, mark), + ) + } + } + return cmds +} + +// ingressTeardownCmds returns commands that undo ingressSetupCmds. +func ingressTeardownCmds(cfg *Config) []string { + return []string{ + "iptables -t mangle -F PREROUTING", + fmt.Sprintf("tc qdisc del dev %s ingress", cfg.Interface), + fmt.Sprintf("ip link set %s down", cfg.IFB), + fmt.Sprintf("ip link del %s", cfg.IFB), + } +} + +// netemCmd builds the tc qdisc add command that attaches a netem discipline +// as a leaf under the HTB class identified by mark. +func netemCmd(dev string, mark int, p Profile) string { + cmd := fmt.Sprintf("tc qdisc add dev %s parent 1:%d handle %d: netem delay %s", + dev, mark, mark, p.Delay) + if p.Loss > 0 { + cmd += fmt.Sprintf(" loss %.4f%%", p.Loss) + } + return cmd +}