Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/relab/container v0.0.0-20260109140004-4adfae874bb5
github.com/relab/wrfs v0.0.0-20220416082020-a641cd350078
golang.org/x/crypto v0.50.0
gopkg.in/yaml.v3 v3.0.1
)

require (
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,7 @@ golang.org/x/sys v0.43.0 h1:Rlag2XtaFTxp19wS8MXlJwTvoh8ArU6ezoyFsMyCTNI=
golang.org/x/sys v0.43.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
golang.org/x/term v0.42.0 h1:UiKe+zDFmJobeJ5ggPwOshJIVt6/Ft0rcfrXZDLWAWY=
golang.org/x/term v0.42.0/go.mod h1:Dq/D+snpsbazcBG5+F9Q1n2rXV8Ma+71xEjTRufARgY=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
225 changes: 225 additions & 0 deletions netemu/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
package netemu

import (
"fmt"
"os"

"gopkg.in/yaml.v3"
)

// Config describes the bandwidth emulation to apply across a set of replicas.
type Config struct {
// Interface is the NIC to apply egress shaping rules on (e.g. "eth0").
Interface string `yaml:"interface"`
// IFB is the name of the Intermediate Functional Block device used for
// ingress shaping. It is created by SetupEmulation when ShapeIngress is true.
IFB string `yaml:"ifb"`

// Profiles are named bandwidth templates that can be referenced by
// LinkProfiles or Links.
Profiles map[string]Profile `yaml:"profiles"`

// DefaultProfile names the Profile to apply to any replica pair not
// covered by LinkProfiles or Links. Leave empty to leave such pairs
// unconstrained (default tc catch-all class).
DefaultProfile string `yaml:"default_profile"`

Replicas []Replica `yaml:"replicas"`
LinkProfiles []GroupLinkProfile `yaml:"link_profiles"`

// Links are per-pair overrides. They take precedence over LinkProfiles
// and DefaultProfile. Either set Profile to name a predefined profile,
// or set Rate/Delay/Loss directly (direct fields win over Profile).
Links []Link `yaml:"links"`
}

// Profile is a reusable bandwidth template.
type Profile struct {
// Rate is passed directly to tc (e.g. "100mbit", "1gbit").
Rate string `yaml:"rate"`
// Delay is passed directly to netem (e.g. "50ms").
Delay string `yaml:"delay"`
// Loss is the packet loss percentage passed to netem (e.g. 0.1 for 0.1%).
Loss float64 `yaml:"loss"`
}

// Replica is a single process endpoint identified by its host and listening port.
// Host may be an IP address or a hostname resolvable from the controller.
// IP, if set, is used instead of Host in iptables rules (required when the
// node running iptables cannot resolve Host via DNS).
// Multiple replicas may share the same host with distinct ports.
type Replica struct {
ID string `yaml:"id"`
Host string `yaml:"host"`
IP string `yaml:"ip,omitempty"`
Port int `yaml:"port"`
Group string `yaml:"group"`
}

// dst returns the IP address to use in iptables --dst rules: the explicit IP
// field if set, otherwise Host.
func (r Replica) dst() string {
if r.IP != "" {
return r.IP
}
return r.Host
}

// GroupLinkProfile assigns a named Profile to all (from, to) pairs whose
// replicas belong to the specified groups.
type GroupLinkProfile struct {
FromGroup string `yaml:"from_group"`
ToGroup string `yaml:"to_group"`
Profile string `yaml:"profile"`
}

// Link is an explicit per-pair bandwidth override. It takes precedence over
// any matching GroupLinkProfile or DefaultProfile.
// Set Profile to reference a named profile, or set Rate/Delay/Loss directly.
// Direct fields win over Profile when both are present.
type Link struct {
From string `yaml:"from"`
To string `yaml:"to"`
Profile string `yaml:"profile"`
Rate string `yaml:"rate"`
Delay string `yaml:"delay"`
Loss float64 `yaml:"loss"`
}

// LoadConfig reads and validates a YAML config file.
func LoadConfig(path string) (*Config, error) {
data, err := os.ReadFile(path)
if err != nil {
return nil, err
}
var cfg Config
if err := yaml.Unmarshal(data, &cfg); err != nil {
return nil, err
}
if err := cfg.validate(); err != nil {
return nil, err
}
return &cfg, nil
}

// resolve returns the effective Profile for a given (fromID, toID) pair,
// following the precedence: explicit Link > GroupLinkProfile > DefaultProfile.
// Returns (profile, false) if no rule matches and DefaultProfile is unset.
func (c *Config) resolve(fromID, toID string) (Profile, bool) {
fromGroup := c.groupOf(fromID)
toGroup := c.groupOf(toID)

// 1. Explicit per-pair link.
for _, l := range c.Links {
if l.From == fromID && l.To == toID {
return c.mergeLink(l), true
}
}

// 2. Group-to-group profile.
for _, glp := range c.LinkProfiles {
if glp.FromGroup == fromGroup && glp.ToGroup == toGroup {
p, ok := c.Profiles[glp.Profile]
return p, ok
}
}

// 3. Default profile.
if c.DefaultProfile != "" {
p, ok := c.Profiles[c.DefaultProfile]
return p, ok
}

return Profile{}, false
}

// mergeLink builds a Profile from a Link entry. Direct fields (Rate, Delay,
// Loss) take precedence over any named Profile the Link references.
func (c *Config) mergeLink(l Link) Profile {
base := Profile{}
if l.Profile != "" {
base = c.Profiles[l.Profile]
}
if l.Rate != "" {
base.Rate = l.Rate
}
if l.Delay != "" {
base.Delay = l.Delay
}
if l.Loss != 0 {
base.Loss = l.Loss
}
return base
}

func (c *Config) groupOf(id string) string {
for _, r := range c.Replicas {
if r.ID == id {
return r.Group
}
}
return ""
}

func (c *Config) validate() error {
if c.Interface == "" {
return fmt.Errorf("netemu: interface is required")
}

ids := make(map[string]bool, len(c.Replicas))
for _, r := range c.Replicas {
if r.ID == "" {
return fmt.Errorf("netemu: replica has empty id")
}
if ids[r.ID] {
return fmt.Errorf("netemu: duplicate replica id %q", r.ID)
}
if r.Host == "" {
return fmt.Errorf("netemu: replica %q has empty host", r.ID)
}
if r.Port <= 0 || r.Port > 65535 {
return fmt.Errorf("netemu: replica %q has invalid port %d", r.ID, r.Port)
}
ids[r.ID] = true
}

for name, p := range c.Profiles {
if p.Rate == "" {
return fmt.Errorf("netemu: profile %q is missing rate", name)
}
if p.Delay == "" {
return fmt.Errorf("netemu: profile %q is missing delay", name)
}
}

for _, glp := range c.LinkProfiles {
if _, ok := c.Profiles[glp.Profile]; !ok {
return fmt.Errorf("netemu: link_profile references unknown profile %q", glp.Profile)
}
}

if c.DefaultProfile != "" {
if _, ok := c.Profiles[c.DefaultProfile]; !ok {
return fmt.Errorf("netemu: default_profile references unknown profile %q", c.DefaultProfile)
}
}

for _, l := range c.Links {
if !ids[l.From] {
return fmt.Errorf("netemu: link references unknown replica %q", l.From)
}
if !ids[l.To] {
return fmt.Errorf("netemu: link references unknown replica %q", l.To)
}
if l.From == l.To {
return fmt.Errorf("netemu: link from replica to itself: %q", l.From)
}
if l.Profile != "" {
if _, ok := c.Profiles[l.Profile]; !ok {
return fmt.Errorf("netemu: link references unknown profile %q", l.Profile)
}
}
}

return nil
}
122 changes: 122 additions & 0 deletions netemu/emulate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// Package netemu provides Iago actions for per-flow bandwidth emulation using
// Linux tc (traffic control) and iptables. It shapes traffic between replicas
// identified by TCP port, supporting asymmetric bandwidth and latency via a
// declarative YAML configuration.
package netemu

import (
"bytes"
"context"
"fmt"
"io"
"strings"

"github.com/relab/iago"
)

// SetupEmulation is an Iago action that installs tc HTB qdiscs, netem leaf
// disciplines, and iptables MANGLE rules on a host to enforce the per-flow
// bandwidth and latency declared in Config.
//
// It requires CAP_NET_ADMIN on the target host (run as root or with sudo).
// Use TeardownEmulation to remove the rules when the experiment ends.
type SetupEmulation struct {
Config *Config
// ShapeIngress installs a matching rule set on an IFB device so that
// incoming traffic is also shaped. When false only egress is shaped.
ShapeIngress bool
// Sudo prepends "sudo" to every privileged command. Set to true when the
// SSH user is not root but has passwordless sudo.
Sudo bool
}

// Apply implements the iago action interface.
func (s SetupEmulation) Apply(ctx context.Context, host iago.Host) error {
cmds := egressSetupCmds(s.Config)
if s.ShapeIngress {
if s.Config.IFB == "" {
return fmt.Errorf("netemu: ShapeIngress is true but config.ifb is empty")
}
cmds = append(cmds, ingressSetupCmds(s.Config)...)
}
if err := runScript(ctx, host, s.Sudo, true, cmds); err != nil {
return fmt.Errorf("netemu setup on %s: %w", host.Name(), err)
}
return nil
}

// TeardownEmulation is an Iago action that removes the tc qdiscs and iptables
// rules installed by SetupEmulation.
//
// Teardown commands that fail (e.g. because rules were never installed) are
// silently ignored so that TeardownEmulation is safe to call as a deferred
// cleanup regardless of whether SetupEmulation succeeded.
type TeardownEmulation struct {
Config *Config
ShapeIngress bool
Sudo bool
}

// Apply implements the iago action interface.
func (t TeardownEmulation) Apply(ctx context.Context, host iago.Host) error {
cmds := egressTeardownCmds(t.Config)
if t.ShapeIngress {
cmds = append(cmds, ingressTeardownCmds(t.Config)...)
}
// Intentionally ignore errors: del/flush on non-existent rules exits non-zero.
runScript(ctx, host, t.Sudo, false, cmds) //nolint:errcheck
return nil
}

// runScript pipes cmds to a single remote sh session, reducing SSH round-trips
// from O(n²) to 1 for n replicas. When failFast is true the shell runs with
// set -e so any failing command aborts the script.
func runScript(ctx context.Context, host iago.Host, sudo, failFast bool, cmds []string) error {
runner, err := host.NewCommand()
if err != nil {
return err
}
stderrPipe, err := runner.StderrPipe()
if err != nil {
return err
}
stdin, err := runner.StdinPipe()
if err != nil {
return err
}
shell := "sh"
if failFast {
shell = "sh -e"
}
if sudo {
shell = "sudo " + shell
}
if err := runner.Start(shell); err != nil {
return err
}
var stderrBuf bytes.Buffer
stderrDone := make(chan struct{})
go func() {
io.Copy(&stderrBuf, stderrPipe) //nolint:errcheck
close(stderrDone)
}()
for _, cmd := range cmds {
if _, err := fmt.Fprintln(stdin, cmd); err != nil {
stdin.Close()
<-stderrDone
return err
}
}
if err := stdin.Close(); err != nil {
<-stderrDone
return err
}
<-stderrDone
if err := runner.Wait(); err != nil {
if stderrBuf.Len() > 0 {
return fmt.Errorf("%w: %s", err, strings.TrimSpace(stderrBuf.String()))
}
return err
}
return nil
}
Loading