An Illustrative Introduction to Dynamic Time Warping
An Illustrative Introduction to Dynamic Time Warping

An Illustrative Introduction to Dynamic Time Warping

Reading Time (min)
7 min read
Tags
Dynamic ProgrammingGetting StartedPattern RecognitionPythonTime Series Analysis
Last Updated
Feb 25, 2021
Show on Homepage?
Subtitle
👉

This blog post was originally published on Towards Data Science blog.

Table of Content

Dynamic Time Warping (DTW) is a way to compare two -usually temporal- sequences that do not sync up perfectly. It is a method to calculate the optimal matching between two sequences. DTW is useful in many domains such as speech recognition, data mining, financial markets, etc. It’s commonly used in data mining to measure the distance between two time-series.

In this post, we will go over the mathematics behind DTW. Then, two illustrative examples are provided to better understand the concept. If you are not interested in the math behind it, please jump to examples.

Formulation

Let’s assume we have two sequences like the following:

The sequences and can be arranged to form an grid, where each point is the alignment between and .

A warping path maps the elements of and to minimize the distance between them. is a sequence of grid points . We will see an example of the warping path later.

Warping Path and DTW distance

The Optimal path to can be computed by:

where is the Euclidean distance

Overall path cost:

Restrictions on the Warping function

The warping path is found using a dynamic programming approach to align two sequences. Going through all possible paths is “combinatorically explosive”[1]. Therefore, for efficiency purposes, it’s important to limit the number of possible warping paths, and hence the following constraints are outlined:

  • Boundary Condition: This constraint ensures that the warping path begins with the start points of both signals and terminates with their endpoints.
  • Monotonicity condition: This constraint preserves the time-order of points (not going back in time).
  • Continuity (step size) condition: This constraint limits the path transitions to adjacent points in time (not jumping in time).

In addition to the above three constraints, there are other less frequent conditions for an allowable warping path:

  • Warping window condition: Allowable points can be restricted to fall within a given warping window of width (a positive integer).
  • Slope condition: The warping path can be constrained by restricting the slope, and consequently avoiding extreme movements in one direction.

An acceptable warping path has combinations of chess king moves that are:

  • Horizontal moves: 
  • Vertical moves: 
  • Diagonal moves: 

Implementation

Let’s import all python packages we need.

import pandas as pd
import numpy as np

# Plotting Packages
import matplotlib.pyplot as plt
import seaborn as sbn

import matplotlib as mpl
mpl.rcParams['figure.dpi'] = 300
savefig_options = dict(format="png", dpi=300, bbox_inches="tight")


# Computation packages
from scipy.spatial.distance import euclidean
from fastdtw import fastdtw

Let’s define a method to compute the accumulated cost matrix for the warp path. The cost matrix uses the Euclidean distance to calculate the distance between every two points. The methods to compute the Euclidean distance matrix and accumulated cost matrix are defined below:

def compute_euclidean_distance_matrix(x, y) -> np.array:
    """Calculate distance matrix
    This method calcualtes the pairwise Euclidean distance between two sequences.
    The sequences can have different lengths.
    """
    dist = np.zeros((len(y), len(x)))
    for i in range(len(y)):
        for j in range(len(x)):
            dist[i,j] = (x[j]-y[i])**2
    return dist
def compute_accumulated_cost_matrix(x, y) -> np.array:
    """Compute accumulated cost matrix for warp path using Euclidean distance
    """
    distances = compute_euclidean_distance_matrix(x, y)

    # Initialization
    cost = np.zeros((len(y), len(x)))
    cost[0,0] = distances[0,0]
    
    for i in range(1, len(y)):
        cost[i, 0] = distances[i, 0] + cost[i-1, 0]  
        
    for j in range(1, len(x)):
        cost[0, j] = distances[0, j] + cost[0, j-1]  

    # Accumulated warp path cost
    for i in range(1, len(y)):
        for j in range(1, len(x)):
            cost[i, j] = min(
                cost[i-1, j],    # insertion
                cost[i, j-1],    # deletion
                cost[i-1, j-1]   # match
            ) + distances[i, j] 
            
    return cost

