mtconnect_adapter.py
Go to the documentation of this file.
00001 """Copyright 2012, System Insights, Inc.
00002 
00003    Licensed under the Apache License, Version 2.0 (the "License");
00004    you may not use this file except in compliance with the License.
00005    You may obtain a copy of the License at
00006 
00007        http://www.apache.org/licenses/LICENSE-2.0
00008 
00009    Unless required by applicable law or agreed to in writing, software
00010    distributed under the License is distributed on an "AS IS" BASIS,
00011    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00012    See the License for the specific language governing permissions and
00013    limitations under the License."""
00014     
00015 from SocketServer import ThreadingMixIn, TCPServer, BaseRequestHandler
00016 import threading
00017 import socket
00018 from datetime import datetime
00019 import re
00020 
00021 
00022 class Adapter(ThreadingMixIn, TCPServer):
00023     allow_reuse_address = True
00024 
00025     def __init__(self, address, heartbeat_interval = 10000, family = socket.AF_INET):
00026         self.address_family = family
00027         TCPServer.__init__(self, address, BaseRequestHandler, False)
00028         self._clients = dict()
00029         self._lock = threading.RLock()
00030         self._data_items = []
00031         self._running = False
00032         self._heartbeat_interval = heartbeat_interval
00033         self._ping_pat = re.compile('\\* PING')
00034 
00035     def add_data_item(self, item):
00036         self._data_items.append(item)
00037 
00038     def start(self):
00039         self.server_bind()
00040         self._running = True
00041         self.server_activate()
00042         self._server_thread = threading.Thread(target = self.serve_forever)
00043         self._server_thread.setDaemon(True)
00044         print "Server started, waiting for connections on " + str(self.server_address)
00045         self._server_thread.start()
00046 
00047     def stop(self):
00048         self.shutdown()
00049         for client in self._clients.values():
00050             client.shutdown(socket.SHUT_RDWR)
00051         self._server_thread.join(5.0)
00052 
00053     def wait_until_stopped(self):
00054         self._server_thread.join()
00055 
00056     def finish_request(self, request, client_address):
00057         print "Connected to " + str(client_address)
00058         self._lock.acquire()
00059         self._clients[client_address] = request
00060         self._lock.release()
00061 
00062         # Turn nageling off
00063         request.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
00064 
00065         self.send_initial(client_address)
00066         self.heartbeat(request)
00067 
00068         self.remove_client(client_address)
00069 
00070     def heartbeat(self, client):
00071         try:
00072             client.settimeout(None)
00073             while self._running:
00074                 line = client.recv(256)
00075                 if self._ping_pat.match(line):
00076                     if not client.gettimeout():
00077                         client.settimeout(self._heartbeat_interval / 500.0)
00078                     try:
00079                         self._lock.acquire()
00080                         client.send("* PONG " + str(self._heartbeat_interval) + "\n")
00081                     finally:
00082                         self._lock.release()
00083                 else:
00084                     break
00085         except:
00086             print "Exception in heartbeat thread"
00087 
00088         print "Headbeat thread stopped"
00089 
00090 
00091     def remove_client(self, client_address):
00092         print "Removing " + str(client_address)
00093         try:
00094             self._lock.acquire()
00095             if client_address in self._clients:
00096                 socket = self._clients[client_address]
00097                 del self._clients[client_address]
00098                 socket.shutdown(socket.SHUT_RDWR)
00099         except:
00100             print "Exception closing socket for " + str(client_address)
00101         finally:
00102             self._lock.release()
00103 
00104     def begin(self):
00105         for di in self._data_items:
00106             di.begin()
00107 
00108     def complete(self):
00109         for di in self._data_items:
00110             di.complete()
00111 
00112     def sweep(self):
00113         for di in self._data_items:
00114             di.sweep()
00115 
00116     def unavailable(self):
00117         for di in self._data_items:
00118             di.unavailable()
00119 
00120     def send_initial(self, client_address):
00121         self.send_changed([client_address], True)
00122 
00123     def format_time(self):
00124         time = datetime.utcnow()
00125         return time.strftime("%Y-%m-%dT%H:%M:%S.%f") + 'Z'
00126 
00127     def send_changed(self, clients, force = False):
00128         text = ''
00129         time = self.format_time()
00130         separate = [item for item in self._data_items if item.separate_line()]
00131         combined = [item for item in self._data_items if not item.separate_line()]
00132         for item in combined:
00133             if force or item.changed():
00134                 text += ''.join(item.values(force))
00135 
00136         if len(text) > 0:
00137             self.send(time, text, clients)
00138 
00139         for item in separate:
00140           if force or item.changed():
00141             for line in item.values(force):
00142               self.send(time, line, clients)
00143 
00144     def format_line(self, time, text):
00145         return time + text + "\n"
00146 
00147     def send_to_client(self, client, line):
00148         try:
00149             try:
00150                 self._lock.acquire()
00151                 socket = self._clients[client]
00152             finally:
00153                 self._lock.release()
00154             if socket:
00155                 socket.send(line)
00156         except Exception, ex:
00157             print "Exception occurred in send_to_client, removing client" + str(ex)
00158             self.remove_client(client)
00159 
00160 
00161     def send(self, time, text, clients):
00162         line =  self.format_line(time, text)
00163         for client in clients:
00164           self.send_to_client(client, line)
00165 
00166     def gather(self, function):
00167         self.begin()
00168 
00169         function()
00170 
00171         self.complete()
00172         self.send_changed(self._clients.keys())
00173         self.sweep()
00174 
00175     def begin_gather(self):
00176         self.begin()
00177 
00178     def complete_gather(self):
00179         self.complete()
00180         self.send_changed(self._clients.keys())
00181         self.sweep()


mtconnect_ros_bridge
Author(s): Stephen L. Wiedmann
autogenerated on Mon Jan 6 2014 11:30:45