# ==============================================================================
# Copyright (c) Microsoft. All rights reserved.
# Licensed under the MIT license. See LICENSE.md file in the project root
# for full license information.
# ==============================================================================
'''
Basic building blocks that are semantically not layers (not used in a layered fashion),
e.g. the LSTM block.
'''
from __future__ import division
import numpy as np
from cntk import placeholder, combine, alias
from cntk.variables import Constant, Parameter
from cntk.ops import times, slice, sigmoid, tanh, softplus
#from .typing import Signature
from cntk.internal import _as_tuple
from cntk.initializer import glorot_uniform
from _cntk_py import InferredDimension
from cntk.default_options import get_default_override, default_override_or
from cntk.ops.functions import Function
_INFERRED = (InferredDimension,) # as a tuple, makes life easier
# call this for all untested branches
[docs]def UntestedBranchError(name):
raise NotImplementedError("Untested code branch: " + name)
# create the complete initializer for a given 'init' parameter, to pass to parameter()
# This is called from Parameter() and every place that injects rank parameters.
# It does a few things:
# - maps init_default_override_or_glorot_uniform to default --TODO: we should have a global setting for that
# - creates a new initializer object from an existing one, while updating members
# TODO: remove default resolution, only make this a conversion; then rename
def _initializer_for(init, rank_params=None):
if init is None:
raise ValueError("init parameter cannot be None")
# scalar constant: that's it, nothing further to do here
if np.isscalar(init):
# BUGBUG: this is sometimes required when dimensions are unknown; shouldn't.
from _cntk_py import constant_initializer
return constant_initializer(init)
#return init # TODO: change to this once this works, e.g. for layers.BatchNormalization()
# implant additional rank parameters
if rank_params:
from cntk.initializer import initializer_with_rank
init = initializer_with_rank(init, **rank_params)
return init
# helper to get the initial_state or the default
def _get_initial_state_or_default(initial_state):
# if initial_state is a tuple (multiple state vars), then apply this recursively to all
if isinstance(initial_state, tuple):
return tuple(_get_initial_state_or_default(s) for s in initial_state)
# if initial state is given and a numeric constant, then turn it into a Constant() object
elif initial_state is None:
return Constant(0) # note: don't pass None to past_value, because that would default to float32 --TODO: still the case?
elif np.isscalar(initial_state):
return Constant(initial_state, shape=(1))
else:
return initial_state # already in good shape: return as is
from cntk.ops.functions import BlockFunction # (deprecated)
def _inject_name(f, name):
'''
Call this at the end of any layer or block that takes an optional name argument.
'''
if name:
if not isinstance(f, Function):
f = Function(f)
if len(f.outputs) == 1:
f = alias(f, name=name)
else:
f = combine(list(f.outputs), name=name) # BUGBUG: Does this actually name things?
return f
[docs]def ForwardDeclaration(name='forward_declaration'):
'''
Helper for recurrent network declarations.
Returns a placeholder variable with an added method ``resolve_to()`` to be called
at the end to close the loop.
This is used for explicit graph building with recurrent connections.
Example:
>>> # create a graph with a recurrent loop to compute the length of an input sequence
>>> from cntk.layers.typing import *
>>> x = C.input_variable(**Sequence[Tensor[2]])
>>> ones_like_input = C.sequence.broadcast_as(1, x) # sequence of scalar ones of same length as input
>>> out_fwd = ForwardDeclaration() # placeholder for the state variables
>>> out = C.sequence.past_value(out_fwd, initial_state=0) + ones_like_input
>>> out_fwd.resolve_to(out)
>>> length = C.sequence.last(out)
>>> x0 = np.reshape(np.arange(6,dtype=np.float32),(1,3,2))
>>> x0
array([[[ 0., 1.],
[ 2., 3.],
[ 4., 5.]]], dtype=float32)
>>> length(x0)
array([ 3.], dtype=float32)
Returns:
:class:`~cntk.variables.Variable`: a placeholder variable with a method ``resolve_to()`` that resolves it to another variable
'''
var_fwd = placeholder(name=name)
def resolve_to(var):
#from cntk import cntk_py
#if isinstance(var, cntk_py.Function):
# var.replace_placeholders({var_fwd: var.output}) # resolves var_fwd := var
#else:
# TODO: ^^ should no longer be needed; delete once confirmed
var.owner.replace_placeholders({var_fwd: var}) # resolves var_fwd := var
var_fwd.resolve_to = resolve_to
return var_fwd
@Function
def identity(keep):
'''
identity()
Identity function.
This is useful to pass to layers that accept, e.g., a non-linearity,
but you wish to have none.
Example:
>>> linear_layer = Dense(500, activation=identity)
'''
# Note: We cannot use alias() here since parameter-shape inference cannot be done through alias().
return combine([keep])
[docs]def Stabilizer(steepness=4, enable_self_stabilization=default_override_or(True), name=''):
'''
Stabilizer(steepness=4, enable_self_stabilization=True, name='')
Layer factory function to create a `Droppo self-stabilizer <https://www.microsoft.com/en-us/research/wp-content/uploads/2016/11/SelfLR.pdf>`_.
It multiplies its input with a scalar that is learned.
This takes `enable_self_stabilization` as a flag that allows to disable itself. Useful if this is a global default.
Note:
Some other layers (specifically, recurrent units like :func:`~cntk.layers.blocks.LSTM`) also have the option to
use the ``Stabilizer()`` layer internally. That is enabled by passing `enable_self_stabilization=True`
to those layers. In conjunction with those, the rule is that an explicit ``Stabilizer()`` must be
inserted by the user for the main data input, whereas the recurrent layer will own the stabilizer(s)
for the internal recurrent connection(s).
Note:
Unlike the original paper, which proposed a linear or exponential scalar,
CNTK uses a sharpened Softplus: 1/steepness ln(1+e^{steepness*beta}).
The softplus behaves linear for weights around and above 1 (like the linear scalar) while guaranteeing
positiveness (like the exponentional variant) but is also more robust by avoiding exploding gradients.
Example:
>>> # recurrent model with self-stabilization
>>> from cntk.layers import *
>>> with default_options(enable_self_stabilization=True): # enable stabilizers by default for LSTM()
... model = Sequential([
... Embedding(300),
... Stabilizer(), # stabilizer for main data input of recurrence
... Recurrence(LSTM(512)), # LSTM owns its own stabilizers for the recurrent connections
... Stabilizer(),
... Dense(10)
... ])
Args:
steepness (`int`, defaults to 4):
enable_self_stabilization (bool, defaults to `False`): a flag that allows to disable itself. Useful if this is a global default
name (str, defaults to ''): the name of the Function instance in the network
Returns:
:class:`~cntk.ops.functions.Function`:
A function
'''
enable_self_stabilization = get_default_override(Stabilizer, enable_self_stabilization=enable_self_stabilization)
if not enable_self_stabilization: # disabled (typically through global option; otherwise one would not call this in the first place)
return identity
# parameters bound to this Function
init_param = np.log(np.exp(steepness) -1) / steepness # initialize so that factor is initially 1 (has no effect)
param = Parameter((), init=init_param, name='alpha')
beta = softplus(param, steepness=steepness)
# expression
@BlockFunction('Stabilizer', name)
def stabilize(x):
return beta * x
return stabilize
def _RecurrentBlock(type, shape, cell_shape, activation, use_peepholes,
init, init_bias,
enable_self_stabilization,
name=''):
'''
Helper to create a recurrent block of type 'LSTM', 'GRU', or RNNStep.
'''
has_projection = cell_shape is not None
shape = _as_tuple(shape)
cell_shape = _as_tuple(cell_shape) if cell_shape is not None else shape
if len(shape) != 1 or len(cell_shape) != 1:
raise ValueError("%s: shape and cell_shape must be vectors (rank-1 tensors)" % type)
# otherwise we'd need to fix slicing and Param initializers
stack_axis = -1 # for efficient computation, we stack multiple variables (along the fastest-changing one, to match BS)
# determine stacking dimensions
cell_shape_list = list(cell_shape)
stacked_dim = cell_shape_list[stack_axis]
cell_shape_list[stack_axis] = stacked_dim * {
'RNNStep': 1,
'GRU': 3,
'LSTM': 4
}[type]
cell_shape_stacked = tuple(cell_shape_list) # patched dims with stack_axis duplicated 4 times
cell_shape_list[stack_axis] = stacked_dim * {
'RNNStep': 1,
'GRU': 2,
'LSTM': 4
}[type]
cell_shape_stacked_H = tuple(cell_shape_list) # patched dims with stack_axis duplicated 4 times
# parameters
b = Parameter( cell_shape_stacked, init=init_bias, name='b') # bias
W = Parameter(_INFERRED + cell_shape_stacked, init=init, name='W') # input
H = Parameter(shape + cell_shape_stacked_H, init=init, name='H') # hidden-to-hidden
H1 = Parameter(shape + cell_shape, init=init, name='H1') if type == 'GRU' else None # hidden-to-hidden
Ci = Parameter( cell_shape, init=init, name='Ci') if use_peepholes else None # cell-to-hiddden {note: applied elementwise}
Cf = Parameter( cell_shape, init=init, name='Cf') if use_peepholes else None # cell-to-hiddden {note: applied elementwise}
Co = Parameter( cell_shape, init=init, name='Co') if use_peepholes else None # cell-to-hiddden {note: applied elementwise}
Wmr = Parameter(cell_shape + shape, init=init, name='P') if has_projection else None # final projection
# each use of a stabilizer layer must get its own instance
Sdh = Stabilizer(enable_self_stabilization=enable_self_stabilization, name='dh_stabilizer')
Sdc = Stabilizer(enable_self_stabilization=enable_self_stabilization, name='dc_stabilizer')
Sct = Stabilizer(enable_self_stabilization=enable_self_stabilization, name='c_stabilizer')
Sht = Stabilizer(enable_self_stabilization=enable_self_stabilization, name='P_stabilizer')
# define the model function itself
# general interface for Recurrence():
# (all previous outputs delayed, input) --> (outputs and state)
# where
# - the first output is the main output, e.g. 'h' for LSTM
# - the remaining outputs, if any, are additional state
# - if for some reason output != state, then output is still fed back and should just be ignored by the recurrent block
# LSTM model function
# in this case:
# (dh, dc, x) --> (h, c)
def lstm(dh, dc, x):
dhs = Sdh(dh) # previous values, stabilized
dcs = Sdc(dc)
# note: input does not get a stabilizer here, user is meant to do that outside
# projected contribution from input(s), hidden, and bias
proj4 = b + times(x, W) + times(dhs, H)
it_proj = slice (proj4, stack_axis, 0*stacked_dim, 1*stacked_dim) # split along stack_axis
bit_proj = slice (proj4, stack_axis, 1*stacked_dim, 2*stacked_dim)
ft_proj = slice (proj4, stack_axis, 2*stacked_dim, 3*stacked_dim)
ot_proj = slice (proj4, stack_axis, 3*stacked_dim, 4*stacked_dim)
# helper to inject peephole connection if requested
def peep(x, c, C):
return x + C * c if use_peepholes else x
it = sigmoid (peep (it_proj, dcs, Ci)) # input gate(t)
# TODO: should both activations be replaced?
bit = it * activation (bit_proj) # applied to tanh of input network
ft = sigmoid (peep (ft_proj, dcs, Cf)) # forget-me-not gate(t)
bft = ft * dc # applied to cell(t-1)
ct = bft + bit # c(t) is sum of both
ot = sigmoid (peep (ot_proj, Sct(ct), Co)) # output gate(t)
ht = ot * activation (ct) # applied to tanh(cell(t))
c = ct # cell value
h = times(Sht(ht), Wmr) if has_projection else \
ht
# returns the new state as a tuple with names but order matters
#return (Function.NamedOutput(h=h), Function.NamedOutput(c=c))
return (h, c)
# GRU model function
# in this case:
# (dh, x) --> (h)
# e.g. https://en.wikipedia.org/wiki/Gated_recurrent_unit
def gru(dh, x):
dhs = Sdh(dh) # previous value, stabilized
# note: input does not get a stabilizer here, user is meant to do that outside
# projected contribution from input(s), hidden, and bias
projx3 = b + times(x, W)
projh2 = times(dhs, H)
zt_proj = slice (projx3, stack_axis, 0*stacked_dim, 1*stacked_dim) + slice (projh2, stack_axis, 0*stacked_dim, 1*stacked_dim)
rt_proj = slice (projx3, stack_axis, 1*stacked_dim, 2*stacked_dim) + slice (projh2, stack_axis, 1*stacked_dim, 2*stacked_dim)
ct_proj = slice (projx3, stack_axis, 2*stacked_dim, 3*stacked_dim)
zt = sigmoid (zt_proj) # update gate z(t)
rt = sigmoid (rt_proj) # reset gate r(t)
rs = dhs * rt # "cell" c
ct = activation (ct_proj + times(rs, H1))
ht = (1 - zt) * ct + zt * dhs # hidden state ht / output
# for comparison: CUDNN_GRU
# i(t) = sigmoid(W_i x(t) + R_i h(t-1) + b_Wi + b_Ru)
# r(t) = sigmoid(W_r x(t) + R_r h(t-1) + b_Wr + b_Rr) --same up to here
# h'(t) = tanh(W_h x(t) + r(t) .* (R_h h(t-1)) + b_Wh + b_Rh) --r applied after projection? Would make life easier!
# h(t) = (1 - i(t) .* h'(t)) + i(t) .* h(t-1) --TODO: need to confirm bracketing with NVIDIA
h = times(Sht(ht), Wmr) if has_projection else \
ht
# returns the new state as a tuple with names but order matters
#return Function.NamedOutput(h=h)
return h
def rnn_step(dh, x):
dhs = Sdh(dh) # previous value, stabilized
ht = activation (times(x, W) + times(dhs, H) + b)
h = times(Sht(ht), Wmr) if has_projection else \
ht
#return Function.NamedOutput(h=h)
return h
function = {
'RNNStep': rnn_step,
'GRU': gru,
'LSTM': lstm
}[type]
# return the corresponding lambda as a CNTK Function
return BlockFunction(type, name)(function)
[docs]def LSTM(shape, cell_shape=None, activation=default_override_or(tanh), use_peepholes=default_override_or(False),
init=default_override_or(glorot_uniform()), init_bias=default_override_or(0),
enable_self_stabilization=default_override_or(False),
name=''):
'''
LSTM(shape, cell_shape=None, activation=tanh, use_peepholes=False, init=glorot_uniform(), init_bias=0, enable_self_stabilization=False, name='')
Layer factory function to create an LSTM block for use inside a recurrence.
The LSTM block implements one step of the recurrence and is stateless. It accepts the previous state as its first two arguments,
and outputs its new state as a two-valued tuple ``(h,c)``.
Example:
>>> # a typical recurrent LSTM layer
>>> from cntk.layers import *
>>> lstm_layer = Recurrence(LSTM(500))
Args:
shape (`int` or `tuple` of `ints`): vector or tensor dimension of the output of this layer
cell_shape (tuple, defaults to `None`): if given, then the output state is first computed at `cell_shape`
and linearly projected to `shape`
activation (:class:`~cntk.ops.functions.Function`, defaults to :func:`~cntk.ops.tanh`): function to apply at the end, e.g. `relu`
use_peepholes (bool, defaults to `False`):
init (scalar or NumPy array or :mod:`cntk.initializer`, defaults to `glorot_uniform`): initial value of weights `W`
init_bias (scalar or NumPy array or :mod:`cntk.initializer`, defaults to 0): initial value of weights `b`
enable_self_stabilization (bool, defaults to `False`): if `True` then add a :func:`~cntk.layers.blocks.Stabilizer`
to all state-related projections (but not the data input)
name (str, defaults to ''): the name of the Function instance in the network
Returns:
:class:`~cntk.ops.functions.Function`:
A function ``(prev_h, prev_c, input) -> (h, c)`` that implements one step of a recurrent LSTM layer.
'''
activation = get_default_override(LSTM, activation=activation)
use_peepholes = get_default_override(LSTM, use_peepholes=use_peepholes)
init = get_default_override(LSTM, init=init)
init_bias = get_default_override(LSTM, init_bias=init_bias)
enable_self_stabilization = get_default_override(LSTM, enable_self_stabilization=enable_self_stabilization)
return _RecurrentBlock('LSTM', shape, cell_shape, activation=activation, use_peepholes=use_peepholes,
init=init, init_bias=init_bias,
enable_self_stabilization=enable_self_stabilization, name=name)
[docs]def RNNStep(shape, cell_shape=None, activation=default_override_or(sigmoid),
init=default_override_or(glorot_uniform()), init_bias=default_override_or(0),
enable_self_stabilization=default_override_or(False),
name=''):
'''
RNNStep(shape, cell_shape=None, activation=sigmoid, init=glorot_uniform(), init_bias=0, enable_self_stabilization=False, name='')
Layer factory function to create a plain RNN block for use inside a recurrence.
The RNN block implements one step of the recurrence and is stateless. It accepts the previous state as its first argument,
and outputs its new state.
Example:
>>> # a plain relu RNN layer
>>> from cntk.layers import *
>>> relu_rnn_layer = Recurrence(RNNStep(500, activation=C.relu))
Args:
shape (`int` or `tuple` of `ints`): vector or tensor dimension of the output of this layer
cell_shape (tuple, defaults to `None`): if given, then the output state is first computed at `cell_shape`
and linearly projected to `shape`
activation (:class:`~cntk.ops.functions.Function`, defaults to signmoid): function to apply at the end, e.g. `relu`
init (scalar or NumPy array or :mod:`cntk.initializer`, defaults to `glorot_uniform`): initial value of weights `W`
init_bias (scalar or NumPy array or :mod:`cntk.initializer`, defaults to 0): initial value of weights `b`
enable_self_stabilization (bool, defaults to `False`): if `True` then add a :func:`~cntk.layers.blocks.Stabilizer`
to all state-related projections (but not the data input)
name (str, defaults to ''): the name of the Function instance in the network
Returns:
:class:`~cntk.ops.functions.Function`:
A function ``(prev_h, input) -> h`` where ``h = activation(input @ W + prev_h @ R + b)``
'''
activation = get_default_override(RNNStep, activation=activation)
init = get_default_override(RNNStep, init=init)
init_bias = get_default_override(RNNStep, init_bias=init_bias)
enable_self_stabilization = get_default_override(RNNStep, enable_self_stabilization=enable_self_stabilization)
return _RecurrentBlock('RNNStep', shape, cell_shape, activation=activation, use_peepholes=False,
init=init, init_bias=init_bias,
enable_self_stabilization=enable_self_stabilization, name=name)
# Old name of this, deprecated
[docs]def RNNUnit(shape, cell_shape=None, activation=default_override_or(sigmoid),
init=default_override_or(glorot_uniform()), init_bias=default_override_or(0),
enable_self_stabilization=default_override_or(False),
name=''):
'''
RNNUnit(shape, cell_shape=None, activation=sigmoid, init=glorot_uniform(), init_bias=0, enable_self_stabilization=False, name='')
This is a deprecated name for :func:`~cntk.layers.blocks.RNNStep`. Use that name instead.
'''
activation = get_default_override(RNNUnit, activation=activation)
init = get_default_override(RNNUnit, init=init)
init_bias = get_default_override(RNNUnit, init_bias=init_bias)
enable_self_stabilization = get_default_override(RNNUnit, enable_self_stabilization=enable_self_stabilization)
warnings.warn('This name will be removed in future versions. Please use '
'RNNStep(...) instead, which is identical except for its name', DeprecationWarning)
return _RecurrentBlock('RNNStep', shape, cell_shape, activation=activation, use_peepholes=False,
init=init, init_bias=init_bias,
enable_self_stabilization=enable_self_stabilization, name=name)
[docs]def GRU(shape, cell_shape=None, activation=default_override_or(tanh),
init=default_override_or(glorot_uniform()), init_bias=default_override_or(0),
enable_self_stabilization=default_override_or(False),
name=''):
'''
GRU(shape, cell_shape=None, activation=tanh, init=glorot_uniform(), init_bias=0, enable_self_stabilization=False, name='')
Layer factory function to create a GRU block for use inside a recurrence.
The GRU block implements one step of the recurrence and is stateless. It accepts the previous state as its first argument,
and outputs its new state.
Example:
>>> # a gated recurrent layer
>>> from cntk.layers import *
>>> gru_layer = Recurrence(GRU(500))
Args:
shape (`int` or `tuple` of `ints`): vector or tensor dimension of the output of this layer
cell_shape (tuple, defaults to `None`): if given, then the output state is first computed at `cell_shape`
and linearly projected to `shape`
activation (:class:`~cntk.ops.functions.Function`, defaults to :func:`~cntk.ops.tanh`): function to apply at the end, e.g. `relu`
init (scalar or NumPy array or :mod:`cntk.initializer`, defaults to `glorot_uniform`): initial value of weights `W`
init_bias (scalar or NumPy array or :mod:`cntk.initializer`, defaults to 0): initial value of weights `b`
enable_self_stabilization (bool, defaults to `False`): if `True` then add a :func:`~cntk.layers.blocks.Stabilizer`
to all state-related projections (but not the data input)
name (str, defaults to ''): the name of the Function instance in the network
Returns:
:class:`~cntk.ops.functions.Function`:
A function ``(prev_h, input) -> h`` that implements one step of a recurrent GRU layer.
'''
activation = get_default_override(GRU, activation=activation)
init = get_default_override(GRU, init=init)
init_bias = get_default_override(GRU, init_bias=init_bias)
enable_self_stabilization = get_default_override(GRU, enable_self_stabilization=enable_self_stabilization)
return _RecurrentBlock('GRU', shape, cell_shape, activation=activation, use_peepholes=False,
init=init, init_bias=init_bias,
enable_self_stabilization=enable_self_stabilization, name=name)