0% found this document useful (0 votes)
54 views14 pages

Samarth Adatia MLSP Exp2

The document outlines an experiment on lossless compression using Huffman coding, detailing the generation of datasets, encoding and decoding processes, and performance metrics such as Mean Squared Error (MSE) and compression ratios. It includes Python code for generating random text, constructing Huffman trees, compressing and decompressing data, and analyzing the results through various visualizations. Additionally, it discusses image compression using the same Huffman coding techniques on the Olivetti faces dataset.

Uploaded by

adatiasam
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
54 views14 pages

Samarth Adatia MLSP Exp2

The document outlines an experiment on lossless compression using Huffman coding, detailing the generation of datasets, encoding and decoding processes, and performance metrics such as Mean Squared Error (MSE) and compression ratios. It includes Python code for generating random text, constructing Huffman trees, compressing and decompressing data, and analyzing the results through various visualizations. Additionally, it discusses image compression using the same Huffman coding techniques on the Olivetti faces dataset.

Uploaded by

adatiasam
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
You are on page 1/ 14

Signal Processing and Machine Learning

Laboratory (EE69210)
Experiment : 2
Lossless Compression

Samarth Adatia
Roll Number: 21EE38012
Indian Institute of Technology, Kharagpur

February 11, 2025

1
1 Huffman Coding

3.1 Dataset 1

3.1.3.1 Generation of Dataset


1 import random
2 import string
3
4 def create random text(length):
5 pool = string.ascii lowercase + string.digits
6 return "".join(random.choices(pool, k=length))
7
8 def save text to file(filename, text):
9 with open(filename, "w") as f:
10 f.write(text)
11
12 if name == " main ":
13 N = [100]
14
15 for length in N:
16 generated text = create random text(length)
17 save text to file("string.txt", generated text)

3.1.3.2 Encoder
1 def compute symbol frequency(text, symbols):
2 frequency map = {}
3 for symbol in symbols:
4 frequency map[symbol] = text.count(symbol) / len(text) if len(text) > ...
0 else 0
5 return frequency map
6
7 def read file content(filename):
8 try:
9 with open(filename, "r") as f:
10 return f.read()
11 except FileNotFoundError:
12 return ""
13
14 if name == " m a i n ":
15 text length = 500
16 iterations = 2
17

18 file content = read file content("string.txt")


19 import heapq
20
21 class TreeNode:
22 """Represents a node in the Huffman Tree."""
23

24 def init (self, character, frequency):

2
25 self.character = character
26 self.frequency = frequency
27 self.left child = None
28 self.right child = None
29
30 def l t (self, other):
31 """Defines how nodes are compared based on frequency."""
32 return self.frequency < other.frequency
33
34 def construct huffman tree(frequency map):
35 """Builds the Huffman Tree from the given frequency dictionary."""
36
37 # Create a heap with TreeNode objects
38 priority queue = [TreeNode(char, freq) for char, freq in ...
frequency map.items()]
39
40 # Convert list into a heap
41 heapq.heapify(priority queue)
42
43 while len(priority queue) > 1:
44 # Extract two nodes with the smallest frequency
45 first = heapq.heappop(priority queue)
46 second = heapq.heappop(priority queue)
47
48 # Create an internal node combining their frequencies
49 internal node = TreeNode(None, first.frequency + second.frequency)
50 internal node.left child = first
51 internal node.right child = second
52
53 # Insert the new node back into the priority queue
54 heapq.heappush(priority queue, internal node)
55
56 # The last node is the root of the Huffman tree
57 return priority queue[0] if priority queue else None
58 !pip install bitarray
59 from bitarray import bitarray
60

61 def serialize huffman tree(node):


