In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-26 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article is about how to parse the code of Deep SORT multi-target tracking algorithm, the editor thinks it is very practical, so I share it with you to learn. I hope you can get something after reading this article.
Deep SORT is a commonly used algorithm in multi-target tracking (Multi-Object Tracking), and it is a Detection Based Tracking method. The attention of this algorithm is very high in the industry, and many articles on Zhihu use Deep SORT for engineering deployment. The author will refer to the previous blog, combined with their own practice (theory & code) to analyze the Deep SORT algorithm at the code level. 1. Main steps of MOT
In the review of multi-target tracking based on deep learning, "DEEP LEARNING IN VIDEO MULTI-OBJECT TRACKING: a SURVEY", four main steps in MOT problem are described.
Many main steps of multi-target tracking are given the original frame of the video. Run target detectors such as Faster R-CNN, YOLOv3, SSD, etc., to get the target detection box. All the corresponding targets in the target box are extracted for feature extraction (including apparent features or motion features). Carry on the similarity calculation, calculate the matching degree between the two frames (the distance between the same target is relatively small, the distance between different targets is relatively large) data association, and assign the target ID to each object.
These are the four core steps, the core of which is detection. According to the abstract of the SORT paper, just changing a better detector can improve target tracking performance by 18.9%.
2. SORT
The predecessor of Deep SORT algorithm is SORT, whose full name is Simple Online and Realtime Tracking. In a brief introduction, the biggest feature of SORT is the target detection method based on Faster R-CNN, and the Kalman filter algorithm + Hungarian algorithm is used to greatly improve the speed of multi-target tracking and achieve the accuracy of SOTA.
This algorithm is indeed a widely used algorithm in practical application, and the core is two algorithms: Kalman filter and Hungarian algorithm.
Kalman filter algorithm is divided into two processes, prediction and update. In this algorithm, the motion state of the target is defined as 8 vectors of normal distribution.
Prediction: when the target moves, the target frame position and speed of the current frame are predicted through the parameters such as the target box and speed of the previous frame.
Update: predicted values and observations, the states of the two normal distributions are linearly weighted to get the current predicted state of the system.
* * Hungarian algorithm: * * it solves an allocation problem. The similarity matrix of the front and back frames is obtained by calculating the similarity in the main steps of MOT. The Hungarian algorithm is to solve the target of real matching between the two frames by solving the similarity matrix. This part of the sklearn library has the corresponding function linear_assignment to solve.
In SORT algorithm, the similarity matrix is constructed by two frames of IOU, so the calculation speed of SORT is very fast.
The following figure is a flowchart of the core SORT algorithm:
SORT parsing diagram provided by Harlek
Detections is the target frame obtained by the target detector, and Tracks is a track. The core is the matching process with the Kalman filter prediction and updating process.
The process is as follows: the target detector gets the target box Detections, while the Kalman filter predicts the Tracks of the current frame, and then IOU matches Detections and Tracks, and the final result is as follows:
Unmatched Tracks, this part is considered to be a mismatch, Detection and Track cannot match, if the mismatch persists, the target ID will be deleted from the image. Unmatched Detections, this section states that no Track can match the Detection, so assign a new track to the detection. Matched Track, this part shows that there is a match.
Kalman filter can predict the state of the target frame of the next frame according to the state of Tracks.
Kalman filter updates update the status of all track based on observations (Track on matching) and estimates.
3. Deep SORT
The most important feature of DeepSort is the addition of appearance information, which borrows the ReID domain model to extract features and reduces the number of ID switch. The overall flow chart is as follows:
Picture from Zhihu Harlek
It can be seen that the Deep SORT algorithm adds cascade matching (Matching Cascade) + new trajectory confirmation (confirmed) to the SORT algorithm. The overall process is as follows:
Kalman filter prediction trajectory Tracks uses the Hungarian algorithm to match the predicted trajectory Tracks with the detections in the current frame (cascade matching and IOU matching) Kalman filter update.
The cascade matching in the above figure is expanded as follows:
Picture from Zhihu Harlek
The above figure explains very clearly how to cascade matching, which is divided into two parts by dotted lines:
In the first part, the method of calculating the similarity matrix uses the appearance model (ReID) and the motion model (Mahalanobis distance) to calculate the similarity to get the cost matrix, and the other is the gating matrix, which is used to limit the excessive value in the cost matrix.
The second half is the data association step of cascading matching. The matching process is a cycle (max age iterations, default is 70), that is, the tracks from missing age=0 to missing age=70 are matched with Detections. The tracks that have not been lost are matched first, and those that have been lost for a long time are matched later. Through this part of the processing, the occluded target can be retrieved again, reducing the number of ID Switch of the occluded target and then reappearing.
Match Detection and Track, so there are several situations
Detection matches Track, which is Matched Tracks. Common continuous tracking targets are in this case, and there are targets in both frames, which can be matched. Detection did not find a matching Track, that is, Unmatched Detections. When a new target suddenly appears in the image, Detection cannot find a matching target in the previous Track. Track did not find a matching Detection, that is, Unmatched Tracks. The target of continuous tracking is beyond the image area, and the Track cannot match any of the current Detection. The above does not involve a special case, that is, the occlusion of two targets. The Track of the target that has just been obscured does not match the Detection, and the target temporarily disappears from the image. After that, when the occluded target appears again, we should try our best to keep the ID assigned by the occluded target unchanged and reduce the number of ID Switch occurrences, which requires cascade matching. 4. Deep SORT code parsing
The code provided in this paper is the following address: https://github.com/nwojke/deep_sort
Deep_sort file structure in Github library
The above figure is the core code about Deep SORT in the Github library, excluding the Faster R-CNN detection part, so I will mainly explain several files in this part, the author also made some comments on the core code, the address is: https://github.com/pprp/deep_sort_yolov3_pytorch, the target detector is replaced with a U version of yolov3, and the core of the deep_sort file is called.
4.1 Class Diagram
The following figure is a summary of the class diagrams of these class calls (not very rigorous, but can roughly show the relationship of each module):
Deep Sort class diagram
DeepSort is the core class, which can be divided into three modules by calling other modules:
ReID module, used to extract apparent features, the original paper is to generate 128D embedding. Track module, track class, used to save a Track state information, is a basic unit. Tracker module, Tracker module master the core algorithm, Kalman filter and Hungarian algorithm are completed by calling this module.
The external interface of DeepSort class is very simple:
Self.deepsort = DeepSort (args.deepsort_checkpoint) # instantiation
Outputs = self.deepsort.update (bbox_xcycwh, cls_conf, im) # updates by receiving target detection results
When calling externally, you only need the above two steps, which is very simple.
Through the class diagram, the overall module has a framework understanding, the following in-depth understanding of these modules.
4.2Core module Detection class class Detection (object):
"
This class represents a bounding box detection in a single image.
"
Def _ _ init__ (self, tlwh, confidence, feature):
Self.tlwh = np.asarray (tlwh, dtype=np.float)
Self.confidence = float (confidence)
Self.feature = np.asarray (feature, dtype=np.float32)
Def to_tlbr (self):
"" Convert bounding box to format `(min x, min y, max x, max y)`, i.e.
`(top left, bottom right)`.
"
Ret = self.tlwh.copy ()
Ret [2:] + = ret [: 2]
Return ret
Def to_xyah (self):
"" Convert bounding box to format `(center x, center y, aspect ratio)
Height) `, where the aspect ratio is `width / height`.
"
Ret = self.tlwh.copy ()
Ret [: 2] + = ret [2:] / 2
Ret [2] / = ret [3]
Return ret
The Detection class is used to hold a detection box obtained through the target detector, including the width and height of the top left coordinate + box, as well as the confidence of the bbox and the corresponding embedding obtained by reid. In addition, the conversion methods of different bbox location formats are provided:
Tlwh: represents upper-left coordinates + width-height tlbr: represents upper-left coordinates + lower-right coordinates xyah: represents center coordinates + aspect ratio + height Track type class Track:
# the information of a track, including (xmenyreajinh) & v
"
A single target track with state space `(x, y, a, h)` and associated
Velocities, where `(x, y) `is the center of the bounding box, `a` is the
Aspect ratio and `h` is the height.
"
Def _ _ init__ (self, mean, covariance, track_id, n_init, max_age
Feature=None):
# max age is a survival period, default is 70 frames, in
Self.mean = mean
Self.covariance = covariance
Self.track_id = track_id
Self.hits = 1
# compare hits with n_init
# hits updates every time you update (update only when you are match)
# hits represents the number of matches. If the number of matches exceeds n_init, it will be set to confirmed status.
Self.age = 1 # not used, duplicated with time_since_update function
Self.time_since_update = 0
# every time the predict function is called, it will be + 1
# it is set to 0 every time the update function is called
Self.state = TrackState.Tentative
Self.features = []
# each track corresponds to multiple features, and each update adds the latest feature to the list
If feature is not None:
Self.features.append (feature)
Self._n_init = n_init # set to deleted state if there is no mismatch in successive n_init frames
Self._max_age = max_age # upper limit
The Track class mainly stores track information, mean and covariance are the position and speed information of the saved box, and track_id represents the ID assigned to the track. State represents the status of the box, and there are three types:
Tentative: uncertain state, which is allocated when a Track is initialized, and the n_init frame changes to a deterministic state only on continuous matches. If no detection is matched in the uncertain state, it will be converted to the deleted state. Confirmed: definite state, which means that the Track is indeed in the matching state. If the current Track is in the definite state, but the mismatch reaches the max age number of times in a row, it will be transformed into the deleted state. Deleted: deleted state, indicating that the Track has expired. State transition diagram
Max_age represents a Track lifetime, and it needs to be compared to the time_since_update variable. Time_since_update will be + 1 every time a track calls the predict function, and every time predict is called, it will be reset to 0, that is, if a track does not have a update for a long time (no match), it will continue to increase until time_since_update exceeds max age (default 70), and the Track will be deleted from the list in Tracker.
Hits stands for the number of consecutive confirmations, used when the uncertain state changes to the deterministic state. Every time Track update, hits will be + 1, if hits > n_init (default is 3), that is, the trajectory has been matched for three consecutive frames, then the uncertain state will be changed to deterministic state.
It should be noted that each track also has an important variable, the features list, which stores the features extracted by ReID at the corresponding positions of different frames. Why save this list instead of updating it to the latest features? In order to solve the problem that the target appears again after being occluded, it is necessary to match the features corresponding to the previous frames. In addition, if there are too many features, it will seriously slow down the computing speed, so there is a parameter budget to control the length of the feature list. Take the latest budget features and delete the old one.
ReID feature extraction part
ReID network is a module independent of target detection and tracker. Its function is to extract the feature from the corresponding bounding box and get a fixed-dimensional embedding as the representative of the bbox for use in calculating similarity.
Class Extractor (object):
Def _ _ init__ (self, model_name, model_path, use_cuda=True):
Self.net = build_model (name=model_name
Num_classes=96)
Self.device = "cuda" if torch.cuda.is_available (
) and use_cuda else "cpu"
State_dict = torch.load (model_path) ['net_dict']
Self.net.load_state_dict (state_dict)
Print ("Loading weights from {}... Done!" .format (model_path))
Self.net.to (self.device)
Self.size = (128128)
Self.norm = transforms.Compose ([
Transforms.ToTensor ()
Transforms.Normalize ([0.3568, 0.3141, 0.2781]
[0.1752, 0.1857, 0.1879])
])
Def _ preprocess (self, im_crops):
"
TODO:
1. To float with scale from 0 to 1
2. Resize to (64,128) as Market1501 dataset did
3. Concatenate to a numpy array
3. To torch Tensor
4. Normalize
"
Def _ resize (im, size):
Return cv2.resize (im.astype (np.float32) / 255, size)
Im_batch = torch.cat ([
Self.norm (_ resize (im, self.size)) .unsqueeze (0) for im in im_crops
], dim=0) .float ()
Return im_batch
Def _ _ call__ (self, im_crops):
Im_batch = self._preprocess (im_crops)
With torch.no_grad ():
Im_batch = im_batch.to (self.device)
Features = self.net (im_batch)
Return features.cpu () .numpy ()
Model training is carried out according to the traditional ReID method, when using Extractor class, input as a list picture, get the corresponding characteristics of the picture.
NearestNeighborDistanceMetric class
Two functions that calculate distance are used in this class:
Calculate the Euclidean distance def _ pdist (a, b):
# used to calculate the square distance in pairs
# a NxM represents N objects, and each object has M values to compare as embedding
# b LxM represents L objects, and each object has M values to compare as embedding
# returns the matrix of NxL, for example, distal [I] [j] represents the square and distance between a [I] and b [j]
# for implementation, please see: https://blog.csdn.net/frankzd/article/details/80251042
A, b = np.asarray (a), np.asarray (b) # copy a copy of data
If len (a) = = 0 or len (b) = = 0:
Return np.zeros ((len (a), len (b)
A2, b2 = np.square (a) .sum (axis=1), np.square (
B) .sum (axis=1) # find the sum of squares of each embedding
# sum (N) + sum (L)-2 x [NxM] x [MxL] = [NxL]
R2 =-2. * np.dot (a, b.T) + a2 [:, None] + b2 [None,:]
R2 = np.clip (R2, 0., float (np.inf))
Return r2
The figure is from csdn blog calculating cosine distance def _ cosine_distance (a, b, data_is_normalized=False):
CoSine distance between # an and b
# a: [NxM] b: [LxM]
# CoSine distance = 1-CoSine similarity
# https://blog.csdn.net/u013749540/article/details/51813922
If not data_is_normalized:
# the cosine similarity needs to be converted into a cosine distance similar to the Euclidean distance.
A = np.asarray (a) / np.linalg.norm (a, axis=1, keepdims=True)
The # np.linalg.norm operation is the normal form of finding a vector, and the default is L2 normal form, which is equivalent to the Euclidean distance of finding a vector.
B = np.asarray (b) / np.linalg.norm (b, axis=1, keepdims=True)
Return 1.-np.dot (a, b.T)
Tu Yuan csdn blog
The above code corresponds to the formula. Note that cosine distance = 1-cosine similarity.
Nearest neighbor distance metric class class NearestNeighborDistanceMetric (object):
# for each target, return a nearest distance
Def _ _ init__ (self, metric, matching_threshold, budget=None):
# default matching_threshold = 0.2 budge = 100
If metric = = "euclidean":
# use the nearest neighbor Euclidean distance
Self._metric = _ nn_euclidean_distance
Elif metric = = "cosine":
# use the nearest neighbor cosine distance
Self._metric = _ nn_cosine_distance
Else:
Raise ValueError ("Invalid metric; must be either 'euclidean' or' cosine'")
Self.matching_threshold = matching_threshold
# called in a cascaded matching function
Self.budget = budget
# budge budget to control the number of feature
Self.samples = {}
# samples is a dictionary {id- > feature list}
Def partial_fit (self, features, targets, active_targets):
# function: partial fitting, updating the measured distance with new data
# call: called in the feature set update module section, tracker.update ()
For feature, target in zip (features, targets):
Self.samples.setdefault (target, []) .append (feature)
# add a new feature under the target, and update the feature collection
# Target id: feature list
If self.budget is not None:
Self.samples [target] = self.samples [target] [- self.budget:]
# set the budget, the maximum number of goals per class, more than directly ignore
# filter activated targets
Self.samples = {k: self.samples [k] for k in active_targets}
Def distance (self, features, targets):
# function: compare the distance between feature and targets and return a cost matrix
# call: in the matching phase, encapsulate distance as gated_metric
# carry out appearance information (depth features obtained by reid) +
# Motion information (Mahalanobis distance is used to measure the similarity of two distributions)
Cost_matrix = np.zeros ((len (targets), len (features)
For I, target in enumerate (targets):
Cost_matrix [I,:] = self._metric (self.samples [target], features)
Return cost_matrix
Tracker class
Tracker class is the core class. Tracker stores all the trajectory information and is responsible for initializing the trajectory of the first frame, Kalman filter prediction and update, responsible for cascade matching, IOU matching and other core work.
Class Tracker:
# is a multi-target tracker that saves many track tracks
# responsible for calling Kalman filter to predict the new state of track + matching + initializing the first frame
# when Tracker calls update or predict, each track will also call its own update or predict
"
This is the multi-target tracker.
"
Def _ _ init__ (self, metric, max_iou_distance=0.7, max_age=70, n_init=3):
# when called, the following parameters are all default
Self.metric = metric
# metric is a class that calculates distances (cosine or Mahalanobis distances)
Self.max_iou_distance = max_iou_distance
# use when maximum iou,iou matches
Self.max_age = max_age
# specify cascading matching cascade_depth parameters directly
Self.n_init = n_init
# n_init represents the update that requires the number of n_init before setting the track status to confirmed
Self.kf = kalman_filter.KalmanFilter () # Kalman filter
Self.tracks = [] # Save a series of tracks
Self._next_id = 1 # next assigned track id
Def predict (self):
# iterate through each track and make a prediction
"" Propagate track state distributions one time step forward.
This function should be called once every time step, before `update`.
"
For track in self.tracks:
Track.predict (self.kf)
Then let's take a look at the core update function and match function, which can be compared with the following flow chart:
Update function
Def update (self, detections):
# updating and track management of measurements
"" Perform measurement update and track management.
Parameters
-
Detections: List[deep _ sort.detection.Detection]
A list of detections at the current time step.
"
# Run matching cascade.
Matches, unmatched_tracks, unmatched_detections =\
Self._match (detections)
# Update track set.
# 1. For the results on the match
For track_idx, detection_idx in matches:
# track updates the corresponding detection
Self.tracks[ track _ idx] .update (self.kf, detections[ detection _ idx])
# 2. Call the mark_missed tag for an unmatched tracker
# track mismatch. Delete it if it is to be determined, or delete it if update takes a long time.
# max age is a survival period, default is 70 frames
For track_idx in unmatched_tracks:
Self.tracks[ track _ idx] .mark _ missed ()
# 3. Initialize the unmatched detection and detection mismatch
For detection_idx in unmatched_detections:
Self._initiate_track (detections [detection _ idx])
# get the latest tracks list, and save the track marked confirmed and Tentative
Self.tracks = [t for t in self.tracks if not t.is_deleted ()]
# Update distance metric.
Active_targets = [t.track_id for t in self.tracks if t.is_confirmed ()]
# get the track id of all confirmed states
Features, targets = [], []
For track in self.tracks:
If not track.is_confirmed ():
Continue
Features + = track.features # splices the tracks list to the features list
# get the track id corresponding to each feature
Targets + = [track.track_id for _ in track.features]
Track.features = []
# feature set update in distance measurement
Self.metric.partial_fit (np.asarray (features), np.asarray (targets))
Active_targets)
Match function:
Def _ match (self, detections):
# the main function is to match and find the matching and unmatched parts
Def gated_metric (tracks, dets, track_indices, detection_indices):
# function: used to calculate the distance between track and detection, cost function
# need to be used before the KM algorithm
# call:
# cost_matrix = distance_metric (tracks, detections
# track_indices, detection_indices)
Features = np.array ([dets [I] .feature for i in detection_indices])
Targets = np.array ([tracks.track _ id for i in track_indices])
# 1. Calculate the cost matrix cosine distance from the nearest neighbor
Cost_matrix = self.metric.distance (features, targets)
# 2. A new state matrix is obtained by calculating Mahalanobis distance.
Cost_matrix = linear_assignment.gate_cost_matrix (
Self.kf, cost_matrix, tracks, dets, track_indices
Detection_indices)
Return cost_matrix
# Split track set into confirmed and unconfirmed tracks.
# dividing the states of different tracks
Confirmed_tracks = [
I for i, t in enumerate (self.tracks) if t.is_confirmed ()
]
Unconfirmed_tracks = [
I for i, t in enumerate (self.tracks) if not t.is_confirmed ()
]
# cascade matching to get matched track, mismatched track, mismatched detection
''
!
Cascade matching
!
''
# gated_metric- > cosine distance
# cascade matching only the trajectories of the determined state
Matches_a, unmatched_tracks_a, unmatched_detections =\
Linear_assignment.matching_cascade (
Gated_metric
Self.metric.matching_threshold
Self.max_age
Self.tracks
Detections
Confirmed_tracks)
# combine all tracks that are in an undetermined state and those that just did not match into iou_track_candidates
# match IoU
Iou_track_candidates = unconfirmed_tracks + [
K for k in unmatched_tracks_a
If self.tracks [k] .time _ since_update = = 1 # just didn't match
]
# not matched
Unmatched_tracks_a = [
K for k in unmatched_tracks_a
If self.tracks [k] .time _ since_update! = 1 # has not been matched for a long time
]
''
!
IOU matching
Perform IoU matching on the targets that have not been successfully matched in the cascade matching
!
''
# although min_cost_matching is used as the core in cascading matching
# the metric used here is iou cost which is different from the above
Matches_b, unmatched_tracks_b, unmatched_detections =\
Linear_assignment.min_cost_matching (
Iou_matching.iou_cost
Self.max_iou_distance
Self.tracks
Detections
Iou_track_candidates
Unmatched_detections)
Matches = matches_a + matches_b # combination of two-part match results
Unmatched_tracks = list (set (unmatched_tracks_a + unmatched_tracks_b))
Return matches, unmatched_tracks, unmatched_detections
The above two parts can be understood more easily by combining the comments and the following flowchart.
The picture is from Zhihu Harlek cascade matching
The following is the pseudocode for cascading matching given in this paper:
Pseudocode for cascading matching in the paper
The following code is the corresponding implementation of the pseudo code
# 1. Assign track_indices and detection_indices
If track_indices is None:
Track_indices = list (range (len (tracks)
If detection_indices is None:
Detection_indices = list (range (len (detections)
Unmatched_detections = detection_indices
Matches = []
# cascade depth = max age defaults to 70
For level in range (cascade_depth):
If len (unmatched_detections) = = 0: # No detections left
Break
Track_indices_l = [
K for k in track_indices
If trackers [k] .time _ since_update = = 1 + level
]
If len (track_indices_l) = = 0: # Nothing to match at this level
Continue
# 2. The core content of cascading matching is this function.
Matches_l, _, unmatched_detections =\
Min_cost_matching (# max_distance=0.2
Distance_metric, max_distance, tracks, detections
Track_indices_l, unmatched_detections)
Matches + = matches_l
Unmatched_tracks = list (set (track_indices)-set (k for k, _ in matches))
Gating matrix
The function of gating matrix is to limit the cost matrix by calculating the state distribution of Kalman filter and the distance between measured values.
The distance in the cost matrix is the apparent similarity between Track and Detection. If a track wants to match two Detection with very similar apparent characteristics, it is easy to make mistakes, but at this time, let the two Detection calculate the Mahalanobis distance to the trajectory, and use a threshold gating_threshold to limit it, so the Detection with a longer Mahalanobis distance can be distinguished, which can reduce the mismatch.
Def gate_cost_matrix (
Kf, cost_matrix, tracks, detections, track_indices, detection_indices
Gated_cost=INFTY_COST, only_position=False):
# invalidate the infeasible items in the cost matrix according to the state distribution obtained by Kalman filter.
Gating_dim = 2 if only_position else 4
Gating_threshold = kalman_ filter. Chi2inv95 [dim _ dim] # 9.4877
Measurements = np.asarray ([substitutions] .to _ xyah ()
For i in detection_indices])
For row, track_idx in enumerate (track_indices):
Track = tracks [track _ idx]
Gating_distance = kf.gating_distance (
Track.mean, track.covariance, measurements, only_position)
Cost_matrix [row, gating_distance >
Gating_threshold] = gated_cost # set to inf
Return cost_matrix
Kalman filter
In Deep SORT, the following states of Track need to be estimated:
Mean value: expressed by 8-dimensional vectors (x, y, a, h, vx, vy, va, vh). (XQuery y) is the central coordinate of the box, the aspect ratio is a, the height h, and the corresponding speed, all velocities will be initialized to 0. Covariance: indicates the degree of uncertainty of the target location information, which is expressed by the diagonal matrix of 8x8. The larger the corresponding value of the matrix, the higher the degree of uncertainty.
The following figure represents the main process of Kalman filter:
DeepSORT: Deep Learning to Track Custom Objects in a Video Kalman filter first predicts the state of the current frame (time=t) and predicts the state of the next frame (time=t+1) to get the measurement result. The corresponding measurement in DeepSORT is Detection, that is, the detection box provided by the target detector. Update the forecast results and measurement results.
The following section is the main reference: https://zhuanlan.zhihu.com/p/90835266
If we have a more in-depth understanding of the Kalman filter algorithm, we can combine the Kalman filter algorithm and code to understand.
There are two formulas for forecasting:
The first formula:
Where F is the state transition matrix, as shown in the following figure:
The source of the map is Zhihu @ seeking
The second formula:
P is the covariance of the current frame (time=t), and Q is the motion estimation error of the Kalman filter, representing the degree of uncertainty.
Def predict (self, mean, covariance):
# is equivalent to getting the estimated value of t time.
# Q noise covariance in the prediction process
Std_pos = [
Self._std_weight_position * mean [3]
Self._std_weight_position * mean [3]
1e-2
Self._std_weight_position * mean [3]]
Std_vel = [
Self._std_weight_velocity * mean [3]
Self._std_weight_velocity * mean [3]
1e-5
Self._std_weight_velocity * mean [3]]
# np.r_ connects two matrices by column
# initialize noise matrix Q
Motion_cov = np.diag (np.square (np.r_ [std_pos, std_vel]))
# x' = Fx
Mean = np.dot (self._motion_mat, mean)
# P' = FPF ^ T + Q
Covariance = np.linalg.multi_dot (
Self._motion_mat, covariance, self._motion_mat.T)) + motion_cov
Return mean, covariance
Updated formula
Def project (self, mean, covariance):
Covariance of noise in # R Measurement
Std = [
Self._std_weight_position * mean [3]
Self._std_weight_position * mean [3]
1e-1
Self._std_weight_position * mean [3]]
# initialize noise matrix R
Innovation_cov = np.diag (np.square (std))
# Mapping the mean vector to the detection space, namely Hx'
Mean = np.dot (self._update_mat, mean)
# Mapping the covariance matrix to the detection space, that is, HP' H ^ T
Covariance = np.linalg.multi_dot (
Self._update_mat, covariance, self._update_mat.T))
Return mean, covariance + innovation_cov
Def update (self, mean, covariance, measurement):
# estimate the latest results through estimates and observations
# Mapping the mean and covariance to the detection space to get Hx' and S
Projected_mean, projected_cov = self.project (mean, covariance)
# Matrix decomposition
Chol_factor, lower = scipy.linalg.cho_factor (
Projected_cov, lower=True, check_finite=False)
# calculate Kalman gain K
Kalman_gain = scipy.linalg.cho_solve (
(chol_factor, lower), np.dot (covariance, self._update_mat.T). T
Check_finite=False). T
# z-Hx'
Innovation = measurement-projected_mean
# x = x'+ Ky
New_mean = mean + np.dot (innovation, kalman_gain.T)
# P = (I-KH) P'
New_covariance = covariance-np.linalg.multi_dot (
Kalman_gain, projected_cov, kalman_gain.T))
Return new_mean, new_covariance
In this formula, z is the mean of Detection, does not contain the change value, and the state is [cx,cy,a,h]. H is the measurement matrix, which maps the mean vector of Track to the detection space. The calculated y is the mean error of Detection and Track.
R is the noise matrix of the target detector and a diagonal matrix of 4x4. The values on the diagonal are the two coordinates of the center point and the noise of width and height, respectively.
The Kalman gain is calculated, which acts to measure the weight of the estimation error.
The updated mean vector x.
The updated covariance matrix.
Kalman filter author is not very in-depth understanding, no derivation of the formula, interested in this part of the recommended several blogs:
Kalman filter + python demo: https://zhuanlan.zhihu.com/p/113685503?utm_source=wechat_session&utm_medium=social&utm_oi=801414067897135104 detailed explanation + derivation: https://blog.csdn.net/honyniu/article/details/886975205. Process analysis
The process part mainly follows the following flow chart:
Deep sort flow chart summarized by Zhihu @ Maodi
Thanks to Zhihu @ Maodi for summing up the flow chart, the explanation is very clear, if you just look at the code, it is very easy to be confused. For example, the calculation of the cost matrix takes three functions in a row before it is really called. The above figure summarizes the overall process very well. The author will refer to the above process and combine the code to sort it out:
Analyze the Deep SORT call in the detector class: class Detector (object):
Def _ _ init__ (self, args):
Self.args = args
If args.display:
Cv2.namedWindow ("test", cv2.WINDOW_NORMAL)
Cv2.resizeWindow ("test", args.display_width, args.display_height)
Device = torch.device (
'cuda') if torch.cuda.is_available () else torch.device (' cpu')
Self.vdo = cv2.VideoCapture ()
Self.yolo3 = InferYOLOv3 (args.yolo_cfg
Args.img_size
Args.yolo_weights
Args.data_cfg
Device
Conf_thres=args.conf_thresh
Nms_thres=args.nms_thresh)
Self.deepsort = DeepSort (args.deepsort_checkpoint)
Initialize the DeepSORT object and update the location, confidence and picture of the box detected by the target:
Outputs = self.deepsort.update (bbox_xcycwh, cls_conf, im)
Follow the update function of the DeepSORT class to see class DeepSort (object):
Def _ _ init__ (self, model_path, max_dist=0.2):
Self.min_confidence = 0.3
In # yolov3, the confidence threshold of the test result is selected, and the detection with confidence less than 0.3 is screened.
Self.nms_max_overlap = 1. 0
# non-maximum suppression threshold. Setting it to 1 means no suppression.
# it is used to extract the embedding of an image and returns the corresponding features of a batch image
Self.extractor = Extractor ("resnet18"
Model_path
Use_cuda=True)
Max_cosine_distance = max_dist
# it is used for cascading matching. If it is greater than the modification threshold, ignore it directly.
Nn_budget = 100
# Budget, the maximum number of samples per category, if exceeded, delete the old one
# the first parameter is optional 'cosine' or' euclidean'
Metric = NearestNeighborDistanceMetric ("cosine"
Max_cosine_distance
Nn_budget)
Self.tracker = Tracker (metric)
Def update (self, bbox_xywh, confidences, ori_img):
Self.height, self.width = ori_img.shape [: 2]
# generate detections
Features = self._get_features (bbox_xywh, ori_img)
# crop bbox the corresponding picture from the original image and calculate the embedding
Bbox_tlwh = self._xywh_to_tlwh (bbox_xywh)
Detections = [
Detection (bbox_tlwh [I], conf, features [I])
For I, conf in enumerate (confidences) if conf > self.min_confidence
] # filter targets that are less than min_confidence and construct a list of Detection objects
# Detection is a bbox result in a storage graph
# need: 1. Bbox (tlwh form) 2. Corresponding confidence 3. Corresponding embedding
# run on non-maximum supression
Boxes = np.array ([d.tlwh for d in detections])
Scores = np.array ([d.confidence for d in detections])
# use non-maximal suppression
# it is useless to enable nms_thres=1 by default. In fact, there is no non-maximum suppression.
Indices = non_max_suppression (boxes, self.nms_max_overlap, scores)
Detections = [alternatives [I] for i in indices]
# update tracker
# tracker gives a prediction result, and then passes in detection for Kalman filter operation
Self.tracker.predict ()
Self.tracker.update (detections)
# output bbox identities
# storing results and visualization
Outputs = []
For track in self.tracker.tracks:
If not track.is_confirmed () or track.time_since_update > 1:
Continue
Box = track.to_tlwh ()
X1, Y1, x2, y2 = self._tlwh_to_xyxy (box)
Track_id = track.track_id
Outputs.append (np.array ([x1, y1, x2, track_id], dtype=np.int))
If len (outputs) > 0:
Outputs = np.stack (outputs, axis=0)
Return np.array (outputs)
It will be clearer to compare the above flow chart from here. During Deep SORT initialization, there is a core metric,NearestNeighborDistanceMetric class that is used for matching and feature set updates.
Sort out the update process of DeepSORT:
According to the input parameters (bbox_xywh, conf, img), the ReID model is used to extract the apparent features of the corresponding bbox.
Build a list of detections, and the contents of the list are Detection classes, where the minimum confidence of bbox is limited.
Using a non-maximal suppression algorithm, because of the default nms_thres=1, is actually useless.
The Tracker class makes a prediction, and then passes in the detections for update.
Finally, the track in which the state in the track saved in Tracker belongs to the confirmed state is returned.
The core of the above is Tracker's predict and update functions, and then sort it out.
Predict function of Tracker
Tracker is a multi-target tracker, which saves many track tracks. It is responsible for calling Kalman filter to predict the new state of track, matching work and initializing the first frame. When Tracker calls update or predict, each track will also call its own update or predict
Class Tracker:
Def _ _ init__ (self, metric, max_iou_distance=0.7, max_age=70, n_init=3):
# when called, the following parameters are all default
Self.metric = metric
Self.max_iou_distance = max_iou_distance
# use when maximum iou,iou matches
Self.max_age = max_age
# specify cascading matching cascade_depth parameters directly
Self.n_init = n_init
# n_init represents the update that requires the number of n_init before setting the track status to confirmed
Self.kf = kalman_filter.KalmanFilter () # Kalman filter
Self.tracks = [] # Save a series of tracks
Self._next_id = 1 # next assigned track id
Def predict (self):
# iterate through each track and make a prediction
"" Propagate track state distributions one time step forward.
This function should be called once every time step, before `update`.
"
For track in self.tracks:
Track.predict (self.kf)
Predict mainly uses Kalman filter algorithm to predict the state of all tracks in the track list.
Updates to Tracker
Updates to Tracker are at the core.
Def update (self, detections):
# updating and track management of measurements
"" Perform measurement update and track management.
Parameters
-
Detections: List[deep _ sort.detection.Detection]
A list of detections at the current time step.
"
# Run matching cascade.
Matches, unmatched_tracks, unmatched_detections =\
Self._match (detections)
# Update track set.
# 1. For the results on the match
For track_idx, detection_idx in matches:
# track updates the corresponding detection
Self.tracks[ track _ idx] .update (self.kf, detections[ detection _ idx])
# 2. Call the mark_missed tag for an unmatched tracker
# track mismatch. Delete it if it is to be determined, or delete it if update takes a long time.
# max age is a survival period, default is 70 frames
For track_idx in unmatched_tracks:
Self.tracks[ track _ idx] .mark _ missed ()
# 3. Initialize the unmatched detection and detection mismatch
For detection_idx in unmatched_detections:
Self._initiate_track (detections [detection _ idx])
# get the latest tracks list, and save the track marked confirmed and Tentative
Self.tracks = [t for t in self.tracks if not t.is_deleted ()]
# Update distance metric.
Active_targets = [t.track_id for t in self.tracks if t.is_confirmed ()]
# get the track id of all confirmed states
Features, targets = [], []
For track in self.tracks:
If not track.is_confirmed ():
Continue
Features + = track.features # splices the tracks list to the features list
# get the track id corresponding to each feature
Targets + = [track.track_id for _ in track.features]
Track.features = []
# feature set update in distance measurement
Self.metric.partial_fit (np.asarray (features), np.asarray (targets), active_targets)
This part of the annotation has been very detailed, mainly some post-processing code, need to pay attention to the matching, unmatched Detection, unmatched Track processing and finally the feature set update part, can be combed against the flow chart.
The core function of Tracker's update function is the match function, which describes the process of how to match:
Def _ match (self, detections):
# the main function is to match and find the matching and unmatched parts
Def gated_metric (tracks, dets, track_indices, detection_indices):
# function: used to calculate the distance between track and detection, cost function
# need to be used before the KM algorithm
# call:
# cost_matrix = distance_metric (tracks, detections
# track_indices, detection_indices)
Features = np.array ([dets [I] .feature for i in detection_indices])
Targets = np.array ([tracks.track _ id for i in track_indices])
# 1. Calculate the cost matrix cosine distance from the nearest neighbor
Cost_matrix = self.metric.distance (features, targets)
# 2. A new state matrix is obtained by calculating Mahalanobis distance.
Cost_matrix = linear_assignment.gate_cost_matrix (
Self.kf, cost_matrix, tracks, dets, track_indices
Detection_indices)
Return cost_matrix
# Split track set into confirmed and unconfirmed tracks.
# dividing the states of different tracks
Confirmed_tracks = [
I for i, t in enumerate (self.tracks) if t.is_confirmed ()
]
Unconfirmed_tracks = [
I for i, t in enumerate (self.tracks) if not t.is_confirmed ()
]
# cascade matching to get matched track, mismatched track, mismatched detection
''
!
Cascade matching
!
''
# gated_metric- > cosine distance
# cascade matching only the trajectories of the determined state
Matches_a, unmatched_tracks_a, unmatched_detections =\
Linear_assignment.matching_cascade (
Gated_metric
Self.metric.matching_threshold
Self.max_age
Self.tracks
Detections
Confirmed_tracks)
# combine all tracks that are in an undetermined state and those that just did not match into iou_track_candidates
# match IoU
Iou_track_candidates = unconfirmed_tracks + [
K for k in unmatched_tracks_a
If self.tracks [k] .time _ since_update = = 1 # just didn't match
]
# not matched
Unmatched_tracks_a = [
K for k in unmatched_tracks_a
If self.tracks [k] .time _ since_update! = 1 # has not been matched for a long time
]
''
!
IOU matching
Perform IoU matching on the targets that have not been successfully matched in the cascade matching
!
''
# although min_cost_matching is used as the core in cascading matching
# the metric used here is iou cost which is different from the above
Matches_b, unmatched_tracks_b, unmatched_detections =\
Linear_assignment.min_cost_matching (
Iou_matching.iou_cost
Self.max_iou_distance
Self.tracks
Detections
Iou_track_candidates
Unmatched_detections)
Matches = matches_a + matches_b # combination of two-part match results
Unmatched_tracks = list (set (unmatched_tracks_a + unmatched_tracks_b))
Return matches, unmatched_tracks, unmatched_detections
Compared with the following picture, it will be much smoother:
Picture from Zhihu Harlek
As you can see, the core of the matching function is cascade matching + IOU matching. First, let's take a look at cascade matching:
The call is here:
Matches_a, unmatched_tracks_a, unmatched_detections =\
Linear_assignment.matching_cascade (
Gated_metric
Self.metric.matching_threshold
Self.max_age
Self.tracks
Detections
Confirmed_tracks)
Cascade matching function expansion:
Def matching_cascade (
Distance_metric, max_distance, cascade_depth, tracks, detections
Track_indices=None, detection_indices=None):
# cascade matching
# 1. Assign track_indices and detection_indices
If track_indices is None:
Track_indices = list (range (len (tracks)
If detection_indices is None:
Detection_indices = list (range (len (detections)
Unmatched_detections = detection_indices
Matches = []
# cascade depth = max age defaults to 70
For level in range (cascade_depth):
If len (unmatched_detections) = = 0: # No detections left
Break
Track_indices_l = [
K for k in track_indices
If trackers [k] .time _ since_update = = 1 + level
]
If len (track_indices_l) = = 0: # Nothing to match at this level
Continue
# 2. The core content of cascading matching is this function.
Matches_l, _, unmatched_detections =\
Min_cost_matching (# max_distance=0.2
Distance_metric, max_distance, tracks, detections
Track_indices_l, unmatched_detections)
Matches + = matches_l
Unmatched_tracks = list (set (track_indices)-set (k for k, _ in matches))
Return matches, unmatched_tracks, unmatched_detections
You can see that it is consistent with the pseudo code, which is also mentioned in the first half of the article. There is also a core function min_cost_matching in this part of the code, which can receive different distance_metric and is useful in both cascading matching and IoU matching.
Min_cost_matching function:
Def min_cost_matching (
Distance_metric, max_distance, tracks, detections, track_indices=None
Detection_indices=None):
If track_indices is None:
Track_indices = np.arange (len (tracks))
If detection_indices is None:
Detection_indices = np.arange (len (detections))
If len (detection_indices) = = 0 or len (track_indices) = = 0:
Return [], track_indices, detection_indices # Nothing to match.
#--
# Gated_distance-- >
# 1. Cosine distance
# 2. Mahalanobis distance
# get the cost matrix
#--
# iou_cost-- >
# only calculate the iou distance between track and detection
#--
Cost_matrix = distance_metric (
Tracks, detections, track_indices, detection_indices)
#--
# set the upper limit of the distance in gated_distance
# here the furthest distance is actually set by the max_dist parameter in the deep sort class
# default max_dist=0.2, the smaller the distance, the better
#--
In the case of # iou_cost, the setting of max_distance corresponds to the max_iou_distance in tracker
# default is max_iou_distance=0.7
# notice that the result is 1-iou, so the smaller the better
#--
Cost_ Matrix [cost _ matrix > max_distance] = max_distance + 1e-5
# Hungarian algorithm or KM algorithm
Row_indices, col_indices = linear_assignment (cost_matrix)
Matches, unmatched_tracks, unmatched_detections = [], []
# these for loops are used to filter matching results to get matching and unmatched results
For col, detection_idx in enumerate (detection_indices):
If col not in col_indices:
Unmatched_detections.append (detection_idx)
For row, track_idx in enumerate (track_indices):
If row not in row_indices:
Unmatched_tracks.append (track_idx)
For row, col in zip (row_indices, col_indices):
Track_idx = track_ indications [row]
Detection_idx = detection_ indications [col]
If cost_matrix [row, col] > max_distance:
Unmatched_tracks.append (track_idx)
Unmatched_detections.append (detection_idx)
Else:
Matches.append ((track_idx, detection_idx))
# get a match, no match track, no match detection
Return matches, unmatched_tracks, unmatched_detections
There are two distance_metric mentioned in the comments:
The first is that the distance_metric passed in the cascade matching is gated_metric, and its internal core is the cascade matching of the calculated apparent features. Def gated_metric (tracks, dets, track_indices, detection_indices):
# function: used to calculate the distance between track and detection, cost function
# need to be used before the KM algorithm
# call:
# cost_matrix = distance_metric (tracks, detections
# track_indices, detection_indices)
Features = np.array ([dets [I] .feature for i in detection_indices])
Targets = np.array ([tracks.track _ id for i in track_indices])
# 1. Calculate the cost matrix cosine distance from the nearest neighbor
Cost_matrix = self.metric.distance (features, targets)
# 2. A new state matrix is obtained by calculating Mahalanobis distance.
Cost_matrix = linear_assignment.gate_cost_matrix (
Self.kf, cost_matrix, tracks, dets, track_indices
Detection_indices)
Return cost_matrix
Understand the following figure (the upper part of the following figure is the corresponding gated_metric function):
The second picture from Zhihu Harlek is iou_matching.iou_cost:# in IOU matching, although min_cost_matching is used as the core in both cascade matching and cascading matching.
# the metric used here is iou cost which is different from the above
Matches_b, unmatched_tracks_b, unmatched_detections =\
Linear_assignment.min_cost_matching (
Iou_matching.iou_cost
Self.max_iou_distance
Self.tracks
Detections
Iou_track_candidates
Unmatched_detections)
The iou_cost cost is easy to understand and is used to calculate the IOU distance matrix between Track and Detection.
Def iou_cost (tracks, detections, track_indices=None
Detection_indices=None):
# calculate the iou distance matrix between track and detection
If track_indices is None:
Track_indices = np.arange (len (tracks))
If detection_indices is None:
Detection_indices = np.arange (len (detections))
Cost_matrix = np.zeros ((len (track_indices), len (detection_indices)
For row, track_idx in enumerate (track_indices):
If trackers [track _ idx] .time _ since_update > 1:
Cost_matrix [row,:] = linear_assignment.INFTY_COST
Continue
Bbox = tracks[ track _ idx] .to _ tlwh ()
Candidates = np.asarray (
[protections.tlwh for i in detection_indices])
Cost_matrix [row,:] = 1.-iou (bbox, candidates)
Return cost_matrix above is how to parse the code of the Deep SORT multi-target tracking algorithm. The editor believes that there are some knowledge points that we may see or use in our daily work. I hope you can learn more from this article. For more details, please follow the industry information channel.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.