Integrate lock-free task queue into TinyUSDZ core

Integrated the task queue implementation from sandbox into the main
TinyUSDZ source tree with proper naming conventions and testing.

Changes:
- Added src/task-queue.hh: Lock-free MPMC task queue implementation
- Added src/task-queue.cc: Placeholder for header-only implementation
- Added tests/unit/unit-task-queue.cc: Comprehensive unit tests
- Added tests/unit/unit-task-queue.h: Unit test declarations
- Updated CMakeLists.txt: Added task-queue source files
- Updated tests/unit/CMakeLists.txt: Added task-queue unit test
- Updated tests/unit/unit-main.cc: Registered 5 task queue tests

Features:
- Lock-free CAS-based implementation using __atomic builtins (GCC/Clang)
- Automatic fallback to std::mutex + std::atomic for other compilers
- Two variants: TaskQueue (C function pointers) and TaskQueueFunc (std::function)
- No C++ exceptions or RTTI required
- Thread-safe MPMC (multiple producer, multiple consumer)
- Fixed-size ring buffer with monotonic 64-bit counters
- Follows TinyUSDZ naming conventions (_member_vars)

Test Results:
 task_queue_basic_test - Basic push/pop operations
 task_queue_func_test - std::function lambda support
 task_queue_full_test - Queue capacity handling
 task_queue_multithreaded_test - 2P+2C, 1000 tasks
 task_queue_clear_test - Clear operation
 All existing unit tests still pass

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Syoyo Fujita
2025-10-23 03:22:30 +09:00
parent a4e7120e39
commit 4dbb0ca2d8
7 changed files with 661 additions and 0 deletions

View File

@@ -461,6 +461,8 @@ set(TINYUSDZ_SOURCES
${PROJECT_SOURCE_DIR}/src/parser-timing.cc
${PROJECT_SOURCE_DIR}/src/sha256.cc
${PROJECT_SOURCE_DIR}/src/typed-array.cc
${PROJECT_SOURCE_DIR}/src/task-queue.cc
${PROJECT_SOURCE_DIR}/src/task-queue.hh
)

13
src/task-queue.cc Normal file
View File

@@ -0,0 +1,13 @@
// SPDX-License-Identifier: Apache 2.0
// Copyright 2025-Present Light Transport Entertainment Inc.
//
// Lock-free task queue for multi-threaded task execution
//
#include "task-queue.hh"
namespace tinyusdz {
// Header-only implementation
// This file exists for consistency with TinyUSDZ source structure
} // namespace tinyusdz

406
src/task-queue.hh Normal file
View File

