Under Development use with CAUTION

VGG Net on VGG dataset Multi GPU version

Lets provide the dataset details and protocol paths

In [1]:
DB_Base_Path='/net/kato/datasets/VGG_Face_Dataset/images/'
protocol_file='/net/kato/datasets/VGG_Face_Dataset/VGG_images.csv'

Importing requiered libraries

In [2]:
import cv2
import numpy as np
import csv
import random
import matplotlib.pyplot as plt
import cPickle
from multiprocessing import Process, Queue
from multiprocessing import pool
from multiprocessing.pool import ThreadPool
from functools import partial
%matplotlib inline

from caffe2.python import core,workspace,brew,optimizer,model_helper
from caffe2.proto import caffe2_pb2
from caffe2.python import data_parallel_model as dpm
Ignoring @/caffe2/caffe2/contrib/nccl:nccl_ops as it is not a valid file.
Ignoring @/caffe2/caffe2/contrib/gloo:gloo_ops as it is not a valid file.
Ignoring @/caffe2/caffe2/contrib/gloo:gloo_ops_gpu as it is not a valid file.

Lets Read the protocol file and split it into training, validation and test

In [3]:
protocol=csv.reader(open(protocol_file,'r'),delimiter=',')
next(protocol)
train=[]
val=[]
test=[]
for line in protocol:
    if line[2]=='1':
        train.append([line[0],int(line[1])])
    elif line[2]=='2':
        val.append([line[0],int(line[1])])
    else:
        test.append([line[0],int(line[1])])
In [4]:
gpus = range(4)
#device_option = caffe2_pb2.DeviceOption(device_type=caffe2_pb2.CUDA)

Defining the VGG Network

In [5]:
def VGG_Net(model,loss_scale):
#,no_of_ids=1014,is_test=0):
#    with core.DeviceScope(caffe2_pb2.DeviceOption(device_type=caffe2_pb2.CUDA)):
    if True:
