$search
00001 #! /usr/bin/env python 00002 00003 from __future__ import with_statement 00004 00005 from twisted.internet import reactor, interfaces 00006 from zope.interface import implements 00007 import scapy.all as scapy 00008 import event 00009 import socket 00010 import sys 00011 00012 # Following code is an optimisation because scapy uses tcpdump to compile bpf filters. The code was pulled from 00013 # scapy and plopped down here. 00014 import os 00015 import struct 00016 try: 00017 scapy_arch_linux_module = sys.modules['scapy.arch.linux'] 00018 except: 00019 scapy_arch_linux_module = sys.modules['scapy.arch'] 00020 _orig_attach_filter = scapy_arch_linux_module.attach_filter 00021 _filter_cache = {} 00022 00023 def _attach_filter_replacement(s, filter): 00024 if not scapy_arch_linux_module.TCPDUMP: 00025 return 00026 00027 try: 00028 nb, bpf = _filter_cache[filter] 00029 except KeyError: 00030 try: 00031 f = os.popen("%s -i %s -ddd -s 1600 '%s'" % (scapy.conf.prog.tcpdump,scapy.conf.iface,filter)) 00032 except OSError,msg: 00033 log_interactive.warning("Failed to execute tcpdump: (%s)") 00034 return 00035 lines = f.readlines() 00036 if f.close(): 00037 raise Scapy_Exception("Filter parse error") 00038 nb = int(lines[0]) 00039 bpf = "" 00040 for l in lines[1:]: 00041 bpf += struct.pack("HBBI",*map(long,l.split())) 00042 00043 _filter_cache[filter] = (nb, bpf) 00044 00045 # XXX. Argl! We need to give the kernel a pointer on the BPF, 00046 # python object header seems to be 20 bytes. 36 bytes for x86 64bits arch. 00047 if scapy.arch.X86_64: 00048 bpfh = struct.pack("HL", nb, id(bpf)+36) 00049 else: 00050 bpfh = struct.pack("HI", nb, id(bpf)+20) 00051 s.setsockopt(scapy_arch_linux_module.SOL_SOCKET, scapy_arch_linux_module.SO_ATTACH_FILTER, bpfh) 00052 00053 scapy_arch_linux_module.attach_filter = _attach_filter_replacement 00054 # End of the horrid optimization 00055 00056 class AlreadyClosed(Exception): 00057 pass 00058 00059 class L2Port: 00060 implements(interfaces.IListeningPort) 00061 00062 def __init__(self, proto, iface = 'any', filter = None, max_size = 9000, reactor = reactor): 00063 self._protocol = proto 00064 self._socket = scapy.L2Socket(iface, filter = filter).ins 00065 self._socket.setblocking(False) 00066 self._max_size = 9000 00067 self._protocol.makeConnection(self) 00068 self._reactor = reactor 00069 00070 def fileno(self): 00071 if self._socket: 00072 return self._socket.fileno() 00073 else: 00074 return -1 00075 00076 def doRead(self): 00077 try: 00078 data = self._socket.recv(self._max_size) 00079 except socket.error, e: 00080 if e.errno == 100: 00081 pass # This happens if the interface goes down. 00082 else: 00083 self._protocol.dataReceived(data) 00084 00085 def startListening(self): 00086 self._reactor.addReader(self) 00087 00088 def stopListening(self): 00089 self._reactor.removeReader(self) 00090 self.connectionLost() 00091 00092 def send(self, data): 00093 self._socket.send(data) 00094 00095 def connectionLost(self, reason=None): 00096 self._socket = None 00097 00098 def logPrefix(self): 00099 return "L2Port" 00100 00101 if __name__ == "__main__": 00102 import unittest 00103 import async_helpers 00104 from async_helpers import unittest_with_reactor, async_test 00105 from twisted.internet.defer import Deferred 00106 from twisted.internet.protocol import Protocol 00107 import random 00108 00109 def tst_icmp_pkt(): 00110 return str( 00111 scapy.Ether()/ 00112 scapy.IP(src='127.0.0.1', dst='127.0.0.1')/ 00113 scapy.ICMP(type='echo-request', seq=1, id=random.randint(0, 0xFFFF)) 00114 ) 00115 00116 class L2PortTest(unittest.TestCase): 00117 @async_test 00118 def test_basic(self): 00119 deferred = Deferred() 00120 packet = tst_icmp_pkt() 00121 class TstProto(Protocol): 00122 def dataReceived(self, data): 00123 if data == packet: 00124 deferred.callback(None) 00125 00126 proto = TstProto() 00127 port = reactor.listenWith(L2Port, proto, iface = 'lo', filter='icmp') 00128 port.send(packet) 00129 yield deferred 00130 00131 @async_test 00132 def test_as_event_stream(self): 00133 es = async_helpers.ReadDescrEventStream(L2Port, iface = 'lo', filter='icmp') 00134 pkt = tst_icmp_pkt() 00135 es.port.send(pkt) 00136 while True: 00137 yield async_helpers.select(es) 00138 inpkt = es.recv() 00139 if inpkt == pkt: 00140 break 00141 00142 def run_ros_tests(): 00143 rostest.unitrun('multi_interface_roam', 'pcap_descriptor', L2PortTest) 00144 00145 unittest_with_reactor(run_ros_tests)