lz4_stream.cpp
Go to the documentation of this file.
1 /*********************************************************************
2 * Software License Agreement (BSD License)
3 *
4 * Copyright (c) 2014, Ben Charrow
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 *
11 * * Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * * Redistributions in binary form must reproduce the above
14 * copyright notice, this list of conditions and the following
15 * disclaimer in the documentation and/or other materials provided
16 * with the distribution.
17 * * Neither the name of Willow Garage, Inc. nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific prior written permission.
20 *
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29 * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 * POSSIBILITY OF SUCH DAMAGE.
33 ********************************************************************/
34 
35 #include "rosbag/chunked_file.h"
36 
37 #include <iostream>
38 #include <cstring>
39 #include "console_bridge/console.h"
40 
41 using std::string;
42 
43 namespace rosbag {
44 
46  : Stream(file), block_size_id_(6) {
48  buff_ = new char[buff_size_];
49  lz4s_.state = NULL;
50 }
51 
53  delete[] buff_;
54 }
55 
57  return compression::LZ4;
58 }
59 
61  if (lz4s_.state) {
62  throw BagException("cannot start writing to already opened lz4 stream");
63  }
64 
65  setCompressedIn(0);
66 
68  switch(ret) {
69  case ROSLZ4_OK: break;
70  case ROSLZ4_MEMORY_ERROR: throw BagIOException("ROSLZ4_MEMORY_ERROR: insufficient memory available"); break;
71  case ROSLZ4_PARAM_ERROR: throw BagIOException("ROSLZ4_PARAM_ERROR: bad block size"); break;
72  default: throw BagException("Unhandled return code");
73  }
76 }
77 
78 void LZ4Stream::write(void* ptr, size_t size) {
79  if (!lz4s_.state) {
80  throw BagException("cannot write to unopened lz4 stream");
81  }
82 
83  lz4s_.input_left = size;
84  lz4s_.input_next = (char*) ptr;
85 
88 }
89 
90 void LZ4Stream::writeStream(int action) {
91  int ret = ROSLZ4_OK;
92  while (lz4s_.input_left > 0 ||
93  (action == ROSLZ4_FINISH && ret != ROSLZ4_STREAM_END)) {
94  ret = roslz4_compress(&lz4s_, action);
95  switch(ret) {
96  case ROSLZ4_OK: break;
98  if (lz4s_.output_next - buff_ == buff_size_) {
99  throw BagIOException("ROSLZ4_OUTPUT_SMALL: output buffer is too small");
100  } else {
101  // There's data to be written in buff_; this will free up space
102  break;
103  }
104  case ROSLZ4_STREAM_END: break;
105  case ROSLZ4_PARAM_ERROR: throw BagIOException("ROSLZ4_PARAM_ERROR: bad block size"); break;
106  case ROSLZ4_ERROR: throw BagIOException("ROSLZ4_ERROR: compression error"); break;
107  default: throw BagException("Unhandled return code");
108  }
109 
110  // If output data is ready, write to disk
111  int to_write = lz4s_.output_next - buff_;
112  if (to_write > 0) {
113  if (fwrite(buff_, 1, to_write, getFilePointer()) != static_cast<size_t>(to_write)) {
114  throw BagException("Problem writing data to disk");
115  }
116  advanceOffset(to_write);
119  }
120  }
121 }
122 
124  if (!lz4s_.state) {
125  throw BagException("cannot close unopened lz4 stream");
126  }
127 
129  setCompressedIn(0);
131 }
132 
134  if (lz4s_.state) {
135  throw BagException("cannot start reading from already opened lz4 stream");
136  }
137 
138  int ret = roslz4_decompressStart(&lz4s_);
139  switch(ret) {
140  case ROSLZ4_OK: break;
141  case ROSLZ4_MEMORY_ERROR: throw BagException("ROSLZ4_MEMORY_ERROR: insufficient memory available"); break;
142  default: throw BagException("Unhandled return code");
143  }
144 
145  if (getUnusedLength() > buff_size_) {
146  throw BagException("Too many unused bytes to decompress");
147  }
148 
149  // getUnused() could be pointing to part of buff_, so don't use memcpy
150  memmove(buff_, getUnused(), getUnusedLength());
153  clearUnused();
154 }
155 
156 void LZ4Stream::read(void* ptr, size_t size) {
157  if (!lz4s_.state) {
158  throw BagException("cannot read from unopened lz4 stream");
159  }
160 
161  // Setup stream by filling buffer with data from file
162  int to_read = buff_size_ - lz4s_.input_left;
163  char *input_start = buff_ + lz4s_.input_left;
164  int nread = fread(input_start, 1, to_read, getFilePointer());
165  if (ferror(getFilePointer())) {
166  throw BagIOException("Problem reading from file");
167  }
169  lz4s_.input_left += nread;
170  lz4s_.output_next = (char*) ptr;
171  lz4s_.output_left = size;
172 
173  // Decompress. If reach end of stream, store unused data
174  int ret = roslz4_decompress(&lz4s_);
175  switch (ret) {
176  case ROSLZ4_OK: break;
177  case ROSLZ4_STREAM_END:
178  if (getUnused() || getUnusedLength() > 0) {
179  CONSOLE_BRIDGE_logError("unused data already available");
180  } else {
183  }
184  return;
185  case ROSLZ4_ERROR: throw BagException("ROSLZ4_ERROR: decompression error"); break;
186  case ROSLZ4_MEMORY_ERROR: throw BagException("ROSLZ4_MEMORY_ERROR: insufficient memory available"); break;
187  case ROSLZ4_OUTPUT_SMALL: throw BagException("ROSLZ4_OUTPUT_SMALL: output buffer is too small"); break;
188  case ROSLZ4_DATA_ERROR: throw BagException("ROSLZ4_DATA_ERROR: malformed data to decompress"); break;
189  default: throw BagException("Unhandled return code");
190  }
191  if (feof(getFilePointer())) {
192  throw BagIOException("Reached end of file before reaching end of stream");
193  }
194 
195  size_t total_out = lz4s_.output_next - (char*)ptr;
196  advanceOffset(total_out);
197 
198  // Shift input buffer if there's unconsumed data
199  if (lz4s_.input_left > 0) {
201  }
202 }
203 
205  if (!lz4s_.state) {
206  throw BagException("cannot close unopened lz4 stream");
207  }
208 
210 }
211 
212 void LZ4Stream::decompress(uint8_t* dest, unsigned int dest_len, uint8_t* source, unsigned int source_len) {
213  unsigned int actual_dest_len = dest_len;
214  int ret = roslz4_buffToBuffDecompress((char*)source, source_len,
215  (char*)dest, &actual_dest_len);
216  switch(ret) {
217  case ROSLZ4_OK: break;
218  case ROSLZ4_ERROR: throw BagException("ROSLZ4_ERROR: decompression error"); break;
219  case ROSLZ4_MEMORY_ERROR: throw BagException("ROSLZ4_MEMORY_ERROR: insufficient memory available"); break;
220  case ROSLZ4_OUTPUT_SMALL: throw BagException("ROSLZ4_OUTPUT_SMALL: output buffer is too small"); break;
221  case ROSLZ4_DATA_ERROR: throw BagException("ROSLZ4_DATA_ERROR: malformed data to decompress"); break;
222  default: throw BagException("Unhandled return code");
223  }
224  if (actual_dest_len != dest_len) {
225  throw BagException("Decompression size mismatch in LZ4 chunk");
226  }
227 }
228 
229 } // namespace rosbag
const int ROSLZ4_MEMORY_ERROR
uint64_t getCompressedIn()
Definition: stream.cpp:74
LZ4Stream(ChunkedFile *file)
Definition: lz4_stream.cpp:45
int getUnusedLength()
Definition: stream.cpp:78
ChunkedFile reads and writes files which contain interleaved chunks of compressed and uncompressed da...
Definition: chunked_file.h:51
void setUnusedLength(int nUnused)
Definition: stream.cpp:80
char * getUnused()
Definition: stream.cpp:77
Base class for rosbag exceptions.
Definition: exceptions.h:43
const int ROSLZ4_OUTPUT_SMALL
void roslz4_decompressEnd(roslz4_stream *str)
void read(void *ptr, size_t size)
Definition: lz4_stream.cpp:156
void write(void *ptr, size_t size)
Definition: lz4_stream.cpp:78
CompressionType getCompressionType() const
Definition: lz4_stream.cpp:56
void decompress(uint8_t *dest, unsigned int dest_len, uint8_t *source, unsigned int source_len)
Definition: lz4_stream.cpp:212
int roslz4_compress(roslz4_stream *str, int action)
void roslz4_compressEnd(roslz4_stream *str)
const int ROSLZ4_STREAM_END
roslz4_stream lz4s_
Definition: stream.h:193
void advanceOffset(uint64_t nbytes)
Definition: stream.cpp:76
const int ROSLZ4_PARAM_ERROR
Exception thrown when on IO problems.
Definition: exceptions.h:50
char * output_next
int roslz4_decompressStart(roslz4_stream *str)
char * input_next
FILE * getFilePointer()
Definition: stream.cpp:73
const int ROSLZ4_DATA_ERROR
const int ROSLZ4_OK
void setCompressedIn(uint64_t nbytes)
Definition: stream.cpp:75
void clearUnused()
Definition: stream.cpp:81
const int ROSLZ4_FINISH
char * buff_
Definition: stream.h:190
int block_size_id_
Definition: stream.h:192
const int ROSLZ4_ERROR
void setUnused(char *unused)
Definition: stream.cpp:79
#define CONSOLE_BRIDGE_logError(fmt,...)
void writeStream(int action)
Definition: lz4_stream.cpp:90
int roslz4_decompress(roslz4_stream *str)
const int ROSLZ4_RUN
int roslz4_compressStart(roslz4_stream *str, int block_size_id)
int roslz4_blockSizeFromIndex(int block_id)
int roslz4_buffToBuffDecompress(char *input, unsigned int input_size, char *output, unsigned int *output_size)


rosbag_storage
Author(s): Dirk Thomas
autogenerated on Mon Feb 28 2022 23:33:55