tap2pcap.py
Go to the documentation of this file.
1 """Tool to convert Envoy tap trace format to PCAP.
2 
3 Uses od and text2pcap (part of Wireshark) utilities to translate the Envoy
4 tap trace proto format to a PCAP file suitable for consuming in Wireshark
5 and other tools in the PCAP ecosystem. The TCP stream in the output PCAP is
6 synthesized based on the known IP/port/timestamps that Envoy produces in its
7 tap files; it is not a literal wire tap.
8 
9 Usage:
10 
11 bazel run @envoy_api//tools:tap2pcap <tap .pb/.pb_text> <pcap path>
12 
13 Known issues:
14 - IPv6 PCAP generation has malformed TCP packets. This appears to be a text2pcap
15 issue.
16 
17 TODO(htuch):
18 - Figure out IPv6 PCAP issue above, or file a bug once the root cause is clear.
19 """
20 from __future__ import print_function
21 
22 import datetime
23 import io
24 import socket
25 import subprocess as sp
26 import sys
27 import time
28 
29 from google.protobuf import text_format
30 
31 from envoy.data.tap.v2alpha import wrapper_pb2
32 
33 
34 def dump_event(direction, timestamp, data):
35  dump = io.StringIO()
36  dump.write('%s\n' % direction)
37  # Adjust to local timezone
38  adjusted_dt = timestamp.ToDatetime() - datetime.timedelta(seconds=time.altzone)
39  dump.write('%s\n' % adjusted_dt)
40  od = sp.Popen(['od', '-Ax', '-tx1', '-v'], stdout=sp.PIPE, stdin=sp.PIPE, stderr=sp.PIPE)
41  packet_dump = od.communicate(data)[0]
42  dump.write(packet_dump.decode())
43  return dump.getvalue()
44 
45 
46 def tap2pcap(tap_path, pcap_path):
47  wrapper = wrapper_pb2.TraceWrapper()
48  if tap_path.endswith('.pb_text'):
49  with open(tap_path, 'r') as f:
50  text_format.Merge(f.read(), wrapper)
51  else:
52  with open(tap_path, 'r') as f:
53  wrapper.ParseFromString(f.read())
54 
55  trace = wrapper.socket_buffered_trace
56  local_address = trace.connection.local_address.socket_address.address
57  local_port = trace.connection.local_address.socket_address.port_value
58  remote_address = trace.connection.remote_address.socket_address.address
59  remote_port = trace.connection.remote_address.socket_address.port_value
60 
61  dumps = []
62  for event in trace.events:
63  if event.HasField('read'):
64  dumps.append(dump_event('I', event.timestamp, event.read.data.as_bytes))
65  elif event.HasField('write'):
66  dumps.append(dump_event('O', event.timestamp, event.write.data.as_bytes))
67 
68  ipv6 = False
69  try:
70  socket.inet_pton(socket.AF_INET6, local_address)
71  ipv6 = True
72  except socket.error:
73  pass
74 
75  text2pcap_args = [
76  'text2pcap', '-D', '-t', '%Y-%m-%d %H:%M:%S.', '-6' if ipv6 else '-4',
77  '%s,%s' % (remote_address, local_address), '-T',
78  '%d,%d' % (remote_port, local_port), '-', pcap_path
79  ]
80  text2pcap = sp.Popen(text2pcap_args, stdout=sp.PIPE, stdin=sp.PIPE)
81  text2pcap.communicate('\n'.join(dumps).encode())
82 
83 
84 if __name__ == '__main__':
85  if len(sys.argv) != 3:
86  print('Usage: %s <tap .pb/.pb_text> <pcap path>' % sys.argv[0])
87  sys.exit(1)
88  tap2pcap(sys.argv[1], sys.argv[2])
tap2pcap
Definition: tap2pcap.py:1
google::protobuf
Definition: bloaty/third_party/protobuf/benchmarks/util/data_proto2_to_proto3_util.h:12
grpc._common.encode
def encode(s)
Definition: grpc/_common.py:68
tap2pcap.dump_event
def dump_event(direction, timestamp, data)
Definition: tap2pcap.py:34
open
#define open
Definition: test-fs.c:46
len
int len
Definition: abseil-cpp/absl/base/internal/low_level_alloc_test.cc:46
tap2pcap.tap2pcap
def tap2pcap(tap_path, pcap_path)
Definition: tap2pcap.py:46


grpc
Author(s):
autogenerated on Fri May 16 2025 03:00:25