# ==============================================================================
# Copyright (c) Microsoft. All rights reserved.
# Licensed under the MIT license. See LICENSE.md file in the project root
# for full license information.
# ==============================================================================
from operator import mul
from functools import reduce
import numpy as np
import cntk
import cntk.io.transforms as xforms
from cntk import ops, io, learners, internal
from cntk import Trainer
from cntk.layers.blocks import BlockFunction
[docs]class BlockApiSetup(object):
'''
Implement some special requirement ops
'''
@staticmethod
[docs] def convolution(output, kernel, stride, pad, kernel_init, bias_init, group, dilation, name):
'''
Implement convolution ops
Args:
output (int): the output channel size
kernel (list): the kernel size of filter, with format [width, height]
stride (list): the stride of convolution, with format [w_stride, h_stride]
pad (bool): auto padding or not
kernel_init (`np.array`): the tensor saving initialize values of filter
bias_init (`np.array`): the tensor saving initialize values of bias
group (int): the group size in the convolution
dilation (list): the dilation of convolution, with format [w_dilation, h_dilation]
name (str): the name of ops
Return:
:func:`~cntk.ops.as_block`: the function contains convolution ops
'''
def _conv_ops(weights, data):
return ops.convolution(weights, data, strides=(cntk.InferredDimension, ) + \
ops.sanitize_shape(stride), auto_padding=[False, pad, pad])
def _weights_parameter(output_channels, init, group_name):
dilation_kernel = [(k - 1) * d + 1 for k, d in zip(kernel, dilation)]
# expand kernel to simulate dilation
used_init = init.copy()
if dilation_kernel != kernel:
for axis in range(len(dilation)):
kernel_sequence = [x * dilation[axis] for x in range(kernel[axis])]
insert_lines = list(set([x for x in range(dilation_kernel[axis])]) ^
set(kernel_sequence))
for index in range(len(insert_lines)):
insert_lines[index] -= index
used_init = np.insert(used_init, insert_lines, 0, axis=len(init.shape) - axis - 1)
return ops.parameter(shape=(output_channels, cntk.InferredDimension) + ops.sanitize_shape(dilation_kernel),
init=used_init, name=group_name)
if group == 1:
w = _weights_parameter(output, kernel_init, '.'.join((name, 'W')))
else:
sub_output_channels = int(output / group)
groups_kernel_init = np.split(kernel_init, group)
groups_kernel = [_weights_parameter(sub_output_channels, groups_kernel_init[i],
'.'.join((name, str(i), 'W'))) for i in range(0, group)]
sub_input_channels = groups_kernel[0].shape[1]
if bias_init is not None:
b = ops.parameter(shape=(output, ), init=bias_init, name='.'.join((name, 'b')))
@BlockFunction('Convolution', name)
def _convolution(x):
if group == 1:
apply_x = _conv_ops(w, x)
else:
groups_data = [ops.slice(x, axis=0, begin_index=i * sub_input_channels,
end_index=(i + 1) * sub_input_channels) for i in range(0, group)]
apply_sub = [_conv_ops(group_kernel, group_data)
for group_kernel, group_data in zip(groups_kernel, groups_data)]
apply_x = ops.splice(*apply_sub, axis=0)
if bias_init is not None:
apply_x += b
return apply_x
return _convolution
@staticmethod
[docs] def linear(output_shape, input_shape, scale_init, bias_init, name):
'''
Implement linear ops, also known as full connection in Caffe
Args:
output_shape (tuple): the output channel size
input_shape (tuple): the input channel size
scale_init (`np.array`): the tensor saving initialize values of scale
bias_init (`np.array`): the tensor saving initialize values of bias
name (str): the name of ops
Return:
:func:`~cntk.ops.as_block`: the function contains linear ops
'''
sc = ops.parameter(shape=input_shape + output_shape, init=scale_init, name='.'.join((name, 'sc')))
b = ops.parameter(shape=output_shape, init=bias_init, name='.'.join((name, 'b')))
@BlockFunction('linear', name)
def _linear(x):
apply_x = ops.times(x, sc)
apply_x += b
return apply_x
return _linear
@staticmethod
[docs] def lrn(k, n, alpha, beta, name):
'''
Implement LRN ops
Args:
k (int): the factor k in LRN
n (int): the normalization radius
alpha (float): alpha factor in LRN
beta (float): beta factor in LRN
name (str): the name of ops
Return:
:func:`~cntk.ops.as_block`: the function contains lrn ops
'''
# @BlockFunction('lrn', name)
# def _lrn(x):
# x2 = cntk.ops.square(x)
# x2s = cntk.ops.reshape(x2, (1, cntk.InferredDimension), 0, 1)
# w = cntk.ops.constant(alpha / (2 * n - 1), (1, 2 * n - 1, 1, 1), name='W')
# y = cntk.ops.convolution(w, x2s)
# # reshape back to remove the fake singleton reduction dimension
# b = cntk.ops.reshape(y, cntk.InferredDimension, 0, 2)
# den = cntk.ops.exp(beta * cntk.ops.log(k + b))
# apply_x = cntk.ops.element_divide(x, den)
# return apply_x
@BlockFunction('lrn', name)
def _lrn(x):
return cntk.local_response_normalization(x, int(n - 1), k, alpha, beta)
return _lrn
[docs]class ApiSetup(object):
'''
Setup CNTK ops with given parameters
'''
@staticmethod
[docs] def convolution(cntk_layer, inputs):
'''
Setup convolution op with given parameters
Args:
cntk_layer (:class:`~cntk.contrib.crosstalkcaffe.unimodel.cntkmodel.CntkLayersDefinition`):
the layer definition of convolution op
inputs (list): a list contains all :class:`~cntk.ops.functions.Function` or
:class:`~cntk.input`
Return:
:func:`~cntk.ops.functions.Function`: instaced cntk convolution op
'''
sanitize_input = internal.sanitize_input(inputs[0])
params = cntk_layer.parameters
output_channel = params.output
kernel_size = params.kernel
kernel_shape = (output_channel, int(sanitize_input.shape[0] / params.group)) + tuple(kernel_size)
kernel_init = None
if cntk_layer.parameter_tensor:
kernel_data_tensor = cntk_layer.parameter_tensor[0]
kernel_init = np.asarray(kernel_data_tensor.data, dtype=np.float32)
kernel_init = np.reshape(kernel_init, newshape=kernel_shape)
bias_shape = (output_channel, ) + (1,) * 2
bias_init = None
if params.need_bias:
if cntk_layer.parameter_tensor:
bias_data_tensor = cntk_layer.parameter_tensor[1]
bias_init = np.asarray(bias_data_tensor.data, dtype=np.float32)
bias_init = np.reshape(bias_init, bias_shape)
return BlockApiSetup.convolution(output_channel, kernel_size, stride=params.stride, pad=params.auto_pad,
kernel_init=kernel_init, bias_init=bias_init,
group=params.group, dilation=params.dilation,
name=cntk_layer.op_name)(sanitize_input)
@staticmethod
[docs] def batch_norm(cntk_layer, inputs):
'''
Setup batch normalization op with given parameters
Args:
cntk_layer (:class:`~cntk.contrib.crosstalkcaffe.unimodel.cntkmodel.CntkLayersDefinition`):
the layer definition of batch normalization op
inputs (list): a list contains all :class:`~cntk.ops.functions.Function` or
:class:`~cntk.input`
Return:
:func:`~cntk.ops.functions.Function`: instaced cntk batch normalization op
'''
sanitize_input = internal.sanitize_input(inputs[0])
parameter_tensor = (sanitize_input.shape[0], )
scale_init = 1
bias_init = 0
mean_init = 1
var_init = 0
if cntk_layer.parameter_tensor:
if len(cntk_layer.parameter_tensor) < 3:
raise AssertionError('At least three tensors (saved_mean, saved_variance and scale) are needed')
mean_tensor = cntk_layer.parameter_tensor[0]
variance_tensor = cntk_layer.parameter_tensor[1]
global_scale = cntk_layer.parameter_tensor[2].data[0]
moving_average_factor = 1 / global_scale if global_scale != 0 else 0
mean_init = np.asarray(mean_tensor.data, dtype=np.float32) * moving_average_factor
var_init = np.asarray(variance_tensor.data, dtype=np.float32) * moving_average_factor
if len(cntk_layer.parameter_tensor) == 5:
scale_tensor = cntk_layer.parameter_tensor[3]
bias_tensor = cntk_layer.parameter_tensor[4]
scale_init = np.asarray(scale_tensor.data, dtype=np.float32)
bias_init = np.asarray(bias_tensor.data, dtype=np.float32)
scale_parameters = ops.parameter(parameter_tensor, init=scale_init, name='.'.join((cntk_layer.op_name, 'scale')))
bias_parameters = ops.parameter(parameter_tensor, init=bias_init, name='.'.join((cntk_layer.op_name, 'bias')))
mean_parameters = ops.parameter(parameter_tensor, init=mean_init, name='.'.join((cntk_layer.op_name, 'mean')))
var_parameters = ops.parameter(parameter_tensor, init=var_init, name='.'.join((cntk_layer.op_name, 'var')))
epsilon = cntk_layer.parameters.epsilon
return ops.batch_normalization(sanitize_input, scale_parameters, bias_parameters, mean_parameters,
var_parameters, True, use_cudnn_engine=False, epsilon=epsilon,
running_count=ops.constant(0),
name=cntk_layer.op_name)
@staticmethod
[docs] def pooling(cntk_layer, inputs):
'''
Setup pooling op with given parameters
Args:
cntk_layer (:class:`~cntk.contrib.crosstalkcaffe.unimodel.cntkmodel.CntkLayersDefinition`):
the layer definition of pooling op
inputs (list): a list contains all :class:`~cntk.ops.functions.Function` or
:class:`~cntk.input`
Return:
:func:`~cntk.ops.functions.Function`: instaced cntk pooling op
'''
sanitize_input = internal.sanitize_input(inputs[0])
pooling_type = ops.PoolingType_Average if cntk_layer.parameters.pooling_type else ops.PoolingType_Max
return ops.pooling(sanitize_input, pooling_type, tuple(cntk_layer.parameters.kernel),
strides=tuple(cntk_layer.parameters.stride),
auto_padding=[cntk_layer.parameters.auto_pad],
ceil_out_dim=True,
name=cntk_layer.op_name)
@staticmethod
[docs] def relu(cntk_layer, inputs):
'''
Setup ReLU op with given parameters
Args:
cntk_layer (:class:`~cntk.contrib.crosstalkcaffe.unimodel.cntkmodel.CntkLayersDefinition`):
the layer definition of ReLU op
inputs (list): a list contains all :class:`~cntk.ops.functions.Function` or
:class:`~cntk.input`
Return:
:func:`~cntk.ops.functions.Function`: instaced cntk ReLU op
'''
sanitize_input = internal.sanitize_input(inputs[0])
return ops.relu(sanitize_input, name=cntk_layer.op_name)
@staticmethod
[docs] def dense(cntk_layer, inputs):
'''
Setup dense op with given parameters
Args:
cntk_layer (:class:`~cntk.contrib.crosstalkcaffe.unimodel.cntkmodel.CntkLayersDefinition`):
the layer definition of dense op
inputs (list): a list contains all :class:`~cntk.ops.functions.Function` or
:class:`~cntk.input`
Return:
:func:`~cntk.ops.functions.Function`: instaced cntk dense op
'''
sanitize_input = internal.sanitize_input(inputs[0])
input_channel = sanitize_input.shape
output_channel = cntk_layer.parameters.num_output
flattened_channel = reduce(mul, list(input_channel))
scale_shape = input_channel + (output_channel, )
bias_shape = (output_channel, )
if cntk_layer.parameter_tensor:
if len(cntk_layer.parameter_tensor) != 2:
raise AssertionError('dense layer layer receives two inputs (scale/bias)')
scale_tensor = cntk_layer.parameter_tensor[0]
bias_tensor = cntk_layer.parameter_tensor[1]
scale_init = np.asarray(scale_tensor.data, np.float32)
if cntk_layer.parameters.transpose:
scale_init = np.reshape(scale_init, (output_channel, flattened_channel))
scale_init = np.transpose(scale_init).copy()
scale_init = np.reshape(scale_init, scale_shape)
else:
scale_init = np.reshape(scale_init, scale_shape)
bias_init = np.asarray(bias_tensor.data, np.float32)
return BlockApiSetup.linear(bias_shape, scale_shape, scale_init, bias_init, cntk_layer.op_name)(sanitize_input)
@staticmethod
[docs] def plus(cntk_layer, inputs):
'''
Setup plus op with given parameters
Args:
cntk_layer (:class:`~cntk.contrib.crosstalkcaffe.unimodel.cntkmodel.CntkLayersDefinition`):
the layer definition of dense op
inputs (list): a list contains all :class:`~cntk.ops.functions.Function` or
:class:`~cntk.input`
Return:
:func:`~cntk.ops.functions.Function`: instaced cntk dense op
'''
sanitize_left = ops.sanitize_input(inputs[0])
sanitize_right = ops.sanitize_input(inputs[1])
return ops.plus(sanitize_left, sanitize_right, name=cntk_layer.op_name)
@staticmethod
[docs] def dropout(cntk_layer, inputs):
'''
Setup dropout op with given parameters
Args:
cntk_layer (:class:`~cntk.contrib.crosstalkcaffe.unimodel.cntkmodel.CntkLayersDefinition`):
the layer definition of dropout op
inputs (list): a list contains all :class:`~cntk.ops.functions.Function` or
:class:`~cntk.input`
Return:
:func:`~cntk.ops.functions.Function`: instaced cntk dropout op
'''
sanitize_output = ops.sanitize_input(inputs[0])
return ops.dropout(sanitize_output, name=cntk_layer.op_name)
@staticmethod
[docs] def lrn(cntk_layer, inputs):
'''
Setup lrn op with given parameters
Args:
cntk_layer (:class:`~cntk.contrib.crosstalkcaffe.unimodel.cntkmodel.CntkLayersDefinition`):
the layer definition of lrn op
inputs (list): a list contains all :class:`~cntk.ops.functions.Function` or
:class:`~cntk.input`
Return:
:func:`~cntk.ops.functions.Function`: instaced cntk lrn op
'''
sanitize_output = ops.sanitize_input(inputs[0])
params = cntk_layer.parameters
return BlockApiSetup.lrn(params.k, params.kernel_size, params.alpha,
params.beta, cntk_layer.op_name)(sanitize_output)
@staticmethod
[docs] def splice(cntk_layer, inputs):
'''
Setup splice op with given parameters
Args:
cntk_layer (:class:`~cntk.contrib.crosstalkcaffe.unimodel.cntkmodel.CntkLayersDefinition`):
the layer definition of splice op
inputs (list): a list contains all :class:`~cntk.ops.functions.Function` or
:class:`~cntk.input`
Return:
:func:`~cntk.ops.functions.Function`: instaced cntk splice op
'''
return ops.splice(*inputs, axis=0, name=cntk_layer.op_name)
@staticmethod
[docs] def softmax(cntk_layer, inputs):
'''
Setup softmax op with given parameters
Args:
cntk_layer (:class:`~cntk.contrib.crosstalkcaffe.unimodel.cntkmodel.CntkLayersDefinition`):
the layer definition of softmax op
inputs (list): a list contains all :class:`~cntk.ops.functions.Function` or
:class:`~cntk.input`
Return:
:func:`~cntk.ops.functions.Function`: instaced cntk softmax op
'''
sanitize_output = ops.sanitize_input(inputs[0])
return ops.softmax(sanitize_output, name=cntk_layer.op_name)
[docs]class CntkApiInstance(object):
'''
Instace CNTK ops and network
'''
def __init__(self, cntk_uni_model, global_conf):
self._functions = {}
self._output = None
self._model_solver = global_conf.model_solver
self._source_solver = global_conf.source_solver
self._instance(cntk_uni_model)
def _instance(self, cntk_uni_model):
self.instance_input(cntk_uni_model.data_provider)
self.instance_functions(cntk_uni_model.cntk_sorted_layers, cntk_uni_model.cntk_layers)
[docs] def instance_functions(self, cntk_sorted_layers, cntk_layers):
'''
Instace all nodes into CNTK ops
Args:
cntk_sorted_layers (list): the list contains the name of instaced layers with
traversal order
cntk_layers (dict): the dict contains all layers definition
Return:
None
'''
unused_func = set()
for cntk_sorted_layer in cntk_sorted_layers:
cntk_layer = cntk_layers[cntk_sorted_layer]
local_inputs = []
for local_input in cntk_layer.inputs:
local_inputs.append(self._functions[local_input])
if self._functions[local_input] in unused_func:
unused_func.remove(self._functions[local_input])
self._functions[cntk_layer.op_name] = getattr(ApiSetup, cntk_layer.op_type.name\
)(cntk_layer, local_inputs)
unused_func.add(self._functions[cntk_layer.op_name])
self._output = ops.combine(list(unused_func), name='outputs')
[docs] def export_model(self):
'''
Save instanced CNTK model
Args:
None
Return:
None
'''
save_path = self._model_solver.cntk_model_path
self._output.save(save_path)
[docs] def get_model(self):
'''
Get instaced CNTK model
Args:
None
Return:
:func:`~cntk.ops.functions.Function`: the output node of CNTK
'''
return self._output
[docs] def get_functions(self):
'''
Return the functions of CNTK network
Args:
None
Return:
list: the instaced functions of CNTK
'''
return self._functions