@@ -0,0 +1,406 @@
// SPDX-License-Identifier: Apache 2.0
// Copyright 2025-Present Light Transport Entertainment Inc.
//
// Lock-free task queue for multi-threaded task execution
//
#pragma once
#include <atomic>
#include <mutex>
#include <functional>
#include <vector>
#include <cstdint>
#include <cstddef>
// Detect compiler support for lock-free atomics
#if defined(__GNUC__) || defined(__clang__)
#define TINYUSDZ_TASK_QUEUE_HAS_BUILTIN_ATOMICS 1
#elif defined(_MSC_VER) && (_MSC_VER >= 1900)
#define TINYUSDZ_TASK_QUEUE_HAS_BUILTIN_ATOMICS 1
#else
#define TINYUSDZ_TASK_QUEUE_HAS_BUILTIN_ATOMICS 0
#endif
namespace tinyusdz {
// C function pointer task type
typedef void (*TaskFuncPtr)(void* user_data);
// Task item for C function pointer version
struct TaskItem {
TaskFuncPtr func;
void* user_data;
TaskItem() : func(nullptr), user_data(nullptr) {}
TaskItem(TaskFuncPtr f, void* d) : func(f), user_data(d) {}
};
// Task item for std::function version
struct TaskItemFunc {
std::function<void()> func;
TaskItemFunc() : func(nullptr) {}
explicit TaskItemFunc(std::function<void()> f) : func(std::move(f)) {}
};
///
/// Lock-free task queue for C function pointers
/// Uses lock-free atomics when available, falls back to mutex otherwise
///
/// Example:
/// TaskQueue queue(1024);
/// queue.Push(my_task_func, my_data);
/// TaskItem task;
/// if (queue.Pop(task)) {
/// task.func(task.user_data);
/// }
///
class TaskQueue {
public:
explicit TaskQueue(size_t capacity = 1024)
: _capacity(capacity),
_write_pos(0),
_read_pos(0) {
_tasks.resize(_capacity);
}
~TaskQueue() = default;
// Disable copy
TaskQueue(const TaskQueue&) = delete;
TaskQueue& operator=(const TaskQueue&) = delete;
///
/// Push a task to the queue
/// @param[in] func Task function pointer
/// @param[in] user_data User data to pass to the task
/// @return true on success, false if queue is full
///
bool Push(TaskFuncPtr func, void* user_data) {
if (!func) {
return false;
}
#if TINYUSDZ_TASK_QUEUE_HAS_BUILTIN_ATOMICS
// Lock-free implementation with CAS
while (true) {
uint64_t current_write = __atomic_load_n(&_write_pos, __ATOMIC_ACQUIRE);
uint64_t current_read = __atomic_load_n(&_read_pos, __ATOMIC_ACQUIRE);
// Check if queue is full
if (current_write - current_read >= _capacity) {
return false;
}
// Try to claim this slot with CAS
uint64_t next_write = current_write + 1;
if (__atomic_compare_exchange_n(&_write_pos, &current_write, next_write,
false, __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE)) {
// Successfully claimed slot, now store the task
size_t index = current_write % _capacity;
_tasks[index] = TaskItem(func, user_data);
return true;
}
// CAS failed, retry
}
#else
// Mutex fallback
std::lock_guard<std::mutex> lock(_mutex);
uint64_t current_write = _write_pos.load(std::memory_order_acquire);
uint64_t next_write = current_write + 1;
uint64_t current_read = _read_pos.load(std::memory_order_acquire);
if (next_write - current_read > _capacity) {
return false;
}
size_t index = current_write % _capacity;
_tasks[index] = TaskItem(func, user_data);
_write_pos.store(next_write, std::memory_order_release);
return true;
#endif
}
///
/// Pop a task from the queue
/// @param[out] task Retrieved task item
/// @return true if a task was retrieved, false if queue is empty
///
bool Pop(TaskItem& task) {
#if TINYUSDZ_TASK_QUEUE_HAS_BUILTIN_ATOMICS
// Lock-free implementation with CAS
while (true) {
uint64_t current_read = __atomic_load_n(&_read_pos, __ATOMIC_ACQUIRE);
uint64_t current_write = __atomic_load_n(&_write_pos, __ATOMIC_ACQUIRE);
// Check if queue is empty
if (current_read >= current_write) {
return false;
}
// Try to claim this slot with CAS
uint64_t next_read = current_read + 1;
if (__atomic_compare_exchange_n(&_read_pos, &current_read, next_read,
false, __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE)) {
// Successfully claimed slot, now load the task
size_t index = current_read % _capacity;
task = _tasks[index];
return true;
}
// CAS failed, retry
}
#else
// Mutex fallback
std::lock_guard<std::mutex> lock(_mutex);
uint64_t current_read = _read_pos.load(std::memory_order_acquire);
uint64_t current_write = _write_pos.load(std::memory_order_acquire);
if (current_read >= current_write) {
return false;
}
size_t index = current_read % _capacity;
task = _tasks[index];
_read_pos.store(current_read + 1, std::memory_order_release);
return true;
#endif
}
///
/// Get current queue size (approximate in lock-free mode)
///
size_t Size() const {
#if TINYUSDZ_TASK_QUEUE_HAS_BUILTIN_ATOMICS
uint64_t w = __atomic_load_n(&_write_pos, __ATOMIC_ACQUIRE);
uint64_t r = __atomic_load_n(&_read_pos, __ATOMIC_ACQUIRE);
#else
uint64_t w = _write_pos.load(std::memory_order_acquire);
uint64_t r = _read_pos.load(std::memory_order_acquire);
#endif
return (w >= r) ? (w - r) : 0;
}
///
/// Check if queue is empty
///
bool Empty() const {
return Size() == 0;
}
///
/// Get queue capacity
///
size_t Capacity() const {
return _capacity;
}
///
/// Clear all pending tasks
///
void Clear() {
#if TINYUSDZ_TASK_QUEUE_HAS_BUILTIN_ATOMICS
uint64_t w = __atomic_load_n(&_write_pos, __ATOMIC_ACQUIRE);
__atomic_store_n(&_read_pos, w, __ATOMIC_RELEASE);
#else
std::lock_guard<std::mutex> lock(_mutex);
uint64_t w = _write_pos.load(std::memory_order_acquire);
_read_pos.store(w, std::memory_order_release);
#endif
}
private:
const size_t _capacity;
std::vector<TaskItem> _tasks;
#if TINYUSDZ_TASK_QUEUE_HAS_BUILTIN_ATOMICS
uint64_t _write_pos;
uint64_t _read_pos;
#else
std::atomic<uint64_t> _write_pos;
std::atomic<uint64_t> _read_pos;
std::mutex _mutex;
#endif
};
///
/// Task queue for std::function version
///
/// Example:
/// TaskQueueFunc queue(1024);
/// queue.Push([]() { std::cout << "Hello\n"; });
/// TaskItemFunc task;
/// if (queue.Pop(task)) {
/// task.func();
/// }
///
class TaskQueueFunc {
public:
explicit TaskQueueFunc(size_t capacity = 1024)
: _capacity(capacity),
_write_pos(0),
_read_pos(0) {
_tasks.resize(_capacity);
}
~TaskQueueFunc() = default;
// Disable copy
TaskQueueFunc(const TaskQueueFunc&) = delete;
TaskQueueFunc& operator=(const TaskQueueFunc&) = delete;
///
/// Push a task to the queue
/// @param[in] func Task function (lambda, function object, etc.)
/// @return true on success, false if queue is full
///
bool Push(std::function<void()> func) {
if (!func) {
return false;
}
#if TINYUSDZ_TASK_QUEUE_HAS_BUILTIN_ATOMICS
// Lock-free implementation with CAS
while (true) {
uint64_t current_write = __atomic_load_n(&_write_pos, __ATOMIC_ACQUIRE);
uint64_t current_read = __atomic_load_n(&_read_pos, __ATOMIC_ACQUIRE);
// Check if queue is full
if (current_write - current_read >= _capacity) {
return false;
}
// Try to claim this slot with CAS
uint64_t next_write = current_write + 1;
if (__atomic_compare_exchange_n(&_write_pos, &current_write, next_write,
false, __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE)) {
// Successfully claimed slot, now store the task
size_t index = current_write % _capacity;
_tasks[index] = TaskItemFunc(std::move(func));
return true;
}
// CAS failed, retry
}
#else
// Mutex fallback
std::lock_guard<std::mutex> lock(_mutex);
uint64_t current_write = _write_pos.load(std::memory_order_acquire);
uint64_t next_write = current_write + 1;
uint64_t current_read = _read_pos.load(std::memory_order_acquire);
if (next_write - current_read > _capacity) {
return false;
}
size_t index = current_write % _capacity;
_tasks[index] = TaskItemFunc(std::move(func));
_write_pos.store(next_write, std::memory_order_release);
return true;
#endif
}
///
/// Pop a task from the queue
/// @param[out] task Retrieved task item
/// @return true if a task was retrieved, false if queue is empty
///
bool Pop(TaskItemFunc& task) {
#if TINYUSDZ_TASK_QUEUE_HAS_BUILTIN_ATOMICS
// Lock-free implementation with CAS
while (true) {
uint64_t current_read = __atomic_load_n(&_read_pos, __ATOMIC_ACQUIRE);
uint64_t current_write = __atomic_load_n(&_write_pos, __ATOMIC_ACQUIRE);
// Check if queue is empty
if (current_read >= current_write) {
return false;
}
// Try to claim this slot with CAS
uint64_t next_read = current_read + 1;
if (__atomic_compare_exchange_n(&_read_pos, &current_read, next_read,
false, __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE)) {
// Successfully claimed slot, now load the task
size_t index = current_read % _capacity;
task = std::move(_tasks[index]);
return true;
}
// CAS failed, retry
}
#else
// Mutex fallback
std::lock_guard<std::mutex> lock(_mutex);
uint64_t current_read = _read_pos.load(std::memory_order_acquire);
uint64_t current_write = _write_pos.load(std::memory_order_acquire);
if (current_read >= current_write) {
return false;
}
size_t index = current_read % _capacity;
task = std::move(_tasks[index]);
_read_pos.store(current_read + 1, std::memory_order_release);
return true;
#endif
}
///
/// Get current queue size (approximate in lock-free mode)
///
size_t Size() const {
#if TINYUSDZ_TASK_QUEUE_HAS_BUILTIN_ATOMICS
uint64_t w = __atomic_load_n(&_write_pos, __ATOMIC_ACQUIRE);
uint64_t r = __atomic_load_n(&_read_pos, __ATOMIC_ACQUIRE);
#else
uint64_t w = _write_pos.load(std::memory_order_acquire);
uint64_t r = _read_pos.load(std::memory_order_acquire);
#endif
return (w >= r) ? (w - r) : 0;
}
///
/// Check if queue is empty
///
bool Empty() const {
return Size() == 0;
}
///
/// Get queue capacity
///
size_t Capacity() const {
return _capacity;
}
///
/// Clear all pending tasks
///
void Clear() {
#if TINYUSDZ_TASK_QUEUE_HAS_BUILTIN_ATOMICS
uint64_t w = __atomic_load_n(&_write_pos, __ATOMIC_ACQUIRE);
__atomic_store_n(&_read_pos, w, __ATOMIC_RELEASE);
#else
std::lock_guard<std::mutex> lock(_mutex);
uint64_t w = _write_pos.load(std::memory_order_acquire);
_read_pos.store(w, std::memory_order_release);
#endif
}
private:
const size_t _capacity;
std::vector<TaskItemFunc> _tasks;
#if TINYUSDZ_TASK_QUEUE_HAS_BUILTIN_ATOMICS
uint64_t _write_pos;
uint64_t _read_pos;
#else
std::atomic<uint64_t> _write_pos;
std::atomic<uint64_t> _read_pos;
std::mutex _mutex;
#endif
};
} // namespace tinyusdz

View File

@@ -15,6 +15,7 @@ set(TEST_SOURCES
unit-math.cc
unit-ioutil.cc
unit-timesamples.cc
unit-task-queue.cc
)
if (TINYUSDZ_WITH_PXR_COMPAT_API)

View File

@@ -16,6 +16,7 @@
#include "unit-strutil.h"
#include "unit-timesamples.h"
#include "unit-pprint.h"
#include "unit-task-queue.h"
#if defined(TINYUSDZ_WITH_PXR_COMPAT_API)
#include "unit-pxr-compat-api.h"
@@ -40,6 +41,11 @@ TEST_LIST = {
{ "tinystring_test", tinystring_test },
{ "parse_int_test", parse_int_test },
{ "timesamples_test", timesamples_test },
{ "task_queue_basic_test", task_queue_basic_test },
{ "task_queue_func_test", task_queue_func_test },
{ "task_queue_full_test", task_queue_full_test },
{ "task_queue_multithreaded_test", task_queue_multithreaded_test },
{ "task_queue_clear_test", task_queue_clear_test },
#if defined(TINYUSDZ_WITH_PXR_COMPAT_API)
{ "pxr_compat_api_test", pxr_compat_api_test },
#endif

View File

@@ -0,0 +1,226 @@
#ifdef _MSC_VER
#define NOMINMAX
#endif
#define TEST_NO_MAIN
#include "acutest.h"
#include "task-queue.hh"
#include "unit-common.hh"
#include <atomic>
#include <thread>
#include <vector>
using namespace tinyusdz;
using namespace tinyusdz_test;
// Test data structure
struct TestData {
int value;
std::atomic<int>* counter;
};
// Test task function
static void increment_task(void* user_data) {
TestData* data = static_cast<TestData*>(user_data);
if (data && data->counter) {
data->counter->fetch_add(data->value, std::memory_order_relaxed);
}
}
// Simple increment task
static void simple_increment(void* user_data) {
std::atomic<int>* counter = static_cast<std::atomic<int>*>(user_data);
if (counter) {
counter->fetch_add(1, std::memory_order_relaxed);
}
}
void task_queue_basic_test(void) {
TaskQueue queue(16);
std::atomic<int> counter(0);
// Test push and pop
TestData data1 = {10, &counter};
TestData data2 = {20, &counter};
TestData data3 = {30, &counter};
TEST_CHECK(queue.Push(increment_task, &data1) == true);
TEST_CHECK(queue.Push(increment_task, &data2) == true);
TEST_CHECK(queue.Push(increment_task, &data3) == true);
TEST_CHECK(queue.Size() == 3);
TEST_CHECK(queue.Empty() == false);
// Pop and execute tasks
TaskItem task;
int executed = 0;
while (queue.Pop(task)) {
if (task.func) {
task.func(task.user_data);
executed++;
}
}
TEST_CHECK(executed == 3);
TEST_CHECK(queue.Empty() == true);
TEST_CHECK(counter.load() == 60);
}
void task_queue_func_test(void) {
TaskQueueFunc queue(16);
std::atomic<int> counter(0);
// Push lambda tasks
TEST_CHECK(queue.Push([&counter]() {
counter.fetch_add(10, std::memory_order_relaxed);
}) == true);
TEST_CHECK(queue.Push([&counter]() {
counter.fetch_add(20, std::memory_order_relaxed);
}) == true);
TEST_CHECK(queue.Push([&counter]() {
counter.fetch_add(30, std::memory_order_relaxed);
}) == true);
// Capture by value
int value = 40;
TEST_CHECK(queue.Push([&counter, value]() {
counter.fetch_add(value, std::memory_order_relaxed);
}) == true);
TEST_CHECK(queue.Size() == 4);
// Pop and execute tasks
TaskItemFunc task;
int executed = 0;
while (queue.Pop(task)) {
if (task.func) {
task.func();
executed++;
}
}
TEST_CHECK(executed == 4);
TEST_CHECK(queue.Empty() == true);
TEST_CHECK(counter.load() == 100);
}
void task_queue_full_test(void) {
const size_t capacity = 8;
TaskQueue queue(capacity);
std::atomic<int> counter(0);
// Use stack allocation
std::vector<TestData> test_data(capacity + 10);
for (auto& td : test_data) {
td.value = 1;
td.counter = &counter;
}
// Fill the queue
size_t pushed = 0;
for (size_t i = 0; i < capacity + 10; i++) {
if (queue.Push(increment_task, &test_data[i])) {
pushed++;
}
}
TEST_CHECK(pushed <= capacity);
TEST_CHECK(queue.Size() == pushed);
// Pop all tasks to verify they work
TaskItem task;
size_t popped = 0;
while (queue.Pop(task)) {
if (task.func) {
task.func(task.user_data);
popped++;
}
}
TEST_CHECK(popped == pushed);
TEST_CHECK(queue.Empty() == true);
TEST_CHECK(counter.load() == static_cast<int>(pushed));
}
void task_queue_multithreaded_test(void) {
const int NUM_PRODUCERS = 2;
const int NUM_CONSUMERS = 2;
const int TASKS_PER_PRODUCER = 500;
TaskQueue queue(256);
std::atomic<int> counter(0);
std::atomic<bool> done(false);
// Producer threads
std::vector<std::thread> producers;
for (int i = 0; i < NUM_PRODUCERS; i++) {
producers.emplace_back([&queue, &counter]() {
for (int j = 0; j < TASKS_PER_PRODUCER; j++) {
while (!queue.Push(simple_increment, &counter)) {
std::this_thread::yield();
}
}
});
}
// Consumer threads
std::vector<std::thread> consumers;
for (int i = 0; i < NUM_CONSUMERS; i++) {
consumers.emplace_back([&queue, &done]() {
TaskItem task;
while (!done.load(std::memory_order_acquire) || !queue.Empty()) {
if (queue.Pop(task)) {
if (task.func) {
task.func(task.user_data);
}
} else {
std::this_thread::yield();
}
}
});
}
// Wait for producers to finish
for (auto& t : producers) {
t.join();
}
done.store(true, std::memory_order_release);
// Wait for consumers to finish
for (auto& t : consumers) {
t.join();
}
int expected = NUM_PRODUCERS * TASKS_PER_PRODUCER;
TEST_CHECK(counter.load() == expected);
TEST_CHECK(queue.Empty() == true);
}
void task_queue_clear_test(void) {
TaskQueue queue(16);
std::atomic<int> counter(0);
TestData data = {1, &counter};
// Add some tasks
for (int i = 0; i < 5; i++) {
TEST_CHECK(queue.Push(increment_task, &data) == true);
}
TEST_CHECK(queue.Size() == 5);
// Clear the queue
queue.Clear();
TEST_CHECK(queue.Empty() == true);
TEST_CHECK(queue.Size() == 0);
// Should be able to push again
TEST_CHECK(queue.Push(increment_task, &data) == true);
TEST_CHECK(queue.Size() == 1);
}

View File

@@ -0,0 +1,7 @@
#pragma once
void task_queue_basic_test(void);
void task_queue_func_test(void);
void task_queue_full_test(void);
void task_queue_multithreaded_test(void);
void task_queue_clear_test(void);