空间金字塔池化层

文章Spatial Pyramid Pooling in Deep Convolutional Networks for Visual Recognition提出空间金字塔池化(spatial pyramid pooling)的概念,通过分离卷积层和全连接层架构,避免了固定大小的图像输入,能够有效提高子窗口的识别精度

关于输入

CNN架构必须固定其图像输入大小,这是由于全连接层需要固定大小的输入所限制的。为了保证输入大小,通常有两种处理方式:

  1. 裁剪(croping):无法包含整个图像,造成信息缺失
  2. 变形(warping):导致图像几何失真

空间金字塔池化层

在最后一个卷积层之上添加一个SPP层(空间金字塔池化层),通过池化特征从而得到固定长度的输入向量,再进一步输入到全连接层中。也就是说,在更深的网络层次中(在卷积层和全连通层之间)执行一些信息“聚合”操作,从而避免在开始时需要裁剪或扭曲输入数据

实现

上图实现了一个3级金字塔池化操作(1x1、2x2、4x4)。实际操作就是使用不同大小的池化层对输入特征图进行池化,然后在拼接在一起得到固定长度的特征向量

\([N, C, H, W] -> [N, C, Bins]\)

Note 1:空间金字塔池化操作仅对单层特征图进行池化操作,所以不影响输入特征图的图像大小和通道数

Note 2:空间金字塔池化层通过设置了固定的空间bin数量,保证输出向量的长度

计算

假定最后一个卷积层输出大小为\(a\times a\),而金字塔等级为\(n\times n\)(就是bin的个数)

通过滑动窗口的方式实现池化操作,窗口大小为\(win=\left \lceil a/n \right \rceil\),步长大小为\(str=\left \lfloor a/n \right \rfloor\)

符号\(\left \lfloor \cdot \right \rfloor\)表示向下去整,符号\(\left \lceil \cdot \right \rceil\)表示向上舍入

对于3级金字塔池化(3x3, 2x2, 1x1),其在caffe上实现如图所示

特性

SPP层包含如下特性:

  1. 无论输入大小如何,SPP都能够生成固定长度的输出,而在以前的深层网络中使用的滑动窗口池则不能实现
  2. SPP使用多级空间bin,而滑动窗口池仅使用单个窗口大小。多层池化已被证明对目标变形是健壮的
  3. 由于输入的灵活性,SPP可以池化不同尺度提取的特征

PyTorch定义

PyTorch使用AdaptiveMaxPool2d即可实现固定输出操作。空间金字塔池化层实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
from math import sqrt
import torch
import torch.nn as nn


class SpatialPyramidPooling(nn.Module):
"""Generate fixed length representation regardless of image dimensions
Based on the paper "Spatial Pyramid Pooling in Deep Convolutional Networks
for Visual Recognition" (https://arxiv.org/pdf/1406.4729.pdf)
:param [int] num_pools: Number of pools to split each input feature map into.
Each element must be a perfect square in order to equally divide the
pools across the feature map. Default corresponds to the original
paper's implementation
:param str mode: Specifies the type of pooling, either max or avg
"""

# 默认使用3级金字塔,其向量长度为21
def __init__(self, num_pools=(1, 4, 16), mode='max'):
super(SpatialPyramidPooling, self).__init__()
self.name = 'SpatialPyramidPooling'
if mode == 'max':
pool_func = nn.AdaptiveMaxPool2d
elif mode == 'avg':
pool_func = nn.AdaptiveAvgPool2d
else:
raise NotImplementedError(f"Unknown pooling mode '{mode}', expected 'max' or 'avg'")
self.pools = []
for p in num_pools:
side_length = sqrt(p)
if not side_length.is_integer():
raise ValueError(f'Bin size {p} is not a perfect square')
self.pools.append(pool_func(int(side_length)))

def forward(self, feature_maps):
"""Pool feature maps at different bin levels and concatenate
:param torch.tensor feature_maps: Arbitrarily shaped spatial and
channel dimensions extracted from any generic convolutional
architecture. Shape ``(N, C, H, W)``
:return torch.tensor pooled: Concatenation of all pools with shape
``(N, C, sum(num_pools))``
"""
assert feature_maps.dim() == 4, 'Expected 4D input of (N, C, H, W)'
batch_size = feature_maps.size(0)
channels = feature_maps.size(1)
pooled = []
for p in self.pools:
pooled.append(p(feature_maps).view(batch_size, channels, -1))
return torch.cat(pooled, dim=2)

AlexNet_SPP

AlexNet架构的基础上添加空间金字塔池化层(4级金字塔),实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class AlexNet_SPP(nn.Module):

def __init__(self, num_classes=1000):
super(AlexNet_SPP, self).__init__()
self.features = nn.Sequential(
nn.Conv2d(3, 64, kernel_size=11, stride=4, padding=2),
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=3, stride=2),
nn.Conv2d(64, 192, kernel_size=5, padding=2),
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=3, stride=2),
nn.Conv2d(192, 384, kernel_size=3, padding=1),
nn.ReLU(inplace=True),
nn.Conv2d(384, 256, kernel_size=3, padding=1),
nn.ReLU(inplace=True),
nn.Conv2d(256, 256, kernel_size=3, padding=1),
nn.ReLU(inplace=True),
# nn.MaxPool2d(kernel_size=3, stride=2),
)
self.spp = SpatialPyramidPooling(num_pools=(1, 4, 9, 36), mode='max')
# self.avgpool = nn.AdaptiveAvgPool2d((6, 6))
self.classifier = nn.Sequential(
nn.Dropout(),
nn.Linear(256 * 50, 4096),
nn.ReLU(inplace=True),
nn.Dropout(),
nn.Linear(4096, 4096),
nn.ReLU(inplace=True),
nn.Linear(4096, num_classes),
)

