Triplet Loss for Time Series
Franceschi, Jean-Yves, Aymeric Dieuleveut, and Martin Jaggi. "Unsupervised scalable representation learning for multivariate time series." Advances in neural information processing systems 32 (2019).
references :
1. Triplet Loss ( ver 1 )
length of positive sample = length of positive sample
Notation :
: batch sizeK
: number of negative samplesneg_samples
: negative samples- Length
: total length of TSlength
: maximum length of subseries- ` length_pos_neg` : length of pos/neg subseries
: length of anchor
- Indices
: START index of anchorstart_pos
: START index of positive sampleend_pos
: END index ~start_neg
: START index of negative sample
- Embedding vectors
: size = (B
: size = (B
: size = (B
) …….. make it \(K\) Times
class TripletLoss(torch.nn.modules.loss._Loss):
def __init__(self, compared_length, K, negative_penalty):
super(TripletLoss, self).__init__()
self.compared_length = compared_length
if self.compared_length is None:
self.compared_length = np.inf
self.K = K
self.negative_penalty = negative_penalty
def forward(self, batch, encoder, train, save_memory=False):
B = batch.size(0) # batch size
T = train.size(0) # total length
length = min(self.compared_length, train.size(2)) # MAX length of subseries
neg_samples = np.random.choice(T, size=(self.K, B))
neg_samples = torch.LongTensor(neg_samples)
length_pos_neg = np.random.randint(1, length+1) # length of subseries
length_anchor = np.random.randint(length_pos_neg, length + 1)
start_anchor = np.random.randint(0, length - length_anchor + 1, B)
# (1,B) pos & (K,B) neg
start_pos = np.random.randint(0, length_anchor - length_pos_neg + 1, B)
start_pos = start_anchor + start_pos # positive sample START INDEX
end_pos = start_pos + length_pos_neg # positive sample END INDEX
start_neg = np.random.randint(0, length - length_pos_neg + 1, (self.K, B)) # negative sample START INDEX
Z_anchor = encoder(
j: j + 1, :,
start_anchor[j]: start_anchor[j] + length_anchor
] for j in range(B)]))
Z_pos = encoder(
j: j + 1, :, end_pos[j] - length_pos_neg: end_pos[j]
] for j in range(B)]))
dim_Z = Z_anchor.size(1)
# Positive loss
loss = -torch.mean(torch.nn.functional.logsigmoid(torch.bmm(
Z_anchor.view(B, 1, dim_Z),
Z_pos.view(B, dim_Z, 1)
if save_memory:
loss = 0
del Z_pos
# Negative loss
multiplicative_ratio = self.negative_penalty / self.K
for i in range(self.K):
Z_neg = encoder([train[neg_samples[i, j]: neg_samples[i, j] + 1][
:, :,
start_neg[i, j]:
start_neg[i, j] + length_pos_neg
] for j in range(B)]))
loss += multiplicative_ratio * -torch.mean(
Z_anchor.view(B, 1, dim_Z),
Z_neg.view(B, dim_Z, 1))))
if save_memory and i != self.K - 1:
loss = 0
del Z_neg
return loss
2. Triplet Loss ( ver 2 )
length of positive sample \(\neq\) length of positive sample
- lengths among positive samples are also different
- lengths among negative ~
Notation :
: batch sizeK
: number of negative samplesneg_samples
: negative samples- Length
: total length of TSlength
: maximum length of subseries- ` lengths_pos` : lengths of pos subseries
- ` length_neg` : length of neg subseries
: length of anchor
- Indices
: START index of anchorstart_pos
: START index of positive sampleend_pos
: END index ~start_neg
: START index of negative sample
- Embedding vectors
: size = (B
: size = (B
: size = (B
) …….. make it \(K\) Times
class TripletLossVaryingLength(torch.nn.modules.loss._Loss):
def __init__(self, compared_length, K, negative_penalty):
super(TripletLossVaryingLength, self).__init__()
self.compared_length = compared_length
if self.compared_length is None:
self.compared_length = np.inf
self.K = K
self.negative_penalty = negative_penalty
def forward(self, batch, encoder, train, save_memory=False):
B = batch.size(0)
T = train.size(0)
max_length = train.size(2)
neg_samples = np.random.choice(T, size=(self.K, B))
neg_samples = torch.LongTensor(neg_samples)
with torch.no_grad():
lengths_batch = max_length - torch.sum(torch.isnan(batch[:, 0]), 1).data.cpu().numpy()
lengths_samples = np.empty((self.K, B), dtype=int)
for i in range(self.K):
lengths_samples[i] = max_length - torch.sum(
torch.isnan(train[neg_samples[i], 0]), 1
# lengths_anchor : (B)
# lengths_pos : (B)
# lengths_neg : (K,B)
lengths_pos = np.empty(B, dtype=int)
lengths_neg = np.empty((self.K, B), dtype=int)
for j in range(B):
lengths_pos[j] = np.random.randint(
1, min(self.compared_length, lengths_batch[j]) + 1)
for i in range(self.K):
lengths_neg[i, j] = np.random.randint(
1,min(self.compared_length, lengths_samples[i, j]) + 1)
length_anchor = np.array([np.random.randint(
lengths_pos[j], min(self.compared_length, lengths_batch[j]) + 1
) for j in range(B)])
# start_anchor : (B)
# start_pos : (B)
# start_neg : (K,B)
start_anchor = np.array([np.random.randint(
0, lengths_batch[j] - length_anchor[j] + 1
) for j in range(B)])
start_pos = np.array([np.random.randint(
0, high=length_anchor[j] - lengths_pos[j] + 1
) for j in range(B)])
end_pos = start_pos + lengths_pos
start_neg = np.array([[np.random.randint(
0, high=lengths_samples[i, j] - lengths_neg[i, j] + 1
) for j in range(B)] for i in range(self.K)])
# Z_anchor : (B, dim_Z)
# Z_pos : (B, dim_Z)
## Z_neg : BELOW
Z_anchor =[encoder(
j: j + 1, :,
start_anchor[j]: start_anchor[j] + length_anchor[j]
) for j in range(B)])
Z_pos =[encoder(
j: j + 1, :,
end_pos[j] - lengths_pos[j]: end_pos[j]
) for j in range(B)])
dim_Z = Z_anchor.size(1)
# Positive loss
loss = -torch.mean(torch.nn.functional.logsigmoid(torch.bmm(
Z_anchor.view(B, 1, dim_Z),
Z_pos.view(B, dim_Z, 1)
if save_memory:
loss = 0
del Z_pos
# Negative loss
multiplicative_ratio = self.negative_penalty / self.K
for i in range(self.K):
Z_neg =[encoder(
train[neg_samples[i, j]: neg_samples[i, j] + 1][
:, :,
start_neg[i, j]:
start_neg[i, j] + lengths_neg[i, j]]
) for j in range(B)])
loss += multiplicative_ratio * -torch.mean(
Z_anchor.view(B, 1, dim_Z),
Z_neg.view(B, dim_Z, 1))))
if save_memory and i != self.K - 1:
loss = 0
del Z_neg
return loss