00001
00002
00003 from __future__ import with_statement
00004
00005 from twisted.internet import reactor, interfaces
00006 from zope.interface import implements
00007 import scapy.all as scapy
00008 import event
00009 import socket
00010 import sys
00011
00012
00013
00014 import os
00015 import struct
00016 try:
00017 scapy_arch_linux_module = sys.modules['scapy.arch.linux']
00018 except:
00019 scapy_arch_linux_module = sys.modules['scapy.arch']
00020 _orig_attach_filter = scapy_arch_linux_module.attach_filter
00021 _filter_cache = {}
00022
00023 def _attach_filter_replacement(s, filter):
00024 if not scapy_arch_linux_module.TCPDUMP:
00025 return
00026
00027 try:
00028 nb, bpf = _filter_cache[filter]
00029 except KeyError:
00030 try:
00031 f = os.popen("%s -i %s -ddd -s 1600 '%s'" % (scapy.conf.prog.tcpdump,scapy.conf.iface,filter))
00032 except OSError,msg:
00033 log_interactive.warning("Failed to execute tcpdump: (%s)")
00034 return
00035 lines = f.readlines()
00036 if f.close():
00037 raise Scapy_Exception("Filter parse error")
00038 nb = int(lines[0])
00039 bpf = ""
00040 for l in lines[1:]:
00041 bpf += struct.pack("HBBI",*map(long,l.split()))
00042
00043 _filter_cache[filter] = (nb, bpf)
00044
00045
00046
00047 if scapy.arch.X86_64:
00048 bpfh = struct.pack("HL", nb, id(bpf)+36)
00049 else:
00050 bpfh = struct.pack("HI", nb, id(bpf)+20)
00051 s.setsockopt(scapy_arch_linux_module.SOL_SOCKET, scapy_arch_linux_module.SO_ATTACH_FILTER, bpfh)
00052
00053 scapy_arch_linux_module.attach_filter = _attach_filter_replacement
00054
00055
00056 class AlreadyClosed(Exception):
00057 pass
00058
00059 class L2Port:
00060 implements(interfaces.IListeningPort)
00061
00062 def __init__(self, proto, iface = 'any', filter = None, max_size = 9000, reactor = reactor):
00063 self._protocol = proto
00064 self._socket = scapy.L2Socket(iface, filter = filter).ins
00065 self._socket.setblocking(False)
00066 self._max_size = 9000
00067 self._protocol.makeConnection(self)
00068 self._reactor = reactor
00069
00070 def fileno(self):
00071 if self._socket:
00072 return self._socket.fileno()
00073 else:
00074 return -1
00075
00076 def doRead(self):
00077 try:
00078 data = self._socket.recv(self._max_size)
00079 except socket.error, e:
00080 if e.errno == 100:
00081 pass
00082 else:
00083 self._protocol.dataReceived(data)
00084
00085 def startListening(self):
00086 self._reactor.addReader(self)
00087
00088 def stopListening(self):
00089 self._reactor.removeReader(self)
00090 self.connectionLost()
00091
00092 def send(self, data):
00093 self._socket.send(data)
00094
00095 def connectionLost(self, reason=None):
00096 self._socket = None
00097
00098 def logPrefix(self):
00099 return "L2Port"
00100
00101 if __name__ == "__main__":
00102 import unittest
00103 import async_helpers
00104 from async_helpers import unittest_with_reactor, async_test
00105 from twisted.internet.defer import Deferred
00106 from twisted.internet.protocol import Protocol
00107 import random
00108
00109 def tst_icmp_pkt():
00110 return str(
00111 scapy.Ether()/
00112 scapy.IP(src='127.0.0.1', dst='127.0.0.1')/
00113 scapy.ICMP(type='echo-request', seq=1, id=random.randint(0, 0xFFFF))
00114 )
00115
00116 class L2PortTest(unittest.TestCase):
00117 @async_test
00118 def test_basic(self):
00119 deferred = Deferred()
00120 packet = tst_icmp_pkt()
00121 class TstProto(Protocol):
00122 def dataReceived(self, data):
00123 if data == packet:
00124 deferred.callback(None)
00125
00126 proto = TstProto()
00127 port = reactor.listenWith(L2Port, proto, iface = 'lo', filter='icmp')
00128 port.send(packet)
00129 yield deferred
00130
00131 @async_test
00132 def test_as_event_stream(self):
00133 es = async_helpers.ReadDescrEventStream(L2Port, iface = 'lo', filter='icmp')
00134 pkt = tst_icmp_pkt()
00135 es.port.send(pkt)
00136 while True:
00137 yield async_helpers.select(es)
00138 inpkt = es.recv()
00139 if inpkt == pkt:
00140 break
00141
00142 def run_ros_tests():
00143 rostest.unitrun('multi_interface_roam', 'pcap_descriptor', L2PortTest)
00144
00145 unittest_with_reactor(run_ros_tests)