def forward(self, x):
x = self.features(x)
# x = self.avgpool(x)
x = self.spp(x)
x = torch.flatten(x, 1)
x = self.classifier(x)
return x

测试

训练参数

  • 损失函数:交叉熵损失
  • 优化器:Adam,学习率为1e-3
  • 衰减器:随步长衰减,每隔15轮,因子0.1
  • 迭代次数:50
  • 批量大小:128
  • 输入图像:缩放至(227, 227),随机水平翻转,归一化

实现代码

参考:SPP-net/py/classifier/

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# -*- coding: utf-8 -*-

"""
@date: 2020/3/26 下午2:33
@file: classifier.py
@author: zj
@description:
"""

import os
import time
import copy
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
import torchvision.transforms as transforms
from torchvision.datasets import ImageFolder
import torchvision

import models.alexnet_spp as alexnet_spp
import utils.util as util

data_root_dir = '../data/train_val/'
model_dir = '../data/models/'


def load_data(root_dir):
transform = transforms.Compose([
transforms.Resize((227, 227)),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

data_loaders = {}
dataset_sizes = {}
for phase in ['train', 'val']:
phase_dir = os.path.join(root_dir, phase)

data_set = ImageFolder(phase_dir, transform=transform)
data_loader = DataLoader(data_set, batch_size=128, shuffle=True, num_workers=8)

data_loaders[phase] = data_loader
dataset_sizes[phase] = len(data_set)

return data_loaders, dataset_sizes


def train_model(model, criterion, optimizer, scheduler, dataset_sizes, data_loaders, num_epochs=25, device=None):
since = time.time()

best_model_wts = copy.deepcopy(model.state_dict())
best_acc = 0.0

loss_dict = {'train': [], 'val': []}
acc_dict = {'train': [], 'val': []}
for epoch in range(num_epochs):
print('Epoch {}/{}'.format(epoch, num_epochs - 1))
print('-' * 10)

# Each epoch has a training and validation phase
for phase in ['train', 'val']:
if phase == 'train':
model.train() # Set model to training mode
else:
model.eval() # Set model to evaluate mode

running_loss = 0.0
running_corrects = 0

# Iterate over data.
for inputs, labels in data_loaders[phase]:
inputs = inputs.to(device)
labels = labels.to(device)

# zero the parameter gradients
optimizer.zero_grad()

# forward
# track history if only in train
with torch.set_grad_enabled(phase == 'train'):
outputs = model(inputs)
_, preds = torch.max(outputs, 1)
loss = criterion(outputs, labels)

# backward + optimize only if in training phase
if phase == 'train':
loss.backward()
optimizer.step()

# statistics
running_loss += loss.item() * inputs.size(0)
running_corrects += torch.sum(preds == labels.data)
if phase == 'train':
scheduler.step()

dataset_size = dataset_sizes[phase]

epoch_loss = running_loss / dataset_size
epoch_acc = running_corrects.double() / dataset_size
loss_dict[phase].append(epoch_loss)
acc_dict[phase].append(epoch_acc)

print('{} Loss: {:.4f} Acc: {:.4f}'.format(
phase, epoch_loss, epoch_acc))

# deep copy the model
if phase == 'val' and epoch_acc > best_acc:
best_acc = epoch_acc
best_model_wts = copy.deepcopy(model.state_dict())

print()

time_elapsed = time.time() - since
print('Training complete in {:.0f}m {:.0f}s'.format(
time_elapsed // 60, time_elapsed % 60))
print('Best val Acc: {:4f}'.format(best_acc))

# load best model weights
model.load_state_dict(best_model_wts)
return model, loss_dict, acc_dict


if __name__ == '__main__':
data_loaders, data_sizes = load_data(data_root_dir)
print(data_sizes)

res_loss = dict()
res_acc = dict()
for name in ['alexnet_spp', 'alexnet']:
if name == 'alexnet_spp':
model = alexnet_spp.AlexNet_SPP(num_classes=20)
else:
model = torchvision.models.AlexNet(num_classes=20)

device = util.get_device()
model = model.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)
lr_scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=15, gamma=0.1)

best_model, loss_dict, acc_dict = train_model(model, criterion, optimizer, lr_scheduler, data_sizes,
data_loaders, num_epochs=50, device=device)

# 保存最好的模型参数
util.check_dir(model_dir)
torch.save(best_model.state_dict(), os.path.join(model_dir, '%s.pth' % name))

res_loss[name] = loss_dict
res_acc[name] = acc_dict

print('train %s done' % name)
print()

util.save_png('loss', res_loss)
util.save_png('acc', res_acc)

训练结果

50轮迭代结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
{'train': 6301, 'val': 6307}
Epoch 0/49
----------
train Loss: 2.7173 Acc: 0.3292
val Loss: 2.4345 Acc: 0.3697
。。。
。。。
Epoch 49/49
----------
train Loss: 1.0477 Acc: 0.6720
val Loss: 1.6393 Acc: 0.5413

Training complete in 7m 31s
Best val Acc: 0.543206
train alexnet_spp done

Epoch 0/49
----------
train Loss: 2.7808 Acc: 0.3119
val Loss: 2.4566 Acc: 0.3697
。。。
。。。
Epoch 49/49
----------
train Loss: 1.0700 Acc: 0.6639
val Loss: 1.6357 Acc: 0.5365

Training complete in 7m 7s
Best val Acc: 0.540193
train alexnet done

小结

通过添加空间金字塔池化层,能够有效分离特征提取和分类模型,进一步提高模型分类能力

相关阅读