Write custom deserializer

In this manual we describe how to load data using CNTK custom deserializers. CNTK also provides other means for loading data (i.e. built-in deserializers, user defined minibatch sources or feeding NumPy data explicitly), for more details please have a look at the How to feed data manual.

So, why use custom deserializers in the first place? The main reason is that you have some custom format that CNTK does not support out of the box and you would like to load your data efficiently.

Custom deserializers give the user the following advantages: 1. Flexibility: the user can deserialize any format she wants. 1. Simplicity: writing a new deserializer is relatively easy. 1. Efficiency: the data will be prefetched automatically (when possible) and moved to GPU memory on a separate thread. In conjunction with Function.train API this allows the main Python thread to concentrate fully on deserialization and IO prefetching. 1. Randomization: each new sweep the data will be randomized. 1. Checkpointing: checkpoints are supported out of the box. 1. Distribution: CNTK will know how to distribute chunks to different workers in a distributed environment.

*Please note, that due to CPython limitations only a single thread is allowed to interpret Python script at any point in time, so if you perform some heavy CPU work during deserialization this will still influence your performance because it cannot be effectively parallelized. We recommend using built-in deserializers for CPU hungry workloads.*

We start with some imports we need for the rest of this manual:

In [18]:
from __future__ import print_function
from __future__ import division
import os
import sys
import io
import cntk
import cntk.ops
import cntk.io
import cntk.train
import pandas as pd
import numpy as np
import random
import math
from scipy import sparse as sp
import cntk.tests.test_utils
from cntk.io import MinibatchSource

cntk.tests.test_utils.set_device_from_pytest_env() # (only needed for our build system)
cntk.cntk_py.set_fixed_random_seed(1) # fix the random seed so that LR examples are repeatable

1. Main concepts

The main responsibility of a deserializer is to take serialized data from an external storage and create an in-memory representation of the sequence (or sample) that can be consumed by the network. The interface of the deserializer is simple and consists of three main methods: - stream_infos: similarly to built-in deserializers, this function returns a list of streams this deserializer will provide. Each stream is described by its name, data type, format and shape - num_chunks: returns the number of data chunks.To make IO efficient the deserializer does not operate on a single sequence, instead it operates in chunks. A chunk is just a set of sequences that can be read in an efficient manner (for example, in case of CSV file on disk, it makes sense to read 32 or 64MBs in one go). - get_chunk(chunk_id): given a chunk identifier (0 <= chunk_id < num_chunks) the deserializer should return an array or a CSR matrix of samples/sequences.

Let’s implement a simple custom deserializer that will hold all its data in memory as a single chunk:

In [19]:
from cntk.io import UserDeserializer

# This class will take a simple dictionary of {name => (data)} in the constructor
# and will expose a single chunk.
class FromData(UserDeserializer):
    def __init__(self, data_streams):
        super(FromData, self).__init__()
        if not data_streams:
            raise(ValueError('at least one stream must be specified, in the form name=data'))

        self._data = data_streams   # [name] -> numpy.array or scipy.sparse.csr_matrix or list of those
        self._streams = []          # meta information about exposed stream
        num_sequences = -1          # total number of sequences (can be of length 1 in sample mode)
                                    # must be the same across all streams

        # Infer the meta information about streams
        for name, value in data_streams.items():
            is_sequence = isinstance(value, list) # is list - single elements are considered sequences

            # Infer sparsity
            element = value[0] if is_sequence else value
            if isinstance(element, np.ndarray):
                is_sparse = False
            elif isinstance(element, sp.csr_matrix):
                is_sparse = True
            else:
                raise TypeError('data must be a numpy.array or scipy.sparse.csr_matrix, or a list of those')

            # Infer sample shape
            sample_shape = value[0].shape[1:] if is_sequence else value.shape[1:]

            # Check that the number of sequences across all streams is the same
            stream_num_sequences = len(value) if is_sequence else value.shape[0]
            if num_sequences == -1:
                if stream_num_sequences == 0:
                    raise(ValueError('data is empty'))
                num_sequences = stream_num_sequences
            elif stream_num_sequences != num_sequences:
                raise ValueError('all data items must have the same first dimension')

            self._streams.append(dict(name = name, shape = sample_shape, is_sparse = is_sparse))

    # Return meta information about streams
    def stream_infos(self):
        return [cntk.io.StreamInformation(stream['name'], index, ['dense', 'sparse'][stream['is_sparse']],
                                          np.float32, stream['shape'])
                for index, stream in enumerate(self._streams)]

    # We have a single chunk only
    def num_chunks(self):
        return 1

    # actually return out chunk data as a dictionary name => data
    # where the data is a list of sequences or a csr_matrix/ndarray of samples
    def get_chunk(self, chunk_id):
        if chunk_id != 0:
            raise ValueError("Unexpected chunk id")
        return self._data