62
63 if not node:
64 return bitarray()
65
66 encoded bits = bitarray()
67
68 if node.character is not None:
69 encoded bits.append(1) # Mark as a leaf node
70 encoded bits.frombytes(node.character.encode('latin−1'))
71 else:
72 encoded bits.append(0) # Internal node
73 encoded bits.extend(serialize huffman tree(node.left child))
74 encoded bits.extend(serialize huffman tree(node.right child))
75
76 return encoded bits
77
78 def deserialize huffman tree(bit sequence, position):
79
80 if bit sequence[position]:
81 position += 1
82 char = bit sequence[position:position + 8].tobytes().decode('latin−1')
83 position += 8

3
84 return TreeNode(char, 0), position
85

86 position += 1
87 left child, position = deserialize huffman tree(bit sequence, position)
88 right child, position = deserialize huffman tree(bit sequence, position)
89
90 parent node = TreeNode(None, 0)
91 parent node.left child = left child
92 parent node.right child = right child
93
94 return parent node, position
95
96 def generate huffman codes(node, current path, code map):
97

98 if node is None:
99 return
100
101 if node.character is not None:
102 code map[node.character] = current path
103 return
104
105 generate huffman codes(node.left child, current path + '0', code map)
106 generate huffman codes(node.right child, current path + '1', code map)
107
108 def obtain huffman mappings(root):
109 huffman map = {}
110 generate huffman codes(root, "", huffman map)
111 return huffman map
112 import struct
113 from bitarray import bitarray
114

115 def compress text(input text, output file="compressed data.huf"):


116 character list = list(input text)
117
118 # Extract unique characters from text
119 unique chars = list(set(character list))
120

121 # Compute and sort character probabilities


122 frequency distribution = compute symbol frequency(unique chars, ...
character list)
123 sorted frequencies = dict(sorted(frequency distribution.items(), ...
key=lambda item: item[1], reverse=True))
124

125 # Construct Huffman tree


126 huffman tree = construct huffman tree(sorted frequencies)
127
128 # Encode the Huffman tree structure
129 tree encoding = serialize huffman tree(huffman tree)
130

131 # Calculate tree encoding length


132 tree length = len(tree encoding)
133
134 # Generate Huffman codes for characters
135 huffman mappings = obtain huffman mappings(huffman tree)
136

137 # Convert input text to encoded bitstream


138 encoded bitstream = ''.join(huffman mappings[char] for char in ...
character list)
139
140 with open(output file, "wb") as f:

4
141 # Store the length of the Huffman tree encoding
142 f.write(struct.pack("H", tree length))
143
144 # Save the Huffman tree encoding
145 tree encoding.tofile(f)
146
147 # Convert bitstream string into bitarray
148 bit representation = bitarray(encoded bitstream)
149
150 # Compute padding to align to 8−bit boundary
151 padding required = (8 − len(bit representation) % 8) % 8
152 bit representation.extend('0' * padding required) # Add padding bits
153
154 # Store padding information
155 f.write(struct.pack("B", padding required))
156
157 # Write the bitstream
158 bit representation.tofile(f)
159
160 return
161
162 # Usage
163 compress text(file content)

