Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to parse the code of Deep SORT multi-target tracking algorithm

2025-03-26 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article is about how to parse the code of Deep SORT multi-target tracking algorithm, the editor thinks it is very practical, so I share it with you to learn. I hope you can get something after reading this article.

Deep SORT is a commonly used algorithm in multi-target tracking (Multi-Object Tracking), and it is a Detection Based Tracking method. The attention of this algorithm is very high in the industry, and many articles on Zhihu use Deep SORT for engineering deployment. The author will refer to the previous blog, combined with their own practice (theory & code) to analyze the Deep SORT algorithm at the code level. 1. Main steps of MOT

In the review of multi-target tracking based on deep learning, "DEEP LEARNING IN VIDEO MULTI-OBJECT TRACKING: a SURVEY", four main steps in MOT problem are described.

Many main steps of multi-target tracking are given the original frame of the video. Run target detectors such as Faster R-CNN, YOLOv3, SSD, etc., to get the target detection box. All the corresponding targets in the target box are extracted for feature extraction (including apparent features or motion features). Carry on the similarity calculation, calculate the matching degree between the two frames (the distance between the same target is relatively small, the distance between different targets is relatively large) data association, and assign the target ID to each object.

These are the four core steps, the core of which is detection. According to the abstract of the SORT paper, just changing a better detector can improve target tracking performance by 18.9%.

2. SORT

The predecessor of Deep SORT algorithm is SORT, whose full name is Simple Online and Realtime Tracking. In a brief introduction, the biggest feature of SORT is the target detection method based on Faster R-CNN, and the Kalman filter algorithm + Hungarian algorithm is used to greatly improve the speed of multi-target tracking and achieve the accuracy of SOTA.

This algorithm is indeed a widely used algorithm in practical application, and the core is two algorithms: Kalman filter and Hungarian algorithm.

Kalman filter algorithm is divided into two processes, prediction and update. In this algorithm, the motion state of the target is defined as 8 vectors of normal distribution.

Prediction: when the target moves, the target frame position and speed of the current frame are predicted through the parameters such as the target box and speed of the previous frame.

Update: predicted values and observations, the states of the two normal distributions are linearly weighted to get the current predicted state of the system.

* * Hungarian algorithm: * * it solves an allocation problem. The similarity matrix of the front and back frames is obtained by calculating the similarity in the main steps of MOT. The Hungarian algorithm is to solve the target of real matching between the two frames by solving the similarity matrix. This part of the sklearn library has the corresponding function linear_assignment to solve.

In SORT algorithm, the similarity matrix is constructed by two frames of IOU, so the calculation speed of SORT is very fast.

The following figure is a flowchart of the core SORT algorithm:

SORT parsing diagram provided by Harlek

Detections is the target frame obtained by the target detector, and Tracks is a track. The core is the matching process with the Kalman filter prediction and updating process.

The process is as follows: the target detector gets the target box Detections, while the Kalman filter predicts the Tracks of the current frame, and then IOU matches Detections and Tracks, and the final result is as follows:

Unmatched Tracks, this part is considered to be a mismatch, Detection and Track cannot match, if the mismatch persists, the target ID will be deleted from the image. Unmatched Detections, this section states that no Track can match the Detection, so assign a new track to the detection. Matched Track, this part shows that there is a match.

Kalman filter can predict the state of the target frame of the next frame according to the state of Tracks.

Kalman filter updates update the status of all track based on observations (Track on matching) and estimates.

3. Deep SORT

The most important feature of DeepSort is the addition of appearance information, which borrows the ReID domain model to extract features and reduces the number of ID switch. The overall flow chart is as follows:

Picture from Zhihu Harlek

It can be seen that the Deep SORT algorithm adds cascade matching (Matching Cascade) + new trajectory confirmation (confirmed) to the SORT algorithm. The overall process is as follows:

Kalman filter prediction trajectory Tracks uses the Hungarian algorithm to match the predicted trajectory Tracks with the detections in the current frame (cascade matching and IOU matching) Kalman filter update.

The cascade matching in the above figure is expanded as follows:

Picture from Zhihu Harlek

The above figure explains very clearly how to cascade matching, which is divided into two parts by dotted lines:

In the first part, the method of calculating the similarity matrix uses the appearance model (ReID) and the motion model (Mahalanobis distance) to calculate the similarity to get the cost matrix, and the other is the gating matrix, which is used to limit the excessive value in the cost matrix.

The second half is the data association step of cascading matching. The matching process is a cycle (max age iterations, default is 70), that is, the tracks from missing age=0 to missing age=70 are matched with Detections. The tracks that have not been lost are matched first, and those that have been lost for a long time are matched later. Through this part of the processing, the occluded target can be retrieved again, reducing the number of ID Switch of the occluded target and then reappearing.

Match Detection and Track, so there are several situations

Detection matches Track, which is Matched Tracks. Common continuous tracking targets are in this case, and there are targets in both frames, which can be matched. Detection did not find a matching Track, that is, Unmatched Detections. When a new target suddenly appears in the image, Detection cannot find a matching target in the previous Track. Track did not find a matching Detection, that is, Unmatched Tracks. The target of continuous tracking is beyond the image area, and the Track cannot match any of the current Detection. The above does not involve a special case, that is, the occlusion of two targets. The Track of the target that has just been obscured does not match the Detection, and the target temporarily disappears from the image. After that, when the occluded target appears again, we should try our best to keep the ID assigned by the occluded target unchanged and reduce the number of ID Switch occurrences, which requires cascade matching. 4. Deep SORT code parsing

The code provided in this paper is the following address: https://github.com/nwojke/deep_sort

Deep_sort file structure in Github library

