👉 This article is also published on Towards Data Science blog.
Dynamic Time Warping (DTW) is a way to compare two -usually temporal- sequences that do not sync up perfectly. It is a method to calculate the optimal matching between two sequences. DTW is useful in many domains such as speech recognition, data mining, financial markets, etc. It’s commonly used in data mining to measure the distance between two time-series.
In this post, we will go over the mathematics behind DTW. Then, two illustrative examples are provided to better understand the concept. If you are not interested in the math behind it, please jump to examples.
Formulation
Let’s assume we have two sequences like the following:
X=x_1,x_2,...,x_i,...,x_n \\ Y=y_1,y_2,...,y_j,...,y_m
The sequences X and Y can be arranged to form an n \text{-by-} m grid, where each point (i, j) is the alignment between x_i and y_j.
A warping path W maps the elements of X and Y to minimize the distance between them. W is a sequence of grid points (i,j). We will see an example of the warping path later.
Warping Path and DTW distance
The Optimal path to (i_k,j_k) can be computed by:
D_\text{min}(i_k,j_k) = \min_{i_{k−1},\;j_{k−1}} D_\text{min}(i_{k−1},j_{k−1}) + d(i_k,j_k∣i_{k−1},j_{k−1})
where d is the Euclidean distance d(i,j)=∣∣f_1(i)−f_2(j)∣∣
Overall path cost: D=\sum_k d(i_k,j_k)
Restrictions on the Warping function
The warping path is found using a dynamic programming approach to align two sequences. Going through all possible paths is “combinatorically explosive”see [1]. Therefore, for efficiency purposes, it’s important to limit the number of possible warping paths, and hence the following constraints are outlined:
- Boundary Condition: This constraint ensures that the warping path begins with the start points of both signals and terminates with their endpoints.
i_1=1, \; i_k=n \quad \text{and} \quad j_1=1, \; j_k=m
- Monotonicity condition: This constraint preserves the time-order of points (not going back in time).
i_{t-1} \le i_t \quad \text{and} \quad j_{t-1} \le j_t
- Continuity (step size) condition: This constraint limits the path transitions to adjacent points in time (not jumping in time).
i_t - i_{t-1} \le 1 \quad \text{and} \quad j_t - j_{t-1} \le 1
In addition to the above three constraints, there are other less frequent conditions for an allowable warping path:
- Warping window condition: Allowable points can be restricted to fall within a given warping window of width \omega (a positive integer).
|i_t - j_t| \le \omega
- Slope condition: The warping path can be constrained by restricting the slope, and consequently avoiding extreme movements in one direction.
An acceptable warping path has combinations of chess king moves that are:
- Horizontal moves: (i, j) \rightarrow (i, j+1)
- Vertical moves: (i, j) \rightarrow (i+1, j)
- Diagonal moves: (i, j) \rightarrow (i+1, j+1)
Implementation
Let’s import all python packages we need.
import pandas as pd
import numpy as np
# Plotting Packages
import matplotlib.pyplot as plt
import seaborn as sbn
import matplotlib as mpl
'figure.dpi'] = 300
mpl.rcParams[= dict(format="png", dpi=300, bbox_inches="tight")
savefig_options
# Computation packages
from scipy.spatial.distance import euclidean
from fastdtw import fastdtw
Let’s define a method to compute the accumulated cost matrix D for the warp path. The cost matrix uses the Euclidean distance to calculate the distance between every two points. The methods to compute the Euclidean distance matrix and accumulated cost matrix are defined below:
def compute_euclidean_distance_matrix(x, y) -> np.array:
"""Calculate distance matrix
This method calcualtes the pairwise Euclidean distance between two sequences.
The sequences can have different lengths.
"""
= np.zeros((len(y), len(x)))
dist for i in range(len(y)):
for j in range(len(x)):
= (x[j]-y[i])**2
dist[i,j] return dist
def compute_accumulated_cost_matrix(x, y) -> np.array:
"""Compute accumulated cost matrix for warp path using Euclidean distance
"""
= compute_euclidean_distance_matrix(x, y)
distances
# Initialization
= np.zeros((len(y), len(x)))
cost 0,0] = distances[0,0]
cost[
for i in range(1, len(y)):
0] = distances[i, 0] + cost[i-1, 0]
cost[i,
for j in range(1, len(x)):
0, j] = distances[0, j] + cost[0, j-1]
cost[
# Accumulated warp path cost
for i in range(1, len(y)):
for j in range(1, len(x)):
= min(
cost[i, j] -1, j], # insertion
cost[i-1], # deletion
cost[i, j-1, j-1] # match
cost[i+ distances[i, j]
)
return cost
Example 1
In this example, we have two sequences x and y with different lengths.
# Create two sequences
= [3, 1, 2, 2, 1]
x = [2, 0, 0, 3, 3, 1, 0] y
We cannot calculate the distance between x and y since they don’t have equal lengths (the code follows).
= plt.subplots(figsize=(14, 10))
fig, ax
# Remove the border and axes ticks
False)
fig.patch.set_visible('off')
ax.axis(
= [(i, x[i]) for i in np.arange(0, len(x))]
xx = [(j, y[j]) for j in np.arange(0, len(y))]
yy
for i, j in zip(xx, yy[:-2]):
0], j[0]], [i[1], j[1]], '--k', linewidth=4)
ax.plot([i[
'-ro', label='x', linewidth=4, markersize=20, markerfacecolor='lightcoral', markeredgecolor='lightcoral')
ax.plot(x, '-bo', label='y', linewidth=4, markersize=20, markerfacecolor='skyblue', markeredgecolor='skyblue')
ax.plot(y, "Euclidean Distance!??", fontsize=28, fontweight="bold")
ax.set_title(
"ex1_euclidean_distance.png", **savefig_options) fig.savefig(
Compute DTW distance and warp path
Many Python packages calculate the DTW by just providing the sequences and the type of distance (usually Euclidean by default). Here, we use a popular Python implementation of DTW that is FastDTW which is an approximate DTW algorithm with lower time and memory complexities see [2].
= fastdtw(x, y, dist=euclidean) dtw_distance, warp_path
Note that we are using SciPy’s distance function Euclidean that we imported earlier. For a better understanding of the warp path, let’s first compute the accumulated cost matrix and then visualize the path on a grid. The following code will plot a heatmap of the accumulated cost matrix.
= compute_accumulated_cost_matrix(x, y) cost_matrix
= plt.subplots(figsize=(12, 8))
fig, ax = sbn.heatmap(cost_matrix, annot=True, square=True, linewidths=0.1, cmap="YlGnBu", ax=ax)
ax
ax.invert_yaxis()
# Get the warp path in x and y directions
= [p[0] for p in warp_path]
path_x = [p[1] for p in warp_path]
path_y
# Align the path from the center of each cell
= [x+0.5 for x in path_x]
path_xx = [y+0.5 for y in path_y]
path_yy
='blue', linewidth=3, alpha=0.2)
ax.plot(path_xx, path_yy, color
"ex1_heatmap.png", **savefig_options) fig.savefig(
The color bar shows the cost of each point in the grid. As can be seen, the warp path (blue line) is going through the lowest cost on the grid. Let’s see the DTW distance and the warping path by printing these two variables.
print("DTW distance: ", dtw_distance)
print("Warp path: ", warp_path)
>>> DTW distance: 6.0
>>> Warp path: [(0, 0), (1, 1), (1, 2), (2, 3), (3, 4), (4, 5), (4, 6)]
The warping path starts at point (0, 0) and ends at (4, 6) by 6 moves. Let’s also calculate the accumulated cost most using the functions we defined earlier and compare the values with the heatmap.
= compute_accumulated_cost_matrix(x, y)
cost_matrix print(np.flipud(cost_matrix)) # Flipping the cost matrix for easier comparison with heatmap values!
>>> [[32. 12. 10. 10. 6.]
23. 11. 6. 6. 5.]
[19. 11. 5. 5. 9.]
[19. 7. 4. 5. 8.]
[19. 3. 6. 10. 4.]
[10. 2. 6. 6. 3.]
[1. 2. 2. 2. 3.]] [
The cost matrix is printed above has similar values to the heatmap.
Now let’s plot the two sequences and connect the mapping points. The code to plot the DTW distance between x and y is given below.
= plt.subplots(figsize=(14, 10))
fig, ax
# Remove the border and axes ticks
False)
fig.patch.set_visible('off')
ax.axis(
for [map_x, map_y] in warp_path:
'--k', linewidth=4)
ax.plot([map_x, map_y], [x[map_x], y[map_y]],
'-ro', label='x', linewidth=4, markersize=20, markerfacecolor='lightcoral', markeredgecolor='lightcoral')
ax.plot(x, '-bo', label='y', linewidth=4, markersize=20, markerfacecolor='skyblue', markeredgecolor='skyblue')
ax.plot(y, "DTW Distance", fontsize=28, fontweight="bold")
ax.set_title(
"ex1_dtw_distance.png", **savefig_options) fig.savefig(
Example 2
In this example, we will use two sinusoidal signals and see how they will be matched by calculating the DTW distance between them.
= np.linspace(start=0, stop=1, num=50)
time1 = time1[0:40]
time2
= 3 * np.sin(np.pi * time1) + 1.5 * np.sin(4*np.pi * time1)
x1 = 3 * np.sin(np.pi * time2 + 0.5) + 1.5 * np.sin(4*np.pi * time2 + 0.5) x2
Just like Example 1, let’s calculate the DTW distance and the warp path for x1 and x2 signals using FastDTW package.
= fastdtw(x1, x2, dist=euclidean) distance, warp_path
= plt.subplots(figsize=(16, 12))
fig, ax
# Remove the border and axes ticks
False)
fig.patch.set_visible('off')
ax.axis(
for [map_x, map_y] in warp_path:
'-k')
ax.plot([map_x, map_y], [x1[map_x], x2[map_y]],
='blue', marker='o', markersize=10, linewidth=5)
ax.plot(x1, color='red', marker='o', markersize=10, linewidth=5)
ax.plot(x2, color="both", which="major", labelsize=18)
ax.tick_params(axis
"ex2_dtw_distance.png", **savefig_options) fig.savefig(
As can be seen in above figure, the DTW distance between the two signals is particularly powerful when the signals have similar patterns. The extrema (maximum and minimum points) between the two signals are correctly mapped. Moreover, unlike Euclidean distance, we may see many-to-one mapping when DTW distance is used, particularly if the two signals have different lengths.
You may spot an issue with dynamic time warping from the figure above. Can you guess what it is?
The issue is around the head and tail of time-series that do not properly match. This is because the DTW algorithm cannot afford the warping invariance for at the endpoints. In short, the effect of this is that a small difference at the sequence endpoints will tend to contribute disproportionately to the estimated similarity see [3].
Conclusion
DTW is an algorithm to find an optimal alignment between two sequences and a useful distance metric to have in our toolbox. This technique is useful when we are working with two non-linear sequences, particularly if one sequence is a non-linear stretched/shrunk version of the other. The warping path is a combination of “chess king” moves that starts from the head of two sequences and ends with their tails.
Useful Links
References
Citation
@online{alizadeh2020,
author = {Alizadeh, Esmaeil},
title = {An {Illustrative} {Introduction} to {Dynamic} {Time}
{Warping}},
date = {2020-10-11},
url = {https://ealizadeh.com/blog/introduction-to-dynamic-time-warping/},
langid = {en}
}