old mate yann lecunn decided to remove the mnist zip from his site along with the corresponding file info

data specification:

here are the original archives

and here is the file spec after digging it out of the wayback machine.

TRAINING SET LABEL FILE (train-labels-idx1-ubyte):
[offset] [type]          [value]          [description]
0000     32 bit integer  0x00000801(2049) magic number (MSB first)
0004     32 bit integer  60000            number of items
0008     unsigned byte   ??               label
0009     unsigned byte   ??               label
........
xxxx     unsigned byte   ??               label
The labels values are 0 to 9.

TRAINING SET IMAGE FILE (train-images-idx3-ubyte):
[offset] [type]          [value]          [description]
0000     32 bit integer  0x00000803(2051) magic number
0004     32 bit integer  60000            number of images
0008     32 bit integer  28               number of rows
0012     32 bit integer  28               number of columns
0016     unsigned byte   ??               pixel
0017     unsigned byte   ??               pixel
........
xxxx     unsigned byte   ??               pixel
Pixels are organized row-wise. Pixel values are 0 to 255. 0 means background (white), 255 means foreground (black).

TEST SET LABEL FILE (t10k-labels-idx1-ubyte):
[offset] [type]          [value]          [description]
0000     32 bit integer  0x00000801(2049) magic number (MSB first)
0004     32 bit integer  10000            number of items
0008     unsigned byte   ??               label
0009     unsigned byte   ??               label
........
xxxx     unsigned byte   ??               label
The labels values are 0 to 9.

TEST SET IMAGE FILE (t10k-images-idx3-ubyte):
[offset] [type]          [value]          [description]
0000     32 bit integer  0x00000803(2051) magic number
0004     32 bit integer  10000            number of images
0008     32 bit integer  28               number of rows
0012     32 bit integer  28               number of columns
0016     unsigned byte   ??               pixel
0017     unsigned byte   ??               pixel
........
xxxx     unsigned byte   ??               pixel
Pixels are organized row-wise. Pixel values are 0 to 255. 0 means background (white), 255 means foreground (black).

no imports

originally I followed this fellas youtube video:

creating the following main.py file.

code

DATA_DIR = 'data/'
DATASET = 'mnist'
TEST_DATA_FILENAME = DATA_DIR + DATASET + '/t10k-images-idx3-ubyte'
TEST_LABELS_FILENAME = DATA_DIR + DATASET + '/t10k-labels-idx1-ubyte'
TRAIN_DATA_FILENAME = DATA_DIR + DATASET + '/train-images-idx3-ubyte'
TRAIN_LABELS_FILENAME = DATA_DIR + DATASET + '/train-labels-idx1-ubyte'


def bytes_to_int(byte_data):
    return int.from_bytes(byte_data, 'big')


def read_images(filename, n_max_images=None):
    images = []
    with open(filename, 'rb') as f:
        _ = f.read(4)
        n_images = bytes_to_int(f.read(4))
        if n_max_images:
            n_images = n_max_images
        n_rows = bytes_to_int(f.read(4))
        n_cols = bytes_to_int(f.read(4))
        for image_idx in range(n_images):
            image = []
            for row_idx in range(n_rows):
                row = []
                for col_idx in range(n_cols):
                    pixel = f.read(1)
                    row.append(pixel)
                image.append(row)
            images.append(image)
        return images

    
def read_labels(filename, n_max_labels=None):
    labels = []
    with open(filename, 'rb') as f:
        _ = f.read(4)  # magic number
        n_labels = bytes_to_int(f.read(4))
        if n_max_labels:
            n_labels = n_max_labels
        for label_idx in range(n_labels):
            label = bytes_to_int(f.read(1))
            labels.append(label)
    return labels


def flatten_list(subl):
    return [pixel for sublist in subl for pixel in sublist]


def extract_features(X):
    return [flatten_list(sample) for sample in X]


def dist(x, y):
    return sum(
        [
            (bytes_to_int(x_i) - bytes_to_int(y_i)) ** 2
            for x_i, y_i in zip(x, y)]
    ) ** 0.5


def get_training_distances_for_test_sample(X_train, test_sample):
    return [dist(train_sample, test_sample) for train_sample in X_train]


def knn(X_train, y_train, X_test, y_test, k=3):
    y_pred = []
    for test_sample_idx, test_sample in enumerate(X_test):
        training_distances = get_training_distances_for_test_sample(
            X_train, test_sample
        )
        sorted_distance_indices = [
            pair[0]
            for pair in sorted(
                    enumerate(training_distances),
                    key=lambda x: x[1]
            )
        ]
        candidates = [
            y_train[idx]
            for idx in sorted_distance_indices[:k]
        ]
        top_candidate = max(candidates, key=candidates.count)
        y_pred.append(top_candidate)
    return y_pred


def get_garment_from_label(label):
    return [
        'T-shirt/top',
        'Trouser',
        'Pullover',
        'Dress',
        'Coat',
        'Sandal',
        'Shirt',
        'Sneaker',
        'Bag',
        'Ankle boot',
    ][label]


def main():
    n_train = 10000
    n_test = 100
    k = 7
    print(f"Dataset: {DATASET}")
    print(f"n_train: {n_train}")
    print(f"n_test: {n_test}")
    print(f"k: {k}")
    X_train = read_images(TRAIN_DATA_FILENAME, n_train)
    y_train = read_labels(TRAIN_LABELS_FILENAME, n_train)
    X_test = read_images(TEST_DATA_FILENAME, n_test)
    y_test = read_labels(TEST_LABELS_FILENAME, n_test)
    X_train = extract_features(X_train)
    X_test = extract_features(X_test)

    y_pred = knn(X_train, y_train, X_test, k)

    accuracy = sum([
        int(y_pred_i == y_test_i)
        for y_pred_i, y_test_i in zip(y_pred, y_test)
    ]) / len(y_test)

    if DATASET == 'fmnist':
        garments_pred = [
            get_garment_from_label(label)
            for label in y_pred
        ]
        print(f"Predicted garments: {garments_pred}")
    else:
        print(f"Predicted labels: {y_pred}")

    print(f"Accuracy: {accuracy * 100}%")

    
if __name__ == '__main__':
    main()

results

and this worked okay on a few samples:

dataset: mnist
n_train: 10000
n_test: 100
k: 7
Predicted labels: ...
Accuracy: 95.0%

but given that I wanted to (and I could with the help of UNSW's HPC Katana,) run the algorithm on the entire dataset, I quickly learned that the above code ran a single process on a single CPU core.

using tmux in a HPC over ~100hrs of compute I was able to produce:

dataset: mnist
n_train: 60000
n_test: 10000
k: 5
Predicted labels: ...
Accuracy: 96.93%

however, re-running this for fmnist was out of the question.

as such I turned to other methods:

multiprocessing

I was advised by the sysadmin of katana to use either joblib or multiprocessing.

my solution with this approach was to simply add the relevant import and refactor the main knn loop as

def knn(X_train, y_train, X_test, k=3):
    with Pool(processes=multiprocessing.cpu_count()) as pool:
        work_items = [(test_sample_idx, test_sample, X_train, y_train, k) for test_sample_idx, test_sample in enumerate(X_test)]
        y_pred = pool.map(classify_one, work_items)
    return y_pred

this approach ended up using the 6, 8, 36, etc. cores that I had available.

still only a 10x-20x speed-up — runtimes now of still 10 hours.

multithreading

I then thought of giving each of these 8 or so CPU cores 20 threads to manage.

Dataset: fmnist
n_train: 60000
n_test: 10000
k: 7
Accuracy: 85.41%

however it is worth noting that each of these threads share the memory address space of the parent process.

as such, due to the way python handles its GIL, a computational speed up was not guaranteed.

I let chatgpt draft the code:

code

import os
import math
import multiprocessing
from multiprocessing import Process, Queue
import concurrent.futures
import threading

##################################################
# Same data loading functions as your snippet
##################################################

DATA_DIR = 'data/'
DATASET = 'fmnist'  # or 'mnist' if you prefer
TEST_DATA_FILENAME = DATA_DIR + DATASET + '/t10k-images-idx3-ubyte'
TEST_LABELS_FILENAME = DATA_DIR + DATASET + '/t10k-labels-idx1-ubyte'
TRAIN_DATA_FILENAME = DATA_DIR + DATASET + '/train-images-idx3-ubyte'
TRAIN_LABELS_FILENAME = DATA_DIR + DATASET + '/train-labels-idx1-ubyte'


def bytes_to_int(byte_data):
    return int.from_bytes(byte_data, 'big')

def read_images(filename, n_max_images=None):
    images = []
    with open(filename, 'rb') as f:
        _ = f.read(4)
        n_images = bytes_to_int(f.read(4))
        if n_max_images:
            n_images = n_max_images
        n_rows = bytes_to_int(f.read(4))
        n_cols = bytes_to_int(f.read(4))
        for image_idx in range(n_images):
            image = []
            for row_idx in range(n_rows):
                row = []
                for col_idx in range(n_cols):
                    pixel = f.read(1)
                    row.append(pixel)
                image.append(row)
            images.append(image)
    return images

def read_labels(filename, n_max_labels=None):
    labels = []
    with open(filename, 'rb') as f:
        _ = f.read(4)  # magic number
        n_labels = bytes_to_int(f.read(4))
        if n_max_labels:
            n_labels = n_max_labels
        for label_idx in range(n_labels):
            label = bytes_to_int(f.read(1))
            labels.append(label)
    return labels

def flatten_list(subl):
    return [pixel for sublist in subl for pixel in sublist]

def extract_features(X):
    """ Convert 2D list-of-bytes into a 1D list-of-bytes for each image. """
    return [flatten_list(sample) for sample in X]

def bytes_to_int_fast(b):
    """Slightly faster local version to avoid re-calling int.from_bytes in a loop."""
    return b[0]  # since each pixel is a single byte

##################################################
# Distance + kNN classification
##################################################

def dist(x, y):
    """
    Euclidean distance. x, y are lists of single-byte values.
    Each element is a 'bytes' object of length 1, e.g. b'\\x7f'.
    """
    # For speed, we can interpret b[0] directly:
    # (Note that for n_train=60k, n_test=10k, this is STILL huge in Python!)
    s = 0
    for x_i, y_i in zip(x, y):
        diff = (x_i[0] - y_i[0])
        s += diff * diff
    return math.sqrt(s)

def classify_one_sample(test_sample_idx, test_sample, X_train, y_train, k):
    """
    Compute kNN for a single test sample.
    We'll print the test_sample_idx to show progress.
    """
    print(f"PID {os.getpid()} - Thread {threading.current_thread().name} "
          f"is processing test sample #{test_sample_idx}")

    # Compute distance to all training samples
    training_distances = [dist(train_sample, test_sample) for train_sample in X_train]
    
    # Find k nearest
    sorted_idxs = sorted(range(len(training_distances)), key=lambda i: training_distances[i])[:k]
    neighbors = [y_train[i] for i in sorted_idxs]
    # majority vote
    return max(neighbors, key=neighbors.count)

##################################################
# Process-level worker
# Receives a CHUNK of X_test (and corresponding chunk indices)
# Then spawns threads for each test sample in that chunk
##################################################

def process_chunk(proc_id, start_idx, X_test_chunk, X_train, y_train, k, result_queue):
    """
    This function runs in a separate process.
    It uses a thread pool to handle each test sample in X_test_chunk.
    """
    pid = os.getpid()
    print(f"Process {proc_id} started (PID={pid}), handling {len(X_test_chunk)} test samples.")

    # We'll store predictions in local list
    local_preds = [None] * len(X_test_chunk)

    # Let's define how many threads within this process
    num_threads = 4
    with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
        futures = {}
        for i, test_sample in enumerate(X_test_chunk):
            global_test_index = start_idx + i  # The absolute index in the full test set
            future = executor.submit(
                classify_one_sample,
                global_test_index,
                test_sample,
                X_train,
                y_train,
                k
            )
            futures[future] = i  # so we know which local index it corresponds to

        # Collect results
        for future in concurrent.futures.as_completed(futures):
            local_i = futures[future]
            pred_label = future.result()
            local_preds[local_i] = pred_label

    # Put back to main via a queue: (proc_id, local_preds)
    result_queue.put((proc_id, local_preds))
    print(f"Process {proc_id} finished (PID={pid}).")

##################################################
# Top-level function that spawns 24 processes
##################################################

def knn_parallel(X_train, y_train, X_test, k=7, num_processes=24):
    """
    We split the test set into <num_processes> chunks.
    Each chunk is handled by a separate process, which in turn
    spawns threads to classify its chunk of test samples.
    """
    total_test = len(X_test)
    chunk_size = math.ceil(total_test / num_processes)

    result_queue = multiprocessing.Queue()
    processes = []

    for proc_id in range(num_processes):
        start_idx = proc_id * chunk_size
        if start_idx >= total_test:
            break
        end_idx = min(start_idx + chunk_size, total_test)
        X_test_chunk = X_test[start_idx:end_idx]
        
        p = Process(
            target=process_chunk,
            args=(proc_id, start_idx, X_test_chunk, X_train, y_train, k, result_queue)
        )
        p.start()
        processes.append(p)

    # Collect partial results from the queue
    results_by_proc = [None]*num_processes
    for _ in processes:
        proc_id, preds = result_queue.get()
        results_by_proc[proc_id] = preds

    # Wait for all processes
    for p in processes:
        p.join()

    # Flatten predictions in the correct order
    all_predictions = []
    for chunk_preds in results_by_proc:
        if chunk_preds is not None:
            all_predictions.extend(chunk_preds)

    return all_predictions

##################################################
# Put it all together in main()
##################################################

def get_garment_from_label(label):
    return [
        'T-shirt/top',
        'Trouser',
        'Pullover',
        'Dress',
        'Coat',
        'Sandal',
        'Shirt',
        'Sneaker',
        'Bag',
        'Ankle boot',
    ][label]

def main():
    # FULL dataset sizes
    n_train = 60000
    n_test = 10000
    k = 7
    print(f"Dataset: {DATASET}")
    print(f"n_train: {n_train}")
    print(f"n_test: {n_test}")
    print(f"k: {k}")

    # 1) Read data
    X_train = read_images(TRAIN_DATA_FILENAME, n_train)
    y_train = read_labels(TRAIN_LABELS_FILENAME, n_train)
    X_test = read_images(TEST_DATA_FILENAME, n_test)
    y_test = read_labels(TEST_LABELS_FILENAME, n_test)

    # 2) Flatten images into 1D lists-of-bytes
    X_train = extract_features(X_train)
    X_test = extract_features(X_test)

    # 3) Run parallel kNN
    y_pred = knn_parallel(X_train, y_train, X_test, k=k, num_processes=24)

    # 4) Compute accuracy
    correct = sum(int(p == t) for p, t in zip(y_pred, y_test))
    accuracy = correct / len(y_test)

    # 5) Print results
    if DATASET == 'fmnist':
        garments_pred = [get_garment_from_label(label) for label in y_pred]
        print(f"Predicted garments (first 50 shown): {garments_pred[:50]}")
    else:
        print(f"Predicted labels (first 50 shown): {y_pred[:50]}")

    print(f"Accuracy: {accuracy * 100:.2f}%")

if __name__ == '__main__':
    main()

vectorising

finally, I realised that we could sidestep all of this by leveraging the numpy library and it's underlying parallelism of array operations.

Dataset: mnist
n_train: 60000
n_test: 10000
k: 7
chunk_size: 1000
Shapes: (60000, 784) (60000,) (10000, 784) (10000,)
Processed 1000 / 10000 test samples
Processed 6000 / 10000 test samples
Accuracy: 96.94%

real    0m12.106s
user    0m20.951s
sys     0m2.137s

note that we have now gone from a 100hr inference time to a 12 second inference time!

analysis

furthermore, we must also notice that the above calculation takes place in partitions. the reason for this is the below calculation:

calculation

for each of the \(10,000\) training samples we do \(60,000\) comparisons. thus a total of 600 Million comparisons. each of these image comparisons compares a \(784 \times 1 \) dimensional vector with another.

previously when we were manually doing these loops that would mean \(\approx\) 4.704 Billion calculations. furthermore if we encode all of these as float32's of 4 bytes each, suddenly we require 1.8 Terabytes of RAM to hold all these numbers.

code

making a chunk size of 1,000, we have this (chatgpt o1) code:

import numpy as np
import math

DATA_DIR = 'data/'
DATASET = 'mnist'  # or 'fmnist'
TEST_DATA_FILENAME = DATA_DIR + DATASET + '/t10k-images-idx3-ubyte'
TEST_LABELS_FILENAME = DATA_DIR + DATASET + '/t10k-labels-idx1-ubyte'
TRAIN_DATA_FILENAME = DATA_DIR + DATASET + '/train-images-idx3-ubyte'
TRAIN_LABELS_FILENAME = DATA_DIR + DATASET + '/train-labels-idx1-ubyte'


def bytes_to_int(byte_data):
    return int.from_bytes(byte_data, 'big')


def read_images(filename, n_max_images=None):
    """Reads IDX image file and returns a list of images (2D lists of 1-byte pixels)."""
    images = []
    with open(filename, 'rb') as f:
        _ = f.read(4)
        n_images = bytes_to_int(f.read(4))
        if n_max_images:
            n_images = n_max_images
        n_rows = bytes_to_int(f.read(4))
        n_cols = bytes_to_int(f.read(4))
        for _ in range(n_images):
            image = []
            for _ in range(n_rows):
                row = []
                for _ in range(n_cols):
                    pixel = f.read(1)
                    row.append(pixel)
                image.append(row)
            images.append(image)
    return images


def read_labels(filename, n_max_labels=None):
    """Reads IDX label file and returns a list of integer labels."""
    labels = []
    with open(filename, 'rb') as f:
        _ = f.read(4)  # magic number
        n_labels = bytes_to_int(f.read(4))
        if n_max_labels:
            n_labels = n_max_labels
        for _ in range(n_labels):
            label = bytes_to_int(f.read(1))
            labels.append(label)
    return labels


def flatten_list(image_2d):
    """Convert a 2D list of single-byte objects into a 1D list."""
    return [pix[0] for row in image_2d for pix in row]


def get_garment_from_label(label):
    return [
        'T-shirt/top',
        'Trouser',
        'Pullover',
        'Dress',
        'Coat',
        'Sandal',
        'Shirt',
        'Sneaker',
        'Bag',
        'Ankle boot',
    ][label]


def vectorized_knn(X_train, y_train, X_test, k=7):
    """
    Vectorized kNN using NumPy.
    - X_train: shape (n_train, 784)
    - X_test: shape (n_test, 784)
    - Returns y_pred: shape (n_test,)
    """
    # 1) Compute the distance-squared matrix of shape (n_test, n_train)
    #    We can skip sqrt for the nearest-neighbor check (sqrt is monotonic).
    #    distances[i, j] = sum( (X_test[i] - X_train[j])^2 )
    # Using broadcasting: X_test -> (n_test, 1, 784), X_train -> (1, n_train, 784)
    
    # Convert to float32 or float64. If data is uint8, the difference can remain int,
    # but let's ensure we don't overflow by upcasting:
    X_train_f = X_train.astype(np.float32)
    X_test_f = X_test.astype(np.float32)
    
    # shape (n_test, n_train, 784):
    diff = X_test_f[:, np.newaxis, :] - X_train_f[np.newaxis, :, :]
    dist_sq = np.sum(diff**2, axis=2)  # shape (n_test, n_train)
    
    n_test = X_test.shape[0]
    y_pred = np.empty(n_test, dtype=y_train.dtype)
    
    # 2) For each test sample, find the k nearest training samples
    #    We'll use np.argpartition for partial sort to get the smallest k distances
    for i in range(n_test):
        # Indices of the k smallest distances
        neighbors_idx = np.argpartition(dist_sq[i], k)[:k]
        # labels of those neighbors
        neighbor_labels = y_train[neighbors_idx]
        # majority vote
        vals, counts = np.unique(neighbor_labels, return_counts=True)
        y_pred[i] = vals[np.argmax(counts)]
        
        if i % 1000 == 0:
            print(f"Processed test sample {i}/{n_test}")
    
    return y_pred


def main():
    n_train = 60000
    n_test = 10000
    k = 7

    print(f"Dataset: {DATASET}")
    print(f"n_train: {n_train}")
    print(f"n_test: {n_test}")
    print(f"k: {k}")

    # 1) Read the data from IDX files
    #    (Make sure you have data/mnist/ or data/fmnist/ with the IDX files)
    raw_train = read_images(TRAIN_DATA_FILENAME, n_train)
    y_train = np.array(read_labels(TRAIN_LABELS_FILENAME, n_train), dtype=np.int32)
    raw_test = read_images(TEST_DATA_FILENAME, n_test)
    y_test = np.array(read_labels(TEST_LABELS_FILENAME, n_test), dtype=np.int32)
    
    # 2) Flatten each image from shape (28,28) to (784,)
    #    Then convert to a NumPy array
    #    shape -> (n_train, 784)
    X_train = np.array([flatten_list(img) for img in raw_train], dtype=np.dtype('B'))
    X_test = np.array([flatten_list(img) for img in raw_test], dtype=np.dtype('B'))
    
    print("Finished loading data. Shapes:")
    print("X_train:", X_train.shape, "y_train:", y_train.shape)
    print("X_test:", X_test.shape, "y_test:", y_test.shape)

    # 3) Run vectorized KNN (without parallelization)
    y_pred = vectorized_knn(X_train, y_train, X_test, k)

    # 4) Compute accuracy
    accuracy = np.mean(y_pred == y_test)
    
    # 5) Print results
    print(f"Accuracy: {accuracy * 100:.2f}%")
    
    if DATASET == 'fmnist':
        # Optionally see how the first 10 predictions map to garment names
        garments_pred = [get_garment_from_label(label) for label in y_pred[:10]]
        print(f"First 10 predicted garments: {garments_pred}")
    else:
        print(f"First 10 predicted digits: {y_pred[:10]}")


if __name__ == '__main__':
    main()

frameworks

using scikit-learn's KNeigborsClassifier we have

code

import numpy as np
from sklearn.neighbors import KNeighborsClassifier

DATA_DIR = 'data/'
DATASET = 'mnist'  # or 'fmnist'
TEST_DATA_FILENAME = DATA_DIR + DATASET + '/t10k-images-idx3-ubyte'
TEST_LABELS_FILENAME = DATA_DIR + DATASET + '/t10k-labels-idx1-ubyte'
TRAIN_DATA_FILENAME = DATA_DIR + DATASET + '/train-images-idx3-ubyte'
TRAIN_LABELS_FILENAME = DATA_DIR + DATASET + '/train-labels-idx1-ubyte'

def bytes_to_int(b):
    return int.from_bytes(b, 'big')

def read_images(filename, n_max_images=None):
    with open(filename, 'rb') as f:
        _ = f.read(4)
        n_images = bytes_to_int(f.read(4))
        if n_max_images:
            n_images = n_max_images
        n_rows = bytes_to_int(f.read(4))
        n_cols = bytes_to_int(f.read(4))
        
        images = []
        for _ in range(n_images):
            img = [f.read(1) for _ in range(n_rows*n_cols)]
            images.append(img)
    return images

def read_labels(filename, n_max_labels=None):
    with open(filename, 'rb') as f:
        _ = f.read(4)  # magic number
        n_labels = bytes_to_int(f.read(4))
        if n_max_labels:
            n_labels = n_max_labels
        labels = []
        for _ in range(n_labels):
            label = bytes_to_int(f.read(1))
            labels.append(label)
    return labels

def main():
    n_train = 60000
    n_test = 10000
    k = 7

    print(f"Dataset: {DATASET}")
    print(f"n_train: {n_train}")
    print(f"n_test: {n_test}")
    print(f"k: {k}")

    # 1) Read data
    raw_train = read_images(TRAIN_DATA_FILENAME, n_train)
    y_train = np.array(read_labels(TRAIN_LABELS_FILENAME, n_train), dtype=np.int32)
    raw_test = read_images(TEST_DATA_FILENAME, n_test)
    y_test = np.array(read_labels(TEST_LABELS_FILENAME, n_test), dtype=np.int32)

    # 2) Convert each image to shape (784,) as uint8
    X_train = np.zeros((n_train, 784), dtype=np.uint8)
    for i, img in enumerate(raw_train):
        X_train[i] = [p[0] for p in img]
    del raw_train

    X_test = np.zeros((n_test, 784), dtype=np.uint8)
    for i, img in enumerate(raw_test):
        X_test[i] = [p[0] for p in img]
    del raw_test

    print("Shapes:")
    print("X_train:", X_train.shape, "y_train:", y_train.shape)
    print("X_test:", X_test.shape, "y_test:", y_test.shape)

    # 3) Use scikit-learn’s KNeighborsClassifier
    knn = KNeighborsClassifier(n_neighbors=k, n_jobs=-1, algorithm="auto")
    knn.fit(X_train, y_train)

    # 4) Predict on test set
    y_pred = knn.predict(X_test)

    # 5) Evaluate accuracy
    accuracy = (y_pred == y_test).mean() * 100
    print(f"Accuracy: {accuracy:.2f}%")

if __name__ == "__main__":
    main()

results

it's reassuring to see that our algorithm is deterministic across the dataset:

Dataset: mnist
n_train: 60000
n_test: 10000
k: 7
Shapes:
X_train: (60000, 784) y_train: (60000,)
X_test: (10000, 784) y_test: (10000,)
Accuracy: 96.94%

real    0m27.286s
user    1m10.678s
sys     0m7.199s

then for fmnist:

Dataset: fmnist
n_train: 60000
n_test: 10000
k: 7
Shapes:
X_train: (60000, 784) y_train: (60000,)
X_test: (10000, 784) y_test: (10000,)
Accuracy: 85.40%

real    0m26.802s
user    1m8.035s
sys     0m6.987s

as expected :)

which interestingly loses to our numpy chunking approach!