diff options
Diffstat (limited to 'src/vhost_user/slave_fs_cache.rs')
-rw-r--r-- | src/vhost_user/slave_fs_cache.rs | 210 |
1 files changed, 171 insertions, 39 deletions
diff --git a/src/vhost_user/slave_fs_cache.rs b/src/vhost_user/slave_fs_cache.rs index 1804c7a..a9c4ed2 100644 --- a/src/vhost_user/slave_fs_cache.rs +++ b/src/vhost_user/slave_fs_cache.rs @@ -1,61 +1,59 @@ -// Copyright (C) 2020 Alibaba Cloud Computing. All rights reserved. +// Copyright (C) 2020 Alibaba Cloud. All rights reserved. // SPDX-License-Identifier: Apache-2.0 -use super::connection::Endpoint; -use super::message::*; -use super::{Error, HandlerResult, Result, VhostUserMasterReqHandler}; use std::io; use std::mem; use std::os::unix::io::RawFd; use std::os::unix::net::UnixStream; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, MutexGuard}; + +use super::connection::Endpoint; +use super::message::*; +use super::{Error, HandlerResult, Result, VhostUserMasterReqHandler}; struct SlaveFsCacheReqInternal { sock: Endpoint<SlaveReq>, -} -/// A vhost-user slave endpoint which sends fs cache requests to the master -#[derive(Clone)] -pub struct SlaveFsCacheReq { - // underlying Unix domain socket for communication - node: Arc<Mutex<SlaveFsCacheReqInternal>>, + // Protocol feature VHOST_USER_PROTOCOL_F_REPLY_ACK has been negotiated. + reply_ack_negotiated: bool, // whether the endpoint has encountered any failure error: Option<i32>, } -impl SlaveFsCacheReq { - fn new(ep: Endpoint<SlaveReq>) -> Self { - SlaveFsCacheReq { - node: Arc::new(Mutex::new(SlaveFsCacheReqInternal { sock: ep })), - error: None, +impl SlaveFsCacheReqInternal { + fn check_state(&self) -> Result<u64> { + match self.error { + Some(e) => Err(Error::SocketBroken(std::io::Error::from_raw_os_error(e))), + None => Ok(0), } } - /// Create a new instance. - pub fn from_stream(sock: UnixStream) -> Self { - Self::new(Endpoint::<SlaveReq>::from_stream(sock)) - } - fn send_message( &mut self, - flags: SlaveReq, + request: SlaveReq, fs: &VhostUserFSSlaveMsg, fds: Option<&[RawFd]>, ) -> Result<u64> { self.check_state()?; let len = mem::size_of::<VhostUserFSSlaveMsg>(); - let mut hdr = VhostUserMsgHeader::new(flags, 0, len as u32); - hdr.set_need_reply(true); - self.node.lock().unwrap().sock.send_message(&hdr, fs, fds)?; + let mut hdr = VhostUserMsgHeader::new(request, 0, len as u32); + if self.reply_ack_negotiated { + hdr.set_need_reply(true); + } + self.sock.send_message(&hdr, fs, fds)?; self.wait_for_ack(&hdr) } fn wait_for_ack(&mut self, hdr: &VhostUserMsgHeader<SlaveReq>) -> Result<u64> { self.check_state()?; - let (reply, body, rfds) = self.node.lock().unwrap().sock.recv_body::<VhostUserU64>()?; + if !self.reply_ack_negotiated { + return Ok(0); + } + + let (reply, body, rfds) = self.sock.recv_body::<VhostUserU64>()?; if !reply.is_reply_for(&hdr) || rfds.is_some() || !body.is_valid() { Endpoint::<SlaveReq>::close_rfds(rfds); return Err(Error::InvalidMessage); @@ -63,32 +61,166 @@ impl SlaveFsCacheReq { if body.value != 0 { return Err(Error::MasterInternalError); } - Ok(0) + + Ok(body.value) } +} - fn check_state(&self) -> Result<u64> { - match self.error { - Some(e) => Err(Error::SocketBroken(std::io::Error::from_raw_os_error(e))), - None => Ok(0), +/// Request proxy to send vhost-user-fs slave requests to the master through the slave +/// communication channel. +/// +/// The [SlaveFsCacheReq] acts as a message proxy to forward vhost-user-fs slave requests to the +/// master through the vhost-user slave communication channel. The forwarded messages will be +/// handled by the [MasterReqHandler] server. +/// +/// [SlaveFsCacheReq]: struct.SlaveFsCacheReq.html +/// [MasterReqHandler]: struct.MasterReqHandler.html +#[derive(Clone)] +pub struct SlaveFsCacheReq { + // underlying Unix domain socket for communication + node: Arc<Mutex<SlaveFsCacheReqInternal>>, +} + +impl SlaveFsCacheReq { + fn new(ep: Endpoint<SlaveReq>) -> Self { + SlaveFsCacheReq { + node: Arc::new(Mutex::new(SlaveFsCacheReqInternal { + sock: ep, + reply_ack_negotiated: false, + error: None, + })), } } + fn node(&self) -> MutexGuard<SlaveFsCacheReqInternal> { + self.node.lock().unwrap() + } + + fn send_message( + &self, + request: SlaveReq, + fs: &VhostUserFSSlaveMsg, + fds: Option<&[RawFd]>, + ) -> io::Result<u64> { + self.node() + .send_message(request, fs, fds) + .map_err(|e| io::Error::new(io::ErrorKind::Other, format!("{}", e))) + } + + /// Create a new instance from a `UnixStream` object. + pub fn from_stream(sock: UnixStream) -> Self { + Self::new(Endpoint::<SlaveReq>::from_stream(sock)) + } + + /// Set the negotiation state of the `VHOST_USER_PROTOCOL_F_REPLY_ACK` protocol feature. + /// + /// When the `VHOST_USER_PROTOCOL_F_REPLY_ACK` protocol feature has been negotiated, + /// the "REPLY_ACK" flag will be set in the message header for every slave to master request + /// message. + pub fn set_reply_ack_flag(&self, enable: bool) { + self.node().reply_ack_negotiated = enable; + } + /// Mark endpoint as failed with specified error code. - pub fn set_failed(&mut self, error: i32) { - self.error = Some(error); + pub fn set_failed(&self, error: i32) { + self.node().error = Some(error); } } impl VhostUserMasterReqHandler for SlaveFsCacheReq { - /// Handle virtio-fs map file requests from the slave. - fn fs_slave_map(&mut self, fs: &VhostUserFSSlaveMsg, fd: RawFd) -> HandlerResult<u64> { + /// Forward vhost-user-fs map file requests to the slave. + fn fs_slave_map(&self, fs: &VhostUserFSSlaveMsg, fd: RawFd) -> HandlerResult<u64> { self.send_message(SlaveReq::FS_MAP, fs, Some(&[fd])) - .or_else(|e| Err(io::Error::new(io::ErrorKind::Other, format!("{}", e)))) } - /// Handle virtio-fs unmap file requests from the slave. - fn fs_slave_unmap(&mut self, fs: &VhostUserFSSlaveMsg) -> HandlerResult<u64> { + /// Forward vhost-user-fs unmap file requests to the master. + fn fs_slave_unmap(&self, fs: &VhostUserFSSlaveMsg) -> HandlerResult<u64> { self.send_message(SlaveReq::FS_UNMAP, fs, None) - .or_else(|e| Err(io::Error::new(io::ErrorKind::Other, format!("{}", e)))) + } +} + +#[cfg(test)] +mod tests { + use std::os::unix::io::AsRawFd; + + use super::*; + + #[test] + fn test_slave_fs_cache_req_set_failed() { + let (p1, _p2) = UnixStream::pair().unwrap(); + let fs_cache = SlaveFsCacheReq::from_stream(p1); + + assert!(fs_cache.node().error.is_none()); + fs_cache.set_failed(libc::EAGAIN); + assert_eq!(fs_cache.node().error, Some(libc::EAGAIN)); + } + + #[test] + fn test_slave_fs_cache_send_failure() { + let (p1, p2) = UnixStream::pair().unwrap(); + let fd = p2.as_raw_fd(); + let fs_cache = SlaveFsCacheReq::from_stream(p1); + + fs_cache.set_failed(libc::ECONNRESET); + fs_cache + .fs_slave_map(&VhostUserFSSlaveMsg::default(), fd) + .unwrap_err(); + fs_cache + .fs_slave_unmap(&VhostUserFSSlaveMsg::default()) + .unwrap_err(); + fs_cache.node().error = None; + + drop(p2); + fs_cache + .fs_slave_map(&VhostUserFSSlaveMsg::default(), fd) + .unwrap_err(); + fs_cache + .fs_slave_unmap(&VhostUserFSSlaveMsg::default()) + .unwrap_err(); + } + + #[test] + fn test_slave_fs_cache_recv_negative() { + let (p1, p2) = UnixStream::pair().unwrap(); + let fd = p2.as_raw_fd(); + let fs_cache = SlaveFsCacheReq::from_stream(p1); + let mut master = Endpoint::<SlaveReq>::from_stream(p2); + + let len = mem::size_of::<VhostUserFSSlaveMsg>(); + let mut hdr = VhostUserMsgHeader::new( + SlaveReq::FS_MAP, + VhostUserHeaderFlag::REPLY.bits(), + len as u32, + ); + let body = VhostUserU64::new(0); + + master.send_message(&hdr, &body, Some(&[fd])).unwrap(); + fs_cache + .fs_slave_map(&VhostUserFSSlaveMsg::default(), fd) + .unwrap(); + + fs_cache.set_reply_ack_flag(true); + fs_cache + .fs_slave_map(&VhostUserFSSlaveMsg::default(), fd) + .unwrap_err(); + + hdr.set_code(SlaveReq::FS_UNMAP); + master.send_message(&hdr, &body, None).unwrap(); + fs_cache + .fs_slave_map(&VhostUserFSSlaveMsg::default(), fd) + .unwrap_err(); + hdr.set_code(SlaveReq::FS_MAP); + + let body = VhostUserU64::new(1); + master.send_message(&hdr, &body, None).unwrap(); + fs_cache + .fs_slave_map(&VhostUserFSSlaveMsg::default(), fd) + .unwrap_err(); + + let body = VhostUserU64::new(0); + master.send_message(&hdr, &body, None).unwrap(); + fs_cache + .fs_slave_map(&VhostUserFSSlaveMsg::default(), fd) + .unwrap(); } } |