diff options
author | Dmitry Vyukov <dvyukov@google.com> | 2024-05-03 08:44:48 +0200 |
---|---|---|
committer | Dmitry Vyukov <dvyukov@google.com> | 2024-05-03 15:57:46 +0000 |
commit | 610f2a54d02f8cf4f2454c03bf679b602e6e59b6 (patch) | |
tree | dd1768a4e516fe9fb6945613ebf009eda82317f6 | |
parent | 3a81775029176dd4c693542e6715b985fa7ade4d (diff) | |
download | syzkaller-upstream-master.tar.gz |
pkg/rpctype: prepare for not using for target communicationupstream-master
Remove things that are only needed for target VM communication:
conditional compression, timeout scaling, traffic stats.
To minimize diffs when we switch target VM communication to flatrpc.
-rw-r--r-- | pkg/instance/instance.go | 8 | ||||
-rw-r--r-- | pkg/rpctype/rpc.go | 94 | ||||
-rw-r--r-- | syz-fuzzer/fuzzer.go | 17 | ||||
-rw-r--r-- | syz-fuzzer/testing.go | 4 | ||||
-rw-r--r-- | syz-hub/hub.go | 2 | ||||
-rw-r--r-- | syz-manager/hub.go | 4 | ||||
-rw-r--r-- | syz-manager/manager.go | 9 | ||||
-rw-r--r-- | syz-manager/rpc.go | 2 | ||||
-rw-r--r-- | syz-runner/runner.go | 3 | ||||
-rw-r--r-- | syz-verifier/rpcserver.go | 2 | ||||
-rw-r--r-- | tools/syz-hubtool/hubtool.go | 2 | ||||
-rw-r--r-- | tools/syz-runtest/runtest.go | 2 | ||||
-rw-r--r-- | vm/adb/adb.go | 2 | ||||
-rw-r--r-- | vm/bhyve/bhyve.go | 2 | ||||
-rw-r--r-- | vm/cuttlefish/cuttlefish.go | 2 | ||||
-rw-r--r-- | vm/gce/gce.go | 2 | ||||
-rw-r--r-- | vm/gvisor/gvisor.go | 2 | ||||
-rwxr-xr-x | vm/isolated/isolated.go | 2 | ||||
-rw-r--r-- | vm/proxyapp/init.go | 2 | ||||
-rw-r--r-- | vm/qemu/qemu.go | 2 | ||||
-rw-r--r-- | vm/starnix/starnix.go | 2 | ||||
-rw-r--r-- | vm/vm.go | 6 | ||||
-rw-r--r-- | vm/vm_test.go | 2 | ||||
-rw-r--r-- | vm/vmimpl/vmimpl.go | 12 | ||||
-rw-r--r-- | vm/vmm/vmm.go | 2 | ||||
-rw-r--r-- | vm/vmware/vmware.go | 2 |
26 files changed, 62 insertions, 129 deletions
diff --git a/pkg/instance/instance.go b/pkg/instance/instance.go index 3e59209e2..d47c093ec 100644 --- a/pkg/instance/instance.go +++ b/pkg/instance/instance.go @@ -456,10 +456,9 @@ func (inst *inst) testRepro() ([]byte, error) { } type OptionalFuzzerArgs struct { - Slowdown int - SandboxArg int - PprofPort int - NetCompression bool + Slowdown int + SandboxArg int + PprofPort int } type FuzzerCmdArgs struct { @@ -496,7 +495,6 @@ func FuzzerCmd(args *FuzzerCmdArgs) string { {Name: "slowdown", Value: fmt.Sprint(args.Optional.Slowdown)}, {Name: "sandbox_arg", Value: fmt.Sprint(args.Optional.SandboxArg)}, {Name: "pprof_port", Value: fmt.Sprint(args.Optional.PprofPort)}, - {Name: "net_compression", Value: fmt.Sprint(args.Optional.NetCompression)}, } optionalArg = " " + tool.OptionalFlags(flags) } diff --git a/pkg/rpctype/rpc.go b/pkg/rpctype/rpc.go index d195e993d..f6b8d1eff 100644 --- a/pkg/rpctype/rpc.go +++ b/pkg/rpctype/rpc.go @@ -13,18 +13,14 @@ import ( "time" "github.com/google/syzkaller/pkg/log" - "github.com/google/syzkaller/pkg/stats" ) type RPCServer struct { - ln net.Listener - s *rpc.Server - useCompression bool - statSent *stats.Val - statRecv *stats.Val + ln net.Listener + s *rpc.Server } -func NewRPCServer(addr, name string, receiver interface{}, useCompression bool) (*RPCServer, error) { +func NewRPCServer(addr, name string, receiver interface{}) (*RPCServer, error) { ln, err := net.Listen("tcp", addr) if err != nil { return nil, fmt.Errorf("failed to listen on %v: %w", addr, err) @@ -34,13 +30,8 @@ func NewRPCServer(addr, name string, receiver interface{}, useCompression bool) return nil, err } serv := &RPCServer{ - ln: ln, - s: s, - useCompression: useCompression, - statSent: stats.Create("go rpc sent", "Uncompressed outbound RPC traffic", - stats.Graph("traffic"), stats.Rate{}, stats.FormatMB), - statRecv: stats.Create("go rpc recv", "Uncompressed inbound RPC traffic", - stats.Graph("traffic"), stats.Rate{}, stats.FormatMB), + ln: ln, + s: s, } return serv, nil } @@ -53,7 +44,7 @@ func (serv *RPCServer) Serve() { continue } setupKeepAlive(conn, time.Minute) - go serv.s.ServeConn(maybeFlateConn(newCountedConn(serv, conn), serv.useCompression)) + go serv.s.ServeConn(newFlateConn(conn)) } } @@ -62,51 +53,35 @@ func (serv *RPCServer) Addr() net.Addr { } type RPCClient struct { - conn net.Conn - c *rpc.Client - timeScale time.Duration - useTimeouts bool - useCompression bool + conn net.Conn + c *rpc.Client } -func Dial(addr string, timeScale time.Duration) (net.Conn, error) { - if timeScale <= 0 { - return nil, fmt.Errorf("bad rpc time scale %v", timeScale) - } +func NewRPCClient(addr string) (*RPCClient, error) { var conn net.Conn var err error if addr == "stdin" { // This is used by vm/gvisor which passes us a unix socket connection in stdin. - return net.FileConn(os.Stdin) - } - if conn, err = net.DialTimeout("tcp", addr, time.Minute*timeScale); err != nil { - return nil, err + // TODO: remove this once we switch to flatrpc for target communication. + conn, err = net.FileConn(os.Stdin) + } else { + conn, err = net.DialTimeout("tcp", addr, 3*time.Minute) } - setupKeepAlive(conn, time.Minute*timeScale) - return conn, nil -} - -func NewRPCClient(addr string, timeScale time.Duration, useTimeouts, useCompression bool) (*RPCClient, error) { - conn, err := Dial(addr, timeScale) if err != nil { return nil, err } + setupKeepAlive(conn, time.Minute) cli := &RPCClient{ - conn: conn, - c: rpc.NewClient(maybeFlateConn(conn, useCompression)), - timeScale: timeScale, - useTimeouts: useTimeouts, - useCompression: useCompression, + conn: conn, + c: rpc.NewClient(newFlateConn(conn)), } return cli, nil } func (cli *RPCClient) Call(method string, args, reply interface{}) error { - if cli.useTimeouts { - // Note: SetDeadline is not implemented on fuchsia, so don't fail on error. - cli.conn.SetDeadline(time.Now().Add(3 * time.Minute * cli.timeScale)) - defer cli.conn.SetDeadline(time.Time{}) - } + // Note: SetDeadline is not implemented on fuchsia, so don't fail on error. + cli.conn.SetDeadline(time.Now().Add(10 * time.Minute)) + defer cli.conn.SetDeadline(time.Time{}) return cli.c.Call(method, args, reply) } @@ -130,10 +105,7 @@ type flateConn struct { c io.Closer } -func maybeFlateConn(conn io.ReadWriteCloser, useCompression bool) io.ReadWriteCloser { - if !useCompression { - return conn - } +func newFlateConn(conn io.ReadWriteCloser) io.ReadWriteCloser { w, err := flate.NewWriter(conn, 9) if err != nil { panic(err) @@ -173,29 +145,3 @@ func (fc *flateConn) Close() error { } return err0 } - -// countedConn wraps net.Conn to record the transferred bytes. -type countedConn struct { - io.ReadWriteCloser - server *RPCServer -} - -func newCountedConn(server *RPCServer, - conn io.ReadWriteCloser) io.ReadWriteCloser { - return &countedConn{ - ReadWriteCloser: conn, - server: server, - } -} - -func (cc countedConn) Read(p []byte) (n int, err error) { - n, err = cc.ReadWriteCloser.Read(p) - cc.server.statRecv.Add(n) - return -} - -func (cc countedConn) Write(b []byte) (n int, err error) { - n, err = cc.ReadWriteCloser.Write(b) - cc.server.statSent.Add(n) - return -} diff --git a/syz-fuzzer/fuzzer.go b/syz-fuzzer/fuzzer.go index c350f2c33..bdccf9d82 100644 --- a/syz-fuzzer/fuzzer.go +++ b/syz-fuzzer/fuzzer.go @@ -72,14 +72,13 @@ func main() { debug.SetGCPercent(50) var ( - flagName = flag.String("name", "test", "unique name for manager") - flagOS = flag.String("os", runtime.GOOS, "target OS") - flagArch = flag.String("arch", runtime.GOARCH, "target arch") - flagManager = flag.String("manager", "", "manager rpc address") - flagProcs = flag.Int("procs", 1, "number of parallel test processes") - flagTest = flag.Bool("test", false, "enable image testing mode") // used by syz-ci - flagPprofPort = flag.Int("pprof_port", 0, "HTTP port for the pprof endpoint (disabled if 0)") - flagNetCompression = flag.Bool("net_compression", false, "use network compression for RPC calls") + flagName = flag.String("name", "test", "unique name for manager") + flagOS = flag.String("os", runtime.GOOS, "target OS") + flagArch = flag.String("arch", runtime.GOARCH, "target arch") + flagManager = flag.String("manager", "", "manager rpc address") + flagProcs = flag.Int("procs", 1, "number of parallel test processes") + flagTest = flag.Bool("test", false, "enable image testing mode") // used by syz-ci + flagPprofPort = flag.Int("pprof_port", 0, "HTTP port for the pprof endpoint (disabled if 0)") ) defer tool.Init()() log.Logf(0, "fuzzer started") @@ -123,7 +122,7 @@ func main() { } log.Logf(0, "dialing manager at %v", *flagManager) - manager, err := rpctype.NewRPCClient(*flagManager, timeouts.Scale, false, *flagNetCompression) + manager, err := rpctype.NewRPCClient(*flagManager) if err != nil { log.SyzFatalf("failed to create an RPC client: %v ", err) } diff --git a/syz-fuzzer/testing.go b/syz-fuzzer/testing.go index a3f1b88bf..bd2eb33e1 100644 --- a/syz-fuzzer/testing.go +++ b/syz-fuzzer/testing.go @@ -6,6 +6,7 @@ package main import ( "fmt" "io" + "net" "strings" "time" @@ -30,7 +31,8 @@ type checkArgs struct { func testImage(hostAddr string, args *checkArgs) { log.Logf(0, "connecting to host at %v", hostAddr) - conn, err := rpctype.Dial(hostAddr, args.ipcConfig.Timeouts.Scale) + timeout := time.Minute * args.ipcConfig.Timeouts.Scale + conn, err := net.DialTimeout("tcp", hostAddr, timeout) if err != nil { log.SyzFatalf("failed to connect to host: %v", err) } diff --git a/syz-hub/hub.go b/syz-hub/hub.go index f7e7e3e48..187d4a1eb 100644 --- a/syz-hub/hub.go +++ b/syz-hub/hub.go @@ -61,7 +61,7 @@ func main() { hub.initHTTP(cfg.HTTP) - s, err := rpctype.NewRPCServer(cfg.RPC, "Hub", hub, true) + s, err := rpctype.NewRPCServer(cfg.RPC, "Hub", hub) if err != nil { log.Fatalf("failed to create rpc server: %v", err) } diff --git a/syz-manager/hub.go b/syz-manager/hub.go index 8de9f1f4d..dd0cbbcdb 100644 --- a/syz-manager/hub.go +++ b/syz-manager/hub.go @@ -129,7 +129,7 @@ func (hc *HubConnector) connect(corpus [][]byte) (*rpctype.RPCClient, error) { if err != nil { return nil, err } - hub, err := rpctype.NewRPCClient(hc.cfg.HubAddr, 1, true, true) + hub, err := rpctype.NewRPCClient(hc.cfg.HubAddr) if err != nil { return nil, err } @@ -162,7 +162,7 @@ func (hc *HubConnector) connect(corpus [][]byte) (*rpctype.RPCClient, error) { if err != nil { return nil, err } - hub, err = rpctype.NewRPCClient(hc.cfg.HubAddr, 1, true, true) + hub, err = rpctype.NewRPCClient(hc.cfg.HubAddr) if err != nil { return nil, err } diff --git a/syz-manager/manager.go b/syz-manager/manager.go index 469ac38c4..f5437f853 100644 --- a/syz-manager/manager.go +++ b/syz-manager/manager.go @@ -65,7 +65,6 @@ type Manager struct { vmStop chan bool checkFeatures *host.Features fresh bool - netCompression bool expertMode bool nextInstanceID atomic.Uint64 @@ -177,7 +176,6 @@ func RunManager(cfg *mgrconfig.Config) { memoryLeakFrames: make(map[string]bool), dataRaceFrames: make(map[string]bool), fresh: true, - netCompression: vm.UseNetCompression(cfg.Type), vmStop: make(chan bool), externalReproQueue: make(chan *Crash, 10), needMoreRepros: make(chan chan bool), @@ -811,10 +809,9 @@ func (mgr *Manager) runInstanceInner(index int, instanceName string, injectLog < Debug: *flagDebug, Test: false, Optional: &instance.OptionalFuzzerArgs{ - Slowdown: mgr.cfg.Timeouts.Slowdown, - SandboxArg: mgr.cfg.SandboxArg, - PprofPort: inst.PprofPort(), - NetCompression: mgr.netCompression, + Slowdown: mgr.cfg.Timeouts.Slowdown, + SandboxArg: mgr.cfg.SandboxArg, + PprofPort: inst.PprofPort(), }, } cmd := instance.FuzzerCmd(args) diff --git a/syz-manager/rpc.go b/syz-manager/rpc.go index 062d19e19..3d452fdb0 100644 --- a/syz-manager/rpc.go +++ b/syz-manager/rpc.go @@ -128,7 +128,7 @@ func startRPCServer(mgr *Manager) (*RPCServer, error) { "End-to-end fuzzer RPC Exchange call latency (us)", stats.Distribution{}), statCoverFiltered: stats.Create("filtered coverage", "", stats.NoGraph), } - s, err := rpctype.NewRPCServer(mgr.cfg.RPC, "Manager", serv, mgr.netCompression) + s, err := rpctype.NewRPCServer(mgr.cfg.RPC, "Manager", serv) if err != nil { return nil, err } diff --git a/syz-runner/runner.go b/syz-runner/runner.go index 3b3703944..329b340a3 100644 --- a/syz-runner/runner.go +++ b/syz-runner/runner.go @@ -45,8 +45,7 @@ func main() { log.Fatalf("failed to create default ipc config: %v", err) } - timeouts := config.Timeouts - vrf, err := rpctype.NewRPCClient(*flagAddr, timeouts.Scale, true, true) + vrf, err := rpctype.NewRPCClient(*flagAddr) if err != nil { log.Fatalf("failed to connect to verifier : %v", err) } diff --git a/syz-verifier/rpcserver.go b/syz-verifier/rpcserver.go index 8c0e6b705..f80fb7a46 100644 --- a/syz-verifier/rpcserver.go +++ b/syz-verifier/rpcserver.go @@ -33,7 +33,7 @@ func startRPCServer(vrf *Verifier) (*RPCServer, error) { notChecked: len(vrf.pools), } - s, err := rpctype.NewRPCServer(vrf.addr, "Verifier", srv, true) + s, err := rpctype.NewRPCServer(vrf.addr, "Verifier", srv) if err != nil { return nil, err } diff --git a/tools/syz-hubtool/hubtool.go b/tools/syz-hubtool/hubtool.go index a3317801e..dc59ef230 100644 --- a/tools/syz-hubtool/hubtool.go +++ b/tools/syz-hubtool/hubtool.go @@ -57,7 +57,7 @@ func main() { return } log.Printf("connecting to hub at %v...", *flagHubAddress) - conn, err := rpctype.NewRPCClient(*flagHubAddress, 1, true, true) + conn, err := rpctype.NewRPCClient(*flagHubAddress) if err != nil { log.Fatalf("failed to connect to hub: %v", err) } diff --git a/tools/syz-runtest/runtest.go b/tools/syz-runtest/runtest.go index 1de8f6a54..586e4c2cc 100644 --- a/tools/syz-runtest/runtest.go +++ b/tools/syz-runtest/runtest.go @@ -69,7 +69,7 @@ func main() { } mgr.checkFiles, mgr.checkProgs = mgr.checker.StartCheck() mgr.needCheckResults = len(mgr.checkProgs) - s, err := rpctype.NewRPCServer(cfg.RPC, "Manager", mgr, false) + s, err := rpctype.NewRPCServer(cfg.RPC, "Manager", mgr) if err != nil { log.Fatalf("failed to create rpc server: %v", err) } diff --git a/vm/adb/adb.go b/vm/adb/adb.go index de1bcd2dd..bd986346c 100644 --- a/vm/adb/adb.go +++ b/vm/adb/adb.go @@ -26,7 +26,7 @@ import ( ) func init() { - vmimpl.Register("adb", ctor, false, true) + vmimpl.Register("adb", ctor, false) } type Device struct { diff --git a/vm/bhyve/bhyve.go b/vm/bhyve/bhyve.go index 3e82cbc2d..c3b53e053 100644 --- a/vm/bhyve/bhyve.go +++ b/vm/bhyve/bhyve.go @@ -21,7 +21,7 @@ import ( ) func init() { - vmimpl.Register("bhyve", ctor, true, false) + vmimpl.Register("bhyve", ctor, true) } type Config struct { diff --git a/vm/cuttlefish/cuttlefish.go b/vm/cuttlefish/cuttlefish.go index d17bd78f2..94610ed4e 100644 --- a/vm/cuttlefish/cuttlefish.go +++ b/vm/cuttlefish/cuttlefish.go @@ -28,7 +28,7 @@ const ( ) func init() { - vmimpl.Register("cuttlefish", ctor, true, true) + vmimpl.Register("cuttlefish", ctor, true) } type Pool struct { diff --git a/vm/gce/gce.go b/vm/gce/gce.go index 6a292d4f6..16f3f996f 100644 --- a/vm/gce/gce.go +++ b/vm/gce/gce.go @@ -35,7 +35,7 @@ import ( ) func init() { - vmimpl.Register("gce", ctor, true, true) + vmimpl.Register("gce", ctor, true) } type Config struct { diff --git a/vm/gvisor/gvisor.go b/vm/gvisor/gvisor.go index f8194ab56..45c076b23 100644 --- a/vm/gvisor/gvisor.go +++ b/vm/gvisor/gvisor.go @@ -25,7 +25,7 @@ import ( ) func init() { - vmimpl.Register("gvisor", ctor, true, false) + vmimpl.Register("gvisor", ctor, true) } type Config struct { diff --git a/vm/isolated/isolated.go b/vm/isolated/isolated.go index 3383e91fa..51a995b72 100755 --- a/vm/isolated/isolated.go +++ b/vm/isolated/isolated.go @@ -23,7 +23,7 @@ import ( const pstoreConsoleFile = "/sys/fs/pstore/console-ramoops-0" func init() { - vmimpl.Register("isolated", ctor, false, true) + vmimpl.Register("isolated", ctor, false) } type Config struct { diff --git a/vm/proxyapp/init.go b/vm/proxyapp/init.go index 9c96e4855..467187f46 100644 --- a/vm/proxyapp/init.go +++ b/vm/proxyapp/init.go @@ -32,7 +32,7 @@ func init() { func(env *vmimpl.Env) (vmimpl.Pool, error) { return ctor(makeDefaultParams(), env) }, - false, true) + false) } // Package configuration VARs are mostly needed for tests. diff --git a/vm/qemu/qemu.go b/vm/qemu/qemu.go index 2dd016a2b..6f5cb4f56 100644 --- a/vm/qemu/qemu.go +++ b/vm/qemu/qemu.go @@ -27,7 +27,7 @@ import ( func init() { var _ vmimpl.Infoer = (*instance)(nil) - vmimpl.Register("qemu", ctor, true, false) + vmimpl.Register("qemu", ctor, true) } type Config struct { diff --git a/vm/starnix/starnix.go b/vm/starnix/starnix.go index 2f5a087d3..51f2f670f 100644 --- a/vm/starnix/starnix.go +++ b/vm/starnix/starnix.go @@ -22,7 +22,7 @@ import ( func init() { var _ vmimpl.Infoer = (*instance)(nil) - vmimpl.Register("starnix", ctor, true, false) + vmimpl.Register("starnix", ctor, true) } type Config struct { @@ -92,12 +92,6 @@ func AllowsOvercommit(typ string) bool { return vmimpl.Types[vmType(typ)].Overcommit } -// UseNetCompression says if it's beneficial to use network compression for this VM type. -// Local VMs (qemu) generally don't benefit from compression, while remote machines may benefit. -func UseNetCompression(typ string) bool { - return vmimpl.Types[vmType(typ)].NetCompression -} - // Create creates a VM pool that can be used to create individual VMs. func Create(cfg *mgrconfig.Config, debug bool) (*Pool, error) { typ, ok := vmimpl.Types[vmType(cfg.Type)] diff --git a/vm/vm_test.go b/vm/vm_test.go index 6acc00351..184f6137f 100644 --- a/vm/vm_test.go +++ b/vm/vm_test.go @@ -80,7 +80,7 @@ func init() { ctor := func(env *vmimpl.Env) (vmimpl.Pool, error) { return &testPool{}, nil } - vmimpl.Register("test", ctor, false, false) + vmimpl.Register("test", ctor, false) } type Test struct { diff --git a/vm/vmimpl/vmimpl.go b/vm/vmimpl/vmimpl.go index bd565c8aa..a9afdc1f1 100644 --- a/vm/vmimpl/vmimpl.go +++ b/vm/vmimpl/vmimpl.go @@ -129,18 +129,16 @@ func (err InfraError) InfraError() (string, []byte) { } // Register registers a new VM type within the package. -func Register(typ string, ctor ctorFunc, allowsOvercommit, netCompression bool) { +func Register(typ string, ctor ctorFunc, allowsOvercommit bool) { Types[typ] = Type{ - Ctor: ctor, - Overcommit: allowsOvercommit, - NetCompression: netCompression, + Ctor: ctor, + Overcommit: allowsOvercommit, } } type Type struct { - Ctor ctorFunc - Overcommit bool - NetCompression bool + Ctor ctorFunc + Overcommit bool } type ctorFunc func(env *Env) (Pool, error) diff --git a/vm/vmm/vmm.go b/vm/vmm/vmm.go index 67f12f4c7..71dfd39b4 100644 --- a/vm/vmm/vmm.go +++ b/vm/vmm/vmm.go @@ -25,7 +25,7 @@ import ( var vmctlStatusRegex = regexp.MustCompile(`^\s+([0-9]+)\b.*\brunning`) func init() { - vmimpl.Register("vmm", ctor, true, false) + vmimpl.Register("vmm", ctor, true) } type Config struct { diff --git a/vm/vmware/vmware.go b/vm/vmware/vmware.go index 6bccbaa6e..4c05bac12 100644 --- a/vm/vmware/vmware.go +++ b/vm/vmware/vmware.go @@ -22,7 +22,7 @@ import ( ) func init() { - vmimpl.Register("vmware", ctor, false, false) + vmimpl.Register("vmware", ctor, false) } type Config struct { |