blob: 36536674a12ea14749dd37a16e0530888e18c7d8 [file] [log] [blame]
#include <c10/cuda/CUDAFunctions.h>
#include <c10/cuda/CUDAGuard.h>
#include <c10/cuda/CUDAStream.h>
#include <c10/util/Exception.h>
#include <c10/util/irange.h>
#include <atomic>
#include <cstdint>
#include <mutex>
#include <vector>
#include <iostream>
namespace c10 {
namespace cuda {
namespace {
// Global stream state and constants
static std::once_flag init_flag;
static DeviceIndex num_gpus = -1;
static constexpr int kStreamsPerPoolBits = 5;
static constexpr int kStreamsPerPool = 1 << kStreamsPerPoolBits;
static constexpr unsigned int kDefaultFlags = cudaStreamNonBlocking;
static constexpr int kStreamTypeBits = 3;
// Note: lower numbers are higher priorities, zero is default priority
static constexpr int kHighPriority = -1;
static constexpr int kLowPriority = 0;
// Non-default streams
// Note: the number of CUDA devices is determined at run time,
// and the low and high priority pools are lazily initialized
// when the first stream is requested for a device.
// The device flags track the initialization of each device, while
// the low and high priority counters track, for each device, the next stream
// in the pool to be returned when a stream is requested (round-robin fashion
// , see the note in CUDAStream.h).
// The streams are "leaked": they are created but never destroyed because the
// destruction of global variables could happen after the CUDA runtime has
// already been destroyed and thus invoking cudaStreamDestroy could lead to a
// crash. It's likely an issue in CUDA, but to be safe - let's just "forget"
// the destruction.
static std::once_flag device_flags[C10_COMPILE_TIME_MAX_GPUS];
static std::atomic<uint32_t> low_priority_counters[C10_COMPILE_TIME_MAX_GPUS];
static std::atomic<uint32_t> high_priority_counters[C10_COMPILE_TIME_MAX_GPUS];
static cudaStream_t low_priority_streams[C10_COMPILE_TIME_MAX_GPUS]
[kStreamsPerPool];
static cudaStream_t high_priority_streams[C10_COMPILE_TIME_MAX_GPUS]
[kStreamsPerPool];
// Note [StreamId assignment]
// ~~~~~~~~~~~~~~~~~~~~~~~~~~
// How do we assign stream IDs?
//
// -- 57 bits -- -- 5 bits ----- -- 3 bits --
// zeros stream id index StreamIdType
//
// Where StreamIdType:
// 000 = default stream or externally allocated if id[63:3] != 0
// 001 = low priority stream
// 010 = high priority stream
//
// This is not really for efficiency; it's just easier to write the code
// to extract the index if we do this with bitmasks :)
//
// We are obligated to treat the stream ID 0 as the default stream, per the
// invariant specified in c10::Stream. However, all other numbers are entirely
// an internal implementation detail, we reserve the right to renumber streams
// however we like.
//
// Note that it is really important that the MSB is zero; StreamId is a
// *signed* integer, and unsigned to signed conversion outside of the
// bounds of signed integer representation is undefined behavior. You
// could work around this with something like
// https://stackoverflow.com/questions/13150449/efficient-unsigned-to-signed-cast-avoiding-implementation-defined-behavior
// but it seems a bit overkill for this.
//
// Also, external managed stream pointers (cudaStream_t) can be directly stored
// in the Id field so in this case, we need to check the stream alignment.
// The IdType uses an additional bit to match with the 64-bit address alignment
// making easy to identify an external stream when its value (X & 7) > 0
enum class StreamIdType : uint8_t {
DEFAULT = 0x0,
LOW = 0x1,
HIGH = 0x2,
EXT = 0x3,
};
std::ostream& operator<<(std::ostream& stream, StreamIdType s) {
switch (s) {
case StreamIdType::DEFAULT:
stream << "DEFAULT";
break;
case StreamIdType::LOW:
stream << "LOW";
break;
case StreamIdType::HIGH:
stream << "HIGH";
break;
case StreamIdType::EXT:
stream << "EXT";
break;
default:
stream << static_cast<uint8_t>(s);
break;
}
return stream;
}
// StreamId is 64-bit, so we can just rely on regular promotion rules.
// We rely on streamIdIndex and streamIdType being non-negative;
// see Note [Hazard when concatenating signed integers]
static inline StreamIdType streamIdType(StreamId s) {
int mask_for_type = (1 << kStreamTypeBits) - 1;
if (s && ((s & mask_for_type) == 0)) {
// Externally allocated streams have their id being the cudaStream_ptr
// so the bits corresponding to the type will be 0 and will collide with
// the default stream.
return StreamIdType::EXT;
}
return static_cast<StreamIdType>(s & mask_for_type);
}
static inline size_t streamIdIndex(StreamId s) {
return static_cast<size_t>(
(s >> kStreamTypeBits) & ((1 << kStreamsPerPoolBits) - 1));
}
StreamId makeStreamId(StreamIdType st, size_t si) {
return (static_cast<StreamId>(si) << kStreamTypeBits) |
static_cast<StreamId>(st);
}
// Thread-local current streams
static thread_local std::unique_ptr<StreamId[]> current_streams = nullptr;
// Populates global values.
// Warning: this function must only be called once!
static void initGlobalStreamState() {
num_gpus = device_count();
// Check if the number of GPUs matches the expected compile-time max number
// of GPUs.
TORCH_CHECK(
num_gpus <= C10_COMPILE_TIME_MAX_GPUS,
"Number of CUDA devices on the machine is larger than the compiled "
"max number of gpus expected (",
C10_COMPILE_TIME_MAX_GPUS,
"). Increase that and recompile.");
}
// Creates the low and high priority stream pools for the specified device
// Warning: only call once per device!
static void initDeviceStreamState(DeviceIndex device_index) {
// Switches to the requested device so streams are properly associated
// with it.
CUDAGuard device_guard{device_index};
for (const auto i : c10::irange(kStreamsPerPool)) {
auto& lowpri_stream = low_priority_streams[device_index][i];
auto& hipri_stream = high_priority_streams[device_index][i];
C10_CUDA_CHECK(cudaStreamCreateWithPriority(
&lowpri_stream, kDefaultFlags, kLowPriority));
C10_CUDA_CHECK(cudaStreamCreateWithPriority(
&hipri_stream, kDefaultFlags, kHighPriority));
}
low_priority_counters[device_index] = 0;
high_priority_counters[device_index] = 0;
}
// Init front-end to ensure initialization only occurs once
static void initCUDAStreamsOnce() {
// Inits default streams (once, globally)
std::call_once(init_flag, initGlobalStreamState);
if (current_streams) {
return;
}
// Inits current streams (thread local) to default streams
current_streams = std::make_unique<StreamId[]>(num_gpus);
for (const auto i : c10::irange(num_gpus)) {
current_streams[i] = makeStreamId(StreamIdType::DEFAULT, 0);
}
}
// Helper to verify the GPU index is valid
static inline void check_gpu(DeviceIndex device_index) {
TORCH_INTERNAL_ASSERT(device_index >= 0 && device_index < num_gpus);
}
// Helper to determine the index of the stream to return
// Note: Streams are returned round-robin (see note in CUDAStream.h)
static uint32_t get_idx(std::atomic<uint32_t>& counter) {
auto raw_idx = counter++;
return raw_idx % kStreamsPerPool;
}
CUDAStream CUDAStreamForId(DeviceIndex device_index, StreamId stream_id) {
return CUDAStream(
CUDAStream::UNCHECKED,
Stream(
Stream::UNSAFE,
c10::Device(DeviceType::CUDA, device_index),
stream_id));
}
} // anonymous namespace
// See Note [StreamId assignment]
cudaStream_t CUDAStream::stream() const {
c10::DeviceIndex device_index = stream_.device_index();
StreamId stream_id = stream_.id();
StreamIdType st = streamIdType(stream_id);
size_t si = streamIdIndex(stream_id);
switch (st) {
case StreamIdType::DEFAULT:
TORCH_INTERNAL_ASSERT(
si == 0,
"Unrecognized stream ",
stream_,
" (I think this should be the default stream, but I got a non-zero index ",
si,
").",
" Did you manufacture the StreamId yourself? Don't do that; use the",
" official API like c10::cuda::getStreamFromPool() to get a new stream.");
return nullptr;
case StreamIdType::LOW:
return low_priority_streams[device_index][si];
case StreamIdType::HIGH:
return high_priority_streams[device_index][si];
case StreamIdType::EXT:
return reinterpret_cast<cudaStream_t>(stream_id);
default:
TORCH_INTERNAL_ASSERT(
0,
"Unrecognized stream ",
stream_,
" (I didn't recognize the stream type, ",
st,
")");
}
}
// Returns a stream from the requested pool
// Note: when called the first time on a device, this will create the
// stream pools for that device.
CUDAStream getStreamFromPool(
const bool isHighPriority,
DeviceIndex device_index) {
initCUDAStreamsOnce();
if (device_index == -1)
device_index = current_device();
check_gpu(device_index);
// Initializes the stream pools (once)
std::call_once(
device_flags[device_index], initDeviceStreamState, device_index);
if (isHighPriority) {
const auto idx = get_idx(high_priority_counters[device_index]);
return CUDAStreamForId(device_index, makeStreamId(StreamIdType::HIGH, idx));
}
const auto idx = get_idx(low_priority_counters[device_index]);
return CUDAStreamForId(device_index, makeStreamId(StreamIdType::LOW, idx));
}
CUDAStream getStreamFromExternal(
cudaStream_t ext_stream,
DeviceIndex device_index) {
// The stream pointer will be the actual id
return CUDAStreamForId(device_index, reinterpret_cast<int64_t>(ext_stream));
}
CUDAStream getDefaultCUDAStream(DeviceIndex device_index) {
initCUDAStreamsOnce();
if (device_index == -1) {
device_index = current_device();
}
check_gpu(device_index);
return CUDAStreamForId(device_index, makeStreamId(StreamIdType::DEFAULT, 0));
}
CUDAStream getCurrentCUDAStream(DeviceIndex device_index) {
initCUDAStreamsOnce();
if (device_index == -1) {
device_index = current_device();
}
check_gpu(device_index);
return CUDAStreamForId(device_index, current_streams[device_index]);
}
void setCurrentCUDAStream(CUDAStream stream) {
initCUDAStreamsOnce();
current_streams[stream.device_index()] = stream.id();
}
std::ostream& operator<<(std::ostream& stream, const CUDAStream& s) {
return stream << s.unwrap();
}
} // namespace cuda
} // namespace c10