long_pull.py
Go to the documentation of this file.
00001 """Copyright 2012, System Insights, Inc.
00002 
00003    Licensed under the Apache License, Version 2.0 (the "License");
00004    you may not use this file except in compliance with the License.
00005    You may obtain a copy of the License at
00006 
00007        http://www.apache.org/licenses/LICENSE-2.0
00008 
00009    Unless required by applicable law or agreed to in writing, software
00010    distributed under the License is distributed on an "AS IS" BASIS,
00011    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00012    See the License for the specific language governing permissions and
00013    limitations under the License."""
00014     
00015 import socket, select, re
00016 import scanner
00017 import httplib
00018 
00019 class LongPullException(Exception):
00020     pass
00021 
00022 class LongPull:
00023     def __init__(self, response):
00024         self._response = response
00025         self._buffer = ''
00026 
00027     def _read_all(self, size):
00028         globs = [self._buffer]
00029         amt = size - len(self._buffer)
00030 
00031 
00032         while amt > 0:
00033             ready, d1, d2 = select.select([self._socket], [], [])
00034             if len(ready) > 0:
00035                 glob = self._socket.recv(amt)
00036                 if not glob:
00037                     raise LongPullException(''.join(globs), amt)
00038                 amt -= len(glob)
00039                 globs.append(glob)
00040 
00041         return ''.join(globs)
00042 
00043 
00044     def _read_chunk(self):
00045         # This should always occur on a chunk boundry, so we should never
00046         # need to store a lot of text in the buffer.
00047 
00048         text = ''
00049         chunk_size = None
00050 
00051         if len(self._buffer) < 32:
00052             ready, d1, d2 = select.select([self._socket], [], [])
00053             if len(ready) == 0:
00054                 raise httplib.NotConnected()
00055 
00056             text = self._socket.recv(32)
00057             if len(text) == 0:
00058                 self._socket.close()
00059                 raise httplib.NotConnected()
00060 
00061         self._buffer += text
00062         eol = self._buffer.find('\r\n')
00063         if eol >= 0:
00064             line = self._buffer[:eol]
00065             i = line.find(';')
00066             if i >= 0:
00067                 line = line[:i] # strip chunk-extensions
00068             try:
00069                 chunk_size = int(line, 16) + 2
00070             except ValueError:
00071                 # close the connection as protocol synchronisation is
00072                 # probably lost
00073                 self._socket.close()
00074                 raise httplib.IncompleteRead(line)
00075             self._buffer = self._buffer[(eol + 2):]
00076 
00077         chunk = self._read_all(chunk_size)
00078         self._buffer = chunk[chunk_size:]
00079 
00080         return chunk[:(chunk_size - 2)]
00081 
00082 
00083     def long_pull(self, callback, user_data = None):
00084         fileno = self._response.fileno()
00085         self._socket = socket.fromfd(fileno, socket.AF_INET, socket.SOCK_STREAM)
00086         self._socket.setblocking(False)
00087         content_type = dict(self._response.getheaders())['content-type']
00088         match = re.search('boundary=([0-9A-Fa-f]+)', content_type)
00089         if not match:
00090             raise LongPullException('Cannot find boundary in content-type')
00091 
00092         boundary = '--' + match.group(1)
00093         boundary_pat = re.compile('^' + boundary)
00094         header = True
00095         length = len(boundary)
00096         document = scanner.Scanner('')
00097         while True:
00098             chunk = self._read_chunk()
00099             document.string += chunk
00100 
00101             while document.rest_len() >= length:
00102                 if header:
00103                     if not document.check(boundary_pat):
00104                         print "Framing error!"
00105                         raise LongPullException('Framing error')
00106 
00107                     head = document.scan_until('\r\n\r\n')
00108                     mime_headers = head.split('\r\n')
00109                     values = dict([(v.lower().split(':')) for v in mime_headers[1:] if v.find(':') > 0])
00110                     header = False
00111                     try:
00112                         length = int(values['content-length'])
00113                     except ValueError:
00114                         raise LongPullException('Cannot get length from mime header: ' + mime_headers)
00115 
00116                 else:
00117                     rest = document.rest()
00118                     body = rest[:length]
00119 
00120                     document.reset()
00121                     document.string = rest[length:]
00122 
00123                     callback(body)
00124 
00125                     length = len(boundary)
00126                     header = True
00127 
00128 if __name__ == "__main__":
00129     conn = httplib.HTTPConnection('agent.mtconnect.org')
00130     conn.request("GET", "/sample?interval=1000&count=1000")
00131     response = conn.getresponse()
00132 
00133     lp = LongPull(response)
00134     def callback(chunk):
00135         print chunk
00136 
00137     lp.long_pull(callback)


mtconnect_ros_bridge
Author(s): Stephen L. Wiedmann
autogenerated on Mon Jan 6 2014 11:30:45