lz4_stream.cpp
Go to the documentation of this file.
1 /*********************************************************************
2 * Software License Agreement (BSD License)
3 *
4 * Copyright (c) 2014, Ben Charrow
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 *
11 * * Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * * Redistributions in binary form must reproduce the above
14 * copyright notice, this list of conditions and the following
15 * disclaimer in the documentation and/or other materials provided
16 * with the distribution.
17 * * Neither the name of Willow Garage, Inc. nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific prior written permission.
20 *
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29 * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 * POSSIBILITY OF SUCH DAMAGE.
33 ********************************************************************/
34 
35 #include "rosbag/chunked_file.h"
36 
37 #include <iostream>
38 #include <cstring>
39 #include "console_bridge/console.h"
40 
41 // Remove this include when no longer supporting platforms with libconsole-bridge-dev < 0.3.0,
42 // in particular Debian Jessie: https://packages.debian.org/jessie/libconsole-bridge-dev
44 
45 using std::string;
46 
47 namespace rosbag {
48 
50  : Stream(file), block_size_id_(6) {
52  buff_ = new char[buff_size_];
53  lz4s_.state = NULL;
54 }
55 
57  delete[] buff_;
58 }
59 
61  return compression::LZ4;
62 }
63 
65  if (lz4s_.state) {
66  throw BagException("cannot start writing to already opened lz4 stream");
67  }
68 
69  setCompressedIn(0);
70 
72  switch(ret) {
73  case ROSLZ4_OK: break;
74  case ROSLZ4_MEMORY_ERROR: throw BagIOException("ROSLZ4_MEMORY_ERROR: insufficient memory available"); break;
75  case ROSLZ4_PARAM_ERROR: throw BagIOException("ROSLZ4_PARAM_ERROR: bad block size"); break;
76  default: throw BagException("Unhandled return code");
77  }
80 }
81 
82 void LZ4Stream::write(void* ptr, size_t size) {
83  if (!lz4s_.state) {
84  throw BagException("cannot write to unopened lz4 stream");
85  }
86 
87  lz4s_.input_left = size;
88  lz4s_.input_next = (char*) ptr;
89 
92 }
93 
94 void LZ4Stream::writeStream(int action) {
95  int ret = ROSLZ4_OK;
96  while (lz4s_.input_left > 0 ||
97  (action == ROSLZ4_FINISH && ret != ROSLZ4_STREAM_END)) {
98  ret = roslz4_compress(&lz4s_, action);
99  switch(ret) {
100  case ROSLZ4_OK: break;
101  case ROSLZ4_OUTPUT_SMALL:
102  if (lz4s_.output_next - buff_ == buff_size_) {
103  throw BagIOException("ROSLZ4_OUTPUT_SMALL: output buffer is too small");
104  } else {
105  // There's data to be written in buff_; this will free up space
106  break;
107  }
108  case ROSLZ4_STREAM_END: break;
109  case ROSLZ4_PARAM_ERROR: throw BagIOException("ROSLZ4_PARAM_ERROR: bad block size"); break;
110  case ROSLZ4_ERROR: throw BagIOException("ROSLZ4_ERROR: compression error"); break;
111  default: throw BagException("Unhandled return code");
112  }
113 
114  // If output data is ready, write to disk
115  int to_write = lz4s_.output_next - buff_;
116  if (to_write > 0) {
117  if (fwrite(buff_, 1, to_write, getFilePointer()) != static_cast<size_t>(to_write)) {
118  throw BagException("Problem writing data to disk");
119  }
120  advanceOffset(to_write);
123  }
124  }
125 }
126 
128  if (!lz4s_.state) {
129  throw BagException("cannot close unopened lz4 stream");
130  }
131 
133  setCompressedIn(0);
135 }
136 
138  if (lz4s_.state) {
139  throw BagException("cannot start reading from already opened lz4 stream");
140  }
141 
142  int ret = roslz4_decompressStart(&lz4s_);
143  switch(ret) {
144  case ROSLZ4_OK: break;
145  case ROSLZ4_MEMORY_ERROR: throw BagException("ROSLZ4_MEMORY_ERROR: insufficient memory available"); break;
146  default: throw BagException("Unhandled return code");
147  }
148 
149  if (getUnusedLength() > buff_size_) {
150  throw BagException("Too many unused bytes to decompress");
151  }
152 
153  // getUnused() could be pointing to part of buff_, so don't use memcpy
154  memmove(buff_, getUnused(), getUnusedLength());
157  clearUnused();
158 }
159 
160 void LZ4Stream::read(void* ptr, size_t size) {
161  if (!lz4s_.state) {
162  throw BagException("cannot read from unopened lz4 stream");
163  }
164 
165  // Setup stream by filling buffer with data from file
166  int to_read = buff_size_ - lz4s_.input_left;
167  char *input_start = buff_ + lz4s_.input_left;
168  int nread = fread(input_start, 1, to_read, getFilePointer());
169  if (ferror(getFilePointer())) {
170  throw BagIOException("Problem reading from file");
171  }
173  lz4s_.input_left += nread;
174  lz4s_.output_next = (char*) ptr;
175  lz4s_.output_left = size;
176 
177  // Decompress. If reach end of stream, store unused data
178  int ret = roslz4_decompress(&lz4s_);
179  switch (ret) {
180  case ROSLZ4_OK: break;
181  case ROSLZ4_STREAM_END:
182  if (getUnused() || getUnusedLength() > 0)
183  CONSOLE_BRIDGE_logError("unused data already available");
184  else {
187  }
188  return;
189  case ROSLZ4_ERROR: throw BagException("ROSLZ4_ERROR: decompression error"); break;
190  case ROSLZ4_MEMORY_ERROR: throw BagException("ROSLZ4_MEMORY_ERROR: insufficient memory available"); break;
191  case ROSLZ4_OUTPUT_SMALL: throw BagException("ROSLZ4_OUTPUT_SMALL: output buffer is too small"); break;
192  case ROSLZ4_DATA_ERROR: throw BagException("ROSLZ4_DATA_ERROR: malformed data to decompress"); break;
193  default: throw BagException("Unhandled return code");
194  }
195  if (feof(getFilePointer())) {
196  throw BagIOException("Reached end of file before reaching end of stream");
197  }
198 
199  size_t total_out = lz4s_.output_next - (char*)ptr;
200  advanceOffset(total_out);
201 
202  // Shift input buffer if there's unconsumed data
203  if (lz4s_.input_left > 0) {
205  }
206 }
207 
209  if (!lz4s_.state) {
210  throw BagException("cannot close unopened lz4 stream");
211  }
212 
214 }
215 
216 void LZ4Stream::decompress(uint8_t* dest, unsigned int dest_len, uint8_t* source, unsigned int source_len) {
217  unsigned int actual_dest_len = dest_len;
218  int ret = roslz4_buffToBuffDecompress((char*)source, source_len,
219  (char*)dest, &actual_dest_len);
220  switch(ret) {
221  case ROSLZ4_OK: break;
222  case ROSLZ4_ERROR: throw BagException("ROSLZ4_ERROR: decompression error"); break;
223  case ROSLZ4_MEMORY_ERROR: throw BagException("ROSLZ4_MEMORY_ERROR: insufficient memory available"); break;
224  case ROSLZ4_OUTPUT_SMALL: throw BagException("ROSLZ4_OUTPUT_SMALL: output buffer is too small"); break;
225  case ROSLZ4_DATA_ERROR: throw BagException("ROSLZ4_DATA_ERROR: malformed data to decompress"); break;
226  default: throw BagException("Unhandled return code");
227  }
228  if (actual_dest_len != dest_len) {
229  throw BagException("Decompression size mismatch in LZ4 chunk");
230  }
231 }
232 
233 } // namespace rosbag
const int ROSLZ4_MEMORY_ERROR
uint64_t getCompressedIn()
Definition: stream.cpp:74
LZ4Stream(ChunkedFile *file)
Definition: lz4_stream.cpp:49
int getUnusedLength()
Definition: stream.cpp:78
ChunkedFile reads and writes files which contain interleaved chunks of compressed and uncompressed da...
Definition: chunked_file.h:51
#define CONSOLE_BRIDGE_logError(fmt,...)
void setUnusedLength(int nUnused)
Definition: stream.cpp:80
char * getUnused()
Definition: stream.cpp:77
Base class for rosbag exceptions.
Definition: exceptions.h:43
const int ROSLZ4_OUTPUT_SMALL
void roslz4_decompressEnd(roslz4_stream *str)
void read(void *ptr, size_t size)
Definition: lz4_stream.cpp:160
void write(void *ptr, size_t size)
Definition: lz4_stream.cpp:82
void decompress(uint8_t *dest, unsigned int dest_len, uint8_t *source, unsigned int source_len)
Definition: lz4_stream.cpp:216
int roslz4_compress(roslz4_stream *str, int action)
void roslz4_compressEnd(roslz4_stream *str)
const int ROSLZ4_STREAM_END
roslz4_stream lz4s_
Definition: stream.h:193
void advanceOffset(uint64_t nbytes)
Definition: stream.cpp:76
const int ROSLZ4_PARAM_ERROR
Exception thrown when on IO problems.
Definition: exceptions.h:50
char * output_next
int roslz4_decompressStart(roslz4_stream *str)
char * input_next
FILE * getFilePointer()
Definition: stream.cpp:73
const int ROSLZ4_DATA_ERROR
const int ROSLZ4_OK
void setCompressedIn(uint64_t nbytes)
Definition: stream.cpp:75
void clearUnused()
Definition: stream.cpp:81
const int ROSLZ4_FINISH
char * buff_
Definition: stream.h:190
int block_size_id_
Definition: stream.h:192
Definition: bag.h:68
const int ROSLZ4_ERROR
void setUnused(char *unused)
Definition: stream.cpp:79
void writeStream(int action)
Definition: lz4_stream.cpp:94
int roslz4_decompress(roslz4_stream *str)
const int ROSLZ4_RUN
int roslz4_compressStart(roslz4_stream *str, int block_size_id)
CompressionType getCompressionType() const
Definition: lz4_stream.cpp:60
int roslz4_blockSizeFromIndex(int block_id)
int roslz4_buffToBuffDecompress(char *input, unsigned int input_size, char *output, unsigned int *output_size)


rosbag_storage
Author(s): Dirk Thomas
autogenerated on Mon Nov 2 2020 03:52:19