3.1.3.4 Decoder
1
2 def decompress text(input file="compressed data.huf"):
3
4 with open(input file, "rb") as f:
5 # Read the length of the Huffman tree encoding
6 tree size = struct.unpack("H", f.read(2))[0]
7
8 # Read and reconstruct the Huffman tree
9 tree bits = bitarray()
10 tree bits.frombytes(f.read((tree size + 7) // 8)) # Read bytes and ...
trim extra bits
11 tree bits = tree bits[:tree size]
12
13 # Read the number of padding bits added to the encoded bitstream
14 extra bits = struct.unpack("B", f.read(1))[0]
15

16 # Read the encoded text bitstream


17 encoded stream = bitarray()
18 encoded stream.fromfile(f)
19
20 # Remove padding bits from the end
21 if extra bits > 0:
22 encoded stream = encoded stream[:−extra bits]
23
24 # Reconstruct the Huffman tree from the encoded tree structure
25 reconstructed tree, = deserialize huffman tree(tree bits, 0)
26
27 # Generate Huffman codes
28 huffman mappings = obtain huffman mappings(reconstructed tree)
29
30 # Create a reverse lookup dictionary for decoding

5
31 reverse lookup = {code: char for char, code in huffman mappings.items()}
32

33 # Decode the bitstream into text


34 decoded output = []
35 current sequence = ''
36
37 for bit in encoded stream:
38 current sequence += str(int(bit))
39
40 # If current sequence matches a symbol, append it and reset
41 if current sequence in reverse lookup:
42 decoded output.append(reverse lookup[current sequence])
43 current sequence = ''
44

45 # Convert list to string


46 return ''.join(decoded output)
47
48 # Example usage
49 decompressed text = decompress text('compressed data.huf')
50

51 def calculate mse(original text, reconstructed text):


52 error values = [(ord(char1) − ord(char2)) ** 2 for char1, char2 in ...
zip(original text, reconstructed text)]
53 return sum(error values) / len(error values)
54
55 def compute compression ratio(original file, compressed file):
56
57 original bytes = os.path.getsize(original file)
58 compressed bytes = os.path.getsize(compressed file)
59
60 return original bytes / compressed bytes if compressed bytes != 0 else ...
float('inf')
61
62 # Experiment settings
63 string lengths = np.arange(10, 1000, 100)
64 mse values = []
65 compression ratios = {length: [] for length in string lengths}
66 compression times = {length: [] for length in string lengths}
67 decompression times = {length: [] for length in string lengths}
68
69 for length in string lengths:
70 for in range(10):
71 # Generate random text
72 generated text = create random text(length)
73
74 # Save to file
75 with open("input text.txt", "w") as f:
76 f.write(generated text)
77

78 # Compression timing
79 start time = time.time()
80 compress text(generated text, "compressed data.huf")
81 compression times[length].append(time.time() − start time)
82
83 # Decompression timing
84 start time = time.time()
85 reconstructed text = decompress text("compressed data.huf")
86 decompression times[length].append(time.time() − start time)
87
88 # Compute MSE

6
89 mse values.append(calculate mse(generated text, reconstructed text))
90

91 # Compute Compression Ratio


92 compression ratios[length].append(compute compression ratio("input text.txt", ...
"compressed data.huf"))
93
94 # Create box plot for Compression Ratio
95 plt.figure(figsize=(8, 4))
96 plt.boxplot(compression ratios.values(), notch=True, labels=string lengths)
97 plt.xlabel('Text Length (N)')
98 plt.ylabel('Compression Ratio')
99 plt.title('Compression Ratio vs Text Length')
100 plt.show()

3.1.3.5 Find MSE

1 print(mse values)

MSE Values are 0 since huffman coding is lossless

3.1.3.6 Time to Encode and Decode


1 # Plot Compression and Decompression Time Distributions
2 plt.figure(figsize=(15, 6))
3

4 # Compression Time Plot


5 plt.subplot(1, 2, 1)
6 plt.boxplot(compression times.values(), labels=string lengths)
7 plt.xlabel('Text Length (N)')
8 plt.ylabel('Compression Time (s)')
9 plt.title('Compression Time vs Text Length')
10
11 # Decompression Time Plot
12 plt.subplot(1, 2, 2)
13 plt.boxplot(decompression times.values(), labels=string lengths)
14 plt.xlabel('Text Length (N)')
15 plt.ylabel('Decompression Time (s)')
16 plt.title('Decompression Time vs Text Length')
17
18 plt.tight layout()
19 plt.show()

7
3.2 Dataset 2

3.2.2.1 Store Images


1 from sklearn.datasets import fetch olivetti faces
2 from numpy.random import RandomState
3
4 rng = RandomState(0)
5
6 faces, = fetch olivetti faces(return X y=True,
7 shuffle=True, random state=rng)
8 n samples, n features = faces.shape
9

8
10 # Dataset
11 X = faces
12 def quantize(img):
13
14 # Normalize pixel values to [0, 255]
15 img = 255 * (img − img.min()) / (img.max() − img.min())
16
17 # Convert to 8−bit unsigned integer
18 img = img.astype(np.uint8) # Converts to bmp format
19
20 # Convert to string by mapping each pixel value to ASCII character
21 img string = ''.join([chr(int(x)) for x in img.flatten()])
22
23 return img, img string
24 img, img string = quantize(X[0])
25 # Save the image array as a BMP file
26 plt.imsave('image.bmp', img.reshape(64, 64), cmap='gray')

3.2.2/3/4 Use Encoder, Decoder and find Compression Fac-


tor
1
2 def encode image(image, output file="compressed image.huf"):
3 """
4 Encodes an image using Huffman compression and saves it to a file.
5

6 :param image: The input image (NumPy array).


7 :param output file: The filename where the compressed image will be stored.
8 """
9
10 # Convert image array to a flattened list of pixel values
11 flattened pixels = image.flatten().tolist()
12
13 # Convert pixel values (0−255) to a string representation
14 pixel string = ''.join(chr(pixel) for pixel in flattened pixels)
15
16 # Extract unique symbols
17 unique symbols = list(set(pixel string))
18
19 # Compute symbol probabilities
20 symbol frequencies = compute symbol frequency(unique symbols, pixel string)
21 sorted frequencies = dict(sorted(symbol frequencies.items(), key=lambda ...
x: x[1], reverse=True))
22

23 # Construct Huffman tree


24 huffman tree = construct huffman tree(sorted frequencies)
25
26 # Encode tree structure
27 tree bits = serialize huffman tree(huffman tree)
28 tree length = len(tree bits)
29
30 # Generate Huffman codes
31 huffman mappings = obtain huffman mappings(huffman tree)
32
33 # Convert pixel string to a Huffman−encoded bitstream
34 encoded stream = ''.join(huffman mappings[symbol] for symbol in pixel string)

9
35
36 with open(output file, "wb") as f:
37 # Store tree structure length
38 f.write(struct.pack("H", tree length))
39
40 # Convert tree structure to a bitarray
41 tree bitarray = bitarray(tree bits)
42

43 # Compute padding for tree bitarray


44 tree padding = (8 − len(tree bitarray) % 8) % 8
45 tree bitarray.extend('0' * tree padding)
46
47 # Write tree padding info
48 f.write(struct.pack("B", tree padding))
49
50 # Save tree structure
51 tree bitarray.tofile(f)
52
53 # Convert Huffman bitstream to bitarray
54 encoded bitarray = bitarray(encoded stream)
55
56 # Compute padding for encoded bitstream
57 stream padding = (8 − len(encoded bitarray) % 8) % 8
58 encoded bitarray.extend('0' * stream padding)
59
60 # Write bitstream padding
61 f.write(struct.pack("B", stream padding))
62
63 # Write encoded bitstream
64 encoded bitarray.tofile(f)
65

66 return
67
68 def decode image(input file="compressed image.huf"):
69 """
70 Decodes a Huffman−compressed image from a file.
71

72 :param input file: The filename containing the compressed Huffman−encoded ...
image.
73 :return: A NumPy array representing the decompressed image.
74 """
75
76 with open(input file, "rb") as f:
77 # Read Huffman tree length
78 tree size = struct.unpack("H", f.read(2))[0]
79
80 # Read tree padding
81 tree padding = struct.unpack("B", f.read(1))[0]
82

83 # Read and reconstruct Huffman tree bitarray


84 tree bits = bitarray()
85 tree bits.frombytes(f.read((tree size + tree padding + 7) // 8)) # ...
Read full bytes
86 tree bits = tree bits[:tree size] # Remove padding bits
87

88 # Read bitstream padding


89 stream padding = struct.unpack("B", f.read(1))[0]
90
91 # Read encoded bitstream
92 encoded stream = bitarray()

10
93 encoded stream.fromfile(f)
94 if stream padding > 0:
95 encoded stream = encoded stream[:−stream padding]
96
97 # Reconstruct Huffman tree
98 huffman tree, = deserialize huffman tree(tree bits, 0)
99
100 # Generate Huffman codes
101 huffman mappings = obtain huffman mappings(huffman tree)
102
103 # Create a reverse lookup dictionary for decoding
104 decoding dict = {code: char for char, code in huffman mappings.items()}
105
106 # Decode bitstream into pixel values
107 decoded output = []
108 current sequence = ''
109
110 for bit in encoded stream:
111 current sequence += str(int(bit))
112

113 # If current sequence matches a symbol, append it and reset


114 if current sequence in decoding dict:
115 decoded output.append(decoding dict[current sequence])
116 current sequence = ''
117
118 # Convert decoded characters back to NumPy array (pixel values 0−255)
119 image array = np.array([ord(symbol) for symbol in decoded output], ...
dtype=np.uint8)
120
121 return image array
122 encode image(img, output file='compressed img.huf')
123 recon img = decode image('compressed img.huf')

3.2.5 Measure MSE

11
1 plt.figure(figsize=(8, 6))
2

3 print(f"MSE: {np.mean((img−recon img)**2)}")


4 plt.subplot(1, 2, 1)
5 plt.imshow(img.reshape(64, 64), cmap='gray')
6 plt.title('Original Image')
7
8 plt.subplot(1, 2, 2)
9 plt.imshow(recon img.reshape(64, 64), cmap='gray')
10 plt.title('Reconstructed Image')

MSE is 0

3.2.6 Time to Encode and Decode


1 MSE = []
2 CF = []
3 def compression factor(original image, compressed image):
4 """Calculates the compression factor between two files."""
5 original size = os.path.getsize(original image)
6 compressed size = os.path.getsize(compressed image)
7
8 return original size / compressed size if compressed size != 0 else ...
float('inf')
9 t compress = []
10 t decompress = []
11 for x in X:
12
13 img, img string = quantize(x)
14 plt.imsave('image.bmp', img.reshape(64, 64), cmap='gray')
15
16 t1 = time.time()
17 encode image(img, output file='compressed img.huf')
18 t2 = time.time()
19 t compress.append(t2 − t1)
20
21 t1 = time.time()
22 recon img = decode image('compressed img.huf')
23 t2 = time.time()
24 t decompress.append(t2 − t1)
25
26 MSE.append(np.mean((img − recon img)**2))
27 CF.append(compression factor('image.bmp', 'compressed img.huf'))
28 plt.figure(figsize=(6, 4))
29 plt.boxplot(CF, notch=True, patch artist=True)
30 plt.ylabel('Compression Factor')
31 plt.title('Distribution of Compression Factor')
32 plt.show()

3.2.7 Compression Factor vs Time


1 plt.figure(figsize=(12, 4))
2

12
3 plt.subplot(1, 2, 1)
4 plt.boxplot(t compress, notch=True, patch artist=True)
5 plt.ylabel('Time (seconds)')
6 plt.title('Distribution of Compression Time')
7
8 plt.subplot(1, 2, 2)
9 plt.boxplot(t decompress, notch=True, patch artist=True)
10 plt.ylabel('Time (seconds)')
11 plt.title('Distribution of Decompression Time')
12
13 plt.figure(figsize=(12, 4))
14
15 plt.subplot(1, 2, 1)
16 plt.boxplot(t compress, notch=True, patch artist=True)
17 plt.ylabel('Time (seconds)')
18 plt.title('Distribution of Compression Time')
19
20 plt.subplot(1, 2, 2)
21 plt.boxplot(t decompress, notch=True, patch artist=True)
22 plt.ylabel('Time (seconds)')
23 plt.title('Distribution of Decompression Time')

13
14

You might also like