Go to the documentation of this file.00001 """Copyright 2012, System Insights, Inc.
00002
00003 Licensed under the Apache License, Version 2.0 (the "License");
00004 you may not use this file except in compliance with the License.
00005 You may obtain a copy of the License at
00006
00007 http://www.apache.org/licenses/LICENSE-2.0
00008
00009 Unless required by applicable law or agreed to in writing, software
00010 distributed under the License is distributed on an "AS IS" BASIS,
00011 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00012 See the License for the specific language governing permissions and
00013 limitations under the License."""
00014
00015 import socket, select, re
00016 import scanner
00017 import httplib
00018
00019 class LongPullException(Exception):
00020 pass
00021
00022 class LongPull:
00023 def __init__(self, response):
00024 self._response = response
00025 self._buffer = ''
00026
00027 def _read_all(self, size):
00028 globs = [self._buffer]
00029 amt = size - len(self._buffer)
00030
00031
00032 while amt > 0:
00033 ready, d1, d2 = select.select([self._socket], [], [])
00034 if len(ready) > 0:
00035 glob = self._socket.recv(amt)
00036 if not glob:
00037 raise LongPullException(''.join(globs), amt)
00038 amt -= len(glob)
00039 globs.append(glob)
00040
00041 return ''.join(globs)
00042
00043
00044 def _read_chunk(self):
00045
00046
00047
00048 text = ''
00049 chunk_size = None
00050
00051 if len(self._buffer) < 32:
00052 ready, d1, d2 = select.select([self._socket], [], [])
00053 if len(ready) == 0:
00054 raise httplib.NotConnected()
00055
00056 text = self._socket.recv(32)
00057 if len(text) == 0:
00058 self._socket.close()
00059 raise httplib.NotConnected()
00060
00061 self._buffer += text
00062 eol = self._buffer.find('\r\n')
00063 if eol >= 0:
00064 line = self._buffer[:eol]
00065 i = line.find(';')
00066 if i >= 0:
00067 line = line[:i]
00068 try:
00069 chunk_size = int(line, 16) + 2
00070 except ValueError:
00071
00072
00073 self._socket.close()
00074 raise httplib.IncompleteRead(line)
00075 self._buffer = self._buffer[(eol + 2):]
00076
00077 chunk = self._read_all(chunk_size)
00078 self._buffer = chunk[chunk_size:]
00079
00080 return chunk[:(chunk_size - 2)]
00081
00082
00083 def long_pull(self, callback, user_data = None):
00084 fileno = self._response.fileno()
00085 self._socket = socket.fromfd(fileno, socket.AF_INET, socket.SOCK_STREAM)
00086 self._socket.setblocking(False)
00087 content_type = dict(self._response.getheaders())['content-type']
00088 match = re.search('boundary=([0-9A-Fa-f]+)', content_type)
00089 if not match:
00090 raise LongPullException('Cannot find boundary in content-type')
00091
00092 boundary = '--' + match.group(1)
00093 boundary_pat = re.compile('^' + boundary)
00094 header = True
00095 length = len(boundary)
00096 document = scanner.Scanner('')
00097 while True:
00098 chunk = self._read_chunk()
00099 document.string += chunk
00100
00101 while document.rest_len() >= length:
00102 if header:
00103 if not document.check(boundary_pat):
00104 print "Framing error!"
00105 raise LongPullException('Framing error')
00106
00107 head = document.scan_until('\r\n\r\n')
00108 mime_headers = head.split('\r\n')
00109 values = dict([(v.lower().split(':')) for v in mime_headers[1:] if v.find(':') > 0])
00110 header = False
00111 try:
00112 length = int(values['content-length'])
00113 except ValueError:
00114 raise LongPullException('Cannot get length from mime header: ' + mime_headers)
00115
00116 else:
00117 rest = document.rest()
00118 body = rest[:length]
00119
00120 document.reset()
00121 document.string = rest[length:]
00122
00123 callback(body)
00124
00125 length = len(boundary)
00126 header = True
00127
00128 if __name__ == "__main__":
00129 conn = httplib.HTTPConnection('agent.mtconnect.org')
00130 conn.request("GET", "/sample?interval=1000&count=1000")
00131 response = conn.getresponse()
00132
00133 lp = LongPull(response)
00134 def callback(chunk):
00135 print chunk
00136
00137 lp.long_pull(callback)