Skip to content

Commit 591aceb

Browse files
committed
Draft SerializeRecordBatch for CUDA
Change-Id: I8dd313ac4e1cc0c01fdbe760bcae325a55ec8818
1 parent 84e4525 commit 591aceb

11 files changed

Lines changed: 117 additions & 37 deletions

File tree

cpp/src/arrow/gpu/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ find_package(CUDA REQUIRED)
2727
include_directories(SYSTEM ${CUDA_INCLUDE_DIRS})
2828

2929
set(ARROW_GPU_SRCS
30+
cuda_arrow_ipc.cc
3031
cuda_context.cc
3132
cuda_memory.cc
3233
)
@@ -46,6 +47,7 @@ ADD_ARROW_LIB(arrow_gpu
4647

4748
install(FILES
4849
cuda_api.h
50+
cuda_arrow_ipc.h
4951
cuda_context.h
5052
cuda_memory.h
5153
DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/arrow/gpu")

cpp/src/arrow/gpu/cuda-test.cc

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,43 @@ TEST_F(TestCudaBuffer, CopyFromHost) {
7777
AssertCudaBufferEquals(*device_buffer, host_buffer->data(), kSize);
7878
}
7979

80+
// IPC only supported on Linux
81+
#if defined(__linux)
82+
83+
TEST_F(TestCudaBuffer, DISABLED_ExportForIpc) {
84+
// For this test to work, a second process needs to be spawned
85+
const int64_t kSize = 1000;
86+
std::shared_ptr<CudaBuffer> device_buffer;
87+
ASSERT_OK(context_->Allocate(kSize, &device_buffer));
88+
89+
std::shared_ptr<PoolBuffer> host_buffer;
90+
ASSERT_OK(test::MakeRandomBytePoolBuffer(kSize, default_memory_pool(), &host_buffer));
91+
ASSERT_OK(device_buffer->CopyFromHost(0, host_buffer->data(), kSize));
92+
93+
// Export for IPC and serialize
94+
std::unique_ptr<CudaIpcMemHandle> ipc_handle;
95+
ASSERT_OK(device_buffer->ExportForIpc(&ipc_handle));
96+
97+
std::shared_ptr<Buffer> serialized_handle;
98+
ASSERT_OK(ipc_handle->Serialize(default_memory_pool(), &serialized_handle));
99+
100+
// Deserialize IPC handle and open
101+
std::unique_ptr<CudaIpcMemHandle> ipc_handle2;
102+
ASSERT_OK(CudaIpcMemHandle::FromBuffer(serialized_handle->data(), &ipc_handle2));
103+
104+
std::shared_ptr<CudaBuffer> ipc_buffer;
105+
ASSERT_OK(context_->OpenIpcBuffer(*ipc_handle2, &ipc_buffer));
106+
107+
ASSERT_EQ(kSize, ipc_buffer->size());
108+
109+
std::shared_ptr<MutableBuffer> ipc_data;
110+
ASSERT_OK(AllocateBuffer(default_memory_pool(), kSize, &ipc_data));
111+
ASSERT_OK(ipc_buffer->CopyToHost(0, kSize, ipc_data->mutable_data()));
112+
ASSERT_EQ(0, std::memcmp(ipc_buffer->data(), host_buffer->data(), kSize));
113+
}
114+
115+
#endif
116+
80117
class TestCudaBufferWriter : public TestCudaBufferBase {
81118
public:
82119
void SetUp() { TestCudaBufferBase::SetUp(); }

cpp/src/arrow/gpu/cuda_api.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#ifndef ARROW_GPU_CUDA_API_H
2020
#define ARROW_GPU_CUDA_API_H
2121

22+
#include "arrow/gpu/cuda_arrow_ipc.h"
2223
#include "arrow/gpu/cuda_context.h"
2324
#include "arrow/gpu/cuda_memory.h"
2425
#include "arrow/gpu/cuda_version.h"
Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
#ifndef ARROW_GPU_CUDA_MEMORY_H
19-
#define ARROW_GPU_CUDA_MEMORY_H
18+
#ifndef ARROW_GPU_CUDA_ARROW_IPC_H
19+
#define ARROW_GPU_CUDA_ARROW_IPC_H
2020

2121
#include <cstdint>
2222
#include <memory>
@@ -25,31 +25,25 @@
2525
#include "arrow/status.h"
2626
#include "arrow/util/visibility.h"
2727

28-
#include "arrow/cuda_memory.h"
29-
3028
namespace arrow {
29+
30+
class RecordBatch;
31+
3132
namespace gpu {
3233

33-
/// \brief Write record batch message to GPU device memory
34-
///
35-
///
36-
ARROW_EXPORT
37-
SerializeRecordBatch(const RecordBatch& batch, CudaContext* ctx,
38-
std::shared_ptr<CudaBuffer>* out);
34+
class CudaBuffer;
35+
class CudaContext;
3936

40-
/// \brief Write record batch to pre-allocated GPU device memory
41-
///
42-
/// \param[in] batch the record batch to write
43-
/// \param[in] out the CudaBufferWriter to write the output to
37+
/// \brief Write record batch message to GPU device memory
38+
/// \param[in] batch record batch to write
39+
/// \param[in] ctx CudaContext to allocate device memory from
40+
/// \param[out] out the returned device buffer which contains the record batch message
4441
/// \return Status
45-
///
46-
/// The CudaBufferWriter must have enough pre-allocated space to accommodate
47-
/// the record batch. You can use arrow::ipc::GetRecordBatchSize to compute
48-
/// this
4942
ARROW_EXPORT
50-
SerializeRecordBatch(const RecordBatch& batch, CudaBufferWriter* out);
43+
Status SerializeRecordBatch(const RecordBatch& batch, CudaContext* ctx,
44+
std::shared_ptr<CudaBuffer>* out);
5145

5246
} // namespace gpu
5347
} // namespace arrow
5448

55-
#endif // ARROW_GPU_CUDA_MEMORY_H
49+
#endif // ARROW_GPU_CUDA_ARROW_IPC_H

cpp/src/arrow/gpu/cuda_common.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ namespace gpu {
4040
if (ret != CUDA_SUCCESS) { \
4141
std::stringstream ss; \
4242
ss << "Cuda Driver API call in " << __FILE__ << " at line " << __LINE__ \
43-
<< " failed: " << #STMT; \
43+
<< " failed with code " << ret << ": " << #STMT; \
4444
return Status::IOError(ss.str()); \
4545
} \
4646
} while (0)

cpp/src/arrow/gpu/cuda_context.cc

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,14 @@ class CudaContext::CudaContextImpl {
8888
return Status::OK();
8989
}
9090

91+
Status ExportIpcBuffer(uint8_t* data, std::unique_ptr<CudaIpcMemHandle>* handle) {
92+
CU_RETURN_NOT_OK(cuCtxSetCurrent(context_));
93+
CUipcMemHandle cu_handle;
94+
CU_RETURN_NOT_OK(cuIpcGetMemHandle(&cu_handle, reinterpret_cast<CUdeviceptr>(data)));
95+
*handle = std::unique_ptr<CudaIpcMemHandle>(new CudaIpcMemHandle(&cu_handle));
96+
return Status::OK();
97+
}
98+
9199
Status OpenIpcBuffer(const CudaIpcMemHandle& ipc_handle, uint8_t** out) {
92100
CU_RETURN_NOT_OK(cuCtxSetCurrent(context_));
93101
auto handle = reinterpret_cast<const CUipcMemHandle*>(ipc_handle.handle());
@@ -151,12 +159,17 @@ class CudaDeviceManager::CudaDeviceManagerImpl {
151159
return Status::OK();
152160
}
153161

162+
Status CreateNewContext(int device_number, std::shared_ptr<CudaContext>* out) {
163+
*out = std::shared_ptr<CudaContext>(new CudaContext());
164+
return (*out)->impl_->Init(devices_[device_number]);
165+
}
166+
154167
Status GetContext(int device_number, std::shared_ptr<CudaContext>* out) {
155168
auto it = contexts_.find(device_number);
156169
if (it == contexts_.end()) {
157-
auto ctx = std::shared_ptr<CudaContext>(new CudaContext());
158-
RETURN_NOT_OK(ctx->impl_->Init(devices_[device_number]));
159-
contexts_[device_number] = *out = ctx;
170+
std::shared_ptr<CudaContext> new_context;
171+
RETURN_NOT_OK(CreateNewContext(device_number, &new_context));
172+
contexts_[device_number] = *out = new_context;
160173
} else {
161174
*out = it->second;
162175
}
@@ -193,6 +206,11 @@ Status CudaDeviceManager::GetContext(int device_number,
193206
return impl_->GetContext(device_number, out);
194207
}
195208

209+
Status CudaDeviceManager::CreateNewContext(int device_number,
210+
std::shared_ptr<CudaContext>* out) {
211+
return impl_->CreateNewContext(device_number, out);
212+
}
213+
196214
Status CudaDeviceManager::AllocateHost(int64_t nbytes,
197215
std::shared_ptr<CudaHostBuffer>* out) {
198216
uint8_t* data = nullptr;
@@ -221,6 +239,11 @@ Status CudaContext::Allocate(int64_t nbytes, std::shared_ptr<CudaBuffer>* out) {
221239
return Status::OK();
222240
}
223241

242+
Status CudaContext::ExportIpcBuffer(uint8_t* data,
243+
std::unique_ptr<CudaIpcMemHandle>* handle) {
244+
return impl_->ExportIpcBuffer(data, handle);
245+
}
246+
224247
Status CudaContext::CopyHostToDevice(uint8_t* dst, const uint8_t* src, int64_t nbytes) {
225248
return impl_->CopyHostToDevice(dst, src, nbytes);
226249
}
@@ -229,6 +252,8 @@ Status CudaContext::CopyDeviceToHost(uint8_t* dst, const uint8_t* src, int64_t n
229252
return impl_->CopyDeviceToHost(dst, src, nbytes);
230253
}
231254

255+
Status CudaContext::Close() { return impl_->Close(); }
256+
232257
Status CudaContext::Free(uint8_t* device_ptr, int64_t nbytes) {
233258
return impl_->Free(device_ptr, nbytes);
234259
}

cpp/src/arrow/gpu/cuda_context.h

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,14 @@ class ARROW_EXPORT CudaDeviceManager {
3636
public:
3737
static Status GetInstance(CudaDeviceManager** manager);
3838

39-
/// \brief Get the CUDA driver context for a particular device
39+
/// \brief Get the shared CUDA driver context for a particular device
4040
Status GetContext(int gpu_number, std::shared_ptr<CudaContext>* ctx);
4141

42+
/// \brief Create a new context for a given device number
43+
///
44+
/// In general code will use GetContext
45+
Status CreateNewContext(int gpu_number, std::shared_ptr<CudaContext>* ctx);
46+
4247
Status AllocateHost(int64_t nbytes, std::shared_ptr<CudaHostBuffer>* buffer);
4348

4449
Status FreeHost(uint8_t* data, int64_t nbytes);
@@ -63,7 +68,7 @@ class ARROW_EXPORT CudaContext : public std::enable_shared_from_this<CudaContext
6368
public:
6469
~CudaContext();
6570

66-
Status Destroy();
71+
Status Close();
6772

6873
/// \brief Allocate CUDA memory on GPU device for this context
6974
/// \param[in] nbytes number of bytes
@@ -83,6 +88,7 @@ class ARROW_EXPORT CudaContext : public std::enable_shared_from_this<CudaContext
8388
private:
8489
CudaContext();
8590

91+
Status ExportIpcBuffer(uint8_t* data, std::unique_ptr<CudaIpcMemHandle>* handle);
8692
Status CopyHostToDevice(uint8_t* dst, const uint8_t* src, int64_t nbytes);
8793
Status CopyDeviceToHost(uint8_t* dst, const uint8_t* src, int64_t nbytes);
8894
Status Free(uint8_t* device_ptr, int64_t nbytes);

cpp/src/arrow/gpu/cuda_memory.cc

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -114,11 +114,8 @@ Status CudaBuffer::ExportForIpc(std::unique_ptr<CudaIpcMemHandle>* handle) {
114114
if (is_ipc_) {
115115
return Status::Invalid("Buffer has already been exported for IPC");
116116
}
117-
CUipcMemHandle cu_handle;
118-
CU_RETURN_NOT_OK(
119-
cuIpcGetMemHandle(&cu_handle, reinterpret_cast<CUdeviceptr>(mutable_data_)));
120-
is_ipc_ = true;
121-
*handle = std::unique_ptr<CudaIpcMemHandle>(new CudaIpcMemHandle(&cu_handle));
117+
RETURN_NOT_OK(context_->ExportIpcBuffer(mutable_data_, handle));
118+
own_data_ = false;
122119
return Status::OK();
123120
}
124121

cpp/src/arrow/gpu/cuda_memory.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,15 +103,16 @@ class ARROW_EXPORT CudaIpcMemHandle {
103103
/// \return Status
104104
Status Serialize(MemoryPool* pool, std::shared_ptr<Buffer>* out) const;
105105

106-
const void* handle() const;
107-
108106
private:
109107
explicit CudaIpcMemHandle(const void* handle);
110108

111109
struct CudaIpcMemHandleImpl;
112110
std::unique_ptr<CudaIpcMemHandleImpl> impl_;
113111

112+
const void* handle() const;
113+
114114
friend CudaBuffer;
115+
friend CudaContext;
115116
};
116117

117118
/// \class CudaBufferReader

cpp/src/arrow/ipc/writer.cc

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -901,14 +901,19 @@ Status SerializeRecordBatch(const RecordBatch& batch, MemoryPool* pool,
901901
RETURN_NOT_OK(AllocateBuffer(pool, size, &buffer));
902902

903903
io::FixedSizeBufferWriter stream(buffer);
904-
int32_t metadata_length = 0;
905-
int64_t body_length = 0;
906-
RETURN_NOT_OK(WriteRecordBatch(batch, 0, &stream, &metadata_length, &body_length, pool,
907-
kMaxNestingDepth, true));
904+
RETURN_NOT_OK(SerializeRecordBatch(batch, pool, &stream));
908905
*out = buffer;
909906
return Status::OK();
910907
}
911908

909+
Status SerializeRecordBatch(const RecordBatch& batch, MemoryPool* pool,
910+
io::OutputStream* out) {
911+
int32_t metadata_length = 0;
912+
int64_t body_length = 0;
913+
return WriteRecordBatch(batch, 0, out, &metadata_length, &body_length, pool,
914+
kMaxNestingDepth, true);
915+
}
916+
912917
Status SerializeSchema(const Schema& schema, MemoryPool* pool,
913918
std::shared_ptr<Buffer>* out) {
914919
std::shared_ptr<io::BufferOutputStream> stream;

0 commit comments

Comments
 (0)