Train model using declarative and imperative API

CNTK gives the user several ways how her model can be trained: * High level declarative style API using Function.train method (or training_session). Given a criterion function, the user can simply call the train method, providing configuration parameters for different aspects of the training, such as data sources, checkpointing, cross validation and progress printing. The corresponding test method can be used for evaluation. This API simplifies implementation of routine training tasks and eliminates boilerplate code. * Using low level Trainer.train_minibatch or test methods. In this case the user writes the minibatch loop explicitly and has full control of all aspects. It is more flexible than the first option but is more error prone and requires deeper understanding of concepts especially in a distribution environment.

This document is organized as follows: firstly, we will give an example how a typical imperative loop looks like and what are its caveats in a distributed environment. Then we present how declarative API looks like and how it simplifies development and eliminates potential errors. If you are interested only in using the high level API (in generarl you should always use it instead of explicit minibatch loop) please jump directly to the corresponding Function.train section.

We start with some imports we need for the rest of this manual:

In [29]:
from __future__ import print_function
import os
import cntk
import cntk.ops
import cntk.io
import cntk.train
import cntk.tests.test_utils
from cntk.layers import Dense, Sequential
from cntk.io import StreamDef, StreamDefs, MinibatchSource, CTFDeserializer
from cntk.logging import ProgressPrinter

cntk.tests.test_utils.set_device_from_pytest_env() # (only needed for our build system)
cntk.cntk_py.set_fixed_random_seed(1) # fix the random seed so that LR examples are repeatable

1. Example script with an explicit loop

Many scripts in CNTK have a very similar structure: - they create a network - instantiate a trainer and a learner with appropriate hyper-parameters - load training and testing data with minibatch sources - then run the main training loop fetching the data from the train minibatch source and feeding it to the trainer for N samples/sweeps - at the end they perform the eval loop using data from the test minibatch

As an example for such a script we will take a toy task of learning XOR operation with a simple feed forward network. We will try to learn the following function:

x y result
0 0 0
0 1 1
1 0 1
0 0 0

The network will have two dense layers, we use tanh as an activation for the first layer and no activation for the second. The sample script is presented below:

In [30]:
# Let's prepare data in the CTF format. It exactly matches
# the table above
INPUT_DATA = r'''|features 0 0      |label 0
|features 1 0       |label 1
|features 0 1       |label 1
|features 1 1       |label 0
'''

# Write the data to a temporary file
input_file = 'input.ctf.tmp'
with open(input_file, 'w') as f:
    f.write(INPUT_DATA)

# Create a network
features = cntk.input_variable(2)
label = cntk.input_variable(1)

# Define our input data streams
streams = StreamDefs(
    features = StreamDef(field='features', shape=2),
    label = StreamDef(field='label', shape=1))

model = Sequential([
    Dense(2, activation=cntk.ops.tanh),
    Dense(1)])

z = model(features)
loss = cntk.squared_error(z, label)

# Create a learner and a trainer and a progress writer to
# output current progress
learner = cntk.sgd(model.parameters, cntk.learning_parameter_schedule_per_sample(0.1))
trainer = cntk.train.Trainer(z, (loss, loss), learner, ProgressPrinter(freq=10))

# Now let's create a minibatch source for our input file
mb_source = MinibatchSource(CTFDeserializer(input_file, streams))
input_map = { features : mb_source['features'], label : mb_source['label'] }

# Run a manual training minibatch loop
minibatch_size = 4
max_samples = 800
train = True
while train and trainer.total_number_of_samples_seen < max_samples:
    data = mb_source.next_minibatch(minibatch_size, input_map)
    train = trainer.train_minibatch(data)

# Run a manual evaluation loop ussing the same data file for evaluation
test_mb_source = MinibatchSource(CTFDeserializer(input_file, streams), randomize=False, max_samples=100)
test_input_map = { features : test_mb_source['features'], label : test_mb_source['label'] }
total_samples = 0
error = 0.
data = test_mb_source.next_minibatch(32, input_map)
while data:
    total_samples += data[label].number_of_samples
    error += trainer.test_minibatch(data) * data[label].number_of_samples
    data = test_mb_source.next_minibatch(32, test_input_map)

print("Metric= %f" % (error / total_samples))
Learning rate per sample: 0.1
 Minibatch[   1-  10]: loss = 0.352425 * 40, metric = 35.24% * 40;
 Minibatch[  11-  20]: loss = 0.207848 * 40, metric = 20.78% * 40;
 Minibatch[  21-  30]: loss = 0.191173 * 40, metric = 19.12% * 40;
 Minibatch[  31-  40]: loss = 0.176530 * 40, metric = 17.65% * 40;
 Minibatch[  41-  50]: loss = 0.161325 * 40, metric = 16.13% * 40;
 Minibatch[  51-  60]: loss = 0.143685 * 40, metric = 14.37% * 40;
 Minibatch[  61-  70]: loss = 0.118660 * 40, metric = 11.87% * 40;
 Minibatch[  71-  80]: loss = 0.082769 * 40, metric = 8.28% * 40;
 Minibatch[  81-  90]: loss = 0.046990 * 40, metric = 4.70% * 40;
 Minibatch[  91- 100]: loss = 0.048029 * 40, metric = 4.80% * 40;
 Minibatch[ 101- 110]: loss = 0.518075 * 40, metric = 51.81% * 40;
 Minibatch[ 111- 120]: loss = 0.022979 * 40, metric = 2.30% * 40;
 Minibatch[ 121- 130]: loss = 0.018714 * 40, metric = 1.87% * 40;
 Minibatch[ 131- 140]: loss = 0.193923 * 40, metric = 19.39% * 40;
 Minibatch[ 141- 150]: loss = 0.030105 * 40, metric = 3.01% * 40;
 Minibatch[ 151- 160]: loss = 0.007559 * 40, metric = 0.76% * 40;
 Minibatch[ 161- 170]: loss = 0.006812 * 40, metric = 0.68% * 40;
 Minibatch[ 171- 180]: loss = 0.009554 * 40, metric = 0.96% * 40;
 Minibatch[ 181- 190]: loss = 0.012426 * 40, metric = 1.24% * 40;
 Minibatch[ 191- 200]: loss = 0.012157 * 40, metric = 1.22% * 40;
Metric= 0.010537

As it can be seen above, the actual model is specified in just two lines, the rest is a boilerplate code to iterate over the data and feed it manually for training and evaluation. With a manual loop, the user has the complete flexibility how to feed the data, but she also has to take several not so obvious things into account.

1.1 Failover and recovery

For the small sample above the recovery is not important, but in case the training spans several weeks or days it is not safe to assume that the machine stays online all the time and there are no hardware or software glitches. If the machine reboots, goes down or the script has a bug the user will have to rerun the same experiment from the beginning. That is highly undesirable. To avoid that CNTK allows the user to perform checkpoints and restore from them in the event of failure.

One of the means to save the model state in CNTK is by using save method on the Function class. It is worth mentioning that this function only saves the model state, but there are other stateful entities in the script, including: * minibatch sources * trainer * learners

In order to save the complete state of the script, the user has to manually save the current state of the minibatch source and the trainer. The minibatch source provides get_checkpoint_state method, the result can be passed to the trainer save_checkpoint method, that takes care of saving the state to disk or exchanging the state in case of distributed training. There are also the corresponding restore_from_checkpoint methods on the trainer and the minibatch source that can be used for restore. To recover from error, on start up the user has to restore a state using the trainer and set the current position of the minibatch source.

With the above in mind, let’s rewrite our loop as follows:

In [31]:
# Run a manual training minibatch loop with checkpointing

# Same as before
mb_source = MinibatchSource(CTFDeserializer(input_file, streams))
input_map = { features : mb_source['features'], label : mb_source['label'] }

model = Sequential([
    Dense(2, activation=cntk.ops.tanh),
    Dense(1)])

z = model(features)
loss = cntk.squared_error(z, label)

learner = cntk.sgd(model.parameters, cntk.learning_parameter_schedule_per_sample(0.1))
trainer = cntk.train.Trainer(z, (loss, loss), learner, ProgressPrinter(freq=10))

# Try to restore if the checkpoint exists
checkpoint = 'manual_loop_checkpointed.tmp'
#Please comment the line below if you want to restore from the checkpoint
if os.path.exists(checkpoint):
    os.remove(checkpoint)

if os.path.exists(checkpoint):
    print("Trying to restore from checkpoint")
    mb_source_state = trainer.restore_from_checkpoint(checkpoint)
    mb_source.restore_from_checkpoint(mb_source_state)
    print("Restore has finished successfully")
else:
    print("No restore file found")

checkpoint_frequency = 100
last_checkpoint = 0
train = True
while train and trainer.total_number_of_samples_seen < max_samples:
    data = mb_source.next_minibatch(minibatch_size, input_map)
    train = trainer.train_minibatch(data)
    if trainer.total_number_of_samples_seen / checkpoint_frequency != last_checkpoint:
        mb_source_state = mb_source.get_checkpoint_state()
        trainer.save_checkpoint(checkpoint, mb_source_state)
        last_checkpoint = trainer.total_number_of_samples_seen / checkpoint_frequency

No restore file found
Learning rate per sample: 0.1
 Minibatch[   1-  10]: loss = 0.352425 * 40, metric = 35.24% * 40;
 Minibatch[  11-  20]: loss = 0.207848 * 40, metric = 20.78% * 40;
 Minibatch[  21-  30]: loss = 0.191173 * 40, metric = 19.12% * 40;
 Minibatch[  31-  40]: loss = 0.176530 * 40, metric = 17.65% * 40;
 Minibatch[  41-  50]: loss = 0.161325 * 40, metric = 16.13% * 40;
 Minibatch[  51-  60]: loss = 0.143685 * 40, metric = 14.37% * 40;
 Minibatch[  61-  70]: loss = 0.118660 * 40, metric = 11.87% * 40;
 Minibatch[  71-  80]: loss = 0.082769 * 40, metric = 8.28% * 40;
 Minibatch[  81-  90]: loss = 0.046990 * 40, metric = 4.70% * 40;
 Minibatch[  91- 100]: loss = 0.048029 * 40, metric = 4.80% * 40;
 Minibatch[ 101- 110]: loss = 0.518075 * 40, metric = 51.81% * 40;
 Minibatch[ 111- 120]: loss = 0.022979 * 40, metric = 2.30% * 40;
 Minibatch[ 121- 130]: loss = 0.018714 * 40, metric = 1.87% * 40;
 Minibatch[ 131- 140]: loss = 0.193923 * 40, metric = 19.39% * 40;
 Minibatch[ 141- 150]: loss = 0.030105 * 40, metric = 3.01% * 40;
 Minibatch[ 151- 160]: loss = 0.007559 * 40, metric = 0.76% * 40;
 Minibatch[ 161- 170]: loss = 0.006812 * 40, metric = 0.68% * 40;
 Minibatch[ 171- 180]: loss = 0.009554 * 40, metric = 0.96% * 40;
 Minibatch[ 181- 190]: loss = 0.012426 * 40, metric = 1.24% * 40;
 Minibatch[ 191- 200]: loss = 0.012157 * 40, metric = 1.22% * 40;

At the beginning we check if the checkpoint file exists and we can restore from it. After that we start the training. Our loop is based on the total number of samples the trainer has seen. This information is included in the checkpoint, so in case of failure the training will resume at the saved position (this will become even more important for distributed training).

Depending on the checkpointing frequency the above script retrieves the current state of the minibatch source and creates a checkpoint using the trainer. If the script iterates over the same data many times, saving the state of the minibatch source is not that important, but for huge workloads you probably do not want to start seeing the same data from the beginning.

At some point the user will want to parallelize the script to decrease the training time. Let’s look how this can be done in the next section.

1.2 Distributed manual loop

In order to make training distributed CNTK provides a set of distributed learner that encapsulate a set of algorithms (1BitSGD, BlockMomentum, data parallel SGD) that uses MPI to exchage the state. From the script perspective, almost everything stays the same. The only difference is that the user needs to wrap the learner into the corresponding distributed learner and make sure she picks up the data from the minibatch source based on the current worker rank (also the script should be run with mpiexec):

In [32]:
# Run a manual training minibatch loop with distributed learner
checkpoint = 'manual_loop_distributed.tmp'
#Please comment the line below if you want to restore the checkpoint
if os.path.exists(checkpoint):
    os.remove(checkpoint)

model = Sequential([
    Dense(2, activation=cntk.ops.tanh),
    Dense(1)])

z = model(features)
loss = cntk.squared_error(z, label)

mb_source = MinibatchSource(CTFDeserializer(input_file, streams))
input_map = { features : mb_source['features'], label : mb_source['label'] }

# Make sure the learner is distributed
distributed_learner = cntk.distributed.data_parallel_distributed_learner(
    cntk.sgd(model.parameters, cntk.learning_parameter_schedule_per_sample(0.1)))
trainer = cntk.train.Trainer(z, (loss, loss), distributed_learner, ProgressPrinter(freq=10))

if os.path.exists(checkpoint):
    print("Trying to restore from checkpoint")
    mb_source_state = trainer.restore_from_checkpoint(checkpoint)
    mb_source.restore_from_checkpoint(mb_source_state)
    print("Restore has finished successfully")
else:
    print("No restore file found")

last_checkpoint = 0
train = True
partition = cntk.distributed.Communicator.rank()
num_partitions = cntk.distributed.Communicator.num_workers()
while train and trainer.total_number_of_samples_seen < max_samples:
    # Make sure each worker gets its own data only
    data = mb_source.next_minibatch(minibatch_size_in_samples = minibatch_size,
                                    input_map = input_map, device = cntk.use_default_device(),
                                    num_data_partitions=num_partitions, partition_index=partition)
    train = trainer.train_minibatch(data)
    if trainer.total_number_of_samples_seen / checkpoint_frequency != last_checkpoint:
        mb_source_state = mb_source.get_checkpoint_state()
        trainer.save_checkpoint(checkpoint, mb_source_state)
        last_checkpoint = trainer.total_number_of_samples_seen / checkpoint_frequency

# When you use distributed learners, please call finalize MPI at the end of your script,
# see the next cell.
# cntk.distributed.Communicator.finalize()
No restore file found
 Minibatch[   1-  10]: loss = 0.352425 * 40, metric = 35.24% * 40;
 Minibatch[  11-  20]: loss = 0.207848 * 40, metric = 20.78% * 40;
 Minibatch[  21-  30]: loss = 0.191173 * 40, metric = 19.12% * 40;
 Minibatch[  31-  40]: loss = 0.176530 * 40, metric = 17.65% * 40;
 Minibatch[  41-  50]: loss = 0.161325 * 40, metric = 16.13% * 40;
 Minibatch[  51-  60]: loss = 0.143685 * 40, metric = 14.37% * 40;
 Minibatch[  61-  70]: loss = 0.118660 * 40, metric = 11.87% * 40;
 Minibatch[  71-  80]: loss = 0.082769 * 40, metric = 8.28% * 40;
 Minibatch[  81-  90]: loss = 0.046990 * 40, metric = 4.70% * 40;
 Minibatch[  91- 100]: loss = 0.048029 * 40, metric = 4.80% * 40;
 Minibatch[ 101- 110]: loss = 0.518075 * 40, metric = 51.81% * 40;
 Minibatch[ 111- 120]: loss = 0.022979 * 40, metric = 2.30% * 40;
 Minibatch[ 121- 130]: loss = 0.018714 * 40, metric = 1.87% * 40;
 Minibatch[ 131- 140]: loss = 0.193923 * 40, metric = 19.39% * 40;
 Minibatch[ 141- 150]: loss = 0.030105 * 40, metric = 3.01% * 40;
 Minibatch[ 151- 160]: loss = 0.007559 * 40, metric = 0.76% * 40;
 Minibatch[ 161- 170]: loss = 0.006812 * 40, metric = 0.68% * 40;
 Minibatch[ 171- 180]: loss = 0.009554 * 40, metric = 0.96% * 40;
 Minibatch[ 181- 190]: loss = 0.012426 * 40, metric = 1.24% * 40;
 Minibatch[ 191- 200]: loss = 0.012157 * 40, metric = 1.22% * 40;

In order for distribution to work properly, the minibatch loop should exit on all workers at the same time. Some of the workers can have more data then the others, so the exit condition of the loop should be based on the return value of the trainer (if no more work should be done by a particular worker this can be communicated by passing an empty minibatch to train_minibatch).

As has been noted before, the decisions inside the loop are based on the Trainer.total_number_of_samples_seen. Some of the operations (i.e. train_minibatch, checkpoint, cross validation, if done in a distributed fashion) require synchronization and to match among all the workers they use a global state - the global number of samples seen by the trainer.

Even though writing manual training loops brings all the flexibility to the user, it can also be error prone and require a lot of boilerplate code to make everything work. When this flexibility if not required, it is better to use a higher abstraction.

2. Using Function.train

Instead of writing the training loop manually and taking care of checkpointing and distribution herself, the user can delegate this aspects to the training session exposed through Function.train/test methods. It automatically takes care of the following things: 1. checkpointing 2. validation 3. testing/evaluation

All that is needed from the user is to provide the corresponding configuration parameters. In addition to the higher abstraction the training session is also implemented in C++, so it is generally faster than writing a loop in Python:

In [33]:
checkpoint = 'training_session.tmp'
#Please comment the line below if you want to restore from the checkpoint
if os.path.exists(checkpoint):
    os.remove(checkpoint)

# As before
mb_source = MinibatchSource(CTFDeserializer(input_file, streams))
test_mb_source = MinibatchSource(CTFDeserializer(input_file, streams), randomize=False, max_samples=100)

model_factory = Sequential([
    Dense(2, activation=cntk.ops.tanh),
    Dense(1)])
model = model_factory(features)

# Criterion function
@cntk.Function
def criterion_factory(f, l):
    z = model_factory(f)
    loss = cntk.squared_error(z, l)
    return (loss, loss)

criterion = criterion_factory(features, label)
learner = cntk.distributed.data_parallel_distributed_learner(cntk.sgd(model.parameters,
                                                                      cntk.learning_parameter_schedule_per_sample(0.1)))
progress_writer = cntk.logging.ProgressPrinter(freq=10)
checkpoint_config = cntk.CheckpointConfig(filename=checkpoint, frequency=checkpoint_frequency)
test_config = cntk.TestConfig(test_mb_source)

# Actual training
progress = criterion.train(mb_source, minibatch_size=minibatch_size,
                           model_inputs_to_streams={ features : mb_source['features'], label : mb_source['label'] },
                           max_samples=max_samples, parameter_learners=[learner],
                           callbacks=[progress_writer, checkpoint_config, test_config])

# When you use distributed learners, please call finalize MPI at the end of your script
# cntk.distributed.Communicator.finalize()
 Minibatch[   1-  10]: loss = 0.352425 * 40, metric = 35.24% * 40;
 Minibatch[  11-  20]: loss = 0.207848 * 40, metric = 20.78% * 40;
 Minibatch[  21-  30]: loss = 0.191173 * 40, metric = 19.12% * 40;
 Minibatch[  31-  40]: loss = 0.176530 * 40, metric = 17.65% * 40;
 Minibatch[  41-  50]: loss = 0.161325 * 40, metric = 16.13% * 40;
 Minibatch[  51-  60]: loss = 0.143685 * 40, metric = 14.37% * 40;
 Minibatch[  61-  70]: loss = 0.118660 * 40, metric = 11.87% * 40;
 Minibatch[  71-  80]: loss = 0.082769 * 40, metric = 8.28% * 40;
 Minibatch[  81-  90]: loss = 0.046990 * 40, metric = 4.70% * 40;
 Minibatch[  91- 100]: loss = 0.048029 * 40, metric = 4.80% * 40;
 Minibatch[ 101- 110]: loss = 0.518075 * 40, metric = 51.81% * 40;
 Minibatch[ 111- 120]: loss = 0.022979 * 40, metric = 2.30% * 40;
 Minibatch[ 121- 130]: loss = 0.018714 * 40, metric = 1.87% * 40;
 Minibatch[ 131- 140]: loss = 0.193923 * 40, metric = 19.39% * 40;
 Minibatch[ 141- 150]: loss = 0.030105 * 40, metric = 3.01% * 40;
 Minibatch[ 151- 160]: loss = 0.007559 * 40, metric = 0.76% * 40;
 Minibatch[ 161- 170]: loss = 0.006812 * 40, metric = 0.68% * 40;
 Minibatch[ 171- 180]: loss = 0.009554 * 40, metric = 0.96% * 40;
 Minibatch[ 181- 190]: loss = 0.012426 * 40, metric = 1.24% * 40;
 Minibatch[ 191- 200]: loss = 0.012157 * 40, metric = 1.22% * 40;
Finished Epoch[1]: loss = 0.118087 * 800, metric = 11.81% * 800 1.091s (733.3 samples/s);
Finished Evaluation [1]: Minibatch[1-4]: metric = 1.05% * 100;

Let’s see how to configure different aspects of the train method.

Progress tracking

In order to report progress, please provide an instance of the ProgressWriter. It has its own set of parameters to control how often to print the loss value. If you need to have a custom logic for retrieving current status, please consider implementing your own ProgressWriter.

Checkpointing

Checkpoint configuration specifies how often to save a checkpoint to the given file. The checkpointing frequency is specified in samples. When given, the method takes care of saving/restoring the state across the trainer/learners/minibatch source and propagating this information among distributed workers. If you need to preserve all checkpoints that were taken during training, please set preserveAll to true.

Validation

When cross validation config is given, the training session runs the validation on the specified minibatch source with the specified frequency and reports average metric error. The user can also provide a cross validation callback, that will be called with the specified frequency. It is up to the user to perform validation in the callback and return back True if the training should be continued, or False otherwise.

Testing

If the test configuration is given, after completion of training, the train method runs evaluation on the specified minibatch source. If you need to run only evaluation without training, consider using Function.test method instead.

For more advanced scenarios of use Function.train please see Tutorial 200