Signal Processing and Machine Learning
Laboratory (EE69210)
Experiment : 2
Lossless Compression
Samarth Adatia
Roll Number: 21EE38012
Indian Institute of Technology, Kharagpur
February 11, 2025
1
1 Huffman Coding
3.1 Dataset 1
3.1.3.1 Generation of Dataset
1 import random
2 import string
3
4 def create random text(length):
5 pool = string.ascii lowercase + string.digits
6 return "".join(random.choices(pool, k=length))
7
8 def save text to file(filename, text):
9 with open(filename, "w") as f:
10 f.write(text)
11
12 if name == " main ":
13 N = [100]
14
15 for length in N:
16 generated text = create random text(length)
17 save text to file("string.txt", generated text)
3.1.3.2 Encoder
1 def compute symbol frequency(text, symbols):
2 frequency map = {}
3 for symbol in symbols:
4 frequency map[symbol] = text.count(symbol) / len(text) if len(text) > ...
0 else 0
5 return frequency map
6
7 def read file content(filename):
8 try:
9 with open(filename, "r") as f:
10 return f.read()
11 except FileNotFoundError:
12 return ""
13
14 if name == " m a i n ":
15 text length = 500
16 iterations = 2
17
18 file content = read file content("string.txt")
19 import heapq
20
21 class TreeNode:
22 """Represents a node in the Huffman Tree."""
23
24 def init (self, character, frequency):
2
25 self.character = character
26 self.frequency = frequency
27 self.left child = None
28 self.right child = None
29
30 def l t (self, other):
31 """Defines how nodes are compared based on frequency."""
32 return self.frequency < other.frequency
33
34 def construct huffman tree(frequency map):
35 """Builds the Huffman Tree from the given frequency dictionary."""
36
37 # Create a heap with TreeNode objects
38 priority queue = [TreeNode(char, freq) for char, freq in ...
frequency map.items()]
39
40 # Convert list into a heap
41 heapq.heapify(priority queue)
42
43 while len(priority queue) > 1:
44 # Extract two nodes with the smallest frequency
45 first = heapq.heappop(priority queue)
46 second = heapq.heappop(priority queue)
47
48 # Create an internal node combining their frequencies
49 internal node = TreeNode(None, first.frequency + second.frequency)
50 internal node.left child = first
51 internal node.right child = second
52
53 # Insert the new node back into the priority queue
54 heapq.heappush(priority queue, internal node)
55
56 # The last node is the root of the Huffman tree
57 return priority queue[0] if priority queue else None
58 !pip install bitarray
59 from bitarray import bitarray
60
61 def serialize huffman tree(node):
62
63 if not node:
64 return bitarray()
65
66 encoded bits = bitarray()
67
68 if node.character is not None:
69 encoded bits.append(1) # Mark as a leaf node
70 encoded bits.frombytes(node.character.encode('latin−1'))
71 else:
72 encoded bits.append(0) # Internal node
73 encoded bits.extend(serialize huffman tree(node.left child))
74 encoded bits.extend(serialize huffman tree(node.right child))
75
76 return encoded bits
77
78 def deserialize huffman tree(bit sequence, position):
79
80 if bit sequence[position]:
81 position += 1
82 char = bit sequence[position:position + 8].tobytes().decode('latin−1')
83 position += 8
3
84 return TreeNode(char, 0), position
85
86 position += 1
87 left child, position = deserialize huffman tree(bit sequence, position)
88 right child, position = deserialize huffman tree(bit sequence, position)
89
90 parent node = TreeNode(None, 0)
91 parent node.left child = left child
92 parent node.right child = right child
93
94 return parent node, position
95
96 def generate huffman codes(node, current path, code map):
97
98 if node is None:
99 return
100
101 if node.character is not None:
102 code map[node.character] = current path
103 return
104
105 generate huffman codes(node.left child, current path + '0', code map)
106 generate huffman codes(node.right child, current path + '1', code map)
107
108 def obtain huffman mappings(root):
109 huffman map = {}
110 generate huffman codes(root, "", huffman map)
111 return huffman map
112 import struct
113 from bitarray import bitarray
114
115 def compress text(input text, output file="compressed data.huf"):
116 character list = list(input text)
117
118 # Extract unique characters from text
119 unique chars = list(set(character list))
120
121 # Compute and sort character probabilities
122 frequency distribution = compute symbol frequency(unique chars, ...
character list)
123 sorted frequencies = dict(sorted(frequency distribution.items(), ...
key=lambda item: item[1], reverse=True))
124
125 # Construct Huffman tree
126 huffman tree = construct huffman tree(sorted frequencies)
127
128 # Encode the Huffman tree structure
129 tree encoding = serialize huffman tree(huffman tree)
130
131 # Calculate tree encoding length
132 tree length = len(tree encoding)
133
134 # Generate Huffman codes for characters
135 huffman mappings = obtain huffman mappings(huffman tree)
136
137 # Convert input text to encoded bitstream
138 encoded bitstream = ''.join(huffman mappings[char] for char in ...
character list)
139
140 with open(output file, "wb") as f:
4
141 # Store the length of the Huffman tree encoding
142 f.write(struct.pack("H", tree length))
143
144 # Save the Huffman tree encoding
145 tree encoding.tofile(f)
146
147 # Convert bitstream string into bitarray
148 bit representation = bitarray(encoded bitstream)
149
150 # Compute padding to align to 8−bit boundary
151 padding required = (8 − len(bit representation) % 8) % 8
152 bit representation.extend('0' * padding required) # Add padding bits
153
154 # Store padding information
155 f.write(struct.pack("B", padding required))
156
157 # Write the bitstream
158 bit representation.tofile(f)
159
160 return
161
162 # Usage
163 compress text(file content)
3.1.3.4 Decoder
1
2 def decompress text(input file="compressed data.huf"):
3
4 with open(input file, "rb") as f:
5 # Read the length of the Huffman tree encoding
6 tree size = struct.unpack("H", f.read(2))[0]
7
8 # Read and reconstruct the Huffman tree
9 tree bits = bitarray()
10 tree bits.frombytes(f.read((tree size + 7) // 8)) # Read bytes and ...
trim extra bits
11 tree bits = tree bits[:tree size]
12
13 # Read the number of padding bits added to the encoded bitstream
14 extra bits = struct.unpack("B", f.read(1))[0]
15
16 # Read the encoded text bitstream
17 encoded stream = bitarray()
18 encoded stream.fromfile(f)
19
20 # Remove padding bits from the end
21 if extra bits > 0:
22 encoded stream = encoded stream[:−extra bits]
23
24 # Reconstruct the Huffman tree from the encoded tree structure
25 reconstructed tree, = deserialize huffman tree(tree bits, 0)
26
27 # Generate Huffman codes
28 huffman mappings = obtain huffman mappings(reconstructed tree)
29
30 # Create a reverse lookup dictionary for decoding
5
31 reverse lookup = {code: char for char, code in huffman mappings.items()}
32
33 # Decode the bitstream into text
34 decoded output = []
35 current sequence = ''
36
37 for bit in encoded stream:
38 current sequence += str(int(bit))
39
40 # If current sequence matches a symbol, append it and reset
41 if current sequence in reverse lookup:
42 decoded output.append(reverse lookup[current sequence])
43 current sequence = ''
44
45 # Convert list to string
46 return ''.join(decoded output)
47
48 # Example usage
49 decompressed text = decompress text('compressed data.huf')
50
51 def calculate mse(original text, reconstructed text):
52 error values = [(ord(char1) − ord(char2)) ** 2 for char1, char2 in ...
zip(original text, reconstructed text)]
53 return sum(error values) / len(error values)
54
55 def compute compression ratio(original file, compressed file):
56
57 original bytes = os.path.getsize(original file)
58 compressed bytes = os.path.getsize(compressed file)
59
60 return original bytes / compressed bytes if compressed bytes != 0 else ...
float('inf')
61
62 # Experiment settings
63 string lengths = np.arange(10, 1000, 100)
64 mse values = []
65 compression ratios = {length: [] for length in string lengths}
66 compression times = {length: [] for length in string lengths}
67 decompression times = {length: [] for length in string lengths}
68
69 for length in string lengths:
70 for in range(10):
71 # Generate random text
72 generated text = create random text(length)
73
74 # Save to file
75 with open("input text.txt", "w") as f:
76 f.write(generated text)
77
78 # Compression timing
79 start time = time.time()
80 compress text(generated text, "compressed data.huf")
81 compression times[length].append(time.time() − start time)
82
83 # Decompression timing
84 start time = time.time()
85 reconstructed text = decompress text("compressed data.huf")
86 decompression times[length].append(time.time() − start time)
87
88 # Compute MSE
6
89 mse values.append(calculate mse(generated text, reconstructed text))
90
91 # Compute Compression Ratio
92 compression ratios[length].append(compute compression ratio("input text.txt", ...
"compressed data.huf"))
93
94 # Create box plot for Compression Ratio
95 plt.figure(figsize=(8, 4))
96 plt.boxplot(compression ratios.values(), notch=True, labels=string lengths)
97 plt.xlabel('Text Length (N)')
98 plt.ylabel('Compression Ratio')
99 plt.title('Compression Ratio vs Text Length')
100 plt.show()
3.1.3.5 Find MSE
1 print(mse values)
MSE Values are 0 since huffman coding is lossless
3.1.3.6 Time to Encode and Decode
1 # Plot Compression and Decompression Time Distributions
2 plt.figure(figsize=(15, 6))
3
4 # Compression Time Plot
5 plt.subplot(1, 2, 1)
6 plt.boxplot(compression times.values(), labels=string lengths)
7 plt.xlabel('Text Length (N)')
8 plt.ylabel('Compression Time (s)')
9 plt.title('Compression Time vs Text Length')
10
11 # Decompression Time Plot
12 plt.subplot(1, 2, 2)
13 plt.boxplot(decompression times.values(), labels=string lengths)
14 plt.xlabel('Text Length (N)')
15 plt.ylabel('Decompression Time (s)')
16 plt.title('Decompression Time vs Text Length')
17
18 plt.tight layout()
19 plt.show()
7
3.2 Dataset 2
3.2.2.1 Store Images
1 from sklearn.datasets import fetch olivetti faces
2 from numpy.random import RandomState
3
4 rng = RandomState(0)
5
6 faces, = fetch olivetti faces(return X y=True,
7 shuffle=True, random state=rng)
8 n samples, n features = faces.shape
9
8
10 # Dataset
11 X = faces
12 def quantize(img):
13
14 # Normalize pixel values to [0, 255]
15 img = 255 * (img − img.min()) / (img.max() − img.min())
16
17 # Convert to 8−bit unsigned integer
18 img = img.astype(np.uint8) # Converts to bmp format
19
20 # Convert to string by mapping each pixel value to ASCII character
21 img string = ''.join([chr(int(x)) for x in img.flatten()])
22
23 return img, img string
24 img, img string = quantize(X[0])
25 # Save the image array as a BMP file
26 plt.imsave('image.bmp', img.reshape(64, 64), cmap='gray')
3.2.2/3/4 Use Encoder, Decoder and find Compression Fac-
tor
1
2 def encode image(image, output file="compressed image.huf"):
3 """
4 Encodes an image using Huffman compression and saves it to a file.
5
6 :param image: The input image (NumPy array).
7 :param output file: The filename where the compressed image will be stored.
8 """
9
10 # Convert image array to a flattened list of pixel values
11 flattened pixels = image.flatten().tolist()
12
13 # Convert pixel values (0−255) to a string representation
14 pixel string = ''.join(chr(pixel) for pixel in flattened pixels)
15
16 # Extract unique symbols
17 unique symbols = list(set(pixel string))
18
19 # Compute symbol probabilities
20 symbol frequencies = compute symbol frequency(unique symbols, pixel string)
21 sorted frequencies = dict(sorted(symbol frequencies.items(), key=lambda ...
x: x[1], reverse=True))
22
23 # Construct Huffman tree
24 huffman tree = construct huffman tree(sorted frequencies)
25
26 # Encode tree structure
27 tree bits = serialize huffman tree(huffman tree)
28 tree length = len(tree bits)
29
30 # Generate Huffman codes
31 huffman mappings = obtain huffman mappings(huffman tree)
32
33 # Convert pixel string to a Huffman−encoded bitstream
34 encoded stream = ''.join(huffman mappings[symbol] for symbol in pixel string)
9
35
36 with open(output file, "wb") as f:
37 # Store tree structure length
38 f.write(struct.pack("H", tree length))
39
40 # Convert tree structure to a bitarray
41 tree bitarray = bitarray(tree bits)
42
43 # Compute padding for tree bitarray
44 tree padding = (8 − len(tree bitarray) % 8) % 8
45 tree bitarray.extend('0' * tree padding)
46
47 # Write tree padding info
48 f.write(struct.pack("B", tree padding))
49
50 # Save tree structure
51 tree bitarray.tofile(f)
52
53 # Convert Huffman bitstream to bitarray
54 encoded bitarray = bitarray(encoded stream)
55
56 # Compute padding for encoded bitstream
57 stream padding = (8 − len(encoded bitarray) % 8) % 8
58 encoded bitarray.extend('0' * stream padding)
59
60 # Write bitstream padding
61 f.write(struct.pack("B", stream padding))
62
63 # Write encoded bitstream
64 encoded bitarray.tofile(f)
65
66 return
67
68 def decode image(input file="compressed image.huf"):
69 """
70 Decodes a Huffman−compressed image from a file.
71
72 :param input file: The filename containing the compressed Huffman−encoded ...
image.
73 :return: A NumPy array representing the decompressed image.
74 """
75
76 with open(input file, "rb") as f:
77 # Read Huffman tree length
78 tree size = struct.unpack("H", f.read(2))[0]
79
80 # Read tree padding
81 tree padding = struct.unpack("B", f.read(1))[0]
82
83 # Read and reconstruct Huffman tree bitarray
84 tree bits = bitarray()
85 tree bits.frombytes(f.read((tree size + tree padding + 7) // 8)) # ...
Read full bytes
86 tree bits = tree bits[:tree size] # Remove padding bits
87
88 # Read bitstream padding
89 stream padding = struct.unpack("B", f.read(1))[0]
90
91 # Read encoded bitstream
92 encoded stream = bitarray()
10
93 encoded stream.fromfile(f)
94 if stream padding > 0:
95 encoded stream = encoded stream[:−stream padding]
96
97 # Reconstruct Huffman tree
98 huffman tree, = deserialize huffman tree(tree bits, 0)
99
100 # Generate Huffman codes
101 huffman mappings = obtain huffman mappings(huffman tree)
102
103 # Create a reverse lookup dictionary for decoding
104 decoding dict = {code: char for char, code in huffman mappings.items()}
105
106 # Decode bitstream into pixel values
107 decoded output = []
108 current sequence = ''
109
110 for bit in encoded stream:
111 current sequence += str(int(bit))
112
113 # If current sequence matches a symbol, append it and reset
114 if current sequence in decoding dict:
115 decoded output.append(decoding dict[current sequence])
116 current sequence = ''
117
118 # Convert decoded characters back to NumPy array (pixel values 0−255)
119 image array = np.array([ord(symbol) for symbol in decoded output], ...
dtype=np.uint8)
120
121 return image array
122 encode image(img, output file='compressed img.huf')
123 recon img = decode image('compressed img.huf')
3.2.5 Measure MSE
11
1 plt.figure(figsize=(8, 6))
2
3 print(f"MSE: {np.mean((img−recon img)**2)}")
4 plt.subplot(1, 2, 1)
5 plt.imshow(img.reshape(64, 64), cmap='gray')
6 plt.title('Original Image')
7
8 plt.subplot(1, 2, 2)
9 plt.imshow(recon img.reshape(64, 64), cmap='gray')
10 plt.title('Reconstructed Image')
MSE is 0
3.2.6 Time to Encode and Decode
1 MSE = []
2 CF = []
3 def compression factor(original image, compressed image):
4 """Calculates the compression factor between two files."""
5 original size = os.path.getsize(original image)
6 compressed size = os.path.getsize(compressed image)
7
8 return original size / compressed size if compressed size != 0 else ...
float('inf')
9 t compress = []
10 t decompress = []
11 for x in X:
12
13 img, img string = quantize(x)
14 plt.imsave('image.bmp', img.reshape(64, 64), cmap='gray')
15
16 t1 = time.time()
17 encode image(img, output file='compressed img.huf')
18 t2 = time.time()
19 t compress.append(t2 − t1)
20
21 t1 = time.time()
22 recon img = decode image('compressed img.huf')
23 t2 = time.time()
24 t decompress.append(t2 − t1)
25
26 MSE.append(np.mean((img − recon img)**2))
27 CF.append(compression factor('image.bmp', 'compressed img.huf'))
28 plt.figure(figsize=(6, 4))
29 plt.boxplot(CF, notch=True, patch artist=True)
30 plt.ylabel('Compression Factor')
31 plt.title('Distribution of Compression Factor')
32 plt.show()
3.2.7 Compression Factor vs Time
1 plt.figure(figsize=(12, 4))
2
12
3 plt.subplot(1, 2, 1)
4 plt.boxplot(t compress, notch=True, patch artist=True)
5 plt.ylabel('Time (seconds)')
6 plt.title('Distribution of Compression Time')
7
8 plt.subplot(1, 2, 2)
9 plt.boxplot(t decompress, notch=True, patch artist=True)
10 plt.ylabel('Time (seconds)')
11 plt.title('Distribution of Decompression Time')
12
13 plt.figure(figsize=(12, 4))
14
15 plt.subplot(1, 2, 1)
16 plt.boxplot(t compress, notch=True, patch artist=True)
17 plt.ylabel('Time (seconds)')
18 plt.title('Distribution of Compression Time')
19
20 plt.subplot(1, 2, 2)
21 plt.boxplot(t decompress, notch=True, patch artist=True)
22 plt.ylabel('Time (seconds)')
23 plt.title('Distribution of Decompression Time')
13
14