mio/net/tcp/
stream.rs

1use std::fmt;
2use std::io::{self, IoSlice, IoSliceMut, Read, Write};
3use std::net::{self, Shutdown, SocketAddr};
4#[cfg(unix)]
5use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
6#[cfg(target_os = "wasi")]
7use std::os::wasi::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
8#[cfg(windows)]
9use std::os::windows::io::{AsRawSocket, FromRawSocket, IntoRawSocket, RawSocket};
10
11use crate::io_source::IoSource;
12#[cfg(not(target_os = "wasi"))]
13use crate::sys::tcp::{connect, new_for_addr};
14use crate::{event, Interest, Registry, Token};
15
16/// A non-blocking TCP stream between a local socket and a remote socket.
17///
18/// The socket will be closed when the value is dropped.
19///
20/// # Examples
21///
22#[cfg_attr(feature = "os-poll", doc = "```")]
23#[cfg_attr(not(feature = "os-poll"), doc = "```ignore")]
24/// # use std::net::{TcpListener, SocketAddr};
25/// # use std::error::Error;
26/// #
27/// # fn main() -> Result<(), Box<dyn Error>> {
28/// let address: SocketAddr = "127.0.0.1:0".parse()?;
29/// let listener = TcpListener::bind(address)?;
30/// use mio::{Events, Interest, Poll, Token};
31/// use mio::net::TcpStream;
32/// use std::time::Duration;
33///
34/// let mut stream = TcpStream::connect(listener.local_addr()?)?;
35///
36/// let mut poll = Poll::new()?;
37/// let mut events = Events::with_capacity(128);
38///
39/// // Register the socket with `Poll`
40/// poll.registry().register(&mut stream, Token(0), Interest::WRITABLE)?;
41///
42/// poll.poll(&mut events, Some(Duration::from_millis(100)))?;
43///
44/// // The socket might be ready at this point
45/// #     Ok(())
46/// # }
47/// ```
48pub struct TcpStream {
49    inner: IoSource<net::TcpStream>,
50}
51
52impl TcpStream {
53    /// Create a new TCP stream and issue a non-blocking connect to the
54    /// specified address.
55    ///
56    /// # Notes
57    ///
58    /// The returned `TcpStream` may not be connected (and thus usable), unlike
59    /// the API found in `std::net::TcpStream`. Because Mio issues a
60    /// *non-blocking* connect it will not block the thread and instead return
61    /// an unconnected `TcpStream`.
62    ///
63    /// Ensuring the returned stream is connected is surprisingly complex when
64    /// considering cross-platform support. Doing this properly should follow
65    /// the steps below, an example implementation can be found
66    /// [here](https://github.com/Thomasdezeeuw/heph/blob/0c4f1ab3eaf08bea1d65776528bfd6114c9f8374/src/net/tcp/stream.rs#L560-L622).
67    ///
68    ///  1. Call `TcpStream::connect`
69    ///  2. Register the returned stream with at least [write interest].
70    ///  3. Wait for a (writable) event.
71    ///  4. Check `TcpStream::peer_addr`. If it returns `libc::EINPROGRESS` or
72    ///     `ErrorKind::NotConnected` it means the stream is not yet connected,
73    ///     go back to step 3. If it returns an address it means the stream is
74    ///     connected, go to step 5. If another error is returned something
75    ///     went wrong.
76    ///  5. Now the stream can be used.
77    ///
78    /// This may return a `WouldBlock` in which case the socket connection
79    /// cannot be completed immediately, it usually means there are insufficient
80    /// entries in the routing cache.
81    ///
82    /// [write interest]: Interest::WRITABLE
83    #[cfg(not(target_os = "wasi"))]
84    pub fn connect(addr: SocketAddr) -> io::Result<TcpStream> {
85        let socket = new_for_addr(addr)?;
86        #[cfg(unix)]
87        let stream = unsafe { TcpStream::from_raw_fd(socket) };
88        #[cfg(windows)]
89        let stream = unsafe { TcpStream::from_raw_socket(socket as _) };
90        connect(&stream.inner, addr)?;
91        Ok(stream)
92    }
93
94    /// Creates a new `TcpStream` from a standard `net::TcpStream`.
95    ///
96    /// This function is intended to be used to wrap a TCP stream from the
97    /// standard library in the Mio equivalent. The conversion assumes nothing
98    /// about the underlying stream; it is left up to the user to set it in
99    /// non-blocking mode.
100    ///
101    /// # Note
102    ///
103    /// The TCP stream here will not have `connect` called on it, so it
104    /// should already be connected via some other means (be it manually, or
105    /// the standard library).
106    pub fn from_std(stream: net::TcpStream) -> TcpStream {
107        TcpStream {
108            inner: IoSource::new(stream),
109        }
110    }
111
112    /// Returns the socket address of the remote peer of this TCP connection.
113    pub fn peer_addr(&self) -> io::Result<SocketAddr> {
114        self.inner.peer_addr()
115    }
116
117    /// Returns the socket address of the local half of this TCP connection.
118    pub fn local_addr(&self) -> io::Result<SocketAddr> {
119        self.inner.local_addr()
120    }
121
122    /// Shuts down the read, write, or both halves of this connection.
123    ///
124    /// This function will cause all pending and future I/O on the specified
125    /// portions to return immediately with an appropriate value (see the
126    /// documentation of `Shutdown`).
127    pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
128        self.inner.shutdown(how)
129    }
130
131    /// Sets the value of the `TCP_NODELAY` option on this socket.
132    ///
133    /// If set, this option disables the Nagle algorithm. This means that
134    /// segments are always sent as soon as possible, even if there is only a
135    /// small amount of data. When not set, data is buffered until there is a
136    /// sufficient amount to send out, thereby avoiding the frequent sending of
137    /// small packets.
138    ///
139    /// # Notes
140    ///
141    /// On Windows make sure the stream is connected before calling this method,
142    /// by receiving an (writable) event. Trying to set `nodelay` on an
143    /// unconnected `TcpStream` is unspecified behavior.
144    pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> {
145        self.inner.set_nodelay(nodelay)
146    }
147
148    /// Gets the value of the `TCP_NODELAY` option on this socket.
149    ///
150    /// For more information about this option, see [`set_nodelay`][link].
151    ///
152    /// [link]: #method.set_nodelay
153    ///
154    /// # Notes
155    ///
156    /// On Windows make sure the stream is connected before calling this method,
157    /// by receiving an (writable) event. Trying to get `nodelay` on an
158    /// unconnected `TcpStream` is unspecified behavior.
159    pub fn nodelay(&self) -> io::Result<bool> {
160        self.inner.nodelay()
161    }
162
163    /// Sets the value for the `IP_TTL` option on this socket.
164    ///
165    /// This value sets the time-to-live field that is used in every packet sent
166    /// from this socket.
167    ///
168    /// # Notes
169    ///
170    /// On Windows make sure the stream is connected before calling this method,
171    /// by receiving an (writable) event. Trying to set `ttl` on an
172    /// unconnected `TcpStream` is unspecified behavior.
173    pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
174        self.inner.set_ttl(ttl)
175    }
176
177    /// Gets the value of the `IP_TTL` option for this socket.
178    ///
179    /// For more information about this option, see [`set_ttl`][link].
180    ///
181    /// # Notes
182    ///
183    /// On Windows make sure the stream is connected before calling this method,
184    /// by receiving an (writable) event. Trying to get `ttl` on an
185    /// unconnected `TcpStream` is unspecified behavior.
186    ///
187    /// [link]: #method.set_ttl
188    pub fn ttl(&self) -> io::Result<u32> {
189        self.inner.ttl()
190    }
191
192    /// Get the value of the `SO_ERROR` option on this socket.
193    ///
194    /// This will retrieve the stored error in the underlying socket, clearing
195    /// the field in the process. This can be useful for checking errors between
196    /// calls.
197    pub fn take_error(&self) -> io::Result<Option<io::Error>> {
198        self.inner.take_error()
199    }
200
201    /// Receives data on the socket from the remote address to which it is
202    /// connected, without removing that data from the queue. On success,
203    /// returns the number of bytes peeked.
204    ///
205    /// Successive calls return the same data. This is accomplished by passing
206    /// `MSG_PEEK` as a flag to the underlying recv system call.
207    pub fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
208        self.inner.peek(buf)
209    }
210
211    /// Execute an I/O operation ensuring that the socket receives more events
212    /// if it hits a [`WouldBlock`] error.
213    ///
214    /// # Notes
215    ///
216    /// This method is required to be called for **all** I/O operations to
217    /// ensure the user will receive events once the socket is ready again after
218    /// returning a [`WouldBlock`] error.
219    ///
220    /// [`WouldBlock`]: io::ErrorKind::WouldBlock
221    ///
222    /// # Examples
223    ///
224    #[cfg_attr(unix, doc = "```no_run")]
225    #[cfg_attr(windows, doc = "```ignore")]
226    /// # use std::error::Error;
227    /// #
228    /// # fn main() -> Result<(), Box<dyn Error>> {
229    /// use std::io;
230    /// #[cfg(unix)]
231    /// use std::os::unix::io::AsRawFd;
232    /// #[cfg(windows)]
233    /// use std::os::windows::io::AsRawSocket;
234    /// use mio::net::TcpStream;
235    ///
236    /// let address = "127.0.0.1:8080".parse().unwrap();
237    /// let stream = TcpStream::connect(address)?;
238    ///
239    /// // Wait until the stream is readable...
240    ///
241    /// // Read from the stream using a direct libc call, of course the
242    /// // `io::Read` implementation would be easier to use.
243    /// let mut buf = [0; 512];
244    /// let n = stream.try_io(|| {
245    ///     let buf_ptr = &mut buf as *mut _ as *mut _;
246    ///     #[cfg(unix)]
247    ///     let res = unsafe { libc::recv(stream.as_raw_fd(), buf_ptr, buf.len(), 0) };
248    ///     #[cfg(windows)]
249    ///     let res = unsafe { libc::recvfrom(stream.as_raw_socket() as usize, buf_ptr, buf.len() as i32, 0, std::ptr::null_mut(), std::ptr::null_mut()) };
250    ///     if res != -1 {
251    ///         Ok(res as usize)
252    ///     } else {
253    ///         // If EAGAIN or EWOULDBLOCK is set by libc::recv, the closure
254    ///         // should return `WouldBlock` error.
255    ///         Err(io::Error::last_os_error())
256    ///     }
257    /// })?;
258    /// eprintln!("read {} bytes", n);
259    /// # Ok(())
260    /// # }
261    /// ```
262    pub fn try_io<F, T>(&self, f: F) -> io::Result<T>
263    where
264        F: FnOnce() -> io::Result<T>,
265    {
266        self.inner.do_io(|_| f())
267    }
268}
269
270impl Read for TcpStream {
271    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
272        self.inner.do_io(|mut inner| inner.read(buf))
273    }
274
275    fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
276        self.inner.do_io(|mut inner| inner.read_vectored(bufs))
277    }
278}
279
280impl<'a> Read for &'a TcpStream {
281    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
282        self.inner.do_io(|mut inner| inner.read(buf))
283    }
284
285    fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
286        self.inner.do_io(|mut inner| inner.read_vectored(bufs))
287    }
288}
289
290impl Write for TcpStream {
291    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
292        self.inner.do_io(|mut inner| inner.write(buf))
293    }
294
295    fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
296        self.inner.do_io(|mut inner| inner.write_vectored(bufs))
297    }
298
299    fn flush(&mut self) -> io::Result<()> {
300        self.inner.do_io(|mut inner| inner.flush())
301    }
302}
303
304impl<'a> Write for &'a TcpStream {
305    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
306        self.inner.do_io(|mut inner| inner.write(buf))
307    }
308
309    fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
310        self.inner.do_io(|mut inner| inner.write_vectored(bufs))
311    }
312
313    fn flush(&mut self) -> io::Result<()> {
314        self.inner.do_io(|mut inner| inner.flush())
315    }
316}
317
318impl event::Source for TcpStream {
319    fn register(
320        &mut self,
321        registry: &Registry,
322        token: Token,
323        interests: Interest,
324    ) -> io::Result<()> {
325        self.inner.register(registry, token, interests)
326    }
327
328    fn reregister(
329        &mut self,
330        registry: &Registry,
331        token: Token,
332        interests: Interest,
333    ) -> io::Result<()> {
334        self.inner.reregister(registry, token, interests)
335    }
336
337    fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
338        self.inner.deregister(registry)
339    }
340}
341
342impl fmt::Debug for TcpStream {
343    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
344        self.inner.fmt(f)
345    }
346}
347
348#[cfg(unix)]
349impl IntoRawFd for TcpStream {
350    fn into_raw_fd(self) -> RawFd {
351        self.inner.into_inner().into_raw_fd()
352    }
353}
354
355#[cfg(unix)]
356impl AsRawFd for TcpStream {
357    fn as_raw_fd(&self) -> RawFd {
358        self.inner.as_raw_fd()
359    }
360}
361
362#[cfg(unix)]
363impl FromRawFd for TcpStream {
364    /// Converts a `RawFd` to a `TcpStream`.
365    ///
366    /// # Notes
367    ///
368    /// The caller is responsible for ensuring that the socket is in
369    /// non-blocking mode.
370    unsafe fn from_raw_fd(fd: RawFd) -> TcpStream {
371        TcpStream::from_std(FromRawFd::from_raw_fd(fd))
372    }
373}
374
375#[cfg(windows)]
376impl IntoRawSocket for TcpStream {
377    fn into_raw_socket(self) -> RawSocket {
378        self.inner.into_inner().into_raw_socket()
379    }
380}
381
382#[cfg(windows)]
383impl AsRawSocket for TcpStream {
384    fn as_raw_socket(&self) -> RawSocket {
385        self.inner.as_raw_socket()
386    }
387}
388
389#[cfg(windows)]
390impl FromRawSocket for TcpStream {
391    /// Converts a `RawSocket` to a `TcpStream`.
392    ///
393    /// # Notes
394    ///
395    /// The caller is responsible for ensuring that the socket is in
396    /// non-blocking mode.
397    unsafe fn from_raw_socket(socket: RawSocket) -> TcpStream {
398        TcpStream::from_std(FromRawSocket::from_raw_socket(socket))
399    }
400}
401
402#[cfg(target_os = "wasi")]
403impl IntoRawFd for TcpStream {
404    fn into_raw_fd(self) -> RawFd {
405        self.inner.into_inner().into_raw_fd()
406    }
407}
408
409#[cfg(target_os = "wasi")]
410impl AsRawFd for TcpStream {
411    fn as_raw_fd(&self) -> RawFd {
412        self.inner.as_raw_fd()
413    }
414}
415
416#[cfg(target_os = "wasi")]
417impl FromRawFd for TcpStream {
418    /// Converts a `RawFd` to a `TcpStream`.
419    ///
420    /// # Notes
421    ///
422    /// The caller is responsible for ensuring that the socket is in
423    /// non-blocking mode.
424    unsafe fn from_raw_fd(fd: RawFd) -> TcpStream {
425        TcpStream::from_std(FromRawFd::from_raw_fd(fd))
426    }
427}