Example 1

In this example, we have two sequences and  with different lengths.

# Create two sequences
x = [3, 1, 2, 2, 1]
y = [2, 0, 0, 3, 3, 1, 0]

We cannot calculate the distance between and since they don’t have equal lengths (the code follows).

Example 1: Euclidean distance between x and y (is it possible? 🤔 )
Example 1: Euclidean distance between x and y (is it possible? 🤔 )
fig, ax = plt.subplots(figsize=(14, 10))

# Remove the border and axes ticks
fig.patch.set_visible(False)
ax.axis('off')

xx = [(i, x[i]) for i in np.arange(0, len(x))]
yy = [(j, y[j]) for j in np.arange(0, len(y))]

for i, j in zip(xx, yy[:-2]):
    ax.plot([i[0], j[0]], [i[1], j[1]], '--k', linewidth=4)

ax.plot(x, '-ro', label='x', linewidth=4, markersize=20, markerfacecolor='lightcoral', markeredgecolor='lightcoral')
ax.plot(y, '-bo', label='y', linewidth=4, markersize=20, markerfacecolor='skyblue', markeredgecolor='skyblue')
ax.set_title("Euclidean Distance!??", fontsize=28, fontweight="bold")

fig.savefig("ex1_euclidean_distance.png", **savefig_options)

Compute DTW distance and warp path

Many Python packages calculate the DTW by just providing the sequences and the type of distance (usually Euclidean by default). Here, we use a popular Python implementation of DTW that is FastDTW which is an approximate DTW algorithm with lower time and memory complexities[2].

dtw_distance, warp_path = fastdtw(x, y, dist=euclidean)

Note that we are using SciPy‘s distance function Euclidean that we imported earlier. For a better understanding of the warp path, let’s first compute the accumulated cost matrix and then visualize the path on a grid. The following code will plot a heatmap of the accumulated cost matrix.

cost_matrix = compute_accumulated_cost_matrix(x, y)
fig, ax = plt.subplots(figsize=(12, 8))
ax = sbn.heatmap(cost_matrix, annot=True, square=True, linewidths=0.1, cmap="YlGnBu", ax=ax)
ax.invert_yaxis()

# Get the warp path in x and y directions
path_x = [p[0] for p in warp_path]
path_y = [p[1] for p in warp_path]

# Align the path from the center of each cell
path_xx = [x+0.5 for x in path_x]
path_yy = [y+0.5 for y in path_y]

ax.plot(path_xx, path_yy, color='blue', linewidth=3, alpha=0.2)

fig.savefig("ex1_heatmap.png", **savefig_options)
Example 1: Accumulated cost matrix and warping path
Example 1: Accumulated cost matrix and warping path

The color bar shows the cost of each point in the grid. As can be seen, the warp path (blue line) is going through the lowest cost on the grid. Let’s see the DTW distance and the warping path by printing these two variables.

print("DTW distance: ", dtw_distance)
print("Warp path: ", warp_path)

>>> DTW distance:  6.0
>>> Warp path:  [(0, 0), (1, 1), (1, 2), (2, 3), (3, 4), (4, 5), (4, 6)]

The warping path starts at point (0, 0) and ends at (4, 6) by 6 moves. Let’s also calculate the accumulated cost most using the functions we defined earlier and compare the values with the heatmap.

cost_matrix = compute_accumulated_cost_matrix(x, y)
print(np.flipud(cost_matrix)) # Flipping the cost matrix for easier comparison with heatmap values!

>>> [[32. 12. 10. 10.  6.]  
     [23. 11.  6.  6.  5.]   
     [19. 11.  5.  5.  9.]  
     [19.  7.  4.  5.  8.]  
     [19.  3.  6. 10.  4.]  
     [10.  2.  6.  6.  3.]  
     [ 1.  2.  2.  2.  3.]]

The cost matrix is printed above has similar values to the heatmap.

Now let’s plot the two sequences and connect the mapping points. The code to plot the DTW distance between  and  is given below.

fig, ax = plt.subplots(figsize=(14, 10))