#,cuda_gpu_id=0)) and caffe2_pb2.DeviceOption(device_type=caffe2_pb2.CUDA,cuda_gpu_id=1):
        #----- 3 x 224 x 224 --> 64 x 224 x 224 -----#
        conv1_1 = brew.conv(model, 'data', 'conv1_1', 3, 64, 3,pad=1,weight_init=('GaussianFill',{'mean':0.0, 'std':1e-2}))
        relu1_1 = brew.relu(model, conv1_1, 'relu1_1')
        #----- 64 x 224 x 224 --> 64 x 224 x 224 -----#
        conv1_2 = brew.conv(model, relu1_1, 'conv1_2', 64, 64, 3,pad=1,weight_init=('GaussianFill',{'mean':0.0, 'std':1e-2}))
        relu1_2 = brew.relu(model, conv1_2, 'relu1_2')
        #----- 64 x 224 x 224 --> 64 x 112 x 112 -----#
        pool1 = brew.max_pool(model, relu1_2, 'pool1', kernel=2, stride=2)

        #----- 64 x 112 x 112 --> 128 x 112 x 112 -----#
        conv2_1 = brew.conv(model, pool1, 'conv2_1', 64, 128, 3,pad=1,weight_init=('GaussianFill',{'mean':0.0, 'std':1e-2}))
        relu2_1 = brew.relu(model, conv2_1, 'relu2_1')
        #----- 128 x 112 x 112 --> 128 x 112 x 112 -----#
        conv2_2 = brew.conv(model, relu2_1, 'conv2_2', 128, 128, 3,pad=1,weight_init=('GaussianFill',{'mean':0.0, 'std':1e-2}))
        relu2_2 = brew.relu(model, conv2_2, 'relu2_2')
        #----- 128 x 112 x 112 --> 128 x 56 x 56 -----#
        pool2 = brew.max_pool(model, relu2_2, 'pool2', kernel=2, stride=2)

        #----- 128 x 56 x 56 --> 256 x 56 x 56 -----#
        conv3_1 = brew.conv(model, pool2, 'conv3_1', 128, 256, 3,pad=1,weight_init=('GaussianFill',{'mean':0.0, 'std':1e-2}))
        relu3_1 = brew.relu(model, conv3_1, 'relu3_1')
        #----- 256 x 56 x 56 --> 256 x 56 x 56 -----#
        conv3_2 = brew.conv(model, relu3_1, 'conv3_2', 256, 256, 3,pad=1,weight_init=('GaussianFill',{'mean':0.0, 'std':1e-2}))
        relu3_2 = brew.relu(model, conv3_2, 'relu3_2')
        #----- 256 x 56 x 56 --> 256 x 56 x 56 -----#
        conv3_3 = brew.conv(model, relu3_2, 'conv3_3', 256, 256, 3,pad=1,weight_init=('GaussianFill',{'mean':0.0, 'std':1e-2}))
        relu3_3 = brew.relu(model, conv3_3, 'relu3_3')
        #----- 256 x 56 x 56 --> 256 x 28 x 28 -----#
        pool3 = brew.max_pool(model, relu3_3, 'pool3', kernel=2, stride=2)

        #----- 256 x 28 x 28 --> 512 x 28 x 28 -----#
        conv4_1 = brew.conv(model, pool3, 'conv4_1', 256, 512, 3,pad=1,weight_init=('GaussianFill',{'mean':0.0, 'std':1e-2}))
        relu4_1 = brew.relu(model, conv4_1, 'relu4_1')
        #----- 512 x 28 x 28 --> 512 x 28 x 28 -----#
        conv4_2 = brew.conv(model, relu4_1, 'conv4_2', 512, 512, 3,pad=1,weight_init=('GaussianFill',{'mean':0.0, 'std':1e-2}))
        relu4_2 = brew.relu(model, conv4_2, 'relu4_2')
        #----- 512 x 28 x 28 --> 512 x 28 x 28 -----#
        conv4_3 = brew.conv(model, relu4_2, 'conv4_3', 512, 512, 3,pad=1,weight_init=('GaussianFill',{'mean':0.0, 'std':1e-2}))
        relu4_3 = brew.relu(model, conv4_3, 'relu4_3')
        #----- 512 x 28 x 28 --> 512 x 14 x 14 -----#
        pool4 = brew.max_pool(model, relu4_3, 'pool4', kernel=2, stride=2)

        #----- 512 x 14 x 14 --> 512 x 14 x 14 -----#
        conv5_1 = brew.conv(model, pool4, 'conv5_1', 512, 512, 3,pad=1,weight_init=('GaussianFill',{'mean':0.0, 'std':1e-2}))
        relu5_1 = brew.relu(model, conv5_1, 'relu5_1')
        #----- 512 x 14 x 14 --> 512 x 14 x 14 -----#
        conv5_2 = brew.conv(model, relu5_1, 'conv5_2', 512, 512, 3,pad=1,weight_init=('GaussianFill',{'mean':0.0, 'std':1e-2}))
        relu5_2 = brew.relu(model, conv5_2, 'relu5_2')
        #----- 512 x 14 x 14 --> 512 x 14 x 14 -----#
        conv5_3 = brew.conv(model, relu5_2, 'conv5_3', 512, 512, 3,pad=1,weight_init=('GaussianFill',{'mean':0.0, 'std':1e-2}))
        relu5_3 = brew.relu(model, conv5_3, 'relu5_3')
        #----- 512 x 14 x 14 --> 512 x 7 x 7 -----#
        pool5 = brew.max_pool(model, relu5_3, 'pool5', kernel=2, stride=2)

        fc6 = brew.fc(model, pool5, 'fc6', 25088, 4096)
#        fc6 = brew.fc(model, pool5, 'fc6', 25088, 4096)
        relu6 = brew.relu(model, fc6,'relu6')

        drop6 = brew.dropout(model, relu6, 'drop6', ratio=0.5, is_test=0)

        fc7 = brew.fc(model, drop6, 'fc7', 4096, 4096)
        relu7 = brew.relu(model, fc7,'relu7')
        drop7 = brew.dropout(model, relu7,'drop7',ratio=0.5,is_test=0)

        fc8 = brew.fc(model, drop7, 'fc8', 4096, 2622)
    #no_of_ids)
        softmax = brew.softmax(model, fc8, 'softmax')
        
        xent = model.LabelCrossEntropy([softmax, 'label'], 'xent')
        # compute the expected loss
        loss = model.AveragedLoss(xent, "loss")
        # track the accuracy of the model
        AddAccuracy(model, softmax)
        loss = model.Scale(loss, "loss", scale=loss_scale)       

    return [loss]

Lets Calculate the accuracy

In [6]:
def AddAccuracy(model, softmax):
    accuracy = model.Accuracy([softmax, 'label'], "accuracy")
    return accuracy

Lets define the training operators

In [7]:
def AddTrainingOperators(model):
    """
    opt = optimizer.build_sgd(model, base_learning_rate=1e-5, policy="step", stepsize=1, gamma=0.999, momentum=0.9)
#    model.AddWeightDecay(1e-4)
    """
