l2socket.py
Go to the documentation of this file.
00001 #! /usr/bin/env python
00002 
00003 from __future__ import with_statement
00004 
00005 from twisted.internet import reactor, interfaces
00006 from zope.interface import implements
00007 import scapy.all as scapy
00008 import event
00009 import socket
00010 import sys
00011 
00012 # Following code is an optimisation because scapy uses tcpdump to compile bpf filters. The code was pulled from 
00013 # scapy and plopped down here.
00014 import os
00015 import struct
00016 try:
00017     scapy_arch_linux_module = sys.modules['scapy.arch.linux']
00018 except:
00019     scapy_arch_linux_module = sys.modules['scapy.arch']
00020 _orig_attach_filter = scapy_arch_linux_module.attach_filter
00021 _filter_cache = {}
00022 
00023 def _attach_filter_replacement(s, filter):
00024     if not scapy_arch_linux_module.TCPDUMP:
00025         return
00026 
00027     try:
00028         nb, bpf = _filter_cache[filter]
00029     except KeyError:
00030         try:
00031             f = os.popen("%s -i %s -ddd -s 1600 '%s'" % (scapy.conf.prog.tcpdump,scapy.conf.iface,filter))
00032         except OSError,msg:
00033             log_interactive.warning("Failed to execute tcpdump: (%s)")
00034             return
00035         lines = f.readlines()
00036         if f.close():
00037             raise Scapy_Exception("Filter parse error")
00038         nb = int(lines[0])
00039         bpf = ""
00040         for l in lines[1:]:
00041             bpf += struct.pack("HBBI",*map(long,l.split()))
00042 
00043         _filter_cache[filter] = (nb, bpf)
00044     
00045     # XXX. Argl! We need to give the kernel a pointer on the BPF,
00046     # python object header seems to be 20 bytes. 36 bytes for x86 64bits arch.
00047     if scapy.arch.X86_64:
00048         bpfh = struct.pack("HL", nb, id(bpf)+36)
00049     else:
00050         bpfh = struct.pack("HI", nb, id(bpf)+20)  
00051     s.setsockopt(scapy_arch_linux_module.SOL_SOCKET, scapy_arch_linux_module.SO_ATTACH_FILTER, bpfh)
00052 
00053 scapy_arch_linux_module.attach_filter = _attach_filter_replacement
00054 # End of the horrid optimization
00055 
00056 class AlreadyClosed(Exception):
00057     pass
00058 
00059 class L2Port:
00060     implements(interfaces.IListeningPort)
00061 
00062     def __init__(self, proto, iface = 'any', filter = None, max_size = 9000, reactor = reactor):
00063         self._protocol = proto
00064         self._socket = scapy.L2Socket(iface, filter = filter).ins
00065         self._socket.setblocking(False)
00066         self._max_size = 9000
00067         self._protocol.makeConnection(self)
00068         self._reactor = reactor
00069 
00070     def fileno(self):
00071         if self._socket:
00072             return self._socket.fileno()
00073         else:
00074             return -1
00075 
00076     def doRead(self):
00077         try:
00078             data = self._socket.recv(self._max_size)
00079         except socket.error, e:
00080             if e.errno == 100:
00081                 pass # This happens if the interface goes down.
00082         else:
00083             self._protocol.dataReceived(data)
00084 
00085     def startListening(self):    
00086         self._reactor.addReader(self)
00087 
00088     def stopListening(self):
00089         self._reactor.removeReader(self)
00090         self.connectionLost()
00091 
00092     def send(self, data):
00093         self._socket.send(data)
00094     
00095     def connectionLost(self, reason=None):
00096         self._socket = None
00097 
00098     def logPrefix(self):
00099         return "L2Port"
00100 
00101 if __name__ == "__main__":
00102     import unittest
00103     import async_helpers
00104     from async_helpers import unittest_with_reactor, async_test
00105     from twisted.internet.defer import Deferred
00106     from twisted.internet.protocol import Protocol
00107     import random
00108     
00109     def tst_icmp_pkt():
00110         return str(
00111                 scapy.Ether()/
00112                 scapy.IP(src='127.0.0.1', dst='127.0.0.1')/
00113                 scapy.ICMP(type='echo-request', seq=1, id=random.randint(0, 0xFFFF))
00114                 )
00115 
00116     class L2PortTest(unittest.TestCase):
00117         @async_test
00118         def test_basic(self):
00119             deferred = Deferred()
00120             packet = tst_icmp_pkt()
00121             class TstProto(Protocol):
00122                 def dataReceived(self, data):
00123                     if data == packet:
00124                         deferred.callback(None)
00125 
00126             proto = TstProto()
00127             port = reactor.listenWith(L2Port, proto, iface = 'lo', filter='icmp')
00128             port.send(packet)
00129             yield deferred
00130              
00131         @async_test
00132         def test_as_event_stream(self):
00133             es = async_helpers.ReadDescrEventStream(L2Port, iface = 'lo', filter='icmp')
00134             pkt = tst_icmp_pkt()
00135             es.port.send(pkt)
00136             while True:
00137                 yield async_helpers.select(es)
00138                 inpkt = es.recv()
00139                 if inpkt == pkt:
00140                     break
00141 
00142     def run_ros_tests():
00143         rostest.unitrun('multi_interface_roam', 'pcap_descriptor', L2PortTest)
00144     
00145     unittest_with_reactor(run_ros_tests)


multi_interface_roam
Author(s): Blaise Gassend
autogenerated on Thu Jan 2 2014 11:26:15