# Remove the border and axes ticks
fig.patch.set_visible(False)
ax.axis('off')

for [map_x, map_y] in warp_path:
    ax.plot([map_x, map_y], [x[map_x], y[map_y]], '--k', linewidth=4)

ax.plot(x, '-ro', label='x', linewidth=4, markersize=20, markerfacecolor='lightcoral', markeredgecolor='lightcoral')
ax.plot(y, '-bo', label='y', linewidth=4, markersize=20, markerfacecolor='skyblue', markeredgecolor='skyblue')
ax.set_title("DTW Distance", fontsize=28, fontweight="bold")

fig.savefig("ex1_dtw_distance.png", **savefig_options)
Example 1: DTW distance between x and y
Example 1: DTW distance between x and y

Example 2

In this example, we will use two sinusoidal signals and see how they will be matched by calculating the DTW distance between them.

time1 = np.linspace(start=0, stop=1, num=50)
time2 = time1[0:40]

x1 = 3 * np.sin(np.pi * time1) + 1.5 * np.sin(4*np.pi * time1)
x2 = 3 * np.sin(np.pi * time2 + 0.5) + 1.5 * np.sin(4*np.pi * time2 + 0.5)

Just like Example 1, let’s calculate the DTW distance and the warp path for x1 and x2 signals using FastDTW package.

distance, warp_path = fastdtw(x1, x2, dist=euclidean)
fig, ax = plt.subplots(figsize=(16, 12))

# Remove the border and axes ticks
fig.patch.set_visible(False)
ax.axis('off')

for [map_x, map_y] in warp_path:
    ax.plot([map_x, map_y], [x1[map_x], x2[map_y]], '-k')

ax.plot(x1, color='blue', marker='o', markersize=10, linewidth=5)
ax.plot(x2, color='red', marker='o', markersize=10, linewidth=5)
ax.tick_params(axis="both", which="major", labelsize=18)

fig.savefig("ex2_dtw_distance.png", **savefig_options)
Example 2: DTW distance between x1 and x2
Example 2: DTW distance between x1 and x2

As can be seen in above figure, the DTW distance between the two signals is particularly powerful when the signals have similar patterns. The extrema (maximum and minimum points) between the two signals are correctly mapped. Moreover, unlike Euclidean distance, we may see many-to-one mapping when DTW distance is used, particularly if the two signals have different lengths.

You may spot an issue with dynamic time warping from the figure above. Can you guess what it is?

The issue is around the head and tail of time-series that do not properly match. This is because the DTW algorithm cannot afford the warping invariance for at the endpoints. In short, the effect of this is that a small difference at the sequence endpoints will tend to contribute disproportionately to the estimated similarity[3].

Conclusion

DTW is an algorithm to find an optimal alignment between two sequences and a useful distance metric to have in our toolbox. This technique is useful when we are working with two non-linear sequences, particularly if one sequence is a non-linear stretched/shrunk version of the other. The warping path is a combination of “chess king” moves that starts from the head of two sequences and ends with their tails.

📓

You can find the Jupyter notebook for this blog post on GitHub.

Thanks for reading 🙏

If you liked this post, you can join my mailing list to receive similar posts. You can follow me on LinkedIn, GitHub, Twitter and Medium.

And finally, you can find my knowledge forest 🌲 (raw digital notes) at notes.ealizadeh.com.

📩 Join my mailing list

References

[1] Donald J. Berndt and James Clifford, Using Dynamic Time Warping to Find Patterns in Time Series, 3rd International Conference on Knowledge Discovery and Data Mining

[2] Salvador, S. and P. Chan, FastDTW: Toward accurate dynamic time warping in linear time and space (2007), Intelligent Data Analysis

[3] Diego Furtado Silva, et al. On the effect of endpoints on dynamic time warping (2016), SIGKDD Workshop on Mining and Learning from Time Series

Useful Links

⬅️ Previous Post

✍️
Blog Posts

➡️ Next Post

✍️
Blog Posts

Copyright © 2021 Esmaeil Alizadeh - All Rights Reserved