Causal CNN
Franceschi, Jean-Yves, Aymeric Dieuleveut, and Martin Jaggi. "Unsupervised scalable representation learning for multivariate time series." Advances in neural information processing systems 32 (2019).
references :
- https://github.com/White-Link/UnsupervisedScalableRepresentationLearningTimeSeries
- https://arxiv.org/pdf/1901.10738.pdf
import torch
1. Chomp1D
class Chomp1d(torch.nn.Module):
"""
Removes the 's' last elements of a time series
-- Input : (B,C,L)
-- Output : (B,C,L-s)
"""
def __init__(self, chomp_size):
super(Chomp1d, self).__init__()
self.chomp_size = chomp_size
def forward(self, x):
return x[:, :, :-self.chomp_size]
chomp = Chomp1d(chomp_size = 30)
B = 64
C = 32
L = 100
long_ts = torch.rand((B,C,L))
short_ts = chomp(long_ts)
print(long_ts.shape)
print(short_ts.shape)
torch.Size([64, 32, 100])
torch.Size([64, 32, 70])
2. Squeeze
class SqueezeChannels(torch.nn.Module):
def __init__(self, squeeze_dim = 2):
super(SqueezeChannels, self).__init__()
self.squeeze_dim = squeeze_dim
def forward(self, x):
return x.squeeze(self.squeeze_dim)
squeeze = SqueezeChannels(squeeze_dim = 2)
before_squeeze = torch.rand(10,20,1,2)
after_squeeze = squeeze(before_squeeze)
print(before_squeeze.shape)
print(after_squeeze.shape)
torch.Size([10, 20, 1, 2])
torch.Size([10, 20, 2])
squeeze = SqueezeChannels(squeeze_dim = 3)
before_squeeze = torch.rand(10,20,2,1)
after_squeeze = squeeze(before_squeeze)
print(before_squeeze.shape)
print(after_squeeze.shape)
torch.Size([10, 20, 2, 1])
torch.Size([10, 20, 2])
3. CauCNN block
class CausalConvolutionBlock(torch.nn.Module):
"""
Causal convolution block
= 2 causal convolutions (with leaky ReLU) + parallel residual connection.
-- Input : (B, C_in, L)
-- Output : (B, C_out, L)
"""
def __init__(self, in_channels, out_channels, kernel_size, dilation,
final=False):
super(CausalConvolutionBlock, self).__init__()
# Left Padding ( = for "causality" )
padding = (kernel_size - 1) * dilation
#================================================================#
# [ 1st causal convolution ]
conv1 = torch.nn.Conv1d(in_channels, out_channels, kernel_size,
padding=padding, dilation = dilation)
conv1 = torch.nn.utils.weight_norm(conv1)
chomp1 = Chomp1d(padding) # Chomp ( no cheating )
relu1 = torch.nn.LeakyReLU()
#================================================================#
#================================================================#
# [ 2nd causal convolution ]
conv2 = torch.nn.Conv1d(out_channels, out_channels, kernel_size,
padding=padding, dilation = dilation)
conv2 = torch.nn.utils.weight_norm(conv2)
chomp2 = Chomp1d(padding)
relu2 = torch.nn.LeakyReLU()
#================================================================#
#================================================================#
# [ Causal network ]
self.causal = torch.nn.Sequential(conv1, chomp1, relu1,
conv2, chomp2, relu2)
# --- Residual connection
self.upordownsample = torch.nn.Conv1d(
in_channels, out_channels, 1
) if in_channels != out_channels else None
# --- Final activation function
self.relu = torch.nn.LeakyReLU() if final else None
def forward(self, x):
out_causal = self.causal(x)
res = x if self.upordownsample is None else self.upordownsample(x)
if self.relu is None:
return out_causal + res
else:
return self.relu(out_causal + res)
cau_cnn_block = CausalConvolutionBlock(in_channels=8, out_channels=32,
kernel_size=3, dilation=2, final=False)
B = 64
C_in = 8
L = 100
input = torch.randn((B, C_in, L))
output = cau_cnn_block(input)
print(input.shape)
print(output.shape)
torch.Size([64, 8, 100])
torch.Size([64, 32, 100])
4. CauCNN
class CausalCNN(torch.nn.Module):
"""
Causal CNN
= sequence of causal convolution blocks.
-- Input : (B, C_in, L)
-- Output : (B, C_out, L)
"""
def __init__(self, in_channels, channels, depth, out_channels,
kernel_size):
super(CausalCNN, self).__init__()
layers = [] # List of causal convolution blocks
dilation_size = 1 # Initial dilation size
for i in range(depth):
in_channels_block = in_channels if i == 0 else channels
layers += [CausalConvolutionBlock(
in_channels_block, channels, kernel_size, dilation_size
)]
dilation_size *= 2 # Doubles the dilation size at each step
# Last layer
layers += [CausalConvolutionBlock(
channels, out_channels, kernel_size, dilation_size
)]
self.network = torch.nn.Sequential(*layers)
def forward(self, x):
return self.network(x)
cau_cnn = CausalCNN(in_channels=8, mid_channels=32, depth=4,
out_channels=128, kernel_size=3)
B = 64
C_in = 8
L = 100
input = torch.randn((B, C_in, L))
output = cau_cnn(input)
print(input.shape)
print(output.shape)
torch.Size([64, 8, 100])
torch.Size([64, 128, 100])
5. CauCNN Encoder
class CausalCNNEncoder(torch.nn.Module):
"""
Encoder of a TS using a causal CNN
- (1) causal_cnn
----- (B, C_in, L) -> (B,C_out,L)
- (2) adaptive max pooling ( makes TS to fixed size )
----- (B, C_out, L) -> (B,C_out, 1)
- (3) squeeze
----- (B,C_out, 1) -> (B,C_out)
"""
def __init__(self, in_channels, mid_channels, depth, reduced_size,
out_channels, kernel_size):
super(CausalCNNEncoder, self).__init__()
causal_cnn = CausalCNN(in_channels, mid_channels, depth, reduced_size, kernel_size)
reduce_size = torch.nn.AdaptiveMaxPool1d(1)
squeeze = SqueezeChannels(squeeze_dim = 2) # Time dimension
linear = torch.nn.Linear(reduced_size, out_channels)
self.network = torch.nn.Sequential(causal_cnn, reduce_size, squeeze, linear)
def forward(self, x):
return self.network(x)
cau_cnn_encoder = CausalCNNEncoder(in_channels=8, mid_channels=32, depth=4,
reduced_size = 128, out_channels=20, kernel_size=3)
B = 64
C_in = 8
L = 100
input = torch.randn((B, C_in, L))
output = cau_cnn(input)
print(input.shape)
print(output.shape)
torch.Size([64, 8, 100])
torch.Size([64, 20])
+ \(\alpha\) ) LSTM
class LSTMEncoder(torch.nn.Module):
"""
Encoder of a TS using a LSTM ( 1D TS : C_in = 1 )
- Input : (B, C_in, L)
----------(B, hidden_size, L)
- Output : (B, )
"""
def __init__(self,input_dim=1, hidden_dim=256, output_dim=160 ):
super(LSTMEncoder, self).__init__()
self.lstm = torch.nn.LSTM(input_size=input_dim, hidden_size=hidden_dim, num_layers=2)
self.linear = torch.nn.Linear(hidden_dim, output_dim)
def forward(self, x):
print(x.shape)
print(x.permute(2, 0, 1).shape)
print(self.lstm(x.permute(2, 0, 1))[0].shape)
print(self.lstm(x.permute(2, 0, 1))[0][-1].shape)
print(self.linear(self.lstm(x.permute(2, 0, 1))[0][-1]).shape)
return self.linear(self.lstm(x.permute(2, 0, 1))[0][-1])
lstm_enc = LSTMEncoder(input_dim=1, hidden_dim=256, output_dim=160)
B = 64
C_in = 1
L = 100
input = torch.randn((B, C_in, L))
output = lstm_enc(input)
torch.Size([64, 1, 100])
torch.Size([100, 64, 1])
torch.Size([100, 64, 256])
torch.Size([64, 256])
torch.Size([64, 160])