mio/sys/unix/
pipe.rs

1//! Unix pipe.
2//!
3//! See the [`new`] function for documentation.
4
5use std::io;
6use std::os::unix::io::RawFd;
7
8pub(crate) fn new_raw() -> io::Result<[RawFd; 2]> {
9    let mut fds: [RawFd; 2] = [-1, -1];
10
11    #[cfg(any(
12        target_os = "android",
13        target_os = "dragonfly",
14        target_os = "freebsd",
15        target_os = "linux",
16        target_os = "netbsd",
17        target_os = "openbsd",
18        target_os = "illumos",
19        target_os = "redox",
20        target_os = "solaris",
21        target_os = "vita",
22    ))]
23    unsafe {
24        if libc::pipe2(fds.as_mut_ptr(), libc::O_CLOEXEC | libc::O_NONBLOCK) != 0 {
25            return Err(io::Error::last_os_error());
26        }
27    }
28
29    #[cfg(any(
30        target_os = "aix",
31        target_os = "ios",
32        target_os = "macos",
33        target_os = "tvos",
34        target_os = "watchos",
35        target_os = "espidf",
36    ))]
37    unsafe {
38        // For platforms that don't have `pipe2(2)` we need to manually set the
39        // correct flags on the file descriptor.
40        if libc::pipe(fds.as_mut_ptr()) != 0 {
41            return Err(io::Error::last_os_error());
42        }
43
44        for fd in &fds {
45            if libc::fcntl(*fd, libc::F_SETFL, libc::O_NONBLOCK) != 0
46                || libc::fcntl(*fd, libc::F_SETFD, libc::FD_CLOEXEC) != 0
47            {
48                let err = io::Error::last_os_error();
49                // Don't leak file descriptors. Can't handle closing error though.
50                let _ = libc::close(fds[0]);
51                let _ = libc::close(fds[1]);
52                return Err(err);
53            }
54        }
55    }
56
57    #[cfg(not(any(
58        target_os = "aix",
59        target_os = "android",
60        target_os = "dragonfly",
61        target_os = "freebsd",
62        target_os = "illumos",
63        target_os = "ios",
64        target_os = "linux",
65        target_os = "macos",
66        target_os = "netbsd",
67        target_os = "openbsd",
68        target_os = "redox",
69        target_os = "tvos",
70        target_os = "watchos",
71        target_os = "espidf",
72        target_os = "solaris",
73        target_os = "vita",
74    )))]
75    compile_error!("unsupported target for `mio::unix::pipe`");
76
77    Ok(fds)
78}
79
80cfg_os_ext! {
81use std::fs::File;
82use std::io::{IoSlice, IoSliceMut, Read, Write};
83use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd};
84use std::process::{ChildStderr, ChildStdin, ChildStdout};
85
86use crate::io_source::IoSource;
87use crate::{event, Interest, Registry, Token};
88
89/// Create a new non-blocking Unix pipe.
90///
91/// This is a wrapper around Unix's [`pipe(2)`] system call and can be used as
92/// inter-process or thread communication channel.
93///
94/// This channel may be created before forking the process and then one end used
95/// in each process, e.g. the parent process has the sending end to send command
96/// to the child process.
97///
98/// [`pipe(2)`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/pipe.html
99///
100/// # Events
101///
102/// The [`Sender`] can be registered with [`WRITABLE`] interest to receive
103/// [writable events], the [`Receiver`] with [`READABLE`] interest. Once data is
104/// written to the `Sender` the `Receiver` will receive an [readable event].
105///
106/// In addition to those events, events will also be generated if the other side
107/// is dropped. To check if the `Sender` is dropped you'll need to check
108/// [`is_read_closed`] on events for the `Receiver`, if it returns true the
109/// `Sender` is dropped. On the `Sender` end check [`is_write_closed`], if it
110/// returns true the `Receiver` was dropped. Also see the second example below.
111///
112/// [`WRITABLE`]: Interest::WRITABLE
113/// [writable events]: event::Event::is_writable
114/// [`READABLE`]: Interest::READABLE
115/// [readable event]: event::Event::is_readable
116/// [`is_read_closed`]: event::Event::is_read_closed
117/// [`is_write_closed`]: event::Event::is_write_closed
118///
119/// # Deregistering
120///
121/// Both `Sender` and `Receiver` will deregister themselves when dropped,
122/// **iff** the file descriptors are not duplicated (via [`dup(2)`]).
123///
124/// [`dup(2)`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/dup.html
125///
126/// # Examples
127///
128/// Simple example that writes data into the sending end and read it from the
129/// receiving end.
130///
131/// ```
132/// use std::io::{self, Read, Write};
133///
134/// use mio::{Poll, Events, Interest, Token};
135/// use mio::unix::pipe;
136///
137/// // Unique tokens for the two ends of the channel.
138/// const PIPE_RECV: Token = Token(0);
139/// const PIPE_SEND: Token = Token(1);
140///
141/// # fn main() -> io::Result<()> {
142/// // Create our `Poll` instance and the `Events` container.
143/// let mut poll = Poll::new()?;
144/// let mut events = Events::with_capacity(8);
145///
146/// // Create a new pipe.
147/// let (mut sender, mut receiver) = pipe::new()?;
148///
149/// // Register both ends of the channel.
150/// poll.registry().register(&mut receiver, PIPE_RECV, Interest::READABLE)?;
151/// poll.registry().register(&mut sender, PIPE_SEND, Interest::WRITABLE)?;
152///
153/// const MSG: &[u8; 11] = b"Hello world";
154///
155/// loop {
156///     poll.poll(&mut events, None)?;
157///
158///     for event in events.iter() {
159///         match event.token() {
160///             PIPE_SEND => sender.write(MSG)
161///                 .and_then(|n| if n != MSG.len() {
162///                         // We'll consider a short write an error in this
163///                         // example. NOTE: we can't use `write_all` with
164///                         // non-blocking I/O.
165///                         Err(io::ErrorKind::WriteZero.into())
166///                     } else {
167///                         Ok(())
168///                     })?,
169///             PIPE_RECV => {
170///                 let mut buf = [0; 11];
171///                 let n = receiver.read(&mut buf)?;
172///                 println!("received: {:?}", &buf[0..n]);
173///                 assert_eq!(n, MSG.len());
174///                 assert_eq!(&buf, &*MSG);
175///                 return Ok(());
176///             },
177///             _ => unreachable!(),
178///         }
179///     }
180/// }
181/// # }
182/// ```
183///
184/// Example that receives an event once the `Sender` is dropped.
185///
186/// ```
187/// # use std::io;
188/// #
189/// # use mio::{Poll, Events, Interest, Token};
190/// # use mio::unix::pipe;
191/// #
192/// # const PIPE_RECV: Token = Token(0);
193/// # const PIPE_SEND: Token = Token(1);
194/// #
195/// # fn main() -> io::Result<()> {
196/// // Same setup as in the example above.
197/// let mut poll = Poll::new()?;
198/// let mut events = Events::with_capacity(8);
199///
200/// let (mut sender, mut receiver) = pipe::new()?;
201///
202/// poll.registry().register(&mut receiver, PIPE_RECV, Interest::READABLE)?;
203/// poll.registry().register(&mut sender, PIPE_SEND, Interest::WRITABLE)?;
204///
205/// // Drop the sender.
206/// drop(sender);
207///
208/// poll.poll(&mut events, None)?;
209///
210/// for event in events.iter() {
211///     match event.token() {
212///         PIPE_RECV if event.is_read_closed() => {
213///             // Detected that the sender was dropped.
214///             println!("Sender dropped!");
215///             return Ok(());
216///         },
217///         _ => unreachable!(),
218///     }
219/// }
220/// # unreachable!();
221/// # }
222/// ```
223pub fn new() -> io::Result<(Sender, Receiver)> {
224    let fds = new_raw()?;
225    // SAFETY: `new_raw` initialised the `fds` above.
226    let r = unsafe { Receiver::from_raw_fd(fds[0]) };
227    let w = unsafe { Sender::from_raw_fd(fds[1]) };
228    Ok((w, r))
229}
230
231/// Sending end of an Unix pipe.
232///
233/// See [`new`] for documentation, including examples.
234#[derive(Debug)]
235pub struct Sender {
236    inner: IoSource<File>,
237}
238
239impl Sender {
240    /// Set the `Sender` into or out of non-blocking mode.
241    pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
242        set_nonblocking(self.inner.as_raw_fd(), nonblocking)
243    }
244
245    /// Execute an I/O operation ensuring that the socket receives more events
246    /// if it hits a [`WouldBlock`] error.
247    ///
248    /// # Notes
249    ///
250    /// This method is required to be called for **all** I/O operations to
251    /// ensure the user will receive events once the socket is ready again after
252    /// returning a [`WouldBlock`] error.
253    ///
254    /// [`WouldBlock`]: io::ErrorKind::WouldBlock
255    ///
256    /// # Examples
257    ///
258    /// ```
259    /// # use std::error::Error;
260    /// #
261    /// # fn main() -> Result<(), Box<dyn Error>> {
262    /// use std::io;
263    /// use std::os::unix::io::AsRawFd;
264    /// use mio::unix::pipe;
265    ///
266    /// let (sender, receiver) = pipe::new()?;
267    ///
268    /// // Wait until the sender is writable...
269    ///
270    /// // Write to the sender using a direct libc call, of course the
271    /// // `io::Write` implementation would be easier to use.
272    /// let buf = b"hello";
273    /// let n = sender.try_io(|| {
274    ///     let buf_ptr = &buf as *const _ as *const _;
275    ///     let res = unsafe { libc::write(sender.as_raw_fd(), buf_ptr, buf.len()) };
276    ///     if res != -1 {
277    ///         Ok(res as usize)
278    ///     } else {
279    ///         // If EAGAIN or EWOULDBLOCK is set by libc::write, the closure
280    ///         // should return `WouldBlock` error.
281    ///         Err(io::Error::last_os_error())
282    ///     }
283    /// })?;
284    /// eprintln!("write {} bytes", n);
285    ///
286    /// // Wait until the receiver is readable...
287    ///
288    /// // Read from the receiver using a direct libc call, of course the
289    /// // `io::Read` implementation would be easier to use.
290    /// let mut buf = [0; 512];
291    /// let n = receiver.try_io(|| {
292    ///     let buf_ptr = &mut buf as *mut _ as *mut _;
293    ///     let res = unsafe { libc::read(receiver.as_raw_fd(), buf_ptr, buf.len()) };
294    ///     if res != -1 {
295    ///         Ok(res as usize)
296    ///     } else {
297    ///         // If EAGAIN or EWOULDBLOCK is set by libc::read, the closure
298    ///         // should return `WouldBlock` error.
299    ///         Err(io::Error::last_os_error())
300    ///     }
301    /// })?;
302    /// eprintln!("read {} bytes", n);
303    /// # Ok(())
304    /// # }
305    /// ```
306    pub fn try_io<F, T>(&self, f: F) -> io::Result<T>
307    where
308        F: FnOnce() -> io::Result<T>,
309    {
310        self.inner.do_io(|_| f())
311    }
312}
313
314impl event::Source for Sender {
315    fn register(
316        &mut self,
317        registry: &Registry,
318        token: Token,
319        interests: Interest,
320    ) -> io::Result<()> {
321        self.inner.register(registry, token, interests)
322    }
323
324    fn reregister(
325        &mut self,
326        registry: &Registry,
327        token: Token,
328        interests: Interest,
329    ) -> io::Result<()> {
330        self.inner.reregister(registry, token, interests)
331    }
332
333    fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
334        self.inner.deregister(registry)
335    }
336}
337
338impl Write for Sender {
339    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
340        self.inner.do_io(|mut sender| sender.write(buf))
341    }
342
343    fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
344        self.inner.do_io(|mut sender| sender.write_vectored(bufs))
345    }
346
347    fn flush(&mut self) -> io::Result<()> {
348        self.inner.do_io(|mut sender| sender.flush())
349    }
350}
351
352impl Write for &Sender {
353    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
354        self.inner.do_io(|mut sender| sender.write(buf))
355    }
356
357    fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
358        self.inner.do_io(|mut sender| sender.write_vectored(bufs))
359    }
360
361    fn flush(&mut self) -> io::Result<()> {
362        self.inner.do_io(|mut sender| sender.flush())
363    }
364}
365
366/// # Notes
367///
368/// The underlying pipe is **not** set to non-blocking.
369impl From<ChildStdin> for Sender {
370    fn from(stdin: ChildStdin) -> Sender {
371        // Safety: `ChildStdin` is guaranteed to be a valid file descriptor.
372        unsafe { Sender::from_raw_fd(stdin.into_raw_fd()) }
373    }
374}
375
376impl FromRawFd for Sender {
377    unsafe fn from_raw_fd(fd: RawFd) -> Sender {
378        Sender {
379            inner: IoSource::new(File::from_raw_fd(fd)),
380        }
381    }
382}
383
384impl AsRawFd for Sender {
385    fn as_raw_fd(&self) -> RawFd {
386        self.inner.as_raw_fd()
387    }
388}
389
390impl IntoRawFd for Sender {
391    fn into_raw_fd(self) -> RawFd {
392        self.inner.into_inner().into_raw_fd()
393    }
394}
395
396/// Receiving end of an Unix pipe.
397///
398/// See [`new`] for documentation, including examples.
399#[derive(Debug)]
400pub struct Receiver {
401    inner: IoSource<File>,
402}
403
404impl Receiver {
405    /// Set the `Receiver` into or out of non-blocking mode.
406    pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
407        set_nonblocking(self.inner.as_raw_fd(), nonblocking)
408    }
409
410    /// Execute an I/O operation ensuring that the socket receives more events
411    /// if it hits a [`WouldBlock`] error.
412    ///
413    /// # Notes
414    ///
415    /// This method is required to be called for **all** I/O operations to
416    /// ensure the user will receive events once the socket is ready again after
417    /// returning a [`WouldBlock`] error.
418    ///
419    /// [`WouldBlock`]: io::ErrorKind::WouldBlock
420    ///
421    /// # Examples
422    ///
423    /// ```
424    /// # use std::error::Error;
425    /// #
426    /// # fn main() -> Result<(), Box<dyn Error>> {
427    /// use std::io;
428    /// use std::os::unix::io::AsRawFd;
429    /// use mio::unix::pipe;
430    ///
431    /// let (sender, receiver) = pipe::new()?;
432    ///
433    /// // Wait until the sender is writable...
434    ///
435    /// // Write to the sender using a direct libc call, of course the
436    /// // `io::Write` implementation would be easier to use.
437    /// let buf = b"hello";
438    /// let n = sender.try_io(|| {
439    ///     let buf_ptr = &buf as *const _ as *const _;
440    ///     let res = unsafe { libc::write(sender.as_raw_fd(), buf_ptr, buf.len()) };
441    ///     if res != -1 {
442    ///         Ok(res as usize)
443    ///     } else {
444    ///         // If EAGAIN or EWOULDBLOCK is set by libc::write, the closure
445    ///         // should return `WouldBlock` error.
446    ///         Err(io::Error::last_os_error())
447    ///     }
448    /// })?;
449    /// eprintln!("write {} bytes", n);
450    ///
451    /// // Wait until the receiver is readable...
452    ///
453    /// // Read from the receiver using a direct libc call, of course the
454    /// // `io::Read` implementation would be easier to use.
455    /// let mut buf = [0; 512];
456    /// let n = receiver.try_io(|| {
457    ///     let buf_ptr = &mut buf as *mut _ as *mut _;
458    ///     let res = unsafe { libc::read(receiver.as_raw_fd(), buf_ptr, buf.len()) };
459    ///     if res != -1 {
460    ///         Ok(res as usize)
461    ///     } else {
462    ///         // If EAGAIN or EWOULDBLOCK is set by libc::read, the closure
463    ///         // should return `WouldBlock` error.
464    ///         Err(io::Error::last_os_error())
465    ///     }
466    /// })?;
467    /// eprintln!("read {} bytes", n);
468    /// # Ok(())
469    /// # }
470    /// ```
471    pub fn try_io<F, T>(&self, f: F) -> io::Result<T>
472    where
473        F: FnOnce() -> io::Result<T>,
474    {
475        self.inner.do_io(|_| f())
476    }
477}
478
479impl event::Source for Receiver {
480    fn register(
481        &mut self,
482        registry: &Registry,
483        token: Token,
484        interests: Interest,
485    ) -> io::Result<()> {
486        self.inner.register(registry, token, interests)
487    }
488
489    fn reregister(
490        &mut self,
491        registry: &Registry,
492        token: Token,
493        interests: Interest,
494    ) -> io::Result<()> {
495        self.inner.reregister(registry, token, interests)
496    }
497
498    fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
499        self.inner.deregister(registry)
500    }
501}
502
503impl Read for Receiver {
504    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
505        self.inner.do_io(|mut sender| sender.read(buf))
506    }
507
508    fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
509        self.inner.do_io(|mut sender| sender.read_vectored(bufs))
510    }
511}
512
513impl Read for &Receiver {
514    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
515        self.inner.do_io(|mut sender| sender.read(buf))
516    }
517
518    fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
519        self.inner.do_io(|mut sender| sender.read_vectored(bufs))
520    }
521}
522
523/// # Notes
524///
525/// The underlying pipe is **not** set to non-blocking.
526impl From<ChildStdout> for Receiver {
527    fn from(stdout: ChildStdout) -> Receiver {
528        // Safety: `ChildStdout` is guaranteed to be a valid file descriptor.
529        unsafe { Receiver::from_raw_fd(stdout.into_raw_fd()) }
530    }
531}
532
533/// # Notes
534///
535/// The underlying pipe is **not** set to non-blocking.
536impl From<ChildStderr> for Receiver {
537    fn from(stderr: ChildStderr) -> Receiver {
538        // Safety: `ChildStderr` is guaranteed to be a valid file descriptor.
539        unsafe { Receiver::from_raw_fd(stderr.into_raw_fd()) }
540    }
541}
542
543impl FromRawFd for Receiver {
544    unsafe fn from_raw_fd(fd: RawFd) -> Receiver {
545        Receiver {
546            inner: IoSource::new(File::from_raw_fd(fd)),
547        }
548    }
549}
550
551impl AsRawFd for Receiver {
552    fn as_raw_fd(&self) -> RawFd {
553        self.inner.as_raw_fd()
554    }
555}
556
557impl IntoRawFd for Receiver {
558    fn into_raw_fd(self) -> RawFd {
559        self.inner.into_inner().into_raw_fd()
560    }
561}
562
563#[cfg(not(any(target_os = "illumos", target_os = "solaris", target_os = "vita")))]
564fn set_nonblocking(fd: RawFd, nonblocking: bool) -> io::Result<()> {
565    let value = nonblocking as libc::c_int;
566    if unsafe { libc::ioctl(fd, libc::FIONBIO, &value) } == -1 {
567        Err(io::Error::last_os_error())
568    } else {
569        Ok(())
570    }
571}
572
573#[cfg(any(target_os = "illumos", target_os = "solaris", target_os = "vita"))]
574fn set_nonblocking(fd: RawFd, nonblocking: bool) -> io::Result<()> {
575    let flags = unsafe { libc::fcntl(fd, libc::F_GETFL) };
576    if flags < 0 {
577        return Err(io::Error::last_os_error());
578    }
579
580    let nflags = if nonblocking {
581        flags | libc::O_NONBLOCK
582    } else {
583        flags & !libc::O_NONBLOCK
584    };
585
586    if flags != nflags {
587        if unsafe { libc::fcntl(fd, libc::F_SETFL, nflags) } < 0 {
588            return Err(io::Error::last_os_error());
589        }
590    }
591
592    Ok(())
593}
594} // `cfg_os_ext!`.