The above figure is the core code about Deep SORT in the Github library, excluding the Faster R-CNN detection part, so I will mainly explain several files in this part, the author also made some comments on the core code, the address is: https://github.com/pprp/deep_sort_yolov3_pytorch, the target detector is replaced with a U version of yolov3, and the core of the deep_sort file is called.

4.1 Class Diagram

The following figure is a summary of the class diagrams of these class calls (not very rigorous, but can roughly show the relationship of each module):

Deep Sort class diagram

DeepSort is the core class, which can be divided into three modules by calling other modules:

ReID module, used to extract apparent features, the original paper is to generate 128D embedding. Track module, track class, used to save a Track state information, is a basic unit. Tracker module, Tracker module master the core algorithm, Kalman filter and Hungarian algorithm are completed by calling this module.

The external interface of DeepSort class is very simple:

Self.deepsort = DeepSort (args.deepsort_checkpoint) # instantiation

Outputs = self.deepsort.update (bbox_xcycwh, cls_conf, im) # updates by receiving target detection results

When calling externally, you only need the above two steps, which is very simple.

Through the class diagram, the overall module has a framework understanding, the following in-depth understanding of these modules.

4.2Core module Detection class class Detection (object):

"

This class represents a bounding box detection in a single image.

"

Def _ _ init__ (self, tlwh, confidence, feature):

Self.tlwh = np.asarray (tlwh, dtype=np.float)

Self.confidence = float (confidence)

Self.feature = np.asarray (feature, dtype=np.float32)

Def to_tlbr (self):

"" Convert bounding box to format `(min x, min y, max x, max y)`, i.e.

`(top left, bottom right)`.

"

Ret = self.tlwh.copy ()

Ret [2:] + = ret [: 2]

Return ret

Def to_xyah (self):

"" Convert bounding box to format `(center x, center y, aspect ratio)

Height) `, where the aspect ratio is `width / height`.

"

Ret = self.tlwh.copy ()

Ret [: 2] + = ret [2:] / 2

Ret [2] / = ret [3]

Return ret

The Detection class is used to hold a detection box obtained through the target detector, including the width and height of the top left coordinate + box, as well as the confidence of the bbox and the corresponding embedding obtained by reid. In addition, the conversion methods of different bbox location formats are provided:

Tlwh: represents upper-left coordinates + width-height tlbr: represents upper-left coordinates + lower-right coordinates xyah: represents center coordinates + aspect ratio + height Track type class Track:

# the information of a track, including (xmenyreajinh) & v

"

A single target track with state space `(x, y, a, h)` and associated

Velocities, where `(x, y) `is the center of the bounding box, `a` is the

Aspect ratio and `h` is the height.

"

Def _ _ init__ (self, mean, covariance, track_id, n_init, max_age

Feature=None):

# max age is a survival period, default is 70 frames, in

Self.mean = mean

Self.covariance = covariance

Self.track_id = track_id

Self.hits = 1

# compare hits with n_init

# hits updates every time you update (update only when you are match)

# hits represents the number of matches. If the number of matches exceeds n_init, it will be set to confirmed status.

Self.age = 1 # not used, duplicated with time_since_update function

Self.time_since_update = 0

# every time the predict function is called, it will be + 1

# it is set to 0 every time the update function is called

Self.state = TrackState.Tentative

Self.features = []

# each track corresponds to multiple features, and each update adds the latest feature to the list

If feature is not None:

Self.features.append (feature)

Self._n_init = n_init # set to deleted state if there is no mismatch in successive n_init frames

Self._max_age = max_age # upper limit

The Track class mainly stores track information, mean and covariance are the position and speed information of the saved box, and track_id represents the ID assigned to the track. State represents the status of the box, and there are three types:

Tentative: uncertain state, which is allocated when a Track is initialized, and the n_init frame changes to a deterministic state only on continuous matches. If no detection is matched in the uncertain state, it will be converted to the deleted state. Confirmed: definite state, which means that the Track is indeed in the matching state. If the current Track is in the definite state, but the mismatch reaches the max age number of times in a row, it will be transformed into the deleted state. Deleted: deleted state, indicating that the Track has expired. State transition diagram

Max_age represents a Track lifetime, and it needs to be compared to the time_since_update variable. Time_since_update will be + 1 every time a track calls the predict function, and every time predict is called, it will be reset to 0, that is, if a track does not have a update for a long time (no match), it will continue to increase until time_since_update exceeds max age (default 70), and the Track will be deleted from the list in Tracker.

Hits stands for the number of consecutive confirmations, used when the uncertain state changes to the deterministic state. Every time Track update, hits will be + 1, if hits > n_init (default is 3), that is, the trajectory has been matched for three consecutive frames, then the uncertain state will be changed to deterministic state.

It should be noted that each track also has an important variable, the features list, which stores the features extracted by ReID at the corresponding positions of different frames. Why save this list instead of updating it to the latest features? In order to solve the problem that the target appears again after being occluded, it is necessary to match the features corresponding to the previous frames. In addition, if there are too many features, it will seriously slow down the computing speed, so there is a parameter budget to control the length of the feature list. Take the latest budget features and delete the old one.

ReID feature extraction part

ReID network is a module independent of target detection and tracker. Its function is to extract the feature from the corresponding bounding box and get a fixed-dimensional embedding as the representative of the bbox for use in calculating similarity.

Class Extractor (object):

Def _ _ init__ (self, model_name, model_path, use_cuda=True):

Self.net = build_model (name=model_name

Num_classes=96)

Self.device = "cuda" if torch.cuda.is_available (

) and use_cuda else "cpu"

State_dict = torch.load (model_path) ['net_dict']

Self.net.load_state_dict (state_dict)

