Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028 #include "threadedfilebuf.h"
00029
00030 #include <cstring>
00031
00032 using namespace std;
00033
00034 namespace pangolin
00035 {
00036
00037 threadedfilebuf::threadedfilebuf( const std::string& filename, unsigned int buffer_size_bytes )
00038 : mem_buffer(0), mem_size(0), mem_start(0), mem_end(0)
00039 {
00040 file.open(filename.c_str(), ios::out | ios::binary );
00041
00042 mem_max_size = buffer_size_bytes;
00043 mem_buffer = new char[mem_max_size];
00044
00045 write_thread = boost::thread(boost::ref(*this));
00046 }
00047
00048 threadedfilebuf::~threadedfilebuf()
00049 {
00050 if( write_thread.joinable() )
00051 {
00052 write_thread.interrupt();
00053 write_thread.join();
00054 }
00055
00056 if( mem_buffer) delete mem_buffer;
00057 file.close();
00058 }
00059
00060 std::streamsize threadedfilebuf::xsputn(const char* data, std::streamsize num_bytes)
00061 {
00062 if( num_bytes > mem_max_size )
00063 throw exception();
00064
00065 {
00066 boost::unique_lock<boost::mutex> lock(update_mutex);
00067
00068
00069 while( mem_size + num_bytes > mem_max_size )
00070 {
00071 cond_dequeued.wait(lock);
00072 }
00073
00074
00075 const int array_a_size =
00076 (mem_start <= mem_end) ? (mem_max_size - mem_end) : (mem_start - mem_end);
00077
00078 if( num_bytes <= array_a_size )
00079 {
00080
00081 memcpy(mem_buffer + mem_end, data, num_bytes);
00082 mem_end += num_bytes;
00083 mem_size += num_bytes;
00084 }
00085 else
00086 {
00087 const int array_b_size = num_bytes - array_a_size;
00088 memcpy(mem_buffer + mem_end, data, array_a_size);
00089 memcpy(mem_buffer, data+array_a_size, array_b_size);
00090 mem_end = array_b_size;
00091 mem_size += num_bytes;
00092 }
00093
00094 if(mem_end == mem_max_size)
00095 mem_end = 0;
00096 }
00097
00098 cond_queued.notify_one();
00099
00100 return num_bytes;
00101 }
00102
00103 void threadedfilebuf::operator()()
00104 {
00105 int data_to_write = 0;
00106
00107 while(true)
00108 {
00109 {
00110 boost::unique_lock<boost::mutex> lock(update_mutex);
00111
00112 while( mem_size == 0 )
00113 cond_queued.wait(lock);
00114
00115 data_to_write =
00116 (mem_start < mem_end) ?
00117 mem_end - mem_start :
00118 mem_max_size - mem_start;
00119 }
00120
00121 std::streamsize bytes_written =
00122 file.sputn(mem_buffer + mem_start, data_to_write );
00123
00124 if( bytes_written != data_to_write)
00125 throw std::exception();
00126
00127 {
00128 boost::unique_lock<boost::mutex> lock(update_mutex);
00129
00130 mem_size -= data_to_write;
00131 mem_start += data_to_write;
00132
00133 if(mem_start == mem_max_size)
00134 mem_start = 0;
00135 }
00136
00137 cond_dequeued.notify_all();
00138 }
00139 }
00140
00141 }