Go to the documentation of this file.00001 """Copyright 2012, System Insights, Inc.
00002
00003 Licensed under the Apache License, Version 2.0 (the "License");
00004 you may not use this file except in compliance with the License.
00005 You may obtain a copy of the License at
00006
00007 http://www.apache.org/licenses/LICENSE-2.0
00008
00009 Unless required by applicable law or agreed to in writing, software
00010 distributed under the License is distributed on an "AS IS" BASIS,
00011 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00012 See the License for the specific language governing permissions and
00013 limitations under the License."""
00014
00015 from SocketServer import ThreadingMixIn, TCPServer, BaseRequestHandler
00016 import threading
00017 import socket
00018 from datetime import datetime
00019 import re
00020
00021
00022 class Adapter(ThreadingMixIn, TCPServer):
00023 allow_reuse_address = True
00024
00025 def __init__(self, address, heartbeat_interval = 10000, family = socket.AF_INET):
00026 self.address_family = family
00027 TCPServer.__init__(self, address, BaseRequestHandler, False)
00028 self._clients = dict()
00029 self._lock = threading.RLock()
00030 self._data_items = []
00031 self._running = False
00032 self._heartbeat_interval = heartbeat_interval
00033 self._ping_pat = re.compile('\\* PING')
00034
00035 def add_data_item(self, item):
00036 self._data_items.append(item)
00037
00038 def start(self):
00039 self.server_bind()
00040 self._running = True
00041 self.server_activate()
00042 self._server_thread = threading.Thread(target = self.serve_forever)
00043 self._server_thread.setDaemon(True)
00044 print "Server started, waiting for connections on " + str(self.server_address)
00045 self._server_thread.start()
00046
00047 def stop(self):
00048 self.shutdown()
00049 for client in self._clients.values():
00050 client.shutdown(socket.SHUT_RDWR)
00051 self._server_thread.join(5.0)
00052
00053 def wait_until_stopped(self):
00054 self._server_thread.join()
00055
00056 def finish_request(self, request, client_address):
00057 print "Connected to " + str(client_address)
00058 self._lock.acquire()
00059 self._clients[client_address] = request
00060 self._lock.release()
00061
00062
00063 request.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
00064
00065 self.send_initial(client_address)
00066 self.heartbeat(request)
00067
00068 self.remove_client(client_address)
00069
00070 def heartbeat(self, client):
00071 try:
00072 client.settimeout(None)
00073 while self._running:
00074 line = client.recv(256)
00075 if self._ping_pat.match(line):
00076 if not client.gettimeout():
00077 client.settimeout(self._heartbeat_interval / 500.0)
00078 try:
00079 self._lock.acquire()
00080 client.send("* PONG " + str(self._heartbeat_interval) + "\n")
00081 finally:
00082 self._lock.release()
00083 else:
00084 break
00085 except:
00086 print "Exception in heartbeat thread"
00087
00088 print "Headbeat thread stopped"
00089
00090
00091 def remove_client(self, client_address):
00092 print "Removing " + str(client_address)
00093 try:
00094 self._lock.acquire()
00095 if client_address in self._clients:
00096 socket = self._clients[client_address]
00097 del self._clients[client_address]
00098 socket.shutdown(socket.SHUT_RDWR)
00099 except:
00100 print "Exception closing socket for " + str(client_address)
00101 finally:
00102 self._lock.release()
00103
00104 def begin(self):
00105 for di in self._data_items:
00106 di.begin()
00107
00108 def complete(self):
00109 for di in self._data_items:
00110 di.complete()
00111
00112 def sweep(self):
00113 for di in self._data_items:
00114 di.sweep()
00115
00116 def unavailable(self):
00117 for di in self._data_items:
00118 di.unavailable()
00119
00120 def send_initial(self, client_address):
00121 self.send_changed([client_address], True)
00122
00123 def format_time(self):
00124 time = datetime.utcnow()
00125 return time.strftime("%Y-%m-%dT%H:%M:%S.%f") + 'Z'
00126
00127 def send_changed(self, clients, force = False):
00128 text = ''
00129 time = self.format_time()
00130 separate = [item for item in self._data_items if item.separate_line()]
00131 combined = [item for item in self._data_items if not item.separate_line()]
00132 for item in combined:
00133 if force or item.changed():
00134 text += ''.join(item.values(force))
00135
00136 if len(text) > 0:
00137 self.send(time, text, clients)
00138
00139 for item in separate:
00140 if force or item.changed():
00141 for line in item.values(force):
00142 self.send(time, line, clients)
00143
00144 def format_line(self, time, text):
00145 return time + text + "\n"
00146
00147 def send_to_client(self, client, line):
00148 try:
00149 try:
00150 self._lock.acquire()
00151 socket = self._clients[client]
00152 finally:
00153 self._lock.release()
00154 if socket:
00155 socket.send(line)
00156 except Exception, ex:
00157 print "Exception occurred in send_to_client, removing client" + str(ex)
00158 self.remove_client(client)
00159
00160
00161 def send(self, time, text, clients):
00162 line = self.format_line(time, text)
00163 for client in clients:
00164 self.send_to_client(client, line)
00165
00166 def gather(self, function):
00167 self.begin()
00168
00169 function()
00170
00171 self.complete()
00172 self.send_changed(self._clients.keys())
00173 self.sweep()
00174
00175 def begin_gather(self):
00176 self.begin()
00177
00178 def complete_gather(self):
00179 self.complete()
00180 self.send_changed(self._clients.keys())
00181 self.sweep()