Print ("Loading weights from {}... Done!" .format (model_path))

Self.net.to (self.device)

Self.size = (128128)

Self.norm = transforms.Compose ([

Transforms.ToTensor ()

Transforms.Normalize ([0.3568, 0.3141, 0.2781]

[0.1752, 0.1857, 0.1879])

])

Def _ preprocess (self, im_crops):

"

TODO:

1. To float with scale from 0 to 1

2. Resize to (64,128) as Market1501 dataset did

3. Concatenate to a numpy array

3. To torch Tensor

4. Normalize

"

Def _ resize (im, size):

Return cv2.resize (im.astype (np.float32) / 255, size)

Im_batch = torch.cat ([

Self.norm (_ resize (im, self.size)) .unsqueeze (0) for im in im_crops

], dim=0) .float ()

Return im_batch

Def _ _ call__ (self, im_crops):

Im_batch = self._preprocess (im_crops)

With torch.no_grad ():

Im_batch = im_batch.to (self.device)

Features = self.net (im_batch)

Return features.cpu () .numpy ()

Model training is carried out according to the traditional ReID method, when using Extractor class, input as a list picture, get the corresponding characteristics of the picture.

NearestNeighborDistanceMetric class

Two functions that calculate distance are used in this class:

Calculate the Euclidean distance def _ pdist (a, b):

# used to calculate the square distance in pairs

# a NxM represents N objects, and each object has M values to compare as embedding

# b LxM represents L objects, and each object has M values to compare as embedding

# returns the matrix of NxL, for example, distal [I] [j] represents the square and distance between a [I] and b [j]

# for implementation, please see: https://blog.csdn.net/frankzd/article/details/80251042

A, b = np.asarray (a), np.asarray (b) # copy a copy of data

If len (a) = = 0 or len (b) = = 0:

