MNIST and FMNIST using KNN
old mate yann lecunn decided to remove the mnist zip from his site along with the corresponding file info
data specification:
here are the original archives
and here is the file spec after digging it out of the wayback machine.
TRAINING SET LABEL FILE (train-labels-idx1-ubyte):
[offset] [type] [value] [description]
0000 32 bit integer 0x00000801(2049) magic number (MSB first)
0004 32 bit integer 60000 number of items
0008 unsigned byte ?? label
0009 unsigned byte ?? label
........
xxxx unsigned byte ?? label
The labels values are 0 to 9.
TRAINING SET IMAGE FILE (train-images-idx3-ubyte):
[offset] [type] [value] [description]
0000 32 bit integer 0x00000803(2051) magic number
0004 32 bit integer 60000 number of images
0008 32 bit integer 28 number of rows
0012 32 bit integer 28 number of columns
0016 unsigned byte ?? pixel
0017 unsigned byte ?? pixel
........
xxxx unsigned byte ?? pixel
Pixels are organized row-wise. Pixel values are 0 to 255. 0 means background (white), 255 means foreground (black).
TEST SET LABEL FILE (t10k-labels-idx1-ubyte):
[offset] [type] [value] [description]
0000 32 bit integer 0x00000801(2049) magic number (MSB first)
0004 32 bit integer 10000 number of items
0008 unsigned byte ?? label
0009 unsigned byte ?? label
........
xxxx unsigned byte ?? label
The labels values are 0 to 9.
TEST SET IMAGE FILE (t10k-images-idx3-ubyte):
[offset] [type] [value] [description]
0000 32 bit integer 0x00000803(2051) magic number
0004 32 bit integer 10000 number of images
0008 32 bit integer 28 number of rows
0012 32 bit integer 28 number of columns
0016 unsigned byte ?? pixel
0017 unsigned byte ?? pixel
........
xxxx unsigned byte ?? pixel
Pixels are organized row-wise. Pixel values are 0 to 255. 0 means background (white), 255 means foreground (black).
no imports
originally I followed this fellas youtube video:
creating the following main.py
file.
code
DATA_DIR = 'data/'
DATASET = 'mnist'
TEST_DATA_FILENAME = DATA_DIR + DATASET + '/t10k-images-idx3-ubyte'
TEST_LABELS_FILENAME = DATA_DIR + DATASET + '/t10k-labels-idx1-ubyte'
TRAIN_DATA_FILENAME = DATA_DIR + DATASET + '/train-images-idx3-ubyte'
TRAIN_LABELS_FILENAME = DATA_DIR + DATASET + '/train-labels-idx1-ubyte'
def bytes_to_int(byte_data):
return int.from_bytes(byte_data, 'big')
def read_images(filename, n_max_images=None):
images = []
with open(filename, 'rb') as f:
_ = f.read(4)
n_images = bytes_to_int(f.read(4))
if n_max_images:
n_images = n_max_images
n_rows = bytes_to_int(f.read(4))
n_cols = bytes_to_int(f.read(4))
for image_idx in range(n_images):
image = []
for row_idx in range(n_rows):
row = []
for col_idx in range(n_cols):
pixel = f.read(1)
row.append(pixel)
image.append(row)
images.append(image)
return images
def read_labels(filename, n_max_labels=None):
labels = []
with open(filename, 'rb') as f:
_ = f.read(4) # magic number
n_labels = bytes_to_int(f.read(4))
if n_max_labels:
n_labels = n_max_labels
for label_idx in range(n_labels):
label = bytes_to_int(f.read(1))
labels.append(label)
return labels
def flatten_list(subl):
return [pixel for sublist in subl for pixel in sublist]
def extract_features(X):
return [flatten_list(sample) for sample in X]
def dist(x, y):
return sum(
[
(bytes_to_int(x_i) - bytes_to_int(y_i)) ** 2
for x_i, y_i in zip(x, y)]
) ** 0.5
def get_training_distances_for_test_sample(X_train, test_sample):
return [dist(train_sample, test_sample) for train_sample in X_train]
def knn(X_train, y_train, X_test, y_test, k=3):
y_pred = []
for test_sample_idx, test_sample in enumerate(X_test):
training_distances = get_training_distances_for_test_sample(
X_train, test_sample
)
sorted_distance_indices = [
pair[0]
for pair in sorted(
enumerate(training_distances),
key=lambda x: x[1]
)
]
candidates = [
y_train[idx]
for idx in sorted_distance_indices[:k]
]
top_candidate = max(candidates, key=candidates.count)
y_pred.append(top_candidate)
return y_pred
def get_garment_from_label(label):
return [
'T-shirt/top',
'Trouser',
'Pullover',
'Dress',
'Coat',
'Sandal',
'Shirt',
'Sneaker',
'Bag',
'Ankle boot',
][label]
def main():
n_train = 10000
n_test = 100
k = 7
print(f"Dataset: {DATASET}")
print(f"n_train: {n_train}")
print(f"n_test: {n_test}")
print(f"k: {k}")
X_train = read_images(TRAIN_DATA_FILENAME, n_train)
y_train = read_labels(TRAIN_LABELS_FILENAME, n_train)
X_test = read_images(TEST_DATA_FILENAME, n_test)
y_test = read_labels(TEST_LABELS_FILENAME, n_test)
X_train = extract_features(X_train)
X_test = extract_features(X_test)
y_pred = knn(X_train, y_train, X_test, k)
accuracy = sum([
int(y_pred_i == y_test_i)
for y_pred_i, y_test_i in zip(y_pred, y_test)
]) / len(y_test)
if DATASET == 'fmnist':
garments_pred = [
get_garment_from_label(label)
for label in y_pred
]
print(f"Predicted garments: {garments_pred}")
else:
print(f"Predicted labels: {y_pred}")
print(f"Accuracy: {accuracy * 100}%")
if __name__ == '__main__':
main()
results
and this worked okay on a few samples:
dataset: mnist
n_train: 10000
n_test: 100
k: 7
Predicted labels: ...
Accuracy: 95.0%
but given that I wanted to (and I could with the help of UNSW's HPC Katana,) run the algorithm on the entire dataset, I quickly learned that the above code ran a single process on a single CPU core.
using tmux
in a HPC over ~100hrs of compute I was able to produce:
dataset: mnist
n_train: 60000
n_test: 10000
k: 5
Predicted labels: ...
Accuracy: 96.93%
however, re-running this for fmnist
was out of the question.
as such I turned to other methods:
multiprocessing
I was advised by the sysadmin of katana to use either joblib
or multiprocessing
.
my solution with this approach was to simply add the relevant import
and refactor the main knn
loop as
def knn(X_train, y_train, X_test, k=3):
with Pool(processes=multiprocessing.cpu_count()) as pool:
work_items = [(test_sample_idx, test_sample, X_train, y_train, k) for test_sample_idx, test_sample in enumerate(X_test)]
y_pred = pool.map(classify_one, work_items)
return y_pred
this approach ended up using the 6, 8, 36, etc. cores that I had available.
still only a 10x-20x speed-up — runtimes now of still 10 hours.
multithreading
I then thought of giving each of these 8 or so CPU cores 20 threads to manage.
Dataset: fmnist
n_train: 60000
n_test: 10000
k: 7
Accuracy: 85.41%
however it is worth noting that each of these threads share the memory address space of the parent process.
as such, due to the way python handles its GIL, a computational speed up was not guaranteed.
I let chatgpt draft the code:
code
import os
import math
import multiprocessing
from multiprocessing import Process, Queue
import concurrent.futures
import threading
##################################################
# Same data loading functions as your snippet
##################################################
DATA_DIR = 'data/'
DATASET = 'fmnist' # or 'mnist' if you prefer
TEST_DATA_FILENAME = DATA_DIR + DATASET + '/t10k-images-idx3-ubyte'
TEST_LABELS_FILENAME = DATA_DIR + DATASET + '/t10k-labels-idx1-ubyte'
TRAIN_DATA_FILENAME = DATA_DIR + DATASET + '/train-images-idx3-ubyte'
TRAIN_LABELS_FILENAME = DATA_DIR + DATASET + '/train-labels-idx1-ubyte'
def bytes_to_int(byte_data):
return int.from_bytes(byte_data, 'big')
def read_images(filename, n_max_images=None):
images = []
with open(filename, 'rb') as f:
_ = f.read(4)
n_images = bytes_to_int(f.read(4))
if n_max_images:
n_images = n_max_images
n_rows = bytes_to_int(f.read(4))
n_cols = bytes_to_int(f.read(4))
for image_idx in range(n_images):
image = []
for row_idx in range(n_rows):
row = []
for col_idx in range(n_cols):
pixel = f.read(1)
row.append(pixel)
image.append(row)
images.append(image)
return images
def read_labels(filename, n_max_labels=None):
labels = []
with open(filename, 'rb') as f:
_ = f.read(4) # magic number
n_labels = bytes_to_int(f.read(4))
if n_max_labels:
n_labels = n_max_labels
for label_idx in range(n_labels):
label = bytes_to_int(f.read(1))
labels.append(label)
return labels
def flatten_list(subl):
return [pixel for sublist in subl for pixel in sublist]
def extract_features(X):
""" Convert 2D list-of-bytes into a 1D list-of-bytes for each image. """
return [flatten_list(sample) for sample in X]
def bytes_to_int_fast(b):
"""Slightly faster local version to avoid re-calling int.from_bytes in a loop."""
return b[0] # since each pixel is a single byte
##################################################
# Distance + kNN classification
##################################################
def dist(x, y):
"""
Euclidean distance. x, y are lists of single-byte values.
Each element is a 'bytes' object of length 1, e.g. b'\\x7f'.
"""
# For speed, we can interpret b[0] directly:
# (Note that for n_train=60k, n_test=10k, this is STILL huge in Python!)
s = 0
for x_i, y_i in zip(x, y):
diff = (x_i[0] - y_i[0])
s += diff * diff
return math.sqrt(s)
def classify_one_sample(test_sample_idx, test_sample, X_train, y_train, k):
"""
Compute kNN for a single test sample.
We'll print the test_sample_idx to show progress.
"""
print(f"PID {os.getpid()} - Thread {threading.current_thread().name} "
f"is processing test sample #{test_sample_idx}")
# Compute distance to all training samples
training_distances = [dist(train_sample, test_sample) for train_sample in X_train]
# Find k nearest
sorted_idxs = sorted(range(len(training_distances)), key=lambda i: training_distances[i])[:k]
neighbors = [y_train[i] for i in sorted_idxs]
# majority vote
return max(neighbors, key=neighbors.count)
##################################################
# Process-level worker
# Receives a CHUNK of X_test (and corresponding chunk indices)
# Then spawns threads for each test sample in that chunk
##################################################
def process_chunk(proc_id, start_idx, X_test_chunk, X_train, y_train, k, result_queue):
"""
This function runs in a separate process.
It uses a thread pool to handle each test sample in X_test_chunk.
"""
pid = os.getpid()
print(f"Process {proc_id} started (PID={pid}), handling {len(X_test_chunk)} test samples.")
# We'll store predictions in local list
local_preds = [None] * len(X_test_chunk)
# Let's define how many threads within this process
num_threads = 4
with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
futures = {}
for i, test_sample in enumerate(X_test_chunk):
global_test_index = start_idx + i # The absolute index in the full test set
future = executor.submit(
classify_one_sample,
global_test_index,
test_sample,
X_train,
y_train,
k
)
futures[future] = i # so we know which local index it corresponds to
# Collect results
for future in concurrent.futures.as_completed(futures):
local_i = futures[future]
pred_label = future.result()
local_preds[local_i] = pred_label
# Put back to main via a queue: (proc_id, local_preds)
result_queue.put((proc_id, local_preds))
print(f"Process {proc_id} finished (PID={pid}).")
##################################################
# Top-level function that spawns 24 processes
##################################################
def knn_parallel(X_train, y_train, X_test, k=7, num_processes=24):
"""
We split the test set into <num_processes> chunks.
Each chunk is handled by a separate process, which in turn
spawns threads to classify its chunk of test samples.
"""
total_test = len(X_test)
chunk_size = math.ceil(total_test / num_processes)
result_queue = multiprocessing.Queue()
processes = []
for proc_id in range(num_processes):
start_idx = proc_id * chunk_size
if start_idx >= total_test:
break
end_idx = min(start_idx + chunk_size, total_test)
X_test_chunk = X_test[start_idx:end_idx]
p = Process(
target=process_chunk,
args=(proc_id, start_idx, X_test_chunk, X_train, y_train, k, result_queue)
)
p.start()
processes.append(p)
# Collect partial results from the queue
results_by_proc = [None]*num_processes
for _ in processes:
proc_id, preds = result_queue.get()
results_by_proc[proc_id] = preds
# Wait for all processes
for p in processes:
p.join()
# Flatten predictions in the correct order
all_predictions = []
for chunk_preds in results_by_proc:
if chunk_preds is not None:
all_predictions.extend(chunk_preds)
return all_predictions
##################################################
# Put it all together in main()
##################################################
def get_garment_from_label(label):
return [
'T-shirt/top',
'Trouser',
'Pullover',
'Dress',
'Coat',
'Sandal',
'Shirt',
'Sneaker',
'Bag',
'Ankle boot',
][label]
def main():
# FULL dataset sizes
n_train = 60000
n_test = 10000
k = 7
print(f"Dataset: {DATASET}")
print(f"n_train: {n_train}")
print(f"n_test: {n_test}")
print(f"k: {k}")
# 1) Read data
X_train = read_images(TRAIN_DATA_FILENAME, n_train)
y_train = read_labels(TRAIN_LABELS_FILENAME, n_train)
X_test = read_images(TEST_DATA_FILENAME, n_test)
y_test = read_labels(TEST_LABELS_FILENAME, n_test)
# 2) Flatten images into 1D lists-of-bytes
X_train = extract_features(X_train)
X_test = extract_features(X_test)
# 3) Run parallel kNN
y_pred = knn_parallel(X_train, y_train, X_test, k=k, num_processes=24)
# 4) Compute accuracy
correct = sum(int(p == t) for p, t in zip(y_pred, y_test))
accuracy = correct / len(y_test)
# 5) Print results
if DATASET == 'fmnist':
garments_pred = [get_garment_from_label(label) for label in y_pred]
print(f"Predicted garments (first 50 shown): {garments_pred[:50]}")
else:
print(f"Predicted labels (first 50 shown): {y_pred[:50]}")
print(f"Accuracy: {accuracy * 100:.2f}%")
if __name__ == '__main__':
main()
vectorising
finally, I realised that we could sidestep all of this by leveraging the numpy
library and it's underlying parallelism of array operations.
Dataset: mnist
n_train: 60000
n_test: 10000
k: 7
chunk_size: 1000
Shapes: (60000, 784) (60000,) (10000, 784) (10000,)
Processed 1000 / 10000 test samples
Processed 6000 / 10000 test samples
Accuracy: 96.94%
real 0m12.106s
user 0m20.951s
sys 0m2.137s
note that we have now gone from a 100hr inference time to a 12 second inference time!
analysis
furthermore, we must also notice that the above calculation takes place in partitions. the reason for this is the below calculation:
calculation
for each of the \(10,000\) training samples we do \(60,000\) comparisons. thus a total of 600 Million comparisons. each of these image comparisons compares a \(784 \times 1 \) dimensional vector with another.
previously when we were manually doing these loops that would mean \(\approx\) 4.704 Billion calculations. furthermore if we encode all of these as float32
's of 4 bytes each, suddenly we require 1.8 Terabytes of RAM to hold all these numbers.
code
making a chunk size of 1,000, we have this (chatgpt o1) code:
import numpy as np
import math
DATA_DIR = 'data/'
DATASET = 'mnist' # or 'fmnist'
TEST_DATA_FILENAME = DATA_DIR + DATASET + '/t10k-images-idx3-ubyte'
TEST_LABELS_FILENAME = DATA_DIR + DATASET + '/t10k-labels-idx1-ubyte'
TRAIN_DATA_FILENAME = DATA_DIR + DATASET + '/train-images-idx3-ubyte'
TRAIN_LABELS_FILENAME = DATA_DIR + DATASET + '/train-labels-idx1-ubyte'
def bytes_to_int(byte_data):
return int.from_bytes(byte_data, 'big')
def read_images(filename, n_max_images=None):
"""Reads IDX image file and returns a list of images (2D lists of 1-byte pixels)."""
images = []
with open(filename, 'rb') as f:
_ = f.read(4)
n_images = bytes_to_int(f.read(4))
if n_max_images:
n_images = n_max_images
n_rows = bytes_to_int(f.read(4))
n_cols = bytes_to_int(f.read(4))
for _ in range(n_images):
image = []
for _ in range(n_rows):
row = []
for _ in range(n_cols):
pixel = f.read(1)
row.append(pixel)
image.append(row)
images.append(image)
return images
def read_labels(filename, n_max_labels=None):
"""Reads IDX label file and returns a list of integer labels."""
labels = []
with open(filename, 'rb') as f:
_ = f.read(4) # magic number
n_labels = bytes_to_int(f.read(4))
if n_max_labels:
n_labels = n_max_labels
for _ in range(n_labels):
label = bytes_to_int(f.read(1))
labels.append(label)
return labels
def flatten_list(image_2d):
"""Convert a 2D list of single-byte objects into a 1D list."""
return [pix[0] for row in image_2d for pix in row]
def get_garment_from_label(label):
return [
'T-shirt/top',
'Trouser',
'Pullover',
'Dress',
'Coat',
'Sandal',
'Shirt',
'Sneaker',
'Bag',
'Ankle boot',
][label]
def vectorized_knn(X_train, y_train, X_test, k=7):
"""
Vectorized kNN using NumPy.
- X_train: shape (n_train, 784)
- X_test: shape (n_test, 784)
- Returns y_pred: shape (n_test,)
"""
# 1) Compute the distance-squared matrix of shape (n_test, n_train)
# We can skip sqrt for the nearest-neighbor check (sqrt is monotonic).
# distances[i, j] = sum( (X_test[i] - X_train[j])^2 )
# Using broadcasting: X_test -> (n_test, 1, 784), X_train -> (1, n_train, 784)
# Convert to float32 or float64. If data is uint8, the difference can remain int,
# but let's ensure we don't overflow by upcasting:
X_train_f = X_train.astype(np.float32)
X_test_f = X_test.astype(np.float32)
# shape (n_test, n_train, 784):
diff = X_test_f[:, np.newaxis, :] - X_train_f[np.newaxis, :, :]
dist_sq = np.sum(diff**2, axis=2) # shape (n_test, n_train)
n_test = X_test.shape[0]
y_pred = np.empty(n_test, dtype=y_train.dtype)
# 2) For each test sample, find the k nearest training samples
# We'll use np.argpartition for partial sort to get the smallest k distances
for i in range(n_test):
# Indices of the k smallest distances
neighbors_idx = np.argpartition(dist_sq[i], k)[:k]
# labels of those neighbors
neighbor_labels = y_train[neighbors_idx]
# majority vote
vals, counts = np.unique(neighbor_labels, return_counts=True)
y_pred[i] = vals[np.argmax(counts)]
if i % 1000 == 0:
print(f"Processed test sample {i}/{n_test}")
return y_pred
def main():
n_train = 60000
n_test = 10000
k = 7
print(f"Dataset: {DATASET}")
print(f"n_train: {n_train}")
print(f"n_test: {n_test}")
print(f"k: {k}")
# 1) Read the data from IDX files
# (Make sure you have data/mnist/ or data/fmnist/ with the IDX files)
raw_train = read_images(TRAIN_DATA_FILENAME, n_train)
y_train = np.array(read_labels(TRAIN_LABELS_FILENAME, n_train), dtype=np.int32)
raw_test = read_images(TEST_DATA_FILENAME, n_test)
y_test = np.array(read_labels(TEST_LABELS_FILENAME, n_test), dtype=np.int32)
# 2) Flatten each image from shape (28,28) to (784,)
# Then convert to a NumPy array
# shape -> (n_train, 784)
X_train = np.array([flatten_list(img) for img in raw_train], dtype=np.dtype('B'))
X_test = np.array([flatten_list(img) for img in raw_test], dtype=np.dtype('B'))
print("Finished loading data. Shapes:")
print("X_train:", X_train.shape, "y_train:", y_train.shape)
print("X_test:", X_test.shape, "y_test:", y_test.shape)
# 3) Run vectorized KNN (without parallelization)
y_pred = vectorized_knn(X_train, y_train, X_test, k)
# 4) Compute accuracy
accuracy = np.mean(y_pred == y_test)
# 5) Print results
print(f"Accuracy: {accuracy * 100:.2f}%")
if DATASET == 'fmnist':
# Optionally see how the first 10 predictions map to garment names
garments_pred = [get_garment_from_label(label) for label in y_pred[:10]]
print(f"First 10 predicted garments: {garments_pred}")
else:
print(f"First 10 predicted digits: {y_pred[:10]}")
if __name__ == '__main__':
main()
frameworks
using scikit-learn's KNeigborsClassifier
we have
code
import numpy as np
from sklearn.neighbors import KNeighborsClassifier
DATA_DIR = 'data/'
DATASET = 'mnist' # or 'fmnist'
TEST_DATA_FILENAME = DATA_DIR + DATASET + '/t10k-images-idx3-ubyte'
TEST_LABELS_FILENAME = DATA_DIR + DATASET + '/t10k-labels-idx1-ubyte'
TRAIN_DATA_FILENAME = DATA_DIR + DATASET + '/train-images-idx3-ubyte'
TRAIN_LABELS_FILENAME = DATA_DIR + DATASET + '/train-labels-idx1-ubyte'
def bytes_to_int(b):
return int.from_bytes(b, 'big')
def read_images(filename, n_max_images=None):
with open(filename, 'rb') as f:
_ = f.read(4)
n_images = bytes_to_int(f.read(4))
if n_max_images:
n_images = n_max_images
n_rows = bytes_to_int(f.read(4))
n_cols = bytes_to_int(f.read(4))
images = []
for _ in range(n_images):
img = [f.read(1) for _ in range(n_rows*n_cols)]
images.append(img)
return images
def read_labels(filename, n_max_labels=None):
with open(filename, 'rb') as f:
_ = f.read(4) # magic number
n_labels = bytes_to_int(f.read(4))
if n_max_labels:
n_labels = n_max_labels
labels = []
for _ in range(n_labels):
label = bytes_to_int(f.read(1))
labels.append(label)
return labels
def main():
n_train = 60000
n_test = 10000
k = 7
print(f"Dataset: {DATASET}")
print(f"n_train: {n_train}")
print(f"n_test: {n_test}")
print(f"k: {k}")
# 1) Read data
raw_train = read_images(TRAIN_DATA_FILENAME, n_train)
y_train = np.array(read_labels(TRAIN_LABELS_FILENAME, n_train), dtype=np.int32)
raw_test = read_images(TEST_DATA_FILENAME, n_test)
y_test = np.array(read_labels(TEST_LABELS_FILENAME, n_test), dtype=np.int32)
# 2) Convert each image to shape (784,) as uint8
X_train = np.zeros((n_train, 784), dtype=np.uint8)
for i, img in enumerate(raw_train):
X_train[i] = [p[0] for p in img]
del raw_train
X_test = np.zeros((n_test, 784), dtype=np.uint8)
for i, img in enumerate(raw_test):
X_test[i] = [p[0] for p in img]
del raw_test
print("Shapes:")
print("X_train:", X_train.shape, "y_train:", y_train.shape)
print("X_test:", X_test.shape, "y_test:", y_test.shape)
# 3) Use scikit-learn’s KNeighborsClassifier
knn = KNeighborsClassifier(n_neighbors=k, n_jobs=-1, algorithm="auto")
knn.fit(X_train, y_train)
# 4) Predict on test set
y_pred = knn.predict(X_test)
# 5) Evaluate accuracy
accuracy = (y_pred == y_test).mean() * 100
print(f"Accuracy: {accuracy:.2f}%")
if __name__ == "__main__":
main()
results
it's reassuring to see that our algorithm is deterministic across the dataset:
Dataset: mnist
n_train: 60000
n_test: 10000
k: 7
Shapes:
X_train: (60000, 784) y_train: (60000,)
X_test: (10000, 784) y_test: (10000,)
Accuracy: 96.94%
real 0m27.286s
user 1m10.678s
sys 0m7.199s
then for fmnist:
Dataset: fmnist
n_train: 60000
n_test: 10000
k: 7
Shapes:
X_train: (60000, 784) y_train: (60000,)
X_test: (10000, 784) y_test: (10000,)
Accuracy: 85.40%
real 0m26.802s
user 1m8.035s
sys 0m6.987s
as expected :)
which interestingly loses to our numpy chunking approach!