#    brew.add_weight_decay(model, 1e-4)
    ITER = brew.iter(model, "iter")
#    ITER = model.Iter("iter")
    LR = model.LearningRate(ITER, "LR", base_lr=0.01, policy="step", stepsize=1, gamma=0.999, momentum=0.9 )
    ONE = model.param_init_net.ConstantFill([], "ONE", shape=[1], value=1.0)
    for param in model.GetParams():
        param_grad = model.param_to_grad[param]
        param_momentum = model.param_init_net.ConstantFill(
            [param], param + '_momentum', value=0.0
        )
        model.net.MomentumSGDUpdate(
            [param_grad, param_momentum, LR, param],
            [param_grad, param_momentum, param],
            momentum=0.9,
            nesterov=1,
        )
    return

Image Prefetching

GPU's are a vital resource so to save them from starvation we prefetch the images for processing.
We run the below function as a process. The major time consuming part in image prefetching is the disk response time to provide the images, therefore we use a thread pool to prefetch the images.
We start number of threads equal to the batch size, so that each image for the batch is requested simultaneously to disk.

In [8]:
def prefetch_image(data,q):
    random.shuffle(data)
    p = pool.ThreadPool(Expected_Batch_Size*3)
    p.map(partial(read_img,q),data)
    p.close()
    p.join()
    q.put(['END','END'])
    print "Image Prefetch Process Ends ..........."
    return

For each image we take a random crop of the size of network input

In [9]:
Network_Height=224
Network_Width=224
In [10]:
def read_img(prefetched_images,data):
    # Read image using opencv. Read in BGR by default.
    img=cv2.imread(DB_Base_Path+data[0])
    # swapping the axes to go from HWC to CHW
    img = img.swapaxes(1, 2).swapaxes(0, 1)
    start_h=random.randint(0,img.shape[1]-Network_Height)
    start_w=random.randint(0,img.shape[2]-Network_Width)
    img = img[:,start_h:start_h+Network_Height,start_w:start_w+Network_Width]/256
    subj=data[1]-1
    # Write them into a multiprocessing queue
    prefetched_images.put([img,subj])

Lets group them into batches, but first lets define the Batch size

In [11]:
Expected_Batch_Size=128
Batch_Size=32

Now, lets group them

In [12]:
def prefetch_batch (prefetched_images,prefetched_batches):
    data,subj=prefetched_images.get()
    count=0
    batch_img=[]
    batch_labels=[]
    while data!='END':
        batch_img.append(data)
        batch_labels.append(subj)
        count+=1
        if count%Batch_Size == 0:
            prefetched_batches.put([np.array(batch_img).astype(np.float32),np.array(batch_labels).astype(np.int32)])
            batch_img=[]
            batch_labels=[]
        data,subj=prefetched_images.get()
    prefetched_batches.put([np.array(batch_img).astype(np.float32),np.array(batch_labels).astype(np.int32)])
    prefetched_batches.put(['END','END'])
    prefetched_images.close()
    prefetched_batches.close()
    print "Batch Prefetch Process Ends ....................."
    return

Lets start prefetching
We use multiprocessing queues with a limit to avoid using too much memory as GPU computation may be slower

In [13]:
def start_prefetching(prefetch_list):
    prefetched_images = Queue(Batch_Size*25)
    prefetched_batches = Queue(51)
    image_prefetcher = Process(target=prefetch_image, args=(prefetch_list, prefetched_images))
    batch_prefetcher = Process(target=prefetch_batch, args=(prefetched_images,prefetched_batches))
    image_prefetcher.start()
    batch_prefetcher.start()
    return prefetched_batches

Defining the snapshots location and interval

In [14]:
Snapshot_location='snapshots/'
Snapshot_interval=3
root_folder='root'
No_Of_Epochs=21

Lets save a snapshot

In [15]:
def save_snapshot(model,iter_no):
    d={}
    for blob in model.GetParams():
        d[blob]=workspace.FetchBlob(blob)
    cPickle.dump(d,open(Snapshot_location+str(iter_no),'w'))
In [16]:
def add_image_input_ops(training_model):
    pass
In [17]:
workspace.ResetWorkspace()
training_model = model_helper.ModelHelper(name="training_net")

val_model = model_helper.ModelHelper(name="val_net", init_params=False)
In [18]:
in_blobs=[]
for de in gpus:
    in_blobs.append('gpu_'+str(de)+'/data')
    in_blobs.append('gpu_'+str(de)+'/label')