Return np.zeros ((len (a), len (b)

A2, b2 = np.square (a) .sum (axis=1), np.square (

B) .sum (axis=1) # find the sum of squares of each embedding

# sum (N) + sum (L)-2 x [NxM] x [MxL] = [NxL]

R2 =-2. * np.dot (a, b.T) + a2 [:, None] + b2 [None,:]

R2 = np.clip (R2, 0., float (np.inf))

Return r2

The figure is from csdn blog calculating cosine distance def _ cosine_distance (a, b, data_is_normalized=False):

CoSine distance between # an and b

# a: [NxM] b: [LxM]

# CoSine distance = 1-CoSine similarity

# https://blog.csdn.net/u013749540/article/details/51813922

If not data_is_normalized:

# the cosine similarity needs to be converted into a cosine distance similar to the Euclidean distance.

A = np.asarray (a) / np.linalg.norm (a, axis=1, keepdims=True)

The # np.linalg.norm operation is the normal form of finding a vector, and the default is L2 normal form, which is equivalent to the Euclidean distance of finding a vector.

B = np.asarray (b) / np.linalg.norm (b, axis=1, keepdims=True)

Return 1.-np.dot (a, b.T)

Tu Yuan csdn blog

The above code corresponds to the formula. Note that cosine distance = 1-cosine similarity.

Nearest neighbor distance metric class class NearestNeighborDistanceMetric (object):

# for each target, return a nearest distance

Def _ _ init__ (self, metric, matching_threshold, budget=None):

# default matching_threshold = 0.2 budge = 100

If metric = = "euclidean":

# use the nearest neighbor Euclidean distance

Self._metric = _ nn_euclidean_distance

Elif metric = = "cosine":

# use the nearest neighbor cosine distance

Self._metric = _ nn_cosine_distance

Else:

Raise ValueError ("Invalid metric; must be either 'euclidean' or' cosine'")

Self.matching_threshold = matching_threshold

# called in a cascaded matching function

Self.budget = budget

# budge budget to control the number of feature

Self.samples = {}

# samples is a dictionary {id- > feature list}

Def partial_fit (self, features, targets, active_targets):

# function: partial fitting, updating the measured distance with new data

# call: called in the feature set update module section, tracker.update ()

For feature, target in zip (features, targets):

Self.samples.setdefault (target, []) .append (feature)

# add a new feature under the target, and update the feature collection

# Target id: feature list

If self.budget is not None:

Self.samples [target] = self.samples [target] [- self.budget:]

# set the budget, the maximum number of goals per class, more than directly ignore

# filter activated targets

Self.samples = {k: self.samples [k] for k in active_targets}

Def distance (self, features, targets):

# function: compare the distance between feature and targets and return a cost matrix

# call: in the matching phase, encapsulate distance as gated_metric

# carry out appearance information (depth features obtained by reid) +

# Motion information (Mahalanobis distance is used to measure the similarity of two distributions)

Cost_matrix = np.zeros ((len (targets), len (features)

For I, target in enumerate (targets):

Cost_matrix [I,:] = self._metric (self.samples [target], features)

Return cost_matrix

Tracker class

Tracker class is the core class. Tracker stores all the trajectory information and is responsible for initializing the trajectory of the first frame, Kalman filter prediction and update, responsible for cascade matching, IOU matching and other core work.

Class Tracker:

# is a multi-target tracker that saves many track tracks

# responsible for calling Kalman filter to predict the new state of track + matching + initializing the first frame

# when Tracker calls update or predict, each track will also call its own update or predict

"

This is the multi-target tracker.

"

Def _ _ init__ (self, metric, max_iou_distance=0.7, max_age=70, n_init=3):

# when called, the following parameters are all default

Self.metric = metric

# metric is a class that calculates distances (cosine or Mahalanobis distances)

Self.max_iou_distance = max_iou_distance

# use when maximum iou,iou matches

Self.max_age = max_age

# specify cascading matching cascade_depth parameters directly

Self.n_init = n_init

# n_init represents the update that requires the number of n_init before setting the track status to confirmed

Self.kf = kalman_filter.KalmanFilter () # Kalman filter

Self.tracks = [] # Save a series of tracks

Self._next_id = 1 # next assigned track id

Def predict (self):

# iterate through each track and make a prediction

"" Propagate track state distributions one time step forward.

This function should be called once every time step, before `update`.

"

For track in self.tracks:

Track.predict (self.kf)

Then let's take a look at the core update function and match function, which can be compared with the following flow chart:

Update function

Def update (self, detections):

# updating and track management of measurements

"" Perform measurement update and track management.

Parameters

-

Detections: List[deep _ sort.detection.Detection]

A list of detections at the current time step.

"

# Run matching cascade.

Matches, unmatched_tracks, unmatched_detections =\

Self._match (detections)

# Update track set.

# 1. For the results on the match

For track_idx, detection_idx in matches:

# track updates the corresponding detection

Self.tracks[ track _ idx] .update (self.kf, detections[ detection _ idx])

# 2. Call the mark_missed tag for an unmatched tracker

# track mismatch. Delete it if it is to be determined, or delete it if update takes a long time.

# max age is a survival period, default is 70 frames

For track_idx in unmatched_tracks:

Self.tracks[ track _ idx] .mark _ missed ()

# 3. Initialize the unmatched detection and detection mismatch

For detection_idx in unmatched_detections:

Self._initiate_track (detections [detection _ idx])

# get the latest tracks list, and save the track marked confirmed and Tentative

Self.tracks = [t for t in self.tracks if not t.is_deleted ()]

# Update distance metric.

Active_targets = [t.track_id for t in self.tracks if t.is_confirmed ()]

# get the track id of all confirmed states

Features, targets = [], []

For track in self.tracks:

If not track.is_confirmed ():

Continue

Features + = track.features # splices the tracks list to the features list

# get the track id corresponding to each feature

Targets + = [track.track_id for _ in track.features]

Track.features = []

# feature set update in distance measurement

Self.metric.partial_fit (np.asarray (features), np.asarray (targets))

Active_targets)

Match function:

Def _ match (self, detections):

# the main function is to match and find the matching and unmatched parts

Def gated_metric (tracks, dets, track_indices, detection_indices):

# function: used to calculate the distance between track and detection, cost function

# need to be used before the KM algorithm

# call:

# cost_matrix = distance_metric (tracks, detections

# track_indices, detection_indices)

Features = np.array ([dets [I] .feature for i in detection_indices])

Targets = np.array ([tracks.track _ id for i in track_indices])

# 1. Calculate the cost matrix cosine distance from the nearest neighbor

Cost_matrix = self.metric.distance (features, targets)

# 2. A new state matrix is obtained by calculating Mahalanobis distance.

Cost_matrix = linear_assignment.gate_cost_matrix (

Self.kf, cost_matrix, tracks, dets, track_indices

Detection_indices)

Return cost_matrix

# Split track set into confirmed and unconfirmed tracks.

# dividing the states of different tracks

Confirmed_tracks = [

I for i, t in enumerate (self.tracks) if t.is_confirmed ()

]

Unconfirmed_tracks = [

I for i, t in enumerate (self.tracks) if not t.is_confirmed ()

]

# cascade matching to get matched track, mismatched track, mismatched detection

''

!

Cascade matching

!

''

# gated_metric- > cosine distance

# cascade matching only the trajectories of the determined state

Matches_a, unmatched_tracks_a, unmatched_detections =\

Linear_assignment.matching_cascade (

Gated_metric

Self.metric.matching_threshold

Self.max_age

Self.tracks

Detections

Confirmed_tracks)

# combine all tracks that are in an undetermined state and those that just did not match into iou_track_candidates

# match IoU

Iou_track_candidates = unconfirmed_tracks + [

K for k in unmatched_tracks_a

If self.tracks [k] .time _ since_update = = 1 # just didn't match

]

# not matched

Unmatched_tracks_a = [

K for k in unmatched_tracks_a

If self.tracks [k] .time _ since_update! = 1 # has not been matched for a long time

]

''

!

IOU matching

Perform IoU matching on the targets that have not been successfully matched in the cascade matching

!

''

# although min_cost_matching is used as the core in cascading matching

# the metric used here is iou cost which is different from the above

Matches_b, unmatched_tracks_b, unmatched_detections =\

Linear_assignment.min_cost_matching (

Iou_matching.iou_cost

Self.max_iou_distance

Self.tracks

Detections

Iou_track_candidates

Unmatched_detections)

Matches = matches_a + matches_b # combination of two-part match results

Unmatched_tracks = list (set (unmatched_tracks_a + unmatched_tracks_b))

Return matches, unmatched_tracks, unmatched_detections

The above two parts can be understood more easily by combining the comments and the following flowchart.

The picture is from Zhihu Harlek cascade matching

The following is the pseudocode for cascading matching given in this paper:

Pseudocode for cascading matching in the paper

The following code is the corresponding implementation of the pseudo code

# 1. Assign track_indices and detection_indices

If track_indices is None:

Track_indices = list (range (len (tracks)

If detection_indices is None:

Detection_indices = list (range (len (detections)

Unmatched_detections = detection_indices

Matches = []

# cascade depth = max age defaults to 70

For level in range (cascade_depth):

If len (unmatched_detections) = = 0: # No detections left

Break

Track_indices_l = [

K for k in track_indices

If trackers [k] .time _ since_update = = 1 + level

]

If len (track_indices_l) = = 0: # Nothing to match at this level

Continue

# 2. The core content of cascading matching is this function.

Matches_l, _, unmatched_detections =\

Min_cost_matching (# max_distance=0.2

Distance_metric, max_distance, tracks, detections

Track_indices_l, unmatched_detections)

Matches + = matches_l

Unmatched_tracks = list (set (track_indices)-set (k for k, _ in matches))

Gating matrix

The function of gating matrix is to limit the cost matrix by calculating the state distribution of Kalman filter and the distance between measured values.

The distance in the cost matrix is the apparent similarity between Track and Detection. If a track wants to match two Detection with very similar apparent characteristics, it is easy to make mistakes, but at this time, let the two Detection calculate the Mahalanobis distance to the trajectory, and use a threshold gating_threshold to limit it, so the Detection with a longer Mahalanobis distance can be distinguished, which can reduce the mismatch.

Def gate_cost_matrix (

Kf, cost_matrix, tracks, detections, track_indices, detection_indices

Gated_cost=INFTY_COST, only_position=False):

# invalidate the infeasible items in the cost matrix according to the state distribution obtained by Kalman filter.

Gating_dim = 2 if only_position else 4

Gating_threshold = kalman_ filter. Chi2inv95 [dim _ dim] # 9.4877

Measurements = np.asarray ([substitutions] .to _ xyah ()

For i in detection_indices])

For row, track_idx in enumerate (track_indices):

Track = tracks [track _ idx]

Gating_distance = kf.gating_distance (

Track.mean, track.covariance, measurements, only_position)

Cost_matrix [row, gating_distance >

Gating_threshold] = gated_cost # set to inf

Return cost_matrix

Kalman filter

In Deep SORT, the following states of Track need to be estimated:

Mean value: expressed by 8-dimensional vectors (x, y, a, h, vx, vy, va, vh). (XQuery y) is the central coordinate of the box, the aspect ratio is a, the height h, and the corresponding speed, all velocities will be initialized to 0. Covariance: indicates the degree of uncertainty of the target location information, which is expressed by the diagonal matrix of 8x8. The larger the corresponding value of the matrix, the higher the degree of uncertainty.

The following figure represents the main process of Kalman filter:

DeepSORT: Deep Learning to Track Custom Objects in a Video Kalman filter first predicts the state of the current frame (time=t) and predicts the state of the next frame (time=t+1) to get the measurement result. The corresponding measurement in DeepSORT is Detection, that is, the detection box provided by the target detector. Update the forecast results and measurement results.

The following section is the main reference: https://zhuanlan.zhihu.com/p/90835266

If we have a more in-depth understanding of the Kalman filter algorithm, we can combine the Kalman filter algorithm and code to understand.

There are two formulas for forecasting:

The first formula:

Where F is the state transition matrix, as shown in the following figure:

The source of the map is Zhihu @ seeking

The second formula:

P is the covariance of the current frame (time=t), and Q is the motion estimation error of the Kalman filter, representing the degree of uncertainty.

Def predict (self, mean, covariance):

# is equivalent to getting the estimated value of t time.

# Q noise covariance in the prediction process

Std_pos = [

Self._std_weight_position * mean [3]

Self._std_weight_position * mean [3]

1e-2

Self._std_weight_position * mean [3]]

Std_vel = [

Self._std_weight_velocity * mean [3]

Self._std_weight_velocity * mean [3]

1e-5

Self._std_weight_velocity * mean [3]]

# np.r_ connects two matrices by column

# initialize noise matrix Q

Motion_cov = np.diag (np.square (np.r_ [std_pos, std_vel]))

# x' = Fx

Mean = np.dot (self._motion_mat, mean)

# P' = FPF ^ T + Q

Covariance = np.linalg.multi_dot (

Self._motion_mat, covariance, self._motion_mat.T)) + motion_cov

Return mean, covariance

Updated formula

Def project (self, mean, covariance):

Covariance of noise in # R Measurement

Std = [

Self._std_weight_position * mean [3]

Self._std_weight_position * mean [3]

1e-1

Self._std_weight_position * mean [3]]

# initialize noise matrix R

Innovation_cov = np.diag (np.square (std))

# Mapping the mean vector to the detection space, namely Hx'

Mean = np.dot (self._update_mat, mean)

# Mapping the covariance matrix to the detection space, that is, HP' H ^ T

Covariance = np.linalg.multi_dot (

Self._update_mat, covariance, self._update_mat.T))

Return mean, covariance + innovation_cov

Def update (self, mean, covariance, measurement):

# estimate the latest results through estimates and observations

# Mapping the mean and covariance to the detection space to get Hx' and S

Projected_mean, projected_cov = self.project (mean, covariance)

# Matrix decomposition

Chol_factor, lower = scipy.linalg.cho_factor (

Projected_cov, lower=True, check_finite=False)

# calculate Kalman gain K

Kalman_gain = scipy.linalg.cho_solve (

(chol_factor, lower), np.dot (covariance, self._update_mat.T). T

Check_finite=False). T

# z-Hx'

Innovation = measurement-projected_mean

# x = x'+ Ky

New_mean = mean + np.dot (innovation, kalman_gain.T)

# P = (I-KH) P'

New_covariance = covariance-np.linalg.multi_dot (

Kalman_gain, projected_cov, kalman_gain.T))

Return new_mean, new_covariance

In this formula, z is the mean of Detection, does not contain the change value, and the state is [cx,cy,a,h]. H is the measurement matrix, which maps the mean vector of Track to the detection space. The calculated y is the mean error of Detection and Track.

R is the noise matrix of the target detector and a diagonal matrix of 4x4. The values on the diagonal are the two coordinates of the center point and the noise of width and height, respectively.

The Kalman gain is calculated, which acts to measure the weight of the estimation error.

The updated mean vector x.

The updated covariance matrix.

Kalman filter author is not very in-depth understanding, no derivation of the formula, interested in this part of the recommended several blogs:

Kalman filter + python demo: https://zhuanlan.zhihu.com/p/113685503?utm_source=wechat_session&utm_medium=social&utm_oi=801414067897135104 detailed explanation + derivation: https://blog.csdn.net/honyniu/article/details/886975205. Process analysis

The process part mainly follows the following flow chart:

Deep sort flow chart summarized by Zhihu @ Maodi

Thanks to Zhihu @ Maodi for summing up the flow chart, the explanation is very clear, if you just look at the code, it is very easy to be confused. For example, the calculation of the cost matrix takes three functions in a row before it is really called. The above figure summarizes the overall process very well. The author will refer to the above process and combine the code to sort it out:

Analyze the Deep SORT call in the detector class: class Detector (object):

Def _ _ init__ (self, args):

Self.args = args

If args.display:

Cv2.namedWindow ("test", cv2.WINDOW_NORMAL)

Cv2.resizeWindow ("test", args.display_width, args.display_height)

Device = torch.device (

'cuda') if torch.cuda.is_available () else torch.device (' cpu')

Self.vdo = cv2.VideoCapture ()

Self.yolo3 = InferYOLOv3 (args.yolo_cfg

Args.img_size

Args.yolo_weights

Args.data_cfg

Device

Conf_thres=args.conf_thresh

Nms_thres=args.nms_thresh)

Self.deepsort = DeepSort (args.deepsort_checkpoint)

Initialize the DeepSORT object and update the location, confidence and picture of the box detected by the target:

Outputs = self.deepsort.update (bbox_xcycwh, cls_conf, im)

Follow the update function of the DeepSORT class to see class DeepSort (object):

Def _ _ init__ (self, model_path, max_dist=0.2):

Self.min_confidence = 0.3

In # yolov3, the confidence threshold of the test result is selected, and the detection with confidence less than 0.3 is screened.

Self.nms_max_overlap = 1. 0

# non-maximum suppression threshold. Setting it to 1 means no suppression.

# it is used to extract the embedding of an image and returns the corresponding features of a batch image

Self.extractor = Extractor ("resnet18"

Model_path

Use_cuda=True)

Max_cosine_distance = max_dist

# it is used for cascading matching. If it is greater than the modification threshold, ignore it directly.

Nn_budget = 100

# Budget, the maximum number of samples per category, if exceeded, delete the old one

# the first parameter is optional 'cosine' or' euclidean'

Metric = NearestNeighborDistanceMetric ("cosine"

Max_cosine_distance

Nn_budget)

Self.tracker = Tracker (metric)

Def update (self, bbox_xywh, confidences, ori_img):

Self.height, self.width = ori_img.shape [: 2]

# generate detections

Features = self._get_features (bbox_xywh, ori_img)

# crop bbox the corresponding picture from the original image and calculate the embedding

Bbox_tlwh = self._xywh_to_tlwh (bbox_xywh)

Detections = [

Detection (bbox_tlwh [I], conf, features [I])

For I, conf in enumerate (confidences) if conf > self.min_confidence

] # filter targets that are less than min_confidence and construct a list of Detection objects

# Detection is a bbox result in a storage graph

# need: 1. Bbox (tlwh form) 2. Corresponding confidence 3. Corresponding embedding

# run on non-maximum supression

Boxes = np.array ([d.tlwh for d in detections])

Scores = np.array ([d.confidence for d in detections])

# use non-maximal suppression

# it is useless to enable nms_thres=1 by default. In fact, there is no non-maximum suppression.

Indices = non_max_suppression (boxes, self.nms_max_overlap, scores)

Detections = [alternatives [I] for i in indices]

# update tracker

# tracker gives a prediction result, and then passes in detection for Kalman filter operation

Self.tracker.predict ()

Self.tracker.update (detections)

# output bbox identities

# storing results and visualization

Outputs = []

For track in self.tracker.tracks:

If not track.is_confirmed () or track.time_since_update > 1:

Continue

Box = track.to_tlwh ()

X1, Y1, x2, y2 = self._tlwh_to_xyxy (box)

Track_id = track.track_id

Outputs.append (np.array ([x1, y1, x2, track_id], dtype=np.int))

If len (outputs) > 0:

Outputs = np.stack (outputs, axis=0)

Return np.array (outputs)

It will be clearer to compare the above flow chart from here. During Deep SORT initialization, there is a core metric,NearestNeighborDistanceMetric class that is used for matching and feature set updates.

Sort out the update process of DeepSORT:

According to the input parameters (bbox_xywh, conf, img), the ReID model is used to extract the apparent features of the corresponding bbox.

Build a list of detections, and the contents of the list are Detection classes, where the minimum confidence of bbox is limited.

Using a non-maximal suppression algorithm, because of the default nms_thres=1, is actually useless.

The Tracker class makes a prediction, and then passes in the detections for update.

Finally, the track in which the state in the track saved in Tracker belongs to the confirmed state is returned.

The core of the above is Tracker's predict and update functions, and then sort it out.

Predict function of Tracker

Tracker is a multi-target tracker, which saves many track tracks. It is responsible for calling Kalman filter to predict the new state of track, matching work and initializing the first frame. When Tracker calls update or predict, each track will also call its own update or predict

Class Tracker:

Def _ _ init__ (self, metric, max_iou_distance=0.7, max_age=70, n_init=3):

# when called, the following parameters are all default

Self.metric = metric

Self.max_iou_distance = max_iou_distance

# use when maximum iou,iou matches

Self.max_age = max_age

# specify cascading matching cascade_depth parameters directly

Self.n_init = n_init

# n_init represents the update that requires the number of n_init before setting the track status to confirmed

Self.kf = kalman_filter.KalmanFilter () # Kalman filter

Self.tracks = [] # Save a series of tracks

Self._next_id = 1 # next assigned track id

Def predict (self):

# iterate through each track and make a prediction

"" Propagate track state distributions one time step forward.

This function should be called once every time step, before `update`.

"

For track in self.tracks:

Track.predict (self.kf)

Predict mainly uses Kalman filter algorithm to predict the state of all tracks in the track list.

Updates to Tracker

Updates to Tracker are at the core.

Def update (self, detections):

# updating and track management of measurements

"" Perform measurement update and track management.

Parameters

-

Detections: List[deep _ sort.detection.Detection]

A list of detections at the current time step.

"

# Run matching cascade.

Matches, unmatched_tracks, unmatched_detections =\

Self._match (detections)

# Update track set.

# 1. For the results on the match

For track_idx, detection_idx in matches:

# track updates the corresponding detection

Self.tracks[ track _ idx] .update (self.kf, detections[ detection _ idx])

# 2. Call the mark_missed tag for an unmatched tracker

# track mismatch. Delete it if it is to be determined, or delete it if update takes a long time.

# max age is a survival period, default is 70 frames

For track_idx in unmatched_tracks:

Self.tracks[ track _ idx] .mark _ missed ()

# 3. Initialize the unmatched detection and detection mismatch

For detection_idx in unmatched_detections:

Self._initiate_track (detections [detection _ idx])

# get the latest tracks list, and save the track marked confirmed and Tentative

Self.tracks = [t for t in self.tracks if not t.is_deleted ()]

# Update distance metric.

Active_targets = [t.track_id for t in self.tracks if t.is_confirmed ()]

# get the track id of all confirmed states

Features, targets = [], []

For track in self.tracks:

If not track.is_confirmed ():

Continue

Features + = track.features # splices the tracks list to the features list

# get the track id corresponding to each feature

Targets + = [track.track_id for _ in track.features]

Track.features = []

# feature set update in distance measurement

Self.metric.partial_fit (np.asarray (features), np.asarray (targets), active_targets)

This part of the annotation has been very detailed, mainly some post-processing code, need to pay attention to the matching, unmatched Detection, unmatched Track processing and finally the feature set update part, can be combed against the flow chart.

The core function of Tracker's update function is the match function, which describes the process of how to match:

Def _ match (self, detections):

# the main function is to match and find the matching and unmatched parts

Def gated_metric (tracks, dets, track_indices, detection_indices):

# function: used to calculate the distance between track and detection, cost function

# need to be used before the KM algorithm

# call:

# cost_matrix = distance_metric (tracks, detections

# track_indices, detection_indices)

Features = np.array ([dets [I] .feature for i in detection_indices])

Targets = np.array ([tracks.track _ id for i in track_indices])

# 1. Calculate the cost matrix cosine distance from the nearest neighbor

Cost_matrix = self.metric.distance (features, targets)

# 2. A new state matrix is obtained by calculating Mahalanobis distance.

Cost_matrix = linear_assignment.gate_cost_matrix (

Self.kf, cost_matrix, tracks, dets, track_indices

Detection_indices)

Return cost_matrix

# Split track set into confirmed and unconfirmed tracks.

# dividing the states of different tracks

Confirmed_tracks = [

I for i, t in enumerate (self.tracks) if t.is_confirmed ()

]

Unconfirmed_tracks = [

I for i, t in enumerate (self.tracks) if not t.is_confirmed ()

]

# cascade matching to get matched track, mismatched track, mismatched detection

''

!

Cascade matching

!

''

# gated_metric- > cosine distance

# cascade matching only the trajectories of the determined state

Matches_a, unmatched_tracks_a, unmatched_detections =\

Linear_assignment.matching_cascade (

Gated_metric

Self.metric.matching_threshold

Self.max_age

Self.tracks

Detections

Confirmed_tracks)

# combine all tracks that are in an undetermined state and those that just did not match into iou_track_candidates

# match IoU

Iou_track_candidates = unconfirmed_tracks + [

K for k in unmatched_tracks_a

If self.tracks [k] .time _ since_update = = 1 # just didn't match

]

# not matched

Unmatched_tracks_a = [

K for k in unmatched_tracks_a

If self.tracks [k] .time _ since_update! = 1 # has not been matched for a long time

]

''

!

IOU matching

Perform IoU matching on the targets that have not been successfully matched in the cascade matching

!

''

# although min_cost_matching is used as the core in cascading matching

# the metric used here is iou cost which is different from the above

Matches_b, unmatched_tracks_b, unmatched_detections =\

Linear_assignment.min_cost_matching (

Iou_matching.iou_cost

Self.max_iou_distance

Self.tracks

Detections

Iou_track_candidates

Unmatched_detections)

Matches = matches_a + matches_b # combination of two-part match results

Unmatched_tracks = list (set (unmatched_tracks_a + unmatched_tracks_b))

Return matches, unmatched_tracks, unmatched_detections

Compared with the following picture, it will be much smoother:

Picture from Zhihu Harlek

As you can see, the core of the matching function is cascade matching + IOU matching. First, let's take a look at cascade matching:

The call is here:

Matches_a, unmatched_tracks_a, unmatched_detections =\

Linear_assignment.matching_cascade (

Gated_metric

Self.metric.matching_threshold

Self.max_age

Self.tracks

Detections

Confirmed_tracks)

Cascade matching function expansion:

Def matching_cascade (

Distance_metric, max_distance, cascade_depth, tracks, detections

Track_indices=None, detection_indices=None):

# cascade matching

# 1. Assign track_indices and detection_indices

If track_indices is None:

Track_indices = list (range (len (tracks)

If detection_indices is None:

Detection_indices = list (range (len (detections)

Unmatched_detections = detection_indices

Matches = []

# cascade depth = max age defaults to 70

For level in range (cascade_depth):

If len (unmatched_detections) = = 0: # No detections left

Break

Track_indices_l = [

K for k in track_indices

If trackers [k] .time _ since_update = = 1 + level

]

If len (track_indices_l) = = 0: # Nothing to match at this level

Continue

# 2. The core content of cascading matching is this function.

Matches_l, _, unmatched_detections =\

Min_cost_matching (# max_distance=0.2

Distance_metric, max_distance, tracks, detections

Track_indices_l, unmatched_detections)

Matches + = matches_l

Unmatched_tracks = list (set (track_indices)-set (k for k, _ in matches))

Return matches, unmatched_tracks, unmatched_detections

You can see that it is consistent with the pseudo code, which is also mentioned in the first half of the article. There is also a core function min_cost_matching in this part of the code, which can receive different distance_metric and is useful in both cascading matching and IoU matching.

Min_cost_matching function:

Def min_cost_matching (

Distance_metric, max_distance, tracks, detections, track_indices=None

Detection_indices=None):

If track_indices is None:

Track_indices = np.arange (len (tracks))

If detection_indices is None:

Detection_indices = np.arange (len (detections))

If len (detection_indices) = = 0 or len (track_indices) = = 0:

Return [], track_indices, detection_indices # Nothing to match.

#--

# Gated_distance-- >

# 1. Cosine distance

# 2. Mahalanobis distance

# get the cost matrix

#--

# iou_cost-- >

# only calculate the iou distance between track and detection

#--

Cost_matrix = distance_metric (

Tracks, detections, track_indices, detection_indices)

#--

# set the upper limit of the distance in gated_distance

# here the furthest distance is actually set by the max_dist parameter in the deep sort class

# default max_dist=0.2, the smaller the distance, the better

#--

In the case of # iou_cost, the setting of max_distance corresponds to the max_iou_distance in tracker

# default is max_iou_distance=0.7

# notice that the result is 1-iou, so the smaller the better

#--

Cost_ Matrix [cost _ matrix > max_distance] = max_distance + 1e-5

# Hungarian algorithm or KM algorithm

Row_indices, col_indices = linear_assignment (cost_matrix)

Matches, unmatched_tracks, unmatched_detections = [], []

# these for loops are used to filter matching results to get matching and unmatched results

For col, detection_idx in enumerate (detection_indices):

If col not in col_indices:

Unmatched_detections.append (detection_idx)

For row, track_idx in enumerate (track_indices):

If row not in row_indices:

Unmatched_tracks.append (track_idx)

For row, col in zip (row_indices, col_indices):

Track_idx = track_ indications [row]

Detection_idx = detection_ indications [col]

If cost_matrix [row, col] > max_distance:

Unmatched_tracks.append (track_idx)

Unmatched_detections.append (detection_idx)

Else:

Matches.append ((track_idx, detection_idx))

# get a match, no match track, no match detection

Return matches, unmatched_tracks, unmatched_detections

There are two distance_metric mentioned in the comments:

The first is that the distance_metric passed in the cascade matching is gated_metric, and its internal core is the cascade matching of the calculated apparent features. Def gated_metric (tracks, dets, track_indices, detection_indices):

# function: used to calculate the distance between track and detection, cost function

# need to be used before the KM algorithm

# call:

# cost_matrix = distance_metric (tracks, detections

# track_indices, detection_indices)

Features = np.array ([dets [I] .feature for i in detection_indices])

Targets = np.array ([tracks.track _ id for i in track_indices])

# 1. Calculate the cost matrix cosine distance from the nearest neighbor

Cost_matrix = self.metric.distance (features, targets)

# 2. A new state matrix is obtained by calculating Mahalanobis distance.

Cost_matrix = linear_assignment.gate_cost_matrix (

Self.kf, cost_matrix, tracks, dets, track_indices

Detection_indices)

Return cost_matrix

Understand the following figure (the upper part of the following figure is the corresponding gated_metric function):

The second picture from Zhihu Harlek is iou_matching.iou_cost:# in IOU matching, although min_cost_matching is used as the core in both cascade matching and cascading matching.

# the metric used here is iou cost which is different from the above

Matches_b, unmatched_tracks_b, unmatched_detections =\

Linear_assignment.min_cost_matching (

Iou_matching.iou_cost

Self.max_iou_distance

Self.tracks

Detections

Iou_track_candidates

Unmatched_detections)

The iou_cost cost is easy to understand and is used to calculate the IOU distance matrix between Track and Detection.

Def iou_cost (tracks, detections, track_indices=None

Detection_indices=None):

# calculate the iou distance matrix between track and detection

If track_indices is None:

Track_indices = np.arange (len (tracks))

If detection_indices is None:

Detection_indices = np.arange (len (detections))

Cost_matrix = np.zeros ((len (track_indices), len (detection_indices)

For row, track_idx in enumerate (track_indices):

If trackers [track _ idx] .time _ since_update > 1:

Cost_matrix [row,:] = linear_assignment.INFTY_COST

Continue

Bbox = tracks[ track _ idx] .to _ tlwh ()

Candidates = np.asarray (

[protections.tlwh for i in detection_indices])

Cost_matrix [row,:] = 1.-iou (bbox, candidates)

Return cost_matrix above is how to parse the code of the Deep SORT multi-target tracking algorithm. The editor believes that there are some knowledge points that we may see or use in our daily work. I hope you can learn more from this article. For more details, please follow the industry information channel.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report