4ef74379c2
In VxWorks Kernel, the re-schedule mechanism does not enable "round-robin" reschdule between the same priority tasks by default. So the case "stack_unbounded_stress_test" will lead to test case hang forever. Here's the details: The test case creates 4 tasks as “Reader”, and then 4 tasks as “Writer”, all those 8 task has the same priority 220. If the target has 2 cores, the first 2 reader tasks will occupy the cores by “while (1)” without yield, so the other 2 readers tasks and 4 writers tasks have no chance to be run. This will lead to the test case loop forever , then the exp scrip will catch it and print “RTP Execution Timeout; rebooting” The test case creates 4 tasks as “Reader”, and then 4 tasks as “Writer”, all those 8 task has the same priority 220 in VxWorks Kernel. If the target has 2 cores, the first 2 reader tasks will occupy the cores by “while (1)” without yield, so the other 2 readers tasks and 4 writers tasks have no chance to be run. This will lead to the test case loop forever without any ending. Why modify spsc_queue_stress_test.cpp : In VxWorks user-land task (Wind River calls it as RTP), the max number of objects (the internal data structure RTP uses) in one RTP is limited at most 65535. When the test is selected to run, the C++ constructor will create 1<<16 number of mutex via "new spsc_queue_tester". And after test1->run() is finished, the C++ destructor will delete 1<<16 number of mutex by default. This caused the problem, because VxWorks just support ((1<<16)-1) objects within an RTP.
223 lines
5.4 KiB
C++
223 lines
5.4 KiB
C++
// Copyright (C) 2011-2013 Tim Blechmann
|
|
//
|
|
// Distributed under the Boost Software License, Version 1.0. (See
|
|
// accompanying file LICENSE_1_0.txt or copy at
|
|
// http://www.boost.org/LICENSE_1_0.txt)
|
|
|
|
#include <boost/lockfree/spsc_queue.hpp>
|
|
#include <boost/thread.hpp>
|
|
|
|
#define BOOST_TEST_MAIN
|
|
#ifdef BOOST_LOCKFREE_INCLUDE_TESTS
|
|
#include <boost/test/included/unit_test.hpp>
|
|
#else
|
|
#include <boost/test/unit_test.hpp>
|
|
#endif
|
|
|
|
#include <iostream>
|
|
#include <memory>
|
|
|
|
#include "test_helpers.hpp"
|
|
#include "test_common.hpp"
|
|
|
|
using namespace boost;
|
|
using namespace boost::lockfree;
|
|
using namespace std;
|
|
|
|
#ifndef BOOST_LOCKFREE_STRESS_TEST
|
|
static const boost::uint32_t nodes_per_thread = 100000;
|
|
#else
|
|
static const boost::uint32_t nodes_per_thread = 100000000;
|
|
#endif
|
|
|
|
struct spsc_queue_tester
|
|
{
|
|
spsc_queue<int, capacity<128> > sf;
|
|
|
|
boost::lockfree::detail::atomic<long> spsc_queue_cnt, received_nodes;
|
|
|
|
// In VxWorks one RTP just supports 65535 objects
|
|
#ifndef __VXWORKS__
|
|
static_hashed_set<int, 1<<16 > working_set;
|
|
#else
|
|
static_hashed_set<int, 1<<15 > working_set;
|
|
#endif
|
|
|
|
spsc_queue_tester(void):
|
|
spsc_queue_cnt(0), received_nodes(0)
|
|
{}
|
|
|
|
void add(void)
|
|
{
|
|
for (boost::uint32_t i = 0; i != nodes_per_thread; ++i) {
|
|
int id = generate_id<int>();
|
|
working_set.insert(id);
|
|
|
|
while (sf.push(id) == false)
|
|
{}
|
|
|
|
++spsc_queue_cnt;
|
|
}
|
|
running = false;
|
|
}
|
|
|
|
bool get_element(void)
|
|
{
|
|
int data;
|
|
bool success = sf.pop(data);
|
|
|
|
if (success) {
|
|
++received_nodes;
|
|
--spsc_queue_cnt;
|
|
bool erased = working_set.erase(data);
|
|
assert(erased);
|
|
return true;
|
|
} else
|
|
return false;
|
|
}
|
|
|
|
boost::lockfree::detail::atomic<bool> running;
|
|
|
|
void get(void)
|
|
{
|
|
for(;;) {
|
|
bool success = get_element();
|
|
if (!running && !success)
|
|
break;
|
|
}
|
|
|
|
while ( get_element() );
|
|
}
|
|
|
|
void run(void)
|
|
{
|
|
running = true;
|
|
|
|
BOOST_REQUIRE(sf.empty());
|
|
|
|
boost::thread reader(boost::bind(&spsc_queue_tester::get, this));
|
|
boost::thread writer(boost::bind(&spsc_queue_tester::add, this));
|
|
cout << "reader and writer threads created" << endl;
|
|
|
|
writer.join();
|
|
cout << "writer threads joined. waiting for readers to finish" << endl;
|
|
|
|
reader.join();
|
|
|
|
BOOST_REQUIRE_EQUAL(received_nodes, nodes_per_thread);
|
|
BOOST_REQUIRE_EQUAL(spsc_queue_cnt, 0);
|
|
BOOST_REQUIRE(sf.empty());
|
|
BOOST_REQUIRE(working_set.count_nodes() == 0);
|
|
}
|
|
};
|
|
|
|
BOOST_AUTO_TEST_CASE( spsc_queue_test_caching )
|
|
{
|
|
boost::shared_ptr<spsc_queue_tester> test1(new spsc_queue_tester);
|
|
test1->run();
|
|
}
|
|
|
|
struct spsc_queue_tester_buffering
|
|
{
|
|
spsc_queue<int, capacity<128> > sf;
|
|
|
|
boost::lockfree::detail::atomic<long> spsc_queue_cnt;
|
|
|
|
// In VxWorks one RTP just supports 65535 objects
|
|
#ifndef __VXWORKS__
|
|
static_hashed_set<int, 1<<16 > working_set;
|
|
#else
|
|
static_hashed_set<int, 1<<15 > working_set;
|
|
#endif
|
|
|
|
boost::lockfree::detail::atomic<size_t> received_nodes;
|
|
|
|
spsc_queue_tester_buffering(void):
|
|
spsc_queue_cnt(0), received_nodes(0)
|
|
{}
|
|
|
|
static const size_t buf_size = 5;
|
|
|
|
void add(void)
|
|
{
|
|
boost::array<int, buf_size> input_buffer;
|
|
for (boost::uint32_t i = 0; i != nodes_per_thread; i+=buf_size) {
|
|
for (size_t i = 0; i != buf_size; ++i) {
|
|
int id = generate_id<int>();
|
|
working_set.insert(id);
|
|
input_buffer[i] = id;
|
|
}
|
|
|
|
size_t pushed = 0;
|
|
|
|
do {
|
|
pushed += sf.push(input_buffer.c_array() + pushed,
|
|
input_buffer.size() - pushed);
|
|
} while (pushed != buf_size);
|
|
|
|
spsc_queue_cnt+=buf_size;
|
|
}
|
|
running = false;
|
|
}
|
|
|
|
bool get_elements(void)
|
|
{
|
|
boost::array<int, buf_size> output_buffer;
|
|
|
|
size_t popd = sf.pop(output_buffer.c_array(), output_buffer.size());
|
|
|
|
if (popd) {
|
|
received_nodes += popd;
|
|
spsc_queue_cnt -= popd;
|
|
|
|
for (size_t i = 0; i != popd; ++i) {
|
|
bool erased = working_set.erase(output_buffer[i]);
|
|
assert(erased);
|
|
}
|
|
|
|
return true;
|
|
} else
|
|
return false;
|
|
}
|
|
|
|
boost::lockfree::detail::atomic<bool> running;
|
|
|
|
void get(void)
|
|
{
|
|
for(;;) {
|
|
bool success = get_elements();
|
|
if (!running && !success)
|
|
break;
|
|
}
|
|
|
|
while ( get_elements() );
|
|
}
|
|
|
|
void run(void)
|
|
{
|
|
running = true;
|
|
|
|
boost::thread reader(boost::bind(&spsc_queue_tester_buffering::get, this));
|
|
boost::thread writer(boost::bind(&spsc_queue_tester_buffering::add, this));
|
|
cout << "reader and writer threads created" << endl;
|
|
|
|
writer.join();
|
|
cout << "writer threads joined. waiting for readers to finish" << endl;
|
|
|
|
reader.join();
|
|
|
|
BOOST_REQUIRE_EQUAL(received_nodes, nodes_per_thread);
|
|
BOOST_REQUIRE_EQUAL(spsc_queue_cnt, 0);
|
|
BOOST_REQUIRE(sf.empty());
|
|
BOOST_REQUIRE(working_set.count_nodes() == 0);
|
|
}
|
|
};
|
|
|
|
|
|
BOOST_AUTO_TEST_CASE( spsc_queue_test_buffering )
|
|
{
|
|
boost::shared_ptr<spsc_queue_tester_buffering> test1(new spsc_queue_tester_buffering);
|
|
test1->run();
|
|
}
|
|
|