In [19]:
dpm.Parallelize_GPU(
    training_model,
    input_builder_fun=add_image_input_ops,
    forward_pass_builder_fun=VGG_Net,
    param_update_builder_fun=AddTrainingOperators,
    devices=gpus,
    optimize_gradient_memory=True
)
dpm.Parallelize_GPU(
    val_model,
    input_builder_fun=add_image_input_ops,
    forward_pass_builder_fun=VGG_Net,
    param_update_builder_fun=None,
    devices=gpus,
    optimize_gradient_memory=True
)
No handlers could be found for logger "data_parallel_model"
In [20]:
workspace.RunNetOnce(training_model.param_init_net)
workspace.RunNetOnce(val_model.param_init_net)
Out[20]:
True
In [21]:
workspace.CreateNet(training_model.net,overwrite=True,input_blobs=in_blobs)
workspace.CreateNet(val_model.net,overwrite=True,input_blobs=in_blobs)
---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
 in ()
----> 1 workspace.CreateNet(training_model.net,overwrite=True,input_blobs=in_blobs)
      2 workspace.CreateNet(val_model.net,overwrite=True,input_blobs=in_blobs)

/scratch/adhamija/caffe2_installation/caffe2/python/workspace.pyc in CreateNet(net, overwrite, input_blobs)
    145     for input_blob in input_blobs:
    146         C.create_blob(input_blob)
--> 147     return C.create_net(StringifyProto(net), overwrite)
    148 
    149 

RuntimeError: [enforce fail at operator.cc:26] blob != nullptr. op Conv: Encountered a non-existing input blob: gpu_2/data 
In [22]:
def run_one_epoch(model,prefetched_batches):
    loss=[]
    accuracy=[]
    d,l=prefetched_batches.get()
    while (d!='END'):
        for device in model._devices:
            if d=='END':
                break
            workspace.FeedBlob("gpu_"+str(device)+"/data", d, caffe2_pb2.DeviceOption(device_type=caffe2_pb2.CUDA,cuda_gpu_id=device))
            workspace.FeedBlob("gpu_"+str(device)+"/label", l, caffe2_pb2.DeviceOption(device_type=caffe2_pb2.CUDA,cuda_gpu_id=device))
            d,l=prefetched_batches.get()
        workspace.RunNet(model.net, num_iter=1)
        for device in model._devices:
            loss.append(workspace.FetchBlob("gpu_"+str(device)+"/loss"))
            accuracy.append(workspace.FetchBlob("gpu_"+str(device)+"/accuracy"))
            np.save('training_loss',loss)
            np.save('training_accuracy',accuracy)
        print loss,accuracy
#        exit()
In [23]:
print workspace.Blobs()
for e in range(No_Of_Epochs):
    prefetched_batches=start_prefetching(train)
    run_one_epoch(training_model,prefetched_batches)
    prefetched_batches=start_prefetching(val)
#    snap={}
[]
/net/home/store/home/adhamija/anaconda2/lib/python2.7/site-packages/ipykernel/__main__.py:6: FutureWarning: elementwise comparison failed; returning scalar instead, but in the future will perform elementwise comparison
STARTING
INserting
INsertED
/net/home/store/home/adhamija/anaconda2/lib/python2.7/site-packages/ipykernel/__main__.py:6: FutureWarning: elementwise comparison failed; returning scalar instead, but in the future will perform elementwise comparison
/net/home/store/home/adhamija/anaconda2/lib/python2.7/site-packages/ipykernel/__main__.py:8: FutureWarning: elementwise comparison failed; returning scalar instead, but in the future will perform elementwise comparison
INserting
INsertED
---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
 in ()
      2 for e in range(No_Of_Epochs):
      3     prefetched_batches=start_prefetching(train)
----> 4     run_one_epoch(training_model,prefetched_batches)
      5     prefetched_batches=start_prefetching(val)
      6 #    snap={}

 in run_one_epoch(model, prefetched_batches)
     16         workspace.RunNet(model.net, num_iter=1)
     17         for device in model._devices:
---> 18             loss.append(workspace.FetchBlob("gpu_"+str(device)+"/loss"))
     19             accuracy.append(workspace.FetchBlob("gpu_"+str(device)+"/accuracy"))
     20         print loss,accuracy

/scratch/adhamija/caffe2_installation/caffe2/python/workspace.pyc in FetchBlob(name)
    302       Fetched blob (numpy array or string) if successful
    303     """
--> 304     return C.fetch_blob(StringifyBlobName(name))
    305 
    306 

RuntimeError: [enforce fail at pybind_state.cc:152] ws->HasBlob(name). Can't find blob: gpu_0/loss 

- Akshay Raj Dhamija