Source code for cntk.contrib.netopt.test.factorization_test

import numpy as np
import pytest
import cntk as C
import cntk.contrib.netopt.factorization as nc
C.cntk_py.set_fixed_random_seed(1)
C.cntk_py.force_deterministic_algorithms()

# create a dense network for the tests
def _create_model_dense(features, num_hidden_layers, hidden_layers_dim, num_output_classes):
    with C.layers.default_options(init=C.layers.glorot_uniform(), activation=C.sigmoid):
        h = features
        for _ in range(num_hidden_layers):
            h = C.layers.Dense(hidden_layers_dim)(h)
        last_layer = C.layers.Dense(num_output_classes, activation = None)

        return last_layer(h)


# no size reduction, only the factorization.
def _get_rank_same_size(W):
        return int(len(W) * 1)


# reduce the size by len* 0.8
def _get_rank_reduced_size(W):
    return int(len(W) * 0.8)
 
   
# filter dense blocks that has the same height and width.
def _filter(model):
    W = model.W.value
    if (len(W) != len(W[0])):
        return False
    else:
        return True


# Helper function to generate a random data sample
def _generate_random_data_sample(sample_size, feature_dim, num_classes):
    Y = np.random.randint(size=(sample_size, 1), low=0, high=num_classes)
    X = (np.random.randn(sample_size, feature_dim)+3) * (Y+1)
    X = X.astype(np.float32)
    class_ind = [Y==class_number for class_number in range(num_classes)]
    Y = np.asarray(np.hstack(class_ind), dtype=np.float32)
    return X, Y


[docs]def test_svd_factorization(): # W and its svd factorizations (U and sV) W = np.array([[1, 0, 0, 0, 2], [0, 0, 3, 0, 0], [0, 0, 0, 0, 0], [0, 0, 0, 2, 0]]) U = np.array([[0, 1, 0, 0], [1, 0, 0, 0], [0, 0, 0, 1], [0, 0, 1, 0]]) sV = np.array([[0, 0, 3, 0, 0], [1, 0, 0, 0, 2], [0, 0, 0, 2, 0], [0, 0, 0, 0, 0]]) # call svd factorization with W's length W1, W2 = nc.svd_subprojection(W, len(W)) assert(np.array_equal(W1, U) == True) assert(np.allclose(sV, W2) == True)
[docs]def test_factor_dense(): input_dim = 2 num_output_classes = 2 hidden_layer_dim = 50 input = C.input_variable(input_dim) z = _create_model_dense(input, input_dim, hidden_layer_dim, num_output_classes) newz = nc.factor_dense(z, projection_function=_get_rank_same_size, filter_function = _filter) newblocks = C.logging.graph.depth_first_search( newz, lambda x : type(x) == C.Function and x.root_function.is_block, depth = 0) assert(newblocks[1].op_name == "DenseFactored") block_root = C.as_composite(newblocks[1].block_root) # no reduction, same size but factored. assert(block_root.W1.value.shape == (50, 50)) newz = nc.factor_dense(z, projection_function=_get_rank_reduced_size, filter_function = _filter) newblocks = C.logging.graph.depth_first_search( newz, lambda x : type(x) == C.Function and x.root_function.is_block, depth = 0) assert(newblocks[1].op_name == "DenseFactored") block_root = C.as_composite(newblocks[1].block_root) # the reduction has taken place now. assert(block_root.W1.value.shape == (50, 40))
def _percentage_match(labels, predictions): match_count = 0 for idx, lbl in enumerate(labels): if (np.argmax(lbl) == np.argmax(predictions[idx])): match_count += 1 return match_count / len(labels) * 100 if len(labels) != 0 else 0
[docs]def test_factor_dense_for_prediction(): input_dim = 2 num_output_classes = 2 hidden_layer_dim = 50 num_minibatches_to_train = 2000 minibatch_size = 25 learning_rate = 0.5 input = C.input_variable(input_dim) label = C.input_variable(num_output_classes) z = _create_model_dense(input, input_dim, hidden_layer_dim, num_output_classes) loss = C.cross_entropy_with_softmax(z, label) eval_error = C.classification_error(z, label) # Instantiate the trainer object to drive the model training lr_schedule = C.learning_rate_schedule(learning_rate, C.UnitType.minibatch) learner = C.sgd(z.parameters, lr_schedule) trainer = C.Trainer(z, (loss, eval_error), [learner]) # Run the trainer and perform model training training_progress_output_freq = 20 plotdata = {"batchsize":[], "loss":[], "error":[]} for i in range(0, int(num_minibatches_to_train)): features, labels = _generate_random_data_sample(minibatch_size, input_dim, num_output_classes) # Specify the input variables mapping in the model to actual minibatch data for training trainer.train_minibatch({input : features, label : labels}) # generate some data to predict features, labels = _generate_random_data_sample(10, 2, 2) # factor the model. newz = nc.factor_dense(z, projection_function=_get_rank_reduced_size, filter_function = _filter) original_out = C.softmax(z) factored_out = C.softmax(newz) original_labels_probs = original_out.eval({input : features}) predicted_label_probs = factored_out.eval({input : features}) original_prediction_percentage = _percentage_match(labels, original_labels_probs) # reduced model should have at leat 50% match compared to the original # For the test, we reduced the training minibatches, thus the match is lower. assert(original_prediction_percentage * 0.5 <= _percentage_match(labels, predicted_label_probs))