parsebyparts.cpp
Go to the documentation of this file.
1 // Example of parsing JSON to document by parts.
2 
3 // Using C++11 threads
4 // Temporarily disable for clang (older version) due to incompatibility with libstdc++
5 #if (__cplusplus >= 201103L || (defined(_MSC_VER) && _MSC_VER >= 1700)) && !defined(__clang__)
6 
7 #include "rapidjson/document.h"
8 #include "rapidjson/error/en.h"
9 #include "rapidjson/writer.h"
11 #include <condition_variable>
12 #include <iostream>
13 #include <mutex>
14 #include <thread>
15 
16 using namespace rapidjson;
17 
18 template<unsigned parseFlags = kParseDefaultFlags>
19 class AsyncDocumentParser {
20 public:
21  AsyncDocumentParser(Document& d)
22  : stream_(*this)
23  , d_(d)
24  , parseThread_()
25  , mutex_()
26  , notEmpty_()
27  , finish_()
28  , completed_()
29  {
30  // Create and execute thread after all member variables are initialized.
31  parseThread_ = std::thread(&AsyncDocumentParser::Parse, this);
32  }
33 
34  ~AsyncDocumentParser() {
35  if (!parseThread_.joinable())
36  return;
37 
38  {
39  std::unique_lock<std::mutex> lock(mutex_);
40 
41  // Wait until the buffer is read up (or parsing is completed)
42  while (!stream_.Empty() && !completed_)
43  finish_.wait(lock);
44 
45  // Automatically append '\0' as the terminator in the stream.
46  static const char terminator[] = "";
47  stream_.src_ = terminator;
48  stream_.end_ = terminator + 1;
49  notEmpty_.notify_one(); // unblock the AsyncStringStream
50  }
51 
52  parseThread_.join();
53  }
54 
55  void ParsePart(const char* buffer, size_t length) {
56  std::unique_lock<std::mutex> lock(mutex_);
57 
58  // Wait until the buffer is read up (or parsing is completed)
59  while (!stream_.Empty() && !completed_)
60  finish_.wait(lock);
61 
62  // Stop further parsing if the parsing process is completed.
63  if (completed_)
64  return;
65 
66  // Set the buffer to stream and unblock the AsyncStringStream
67  stream_.src_ = buffer;
68  stream_.end_ = buffer + length;
69  notEmpty_.notify_one();
70  }
71 
72 private:
73  void Parse() {
74  d_.ParseStream<parseFlags>(stream_);
75 
76  // The stream may not be fully read, notify finish anyway to unblock ParsePart()
77  std::unique_lock<std::mutex> lock(mutex_);
78  completed_ = true; // Parsing process is completed
79  finish_.notify_one(); // Unblock ParsePart() or destructor if they are waiting.
80  }
81 
82  struct AsyncStringStream {
83  typedef char Ch;
84 
85  AsyncStringStream(AsyncDocumentParser& parser) : parser_(parser), src_(), end_(), count_() {}
86 
87  char Peek() const {
88  std::unique_lock<std::mutex> lock(parser_.mutex_);
89 
90  // If nothing in stream, block to wait.
91  while (Empty())
92  parser_.notEmpty_.wait(lock);
93 
94  return *src_;
95  }
96 
97  char Take() {
98  std::unique_lock<std::mutex> lock(parser_.mutex_);
99 
100  // If nothing in stream, block to wait.
101  while (Empty())
102  parser_.notEmpty_.wait(lock);
103 
104  count_++;
105  char c = *src_++;
106 
107  // If all stream is read up, notify that the stream is finish.
108  if (Empty())
109  parser_.finish_.notify_one();
110 
111  return c;
112  }
113 
114  size_t Tell() const { return count_; }
115 
116  // Not implemented
117  char* PutBegin() { return 0; }
118  void Put(char) {}
119  void Flush() {}
120  size_t PutEnd(char*) { return 0; }
121 
122  bool Empty() const { return src_ == end_; }
123 
124  AsyncDocumentParser& parser_;
125  const char* src_;
126  const char* end_;
127  size_t count_;
128  };
129 
130  AsyncStringStream stream_;
131  Document& d_;
132  std::thread parseThread_;
133  std::mutex mutex_;
134  std::condition_variable notEmpty_;
135  std::condition_variable finish_;
136  bool completed_;
137 };
138 
139 int main() {
140  Document d;
141 
142  {
143  AsyncDocumentParser<> parser(d);
144 
145  const char json1[] = " { \"hello\" : \"world\", \"t\" : tr";
146  //const char json1[] = " { \"hello\" : \"world\", \"t\" : trX"; // Fot test parsing error
147  const char json2[] = "ue, \"f\" : false, \"n\": null, \"i\":123, \"pi\": 3.14";
148  const char json3[] = "16, \"a\":[1, 2, 3, 4] } ";
149 
150  parser.ParsePart(json1, sizeof(json1) - 1);
151  parser.ParsePart(json2, sizeof(json2) - 1);
152  parser.ParsePart(json3, sizeof(json3) - 1);
153  }
154 
155  if (d.HasParseError()) {
156  std::cout << "Error at offset " << d.GetErrorOffset() << ": " << GetParseError_En(d.GetParseError()) << std::endl;
157  return EXIT_FAILURE;
158  }
159 
160  // Stringify the JSON to cout
161  OStreamWrapper os(std::cout);
162  Writer<OStreamWrapper> writer(os);
163  d.Accept(writer);
164  std::cout << std::endl;
165 
166  return EXIT_SUCCESS;
167 }
168 
169 #else // Not supporting C++11
170 
171 #include <iostream>
172 int main() {
173  std::cout << "This example requires C++11 compiler" << std::endl;
174 }
175 
176 #endif
d
int main()
JSON writer.
Definition: fwd.h:95
ParseErrorCode GetParseError() const
Get the ParseErrorCode of last parsing.
Definition: document.h:2362
main RapidJSON namespace
size_t GetErrorOffset() const
Get the position of last parsing error in input, 0 otherwise.
Definition: document.h:2365
RAPIDJSON_NAMESPACE_BEGIN const RAPIDJSON_ERROR_CHARTYPE * GetParseError_En(ParseErrorCode parseErrorCode)
Maps error code of parsing into error message.
Definition: en.h:36
bool HasParseError() const
Whether a parse error has occured in the last parsing.
Definition: document.h:2359


choreo_rapidjson
Author(s):
autogenerated on Thu Jul 18 2019 03:59:09