As can be seen above the main work is done in the constructor, where given the data we infer the information about the exposed streams. The implementation of get_chunk and num_chunk is degenerate for this case because we have a single chunk only.

The chunk is a dictionary that as keys contains the names of the streams and as values either a list of sequences or a NumPy array/CSR matrix (in sample mode when all sequences are of length 1).

Now given the defined above deserializer we can simply create a minibatch source with or without randomization:

In [20]:
# Dense and sparse samples non randomized
print('Non randomized')
N = 5
X = np.arange(3*N).reshape(N,3).astype(np.float32) # 5 rows of 3 values
Y = sp.csr_matrix(np.array([[1, 0, 0],
                            [0, 2, 0],
                            [0, 0, 3],
                            [4, 0, 0],
                            [0, 5, 0]], dtype=np.float32))

mbs = MinibatchSource([FromData(dict(x=X, y=Y))], randomize=False)
mb = mbs.next_minibatch(3)
result = mb[mbs.streams['y']].data.asarray()
assert (result == np.array([[[ 1, 0, 0]],
                            [[ 0, 2, 0]],
                            [[ 0, 0, 3]]], dtype=np.float32)).all()
print('Sparse')
print(result)

result = mb[mbs.streams['x']].data.asarray()
assert (result == np.array([[[ 0.,  1.,  2.]],
                            [[ 3.,  4.,  5.]],
                            [[ 6.,  7.,  8.]]],dtype=np.float32)).all()
print('Dense')
print(result)

print('Randomized')
mbs1 = MinibatchSource([FromData(dict(x=X, y=Y))], randomize=True)
mb1 = mbs1.next_minibatch(3)
print('Sparse')
print(mb1[mbs1.streams['y']].data.asarray())
print('Dense')
print(mb1[mbs1.streams['x']].data.asarray())

Non randomized
Sparse
[[[ 1.  0.  0.]]

 [[ 0.  2.  0.]]

 [[ 0.  0.  3.]]]
Dense
[[[ 0.  1.  2.]]

 [[ 3.  4.  5.]]

 [[ 6.  7.  8.]]]
Randomized
Sparse
[[[ 0.  5.  0.]]

 [[ 1.  0.  0.]]

 [[ 4.  0.  0.]]]
Dense
[[[ 12.  13.  14.]]

 [[  0.   1.   2.]]

 [[  9.  10.  11.]]]

2. Processing big files

The sample above though simple was only useful for data that can fit in memory. Let’s see how we can implement a deserializer that would allow us to ingest data that exceeds our memory.

Let’s generate a CSV file with 200 thousands lines (you can use adjust the number of rows as you see fit). Each line will have 150 features and a single label (151 columns in total):

x1 x150 y
0 0 0 0
1 1 1 0
... ... ... ...
199999 199999 ... 199999
In [21]:
import csv
filename = 'big_file.tmp'

with open(filename, 'w') as data:
    w = csv.writer(data, quoting=csv.QUOTE_ALL)
    for i in range(200000):
        w.writerow([float(i) for j in range(151)])
        if i % 20000 == 0:
            print('%d records generated' % i)
print("Input file is generated")

0 records generated
20000 records generated
40000 records generated
60000 records generated
80000 records generated
100000 records generated
120000 records generated
140000 records generated
160000 records generated
180000 records generated
Input file is generated

In order to consume this file, let’s write a CSV deserializer that will cut the given file in chunks of the specified size and parse a particular chunk using pandas module:

In [22]:
class CSVDeserializer(UserDeserializer):
    def __init__(self, filename, streams, chunksize = 32*1024*1024):
        super(CSVDeserializer, self).__init__()
        self._chunksize = chunksize
        self._filename = filename

        # Create the information about streams
        # based on the user provided data
        self._streams = [cntk.io.StreamInformation(s['name'], i, 'dense', np.float32, s['shape'])
                         for i, s in enumerate(streams)]

        # Define the number of chunks based on the file size
        self._num_chunks = int(math.ceil(os.stat(filename).st_size/chunksize))

        # Based on the information provided by the user decide what column span
        # belongs to which stream
        self._offsets = [0]
        for i, s in enumerate(self._streams):
            self._offsets.append(s.sample_shape[0] + self._offsets[-1])

    def stream_infos(self):
        return self._streams

    def num_chunks(self):
        return self._num_chunks

    # Ok, let's actually get the work done
    def get_chunk(self, chunk_id):
        fin = open(self._filename, "rb")

        # Some constants
        endline = '\n' if sys.version_info < (3,) else ord('\n')
        _64KB = 64 * 1024;

        # We would like to cut our chunk exactly on the line boundary.
        # So let's make sure if the chunk starts in the middle
        # of a row we move left to the beginning of this row
        offset = chunk_id * self._chunksize
        if offset != 0: # Need to find the beginning of the current row
            while offset > 0:
                offset -= _64KB # move left 64 KB
                fin.seek(offset)
                buf = fin.read(_64KB) # read the data
                index = buf.rindex(endline) # find the last \n and adapt the chunk offset
                if index != -1: # Found, breaking
                    offset += index
                    break
            if offset == 0:
                raise ValueError('A single row does not fit into the chunk, consider increasing the chunk size')

        # Now read the chunk data with adapted offset
        fin.seek(offset)
        size = (chunk_id + 1) * self._chunksize - offset
        data = fin.read(size)
        last_endline = data.rindex(endline) # Make sure we drop the last partial line
                                            # It will be consumed by the next chunk
        if last_endline == -1:
            raise ValueError('A single row does not fit into the chunk, consider increasing the chunk size')
        data = data[:last_endline + 1]

        # Parse the csv using pandas
        df = pd.read_csv(io.BytesIO(data), engine='c', dtype=np.float32, header=None)

        # Create a dictionary {name => data},
        # where data spans the number of columns specified by the user
        result = {}
        mat = df.as_matrix()
        for i, stream in enumerate(self._streams):
            result[stream.m_name] = np.ascontiguousarray(mat[:, self._offsets[i]:self._offsets[i + 1]])
        return result

Now let’s read through the data using the corresponding minibatch source:

In [24]:
import time

d = CSVDeserializer(filename=filename, streams=[dict(name='x', shape=(150,)), dict(name='y', shape=(1,))])
mbs = MinibatchSource([d], randomize=False, max_sweeps=1)

total_num_samples = 0
start = time.time()
while True:
    mb = mbs.next_minibatch(128)
    if not mb:
        break
    total_num_samples += mb[mbs.streams.x].number_of_samples
    if total_num_samples % 12800 == 0:
        sys.stdout.write('.')
end = time.time()
print()
print('Total number of samples %d, speed %f samples per second' % (total_num_samples, total_num_samples/(end-start)))
...............
Total number of samples 200000, speed 23691.068044 samples per second

3. Randomization

The randomization is enabled simply by instantiating the MinibatchSource with randomize=True. Randomization happens each sweep and is two-fold: firstly all chunks are randomized, then the window of chunks is created (controlled using randomization_window_in_chunks or randomization_window_in_samples parameters) and all sequences inside the window are randomized in their own turn.

4. Checkpointing

Checkpointing is done transparently for the deserializer. You can use get_checkpoint_state and restore_from_checkpoint on the MinibatchSource.

5. Distribution

Distribution is done transparently for the deserializer. In case of randomization, the distribution is based on the chunk id, in non randomized mode - on the sequence position in the sweep.

6. Threading

get_chunk is executed on the prefetch thread. Please be aware that real multithreading is not possible in CPython: > In CPython, due to the Global Interpreter Lock, only one thread can execute Python code at once (even though certain performance-oriented libraries might overcome this limitation). If you want your application to make better use of the computational resources of multi-core machines, you are advised to use multiprocessing or concurrent.futures.ProcessPoolExecutor. However, threading is still an appropriate model if you want to run multiple I/O-bound tasks simultaneously.