aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDmitry Vyukov <dvyukov@google.com>2024-05-03 08:44:48 +0200
committerDmitry Vyukov <dvyukov@google.com>2024-05-03 15:57:46 +0000
commit610f2a54d02f8cf4f2454c03bf679b602e6e59b6 (patch)
treedd1768a4e516fe9fb6945613ebf009eda82317f6
parent3a81775029176dd4c693542e6715b985fa7ade4d (diff)
downloadsyzkaller-upstream-master.tar.gz
pkg/rpctype: prepare for not using for target communicationupstream-master
Remove things that are only needed for target VM communication: conditional compression, timeout scaling, traffic stats. To minimize diffs when we switch target VM communication to flatrpc.
-rw-r--r--pkg/instance/instance.go8
-rw-r--r--pkg/rpctype/rpc.go94
-rw-r--r--syz-fuzzer/fuzzer.go17
-rw-r--r--syz-fuzzer/testing.go4
-rw-r--r--syz-hub/hub.go2
-rw-r--r--syz-manager/hub.go4
-rw-r--r--syz-manager/manager.go9
-rw-r--r--syz-manager/rpc.go2
-rw-r--r--syz-runner/runner.go3
-rw-r--r--syz-verifier/rpcserver.go2
-rw-r--r--tools/syz-hubtool/hubtool.go2
-rw-r--r--tools/syz-runtest/runtest.go2
-rw-r--r--vm/adb/adb.go2
-rw-r--r--vm/bhyve/bhyve.go2
-rw-r--r--vm/cuttlefish/cuttlefish.go2
-rw-r--r--vm/gce/gce.go2
-rw-r--r--vm/gvisor/gvisor.go2
-rwxr-xr-xvm/isolated/isolated.go2
-rw-r--r--vm/proxyapp/init.go2
-rw-r--r--vm/qemu/qemu.go2
-rw-r--r--vm/starnix/starnix.go2
-rw-r--r--vm/vm.go6
-rw-r--r--vm/vm_test.go2
-rw-r--r--vm/vmimpl/vmimpl.go12
-rw-r--r--vm/vmm/vmm.go2
-rw-r--r--vm/vmware/vmware.go2
26 files changed, 62 insertions, 129 deletions
diff --git a/pkg/instance/instance.go b/pkg/instance/instance.go
index 3e59209e2..d47c093ec 100644
--- a/pkg/instance/instance.go
+++ b/pkg/instance/instance.go
@@ -456,10 +456,9 @@ func (inst *inst) testRepro() ([]byte, error) {
}
type OptionalFuzzerArgs struct {
- Slowdown int
- SandboxArg int
- PprofPort int
- NetCompression bool
+ Slowdown int
+ SandboxArg int
+ PprofPort int
}
type FuzzerCmdArgs struct {
@@ -496,7 +495,6 @@ func FuzzerCmd(args *FuzzerCmdArgs) string {
{Name: "slowdown", Value: fmt.Sprint(args.Optional.Slowdown)},
{Name: "sandbox_arg", Value: fmt.Sprint(args.Optional.SandboxArg)},
{Name: "pprof_port", Value: fmt.Sprint(args.Optional.PprofPort)},
- {Name: "net_compression", Value: fmt.Sprint(args.Optional.NetCompression)},
}
optionalArg = " " + tool.OptionalFlags(flags)
}
diff --git a/pkg/rpctype/rpc.go b/pkg/rpctype/rpc.go
index d195e993d..f6b8d1eff 100644
--- a/pkg/rpctype/rpc.go
+++ b/pkg/rpctype/rpc.go
@@ -13,18 +13,14 @@ import (
"time"
"github.com/google/syzkaller/pkg/log"
- "github.com/google/syzkaller/pkg/stats"
)
type RPCServer struct {
- ln net.Listener
- s *rpc.Server
- useCompression bool
- statSent *stats.Val
- statRecv *stats.Val
+ ln net.Listener
+ s *rpc.Server
}
-func NewRPCServer(addr, name string, receiver interface{}, useCompression bool) (*RPCServer, error) {
+func NewRPCServer(addr, name string, receiver interface{}) (*RPCServer, error) {
ln, err := net.Listen("tcp", addr)
if err != nil {
return nil, fmt.Errorf("failed to listen on %v: %w", addr, err)
@@ -34,13 +30,8 @@ func NewRPCServer(addr, name string, receiver interface{}, useCompression bool)
return nil, err
}
serv := &RPCServer{
- ln: ln,
- s: s,
- useCompression: useCompression,
- statSent: stats.Create("go rpc sent", "Uncompressed outbound RPC traffic",
- stats.Graph("traffic"), stats.Rate{}, stats.FormatMB),
- statRecv: stats.Create("go rpc recv", "Uncompressed inbound RPC traffic",
- stats.Graph("traffic"), stats.Rate{}, stats.FormatMB),
+ ln: ln,
+ s: s,
}
return serv, nil
}
@@ -53,7 +44,7 @@ func (serv *RPCServer) Serve() {
continue
}
setupKeepAlive(conn, time.Minute)
- go serv.s.ServeConn(maybeFlateConn(newCountedConn(serv, conn), serv.useCompression))
+ go serv.s.ServeConn(newFlateConn(conn))
}
}
@@ -62,51 +53,35 @@ func (serv *RPCServer) Addr() net.Addr {
}
type RPCClient struct {
- conn net.Conn
- c *rpc.Client
- timeScale time.Duration
- useTimeouts bool
- useCompression bool
+ conn net.Conn
+ c *rpc.Client
}
-func Dial(addr string, timeScale time.Duration) (net.Conn, error) {
- if timeScale <= 0 {
- return nil, fmt.Errorf("bad rpc time scale %v", timeScale)
- }
+func NewRPCClient(addr string) (*RPCClient, error) {
var conn net.Conn
var err error
if addr == "stdin" {
// This is used by vm/gvisor which passes us a unix socket connection in stdin.
- return net.FileConn(os.Stdin)
- }
- if conn, err = net.DialTimeout("tcp", addr, time.Minute*timeScale); err != nil {
- return nil, err
+ // TODO: remove this once we switch to flatrpc for target communication.
+ conn, err = net.FileConn(os.Stdin)
+ } else {
+ conn, err = net.DialTimeout("tcp", addr, 3*time.Minute)
}
- setupKeepAlive(conn, time.Minute*timeScale)
- return conn, nil
-}
-
-func NewRPCClient(addr string, timeScale time.Duration, useTimeouts, useCompression bool) (*RPCClient, error) {
- conn, err := Dial(addr, timeScale)
if err != nil {
return nil, err
}
+ setupKeepAlive(conn, time.Minute)
cli := &RPCClient{
- conn: conn,
- c: rpc.NewClient(maybeFlateConn(conn, useCompression)),
- timeScale: timeScale,
- useTimeouts: useTimeouts,
- useCompression: useCompression,
+ conn: conn,
+ c: rpc.NewClient(newFlateConn(conn)),
}
return cli, nil
}
func (cli *RPCClient) Call(method string, args, reply interface{}) error {
- if cli.useTimeouts {
- // Note: SetDeadline is not implemented on fuchsia, so don't fail on error.
- cli.conn.SetDeadline(time.Now().Add(3 * time.Minute * cli.timeScale))
- defer cli.conn.SetDeadline(time.Time{})
- }
+ // Note: SetDeadline is not implemented on fuchsia, so don't fail on error.
+ cli.conn.SetDeadline(time.Now().Add(10 * time.Minute))
+ defer cli.conn.SetDeadline(time.Time{})
return cli.c.Call(method, args, reply)
}
@@ -130,10 +105,7 @@ type flateConn struct {
c io.Closer
}
-func maybeFlateConn(conn io.ReadWriteCloser, useCompression bool) io.ReadWriteCloser {
- if !useCompression {
- return conn
- }
+func newFlateConn(conn io.ReadWriteCloser) io.ReadWriteCloser {
w, err := flate.NewWriter(conn, 9)
if err != nil {
panic(err)
@@ -173,29 +145,3 @@ func (fc *flateConn) Close() error {
}
return err0
}
-
-// countedConn wraps net.Conn to record the transferred bytes.
-type countedConn struct {
- io.ReadWriteCloser
- server *RPCServer
-}
-
-func newCountedConn(server *RPCServer,
- conn io.ReadWriteCloser) io.ReadWriteCloser {
- return &countedConn{
- ReadWriteCloser: conn,
- server: server,
- }
-}
-
-func (cc countedConn) Read(p []byte) (n int, err error) {
- n, err = cc.ReadWriteCloser.Read(p)
- cc.server.statRecv.Add(n)
- return
-}
-
-func (cc countedConn) Write(b []byte) (n int, err error) {
- n, err = cc.ReadWriteCloser.Write(b)
- cc.server.statSent.Add(n)
- return
-}
diff --git a/syz-fuzzer/fuzzer.go b/syz-fuzzer/fuzzer.go
index c350f2c33..bdccf9d82 100644
--- a/syz-fuzzer/fuzzer.go
+++ b/syz-fuzzer/fuzzer.go
@@ -72,14 +72,13 @@ func main() {
debug.SetGCPercent(50)
var (
- flagName = flag.String("name", "test", "unique name for manager")
- flagOS = flag.String("os", runtime.GOOS, "target OS")
- flagArch = flag.String("arch", runtime.GOARCH, "target arch")
- flagManager = flag.String("manager", "", "manager rpc address")
- flagProcs = flag.Int("procs", 1, "number of parallel test processes")
- flagTest = flag.Bool("test", false, "enable image testing mode") // used by syz-ci
- flagPprofPort = flag.Int("pprof_port", 0, "HTTP port for the pprof endpoint (disabled if 0)")
- flagNetCompression = flag.Bool("net_compression", false, "use network compression for RPC calls")
+ flagName = flag.String("name", "test", "unique name for manager")
+ flagOS = flag.String("os", runtime.GOOS, "target OS")
+ flagArch = flag.String("arch", runtime.GOARCH, "target arch")
+ flagManager = flag.String("manager", "", "manager rpc address")
+ flagProcs = flag.Int("procs", 1, "number of parallel test processes")
+ flagTest = flag.Bool("test", false, "enable image testing mode") // used by syz-ci
+ flagPprofPort = flag.Int("pprof_port", 0, "HTTP port for the pprof endpoint (disabled if 0)")
)
defer tool.Init()()
log.Logf(0, "fuzzer started")
@@ -123,7 +122,7 @@ func main() {
}
log.Logf(0, "dialing manager at %v", *flagManager)
- manager, err := rpctype.NewRPCClient(*flagManager, timeouts.Scale, false, *flagNetCompression)
+ manager, err := rpctype.NewRPCClient(*flagManager)
if err != nil {
log.SyzFatalf("failed to create an RPC client: %v ", err)
}
diff --git a/syz-fuzzer/testing.go b/syz-fuzzer/testing.go
index a3f1b88bf..bd2eb33e1 100644
--- a/syz-fuzzer/testing.go
+++ b/syz-fuzzer/testing.go
@@ -6,6 +6,7 @@ package main
import (
"fmt"
"io"
+ "net"
"strings"
"time"
@@ -30,7 +31,8 @@ type checkArgs struct {
func testImage(hostAddr string, args *checkArgs) {
log.Logf(0, "connecting to host at %v", hostAddr)
- conn, err := rpctype.Dial(hostAddr, args.ipcConfig.Timeouts.Scale)
+ timeout := time.Minute * args.ipcConfig.Timeouts.Scale
+ conn, err := net.DialTimeout("tcp", hostAddr, timeout)
if err != nil {
log.SyzFatalf("failed to connect to host: %v", err)
}
diff --git a/syz-hub/hub.go b/syz-hub/hub.go
index f7e7e3e48..187d4a1eb 100644
--- a/syz-hub/hub.go
+++ b/syz-hub/hub.go
@@ -61,7 +61,7 @@ func main() {
hub.initHTTP(cfg.HTTP)
- s, err := rpctype.NewRPCServer(cfg.RPC, "Hub", hub, true)
+ s, err := rpctype.NewRPCServer(cfg.RPC, "Hub", hub)
if err != nil {
log.Fatalf("failed to create rpc server: %v", err)
}
diff --git a/syz-manager/hub.go b/syz-manager/hub.go
index 8de9f1f4d..dd0cbbcdb 100644
--- a/syz-manager/hub.go
+++ b/syz-manager/hub.go
@@ -129,7 +129,7 @@ func (hc *HubConnector) connect(corpus [][]byte) (*rpctype.RPCClient, error) {
if err != nil {
return nil, err
}
- hub, err := rpctype.NewRPCClient(hc.cfg.HubAddr, 1, true, true)
+ hub, err := rpctype.NewRPCClient(hc.cfg.HubAddr)
if err != nil {
return nil, err
}
@@ -162,7 +162,7 @@ func (hc *HubConnector) connect(corpus [][]byte) (*rpctype.RPCClient, error) {
if err != nil {
return nil, err
}
- hub, err = rpctype.NewRPCClient(hc.cfg.HubAddr, 1, true, true)
+ hub, err = rpctype.NewRPCClient(hc.cfg.HubAddr)
if err != nil {
return nil, err
}
diff --git a/syz-manager/manager.go b/syz-manager/manager.go
index 469ac38c4..f5437f853 100644
--- a/syz-manager/manager.go
+++ b/syz-manager/manager.go
@@ -65,7 +65,6 @@ type Manager struct {
vmStop chan bool
checkFeatures *host.Features
fresh bool
- netCompression bool
expertMode bool
nextInstanceID atomic.Uint64
@@ -177,7 +176,6 @@ func RunManager(cfg *mgrconfig.Config) {
memoryLeakFrames: make(map[string]bool),
dataRaceFrames: make(map[string]bool),
fresh: true,
- netCompression: vm.UseNetCompression(cfg.Type),
vmStop: make(chan bool),
externalReproQueue: make(chan *Crash, 10),
needMoreRepros: make(chan chan bool),
@@ -811,10 +809,9 @@ func (mgr *Manager) runInstanceInner(index int, instanceName string, injectLog <
Debug: *flagDebug,
Test: false,
Optional: &instance.OptionalFuzzerArgs{
- Slowdown: mgr.cfg.Timeouts.Slowdown,
- SandboxArg: mgr.cfg.SandboxArg,
- PprofPort: inst.PprofPort(),
- NetCompression: mgr.netCompression,
+ Slowdown: mgr.cfg.Timeouts.Slowdown,
+ SandboxArg: mgr.cfg.SandboxArg,
+ PprofPort: inst.PprofPort(),
},
}
cmd := instance.FuzzerCmd(args)
diff --git a/syz-manager/rpc.go b/syz-manager/rpc.go
index 062d19e19..3d452fdb0 100644
--- a/syz-manager/rpc.go
+++ b/syz-manager/rpc.go
@@ -128,7 +128,7 @@ func startRPCServer(mgr *Manager) (*RPCServer, error) {
"End-to-end fuzzer RPC Exchange call latency (us)", stats.Distribution{}),
statCoverFiltered: stats.Create("filtered coverage", "", stats.NoGraph),
}
- s, err := rpctype.NewRPCServer(mgr.cfg.RPC, "Manager", serv, mgr.netCompression)
+ s, err := rpctype.NewRPCServer(mgr.cfg.RPC, "Manager", serv)
if err != nil {
return nil, err
}
diff --git a/syz-runner/runner.go b/syz-runner/runner.go
index 3b3703944..329b340a3 100644
--- a/syz-runner/runner.go
+++ b/syz-runner/runner.go
@@ -45,8 +45,7 @@ func main() {
log.Fatalf("failed to create default ipc config: %v", err)
}
- timeouts := config.Timeouts
- vrf, err := rpctype.NewRPCClient(*flagAddr, timeouts.Scale, true, true)
+ vrf, err := rpctype.NewRPCClient(*flagAddr)
if err != nil {
log.Fatalf("failed to connect to verifier : %v", err)
}
diff --git a/syz-verifier/rpcserver.go b/syz-verifier/rpcserver.go
index 8c0e6b705..f80fb7a46 100644
--- a/syz-verifier/rpcserver.go
+++ b/syz-verifier/rpcserver.go
@@ -33,7 +33,7 @@ func startRPCServer(vrf *Verifier) (*RPCServer, error) {
notChecked: len(vrf.pools),
}
- s, err := rpctype.NewRPCServer(vrf.addr, "Verifier", srv, true)
+ s, err := rpctype.NewRPCServer(vrf.addr, "Verifier", srv)
if err != nil {
return nil, err
}
diff --git a/tools/syz-hubtool/hubtool.go b/tools/syz-hubtool/hubtool.go
index a3317801e..dc59ef230 100644
--- a/tools/syz-hubtool/hubtool.go
+++ b/tools/syz-hubtool/hubtool.go
@@ -57,7 +57,7 @@ func main() {
return
}
log.Printf("connecting to hub at %v...", *flagHubAddress)
- conn, err := rpctype.NewRPCClient(*flagHubAddress, 1, true, true)
+ conn, err := rpctype.NewRPCClient(*flagHubAddress)
if err != nil {
log.Fatalf("failed to connect to hub: %v", err)
}
diff --git a/tools/syz-runtest/runtest.go b/tools/syz-runtest/runtest.go
index 1de8f6a54..586e4c2cc 100644
--- a/tools/syz-runtest/runtest.go
+++ b/tools/syz-runtest/runtest.go
@@ -69,7 +69,7 @@ func main() {
}
mgr.checkFiles, mgr.checkProgs = mgr.checker.StartCheck()
mgr.needCheckResults = len(mgr.checkProgs)
- s, err := rpctype.NewRPCServer(cfg.RPC, "Manager", mgr, false)
+ s, err := rpctype.NewRPCServer(cfg.RPC, "Manager", mgr)
if err != nil {
log.Fatalf("failed to create rpc server: %v", err)
}
diff --git a/vm/adb/adb.go b/vm/adb/adb.go
index de1bcd2dd..bd986346c 100644
--- a/vm/adb/adb.go
+++ b/vm/adb/adb.go
@@ -26,7 +26,7 @@ import (
)
func init() {
- vmimpl.Register("adb", ctor, false, true)
+ vmimpl.Register("adb", ctor, false)
}
type Device struct {
diff --git a/vm/bhyve/bhyve.go b/vm/bhyve/bhyve.go
index 3e82cbc2d..c3b53e053 100644
--- a/vm/bhyve/bhyve.go
+++ b/vm/bhyve/bhyve.go
@@ -21,7 +21,7 @@ import (
)
func init() {
- vmimpl.Register("bhyve", ctor, true, false)
+ vmimpl.Register("bhyve", ctor, true)
}
type Config struct {
diff --git a/vm/cuttlefish/cuttlefish.go b/vm/cuttlefish/cuttlefish.go
index d17bd78f2..94610ed4e 100644
--- a/vm/cuttlefish/cuttlefish.go
+++ b/vm/cuttlefish/cuttlefish.go
@@ -28,7 +28,7 @@ const (
)
func init() {
- vmimpl.Register("cuttlefish", ctor, true, true)
+ vmimpl.Register("cuttlefish", ctor, true)
}
type Pool struct {
diff --git a/vm/gce/gce.go b/vm/gce/gce.go
index 6a292d4f6..16f3f996f 100644
--- a/vm/gce/gce.go
+++ b/vm/gce/gce.go
@@ -35,7 +35,7 @@ import (
)
func init() {
- vmimpl.Register("gce", ctor, true, true)
+ vmimpl.Register("gce", ctor, true)
}
type Config struct {
diff --git a/vm/gvisor/gvisor.go b/vm/gvisor/gvisor.go
index f8194ab56..45c076b23 100644
--- a/vm/gvisor/gvisor.go
+++ b/vm/gvisor/gvisor.go
@@ -25,7 +25,7 @@ import (
)
func init() {
- vmimpl.Register("gvisor", ctor, true, false)
+ vmimpl.Register("gvisor", ctor, true)
}
type Config struct {
diff --git a/vm/isolated/isolated.go b/vm/isolated/isolated.go
index 3383e91fa..51a995b72 100755
--- a/vm/isolated/isolated.go
+++ b/vm/isolated/isolated.go
@@ -23,7 +23,7 @@ import (
const pstoreConsoleFile = "/sys/fs/pstore/console-ramoops-0"
func init() {
- vmimpl.Register("isolated", ctor, false, true)
+ vmimpl.Register("isolated", ctor, false)
}
type Config struct {
diff --git a/vm/proxyapp/init.go b/vm/proxyapp/init.go
index 9c96e4855..467187f46 100644
--- a/vm/proxyapp/init.go
+++ b/vm/proxyapp/init.go
@@ -32,7 +32,7 @@ func init() {
func(env *vmimpl.Env) (vmimpl.Pool, error) {
return ctor(makeDefaultParams(), env)
},
- false, true)
+ false)
}
// Package configuration VARs are mostly needed for tests.
diff --git a/vm/qemu/qemu.go b/vm/qemu/qemu.go
index 2dd016a2b..6f5cb4f56 100644
--- a/vm/qemu/qemu.go
+++ b/vm/qemu/qemu.go
@@ -27,7 +27,7 @@ import (
func init() {
var _ vmimpl.Infoer = (*instance)(nil)
- vmimpl.Register("qemu", ctor, true, false)
+ vmimpl.Register("qemu", ctor, true)
}
type Config struct {
diff --git a/vm/starnix/starnix.go b/vm/starnix/starnix.go
index 2f5a087d3..51f2f670f 100644
--- a/vm/starnix/starnix.go
+++ b/vm/starnix/starnix.go
@@ -22,7 +22,7 @@ import (
func init() {
var _ vmimpl.Infoer = (*instance)(nil)
- vmimpl.Register("starnix", ctor, true, false)
+ vmimpl.Register("starnix", ctor, true)
}
type Config struct {
diff --git a/vm/vm.go b/vm/vm.go
index 351163f2e..4887ef65e 100644
--- a/vm/vm.go
+++ b/vm/vm.go
@@ -92,12 +92,6 @@ func AllowsOvercommit(typ string) bool {
return vmimpl.Types[vmType(typ)].Overcommit
}
-// UseNetCompression says if it's beneficial to use network compression for this VM type.
-// Local VMs (qemu) generally don't benefit from compression, while remote machines may benefit.
-func UseNetCompression(typ string) bool {
- return vmimpl.Types[vmType(typ)].NetCompression
-}
-
// Create creates a VM pool that can be used to create individual VMs.
func Create(cfg *mgrconfig.Config, debug bool) (*Pool, error) {
typ, ok := vmimpl.Types[vmType(cfg.Type)]
diff --git a/vm/vm_test.go b/vm/vm_test.go
index 6acc00351..184f6137f 100644
--- a/vm/vm_test.go
+++ b/vm/vm_test.go
@@ -80,7 +80,7 @@ func init() {
ctor := func(env *vmimpl.Env) (vmimpl.Pool, error) {
return &testPool{}, nil
}
- vmimpl.Register("test", ctor, false, false)
+ vmimpl.Register("test", ctor, false)
}
type Test struct {
diff --git a/vm/vmimpl/vmimpl.go b/vm/vmimpl/vmimpl.go
index bd565c8aa..a9afdc1f1 100644
--- a/vm/vmimpl/vmimpl.go
+++ b/vm/vmimpl/vmimpl.go
@@ -129,18 +129,16 @@ func (err InfraError) InfraError() (string, []byte) {
}
// Register registers a new VM type within the package.
-func Register(typ string, ctor ctorFunc, allowsOvercommit, netCompression bool) {
+func Register(typ string, ctor ctorFunc, allowsOvercommit bool) {
Types[typ] = Type{
- Ctor: ctor,
- Overcommit: allowsOvercommit,
- NetCompression: netCompression,
+ Ctor: ctor,
+ Overcommit: allowsOvercommit,
}
}
type Type struct {
- Ctor ctorFunc
- Overcommit bool
- NetCompression bool
+ Ctor ctorFunc
+ Overcommit bool
}
type ctorFunc func(env *Env) (Pool, error)
diff --git a/vm/vmm/vmm.go b/vm/vmm/vmm.go
index 67f12f4c7..71dfd39b4 100644
--- a/vm/vmm/vmm.go
+++ b/vm/vmm/vmm.go
@@ -25,7 +25,7 @@ import (
var vmctlStatusRegex = regexp.MustCompile(`^\s+([0-9]+)\b.*\brunning`)
func init() {
- vmimpl.Register("vmm", ctor, true, false)
+ vmimpl.Register("vmm", ctor, true)
}
type Config struct {
diff --git a/vm/vmware/vmware.go b/vm/vmware/vmware.go
index 6bccbaa6e..4c05bac12 100644
--- a/vm/vmware/vmware.go
+++ b/vm/vmware/vmware.go
@@ -22,7 +22,7 @@ import (
)
func init() {
- vmimpl.Register("vmware", ctor, false, false)
+ vmimpl.Register("vmware", ctor, false)
}
type Config struct {