T

his is the last post in our series on using Python for network analysis. In the previous posts, we used Scapy, Plotly, Pandas and more to analyze and visualize network data. All of those examples used data from packet captures (PCAP) files. In this post, we’ll cover how to observe (sniff) packets in real time and plot them using Matplotlib.

Requirements

This was tested on OSX, but will work on any Linux machine. You will need to install Scapy and Matplotlib:

#> pip3 install scapy matplotlib
 

Imports

To run in real-time, you will import Scapy and Matplotlib. We’ll import argparse to get the interface as a command line option, and os to check to see if we are root and can snoop on the specified interface:

from scapy.all import *
 
import matplotlib.pyplot as plt
 
import argparse
 
from os import getuid
 

Arguments

The user will supply the interface, and we will allow another option to only capture X packets before exiting:

parser = argparse.ArgumentParser(description='Live Traffic Examiner')
 
parser.add_argument('interface', help="Network interface", type=str)
 
parser.add_argument('--count', help="Capture X packets and exit", type=int)
 
args=parser.parse_args()
 

User Check

As mentioned above, we want to see if the user can access the interface. Usually, only root can do that, but we’ll allow a check to see if it works:

#Check to see if we are root, otherwise Scapy might not be able to listen

if getuid() != 0 :
 
   print("Warning: Not running as root, packet listening may not work.")
 
   try:
 
       print("--Trying to listen on {}".format(args.interface))
 
       sniff(iface=args.interface,count=1)
 
       print("--Success!")
 
   except:
 
       print("--Failed!\nError: Unable to sniff packets, try using sudo.")
 
       quit()
 

Building the Plot

To start building the plot, we will add titles and labels to the plot. We will also use ion(), which allows for an interactive plot:

#Interactive Mode
 
plt.ion()
 
#Labels
 
plt.ylabel("Bytes")
 
plt.xlabel("Count")
 
plt.title("Real Time Network Traffic")
 
plt.tight_layout()
 

Loop

The script will have a loop that listens for one packet at a time. Here, we’ll set up an empty list to hold a count and the byte data:

#Empty list to hold bytes
 
yData=[]
 
i=0
 
#Listen indefinitely, or until we reach count
 
while True:
 
   #Listen for 1 packet
 
   for pkt in sniff(iface=args.interface,count=1):

The important thing in the above code is count=1 because it tells Scapy to sniff for a single packet. Now that we have a single packet, the code will enter a try/except loop/. The reason for try/except is to allow the user to hit CTRL-C to exit, otherwise the program will not exit easily:

       try:
 
           if IP in pkt:
 
               yData.append(pkt[IP].len)
 
               plt.plot(yData)
 
               #Pause and draw
 
               plt.pause(0.1)
 
               i+=1
 
               if args.count:
 
                   if i >= args.count:
 
                       quit()

The code above adds the size of the packet to an array, adds it to the plot, and draws it with plt.pause:

except KeyboardInterrupt:
 
  print("Captured {} packets on interface {} ".format(i, args.interface))
 
  quit()

Last, we’ll use the except for CTRL-C to exit and send a count of packets.

Usage

#> ./realtime.py  -h
 
usage: realtime.py [-h] [--count COUNT] interface
 
Live Traffic Examiner
 
positional arguments:
 
 interface      Network interface to listen on i.e. en0
 
optional arguments:
 
 -h, --help     show this help message and exit
 
 --count COUNT  Capture X packets and exit
#> sudo ./realtime.py en0 --count 200
 
Password:
 
Capturing 200 packets on interface en0

For your reference, here is the full code:

from scapy.all import *
 
import matplotlib.pyplot as plt
 
import argparse
 
from os import getuid
 
parser = argparse.ArgumentParser(description='Live Traffic Examiner')
 
parser.add_argument('interface', help="Network interface to listen on i.e. en0", type=str)
 
parser.add_argument('--count', help="Capture X packets and exit", type=int)
 
args=parser.parse_args()
 
#Check to see if we are root, otherwise Scapy might not be able to listen
 
if getuid() != 0 :
 
   print("Warning: Not running as root, packet listening may not work.")
 
   try:
 
       print("--Trying to listen on {}".format(args.interface))
 
       sniff(iface=args.interface,count=1)
 
       print("--Success!")
 
   except:
 
       print("--Failed!\nError: Unable to sniff packets, try again using sudo.")
 
       quit()
 
if args.count:
 
   print("Capturing {} packets on interface {} ".format(args.count, args.interface))
 
else:
 
   print("Capturing unlimited packets on interface {} \n--Press CTRL-C to exit
 
".format(args.interface))
 
#Interactive Mode
 
plt.ion()
 
#Labels
 
plt.ylabel("Bytes")
 
plt.xlabel("Count")
 
plt.title("Real time Network Traffic")
 
plt.tight_layout()
 
#Empty list to hold bytes
 
yData=[]
 
i=0
 
#Listen indefinitely, or until we reach count
 
while True:
 
   #Listen for 1 packet
 
   for pkt in sniff(iface=args.interface,count=1):
 
       try:
 
           if IP in pkt:
 
               yData.append(pkt[IP].len)
 
               plt.plot(yData)
 
               #Pause and draw
 
               plt.pause(0.1)
 
               i+=1
 
               if args.count:
 
                   if i >= args.count:
 
                       quit()
 
       except KeyboardInterrupt:
 
           print("Captured {} packets on interface {} ".format(i, args.interface))
 
           quit()
 

Conclusion

I hope this series on analyzing network traffic with Python has helped you. You can take most of the code in the examples and customize it to your specific needs. I have a feeling that once you master this you’ll use Wireshark less and less. As always, feel free to let us know if you have any questions: support@automox.com.

About Automox

Facing growing threats and a rapidly expanding attack surface, understaffed and alert-fatigued organizations need more efficient ways to eliminate their exposure to vulnerabilities. Automox is a modern cyber hygiene platform that closes the aperture of attack by more than 80% with just half the effort of traditional solutions.

Cloud-native and globally available, Automox enforces OS & third-party patch management, security configurations, and custom scripting across Windows, Mac, and Linux from a single intuitive console. IT and SecOps can quickly gain control and share visibility of on-prem, remote and virtual endpoints without the need to deploy costly infrastructure.

Experience modern, cloud-native patch management today with a 15-day free trial of Automox and start recapturing more than half the time you're currently spending on managing your attack surface. Automox dramatically reduces corporate risk while raising operational efficiency to deliver best-in-class security outcomes, faster and with fewer resources.

More posts like this:

Cybersecurity