interprocess/test/message_queue_test.cpp

377 lines
12 KiB
C++

//////////////////////////////////////////////////////////////////////////////
//
// (C) Copyright Ion Gaztanaga 2004-2012. Distributed under the Boost
// Software License, Version 1.0. (See accompanying file
// LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
// See http://www.boost.org/libs/interprocess for documentation.
//
//////////////////////////////////////////////////////////////////////////////
#include <boost/interprocess/detail/config_begin.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/interprocess/managed_external_buffer.hpp>
#include <boost/interprocess/managed_heap_memory.hpp>
#include <boost/interprocess/containers/map.hpp>
#include <boost/interprocess/containers/set.hpp>
#include <boost/interprocess/allocators/node_allocator.hpp>
#include <boost/interprocess/detail/os_thread_functions.hpp>
// intrusive/detail
#include <boost/intrusive/detail/minimal_pair_header.hpp>
#include <boost/intrusive/detail/minimal_less_equal_header.hpp>
#include <boost/move/unique_ptr.hpp>
#include <cstddef>
#include <memory>
#include <iostream>
#include <vector>
#include <stdexcept>
#include <limits>
#include "get_process_id_name.hpp"
////////////////////////////////////////////////////////////////////////////////
// //
// This example tests the process shared message queue. //
// //
////////////////////////////////////////////////////////////////////////////////
using namespace boost::interprocess;
//This test inserts messages with different priority and marks them with a
//time-stamp to check if receiver obtains highest priority messages first and
//messages with same priority are received in fifo order
bool test_priority_order()
{
message_queue::remove(test::get_process_id_name());
{
message_queue mq1
(open_or_create, test::get_process_id_name(), 100, sizeof(std::size_t)),
mq2
(open_or_create, test::get_process_id_name(), 100, sizeof(std::size_t));
//We test that the queue is ordered by priority and in the
//same priority, is a FIFO
message_queue::size_type recvd = 0;
unsigned int priority = 0;
std::size_t tstamp;
unsigned int priority_prev;
std::size_t tstamp_prev;
//We will send 100 message with priority 0-9
//The message will contain the timestamp of the message
for(std::size_t i = 0; i < 100; ++i){
tstamp = i;
mq1.send(&tstamp, sizeof(tstamp), (unsigned int)(i%10));
}
priority_prev = (std::numeric_limits<unsigned int>::max)();
tstamp_prev = 0;
//Receive all messages and test those are ordered
//by priority and by FIFO in the same priority
for(std::size_t i = 0; i < 100; ++i){
mq1.receive(&tstamp, sizeof(tstamp), recvd, priority);
if(priority > priority_prev)
return false;
if(priority == priority_prev &&
tstamp <= tstamp_prev){
return false;
}
priority_prev = priority;
tstamp_prev = tstamp;
}
//Now retry it with different priority order
for(std::size_t i = 0; i < 100; ++i){
tstamp = i;
mq1.send(&tstamp, sizeof(tstamp), (unsigned int)(9 - i%10));
}
priority_prev = (std::numeric_limits<unsigned int>::max)();
tstamp_prev = 0;
//Receive all messages and test those are ordered
//by priority and by FIFO in the same priority
for(std::size_t i = 0; i < 100; ++i){
mq1.receive(&tstamp, sizeof(tstamp), recvd, priority);
if(priority > priority_prev)
return false;
if(priority == priority_prev &&
tstamp <= tstamp_prev){
return false;
}
priority_prev = priority;
tstamp_prev = tstamp;
}
}
message_queue::remove(test::get_process_id_name());
return true;
}
//[message_queue_test_test_serialize_db
//This test creates a in memory data-base using Interprocess machinery and
//serializes it through a message queue. Then rebuilds the data-base in
//another buffer and checks it against the original data-base
bool test_serialize_db()
{
//Typedef data to create a Interprocess map
typedef std::pair<const std::size_t, std::size_t> MyPair;
typedef std::less<std::size_t> MyLess;
typedef node_allocator<MyPair, managed_external_buffer::segment_manager>
node_allocator_t;
typedef map<std::size_t,
std::size_t,
std::less<std::size_t>,
node_allocator_t>
MyMap;
//Some constants
const std::size_t BufferSize = 65536;
const std::size_t MaxMsgSize = 100;
//Allocate a memory buffer to hold the destiny database using vector<char>
std::vector<char> buffer_destiny(BufferSize, 0);
message_queue::remove(test::get_process_id_name());
{
//Create the message-queues
message_queue mq1(create_only, test::get_process_id_name(), 1, MaxMsgSize);
//Open previously created message-queue simulating other process
message_queue mq2(open_only, test::get_process_id_name());
//A managed heap memory to create the origin database
managed_heap_memory db_origin(buffer_destiny.size());
//Construct the map in the first buffer
MyMap *map1 = db_origin.construct<MyMap>("MyMap")
(MyLess(),
db_origin.get_segment_manager());
if(!map1)
return false;
//Fill map1 until is full
try{
std::size_t i = 0;
while(1){
(*map1)[i] = i;
++i;
}
}
catch(boost::interprocess::bad_alloc &){}
//Data control data sending through the message queue
std::size_t sent = 0;
message_queue::size_type recvd = 0;
message_queue::size_type total_recvd = 0;
unsigned int priority;
//Send whole first buffer through the mq1, read it
//through mq2 to the second buffer
while(1){
//Send a fragment of buffer1 through mq1
std::size_t bytes_to_send = MaxMsgSize < (db_origin.get_size() - sent) ?
MaxMsgSize : (db_origin.get_size() - sent);
mq1.send( &static_cast<char*>(db_origin.get_address())[sent]
, bytes_to_send
, 0);
sent += bytes_to_send;
//Receive the fragment through mq2 to buffer_destiny
mq2.receive( &buffer_destiny[total_recvd]
, BufferSize - recvd
, recvd
, priority);
total_recvd += recvd;
//Check if we have received all the buffer
if(total_recvd == BufferSize){
break;
}
}
//The buffer will contain a copy of the original database
//so let's interpret the buffer with managed_external_buffer
managed_external_buffer db_destiny(open_only, &buffer_destiny[0], BufferSize);
//Let's find the map
std::pair<MyMap *, managed_external_buffer::size_type> ret = db_destiny.find<MyMap>("MyMap");
MyMap *map2 = ret.first;
//Check if we have found it
if(!map2){
return false;
}
//Check if it is a single variable (not an array)
if(ret.second != 1){
return false;
}
//Now let's compare size
if(map1->size() != map2->size()){
return false;
}
//Now let's compare all db values
MyMap::size_type num_elements = map1->size();
for(std::size_t i = 0; i < num_elements; ++i){
if((*map1)[i] != (*map2)[i]){
return false;
}
}
//Destroy maps from db-s
db_origin.destroy_ptr(map1);
db_destiny.destroy_ptr(map2);
}
message_queue::remove(test::get_process_id_name());
return true;
}
//]
static const int MsgSize = 10;
static const int NumMsg = 1000;
static char msgsend [10];
static char msgrecv [10];
static boost::interprocess::message_queue *pmessage_queue;
void receiver()
{
boost::interprocess::message_queue::size_type recvd_size;
unsigned int priority;
int nummsg = NumMsg;
while(nummsg--){
pmessage_queue->receive(msgrecv, MsgSize, recvd_size, priority);
}
}
bool test_buffer_overflow()
{
boost::interprocess::message_queue::remove(test::get_process_id_name());
{
boost::movelib::unique_ptr<boost::interprocess::message_queue>
ptr(new boost::interprocess::message_queue
(create_only, test::get_process_id_name(), 10, 10));
pmessage_queue = ptr.get();
//Launch the receiver thread
boost::interprocess::ipcdetail::OS_thread_t thread;
boost::interprocess::ipcdetail::thread_launch(thread, &receiver);
boost::interprocess::ipcdetail::thread_yield();
int nummsg = NumMsg;
while(nummsg--){
pmessage_queue->send(msgsend, MsgSize, 0);
}
boost::interprocess::ipcdetail::thread_join(thread);
}
boost::interprocess::message_queue::remove(test::get_process_id_name());
return true;
}
//////////////////////////////////////////////////////////////////////////////
//
// test_multi_sender_receiver is based on Alexander (aalutov's)
// testcase for ticket #9221. Many thanks.
//
//////////////////////////////////////////////////////////////////////////////
static boost::interprocess::message_queue *global_queue = 0;
//We'll send MULTI_NUM_MSG_PER_SENDER messages per sender
static const int MULTI_NUM_MSG_PER_SENDER = 10000;
//Message queue message capacity
static const int MULTI_QUEUE_SIZE = (MULTI_NUM_MSG_PER_SENDER - 1)/MULTI_NUM_MSG_PER_SENDER + 1;
//We'll launch MULTI_THREAD_COUNT senders and MULTI_THREAD_COUNT receivers
static const int MULTI_THREAD_COUNT = 10;
static void multisend()
{
char buff;
for (int i = 0; i < MULTI_NUM_MSG_PER_SENDER; i++) {
global_queue->send(&buff, 1, 0);
}
global_queue->send(&buff, 0, 0);
//std::cout<<"writer thread complete"<<std::endl;
}
static void multireceive()
{
char buff;
size_t size;
int received_msgs = 0;
unsigned int priority;
do {
global_queue->receive(&buff, 1, size, priority);
++received_msgs;
} while (size > 0);
--received_msgs;
//std::cout << "reader thread complete, read msgs: " << received_msgs << std::endl;
}
bool test_multi_sender_receiver()
{
bool ret = true;
//std::cout << "Testing multi-sender / multi-receiver " << std::endl;
try {
boost::interprocess::message_queue::remove(test::get_process_id_name());
boost::interprocess::message_queue mq
(boost::interprocess::open_or_create, test::get_process_id_name(), MULTI_QUEUE_SIZE, 1);
global_queue = &mq;
std::vector<boost::interprocess::ipcdetail::OS_thread_t> threads(MULTI_THREAD_COUNT*2);
//Launch senders receiver thread
for (int i = 0; i < MULTI_THREAD_COUNT; i++) {
boost::interprocess::ipcdetail::thread_launch
(threads[i], &multisend);
}
for (int i = 0; i < MULTI_THREAD_COUNT; i++) {
boost::interprocess::ipcdetail::thread_launch
(threads[MULTI_THREAD_COUNT+i], &multireceive);
}
for (int i = 0; i < MULTI_THREAD_COUNT*2; i++) {
boost::interprocess::ipcdetail::thread_join(threads[i]);
//std::cout << "Joined thread " << i << std::endl;
}
}
catch (std::exception &e) {
std::cout << "error " << e.what() << std::endl;
ret = false;
}
boost::interprocess::message_queue::remove(test::get_process_id_name());
return ret;
}
int main ()
{
if(!test_priority_order()){
return 1;
}
if(!test_serialize_db()){
return 1;
}
if(!test_buffer_overflow()){
return 1;
}
if(!test_multi_sender_receiver()){
return 1;
}
return 0;
}
#include <boost/interprocess/detail/config_end.hpp>