Add new @Retry() decorator (#7854)

Signed-off-by: Glenn Jocher <glenn.jocher@ultralytics.com>
This commit is contained in:
Glenn Jocher 2024-01-27 20:07:31 +01:00 committed by GitHub
parent 5f00fbd227
commit 1435f0e9de
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 372 additions and 269 deletions

View File

@ -31,6 +31,10 @@ keywords: Ultralytics, Utils, utilitarian functions, colorstr, yaml_save, set_lo
<br><br> <br><br>
## ::: ultralytics.utils.Retry
<br><br>
## ::: ultralytics.utils.SettingsManager ## ::: ultralytics.utils.SettingsManager
<br><br> <br><br>

View File

@ -5,7 +5,7 @@ from pathlib import Path
import pytest import pytest
TMP = Path(__file__).resolve().parent / 'tmp' # temp directory for test files TMP = Path(__file__).resolve().parent / "tmp" # temp directory for test files
def pytest_addoption(parser): def pytest_addoption(parser):
@ -15,7 +15,7 @@ def pytest_addoption(parser):
Args: Args:
parser (pytest.config.Parser): The pytest parser object. parser (pytest.config.Parser): The pytest parser object.
""" """
parser.addoption('--slow', action='store_true', default=False, help='Run slow tests') parser.addoption("--slow", action="store_true", default=False, help="Run slow tests")
def pytest_configure(config): def pytest_configure(config):
@ -25,7 +25,7 @@ def pytest_configure(config):
Args: Args:
config (pytest.config.Config): The pytest config object. config (pytest.config.Config): The pytest config object.
""" """
config.addinivalue_line('markers', 'slow: mark test as slow to run') config.addinivalue_line("markers", "slow: mark test as slow to run")
def pytest_runtest_setup(item): def pytest_runtest_setup(item):
@ -35,8 +35,8 @@ def pytest_runtest_setup(item):
Args: Args:
item (pytest.Item): The test item object. item (pytest.Item): The test item object.
""" """
if 'slow' in item.keywords and not item.config.getoption('--slow'): if "slow" in item.keywords and not item.config.getoption("--slow"):
pytest.skip('skip slow tests unless --slow is set') pytest.skip("skip slow tests unless --slow is set")
def pytest_collection_modifyitems(config, items): def pytest_collection_modifyitems(config, items):
@ -47,9 +47,9 @@ def pytest_collection_modifyitems(config, items):
config (pytest.config.Config): The pytest config object. config (pytest.config.Config): The pytest config object.
items (list): List of test items to be executed. items (list): List of test items to be executed.
""" """
if not config.getoption('--slow'): if not config.getoption("--slow"):
# Remove the item entirely from the list of test items if it's marked as 'slow' # Remove the item entirely from the list of test items if it's marked as 'slow'
items[:] = [item for item in items if 'slow' not in item.keywords] items[:] = [item for item in items if "slow" not in item.keywords]
def pytest_sessionstart(session): def pytest_sessionstart(session):
@ -84,11 +84,11 @@ def pytest_terminal_summary(terminalreporter, exitstatus, config):
from ultralytics.utils import WEIGHTS_DIR from ultralytics.utils import WEIGHTS_DIR
# Remove files # Remove files
models = [path for x in ['*.onnx', '*.torchscript'] for path in WEIGHTS_DIR.rglob(x)] models = [path for x in ["*.onnx", "*.torchscript"] for path in WEIGHTS_DIR.rglob(x)]
for file in ['bus.jpg', 'yolov8n.onnx', 'yolov8n.torchscript'] + models: for file in ["bus.jpg", "yolov8n.onnx", "yolov8n.torchscript"] + models:
Path(file).unlink(missing_ok=True) Path(file).unlink(missing_ok=True)
# Remove directories # Remove directories
models = [path for x in ['*.mlpackage', '*_openvino_model'] for path in WEIGHTS_DIR.rglob(x)] models = [path for x in ["*.mlpackage", "*_openvino_model"] for path in WEIGHTS_DIR.rglob(x)]
for directory in [TMP.parents[1] / '.pytest_cache', TMP] + models: for directory in [TMP.parents[1] / ".pytest_cache", TMP] + models:
shutil.rmtree(directory, ignore_errors=True) shutil.rmtree(directory, ignore_errors=True)

View File

@ -10,17 +10,19 @@ from ultralytics.utils.checks import cuda_device_count, cuda_is_available
CUDA_IS_AVAILABLE = cuda_is_available() CUDA_IS_AVAILABLE = cuda_is_available()
CUDA_DEVICE_COUNT = cuda_device_count() CUDA_DEVICE_COUNT = cuda_device_count()
TASK_ARGS = [ TASK_ARGS = [
('detect', 'yolov8n', 'coco8.yaml'), ("detect", "yolov8n", "coco8.yaml"),
('segment', 'yolov8n-seg', 'coco8-seg.yaml'), ("segment", "yolov8n-seg", "coco8-seg.yaml"),
('classify', 'yolov8n-cls', 'imagenet10'), ("classify", "yolov8n-cls", "imagenet10"),
('pose', 'yolov8n-pose', 'coco8-pose.yaml'), ("pose", "yolov8n-pose", "coco8-pose.yaml"),
('obb', 'yolov8n-obb', 'dota8.yaml'), ] # (task, model, data) ("obb", "yolov8n-obb", "dota8.yaml"),
] # (task, model, data)
EXPORT_ARGS = [ EXPORT_ARGS = [
('yolov8n', 'torchscript'), ("yolov8n", "torchscript"),
('yolov8n-seg', 'torchscript'), ("yolov8n-seg", "torchscript"),
('yolov8n-cls', 'torchscript'), ("yolov8n-cls", "torchscript"),
('yolov8n-pose', 'torchscript'), ("yolov8n-pose", "torchscript"),
('yolov8n-obb', 'torchscript'), ] # (model, format) ("yolov8n-obb", "torchscript"),
] # (model, format)
def run(cmd): def run(cmd):
@ -30,50 +32,50 @@ def run(cmd):
def test_special_modes(): def test_special_modes():
"""Test various special command modes of YOLO.""" """Test various special command modes of YOLO."""
run('yolo help') run("yolo help")
run('yolo checks') run("yolo checks")
run('yolo version') run("yolo version")
run('yolo settings reset') run("yolo settings reset")
run('yolo cfg') run("yolo cfg")
@pytest.mark.parametrize('task,model,data', TASK_ARGS) @pytest.mark.parametrize("task,model,data", TASK_ARGS)
def test_train(task, model, data): def test_train(task, model, data):
"""Test YOLO training for a given task, model, and data.""" """Test YOLO training for a given task, model, and data."""
run(f'yolo train {task} model={model}.yaml data={data} imgsz=32 epochs=1 cache=disk') run(f"yolo train {task} model={model}.yaml data={data} imgsz=32 epochs=1 cache=disk")
@pytest.mark.parametrize('task,model,data', TASK_ARGS) @pytest.mark.parametrize("task,model,data", TASK_ARGS)
def test_val(task, model, data): def test_val(task, model, data):
"""Test YOLO validation for a given task, model, and data.""" """Test YOLO validation for a given task, model, and data."""
run(f'yolo val {task} model={WEIGHTS_DIR / model}.pt data={data} imgsz=32 save_txt save_json') run(f"yolo val {task} model={WEIGHTS_DIR / model}.pt data={data} imgsz=32 save_txt save_json")
@pytest.mark.parametrize('task,model,data', TASK_ARGS) @pytest.mark.parametrize("task,model,data", TASK_ARGS)
def test_predict(task, model, data): def test_predict(task, model, data):
"""Test YOLO prediction on sample assets for a given task and model.""" """Test YOLO prediction on sample assets for a given task and model."""
run(f'yolo predict model={WEIGHTS_DIR / model}.pt source={ASSETS} imgsz=32 save save_crop save_txt') run(f"yolo predict model={WEIGHTS_DIR / model}.pt source={ASSETS} imgsz=32 save save_crop save_txt")
@pytest.mark.parametrize('model,format', EXPORT_ARGS) @pytest.mark.parametrize("model,format", EXPORT_ARGS)
def test_export(model, format): def test_export(model, format):
"""Test exporting a YOLO model to different formats.""" """Test exporting a YOLO model to different formats."""
run(f'yolo export model={WEIGHTS_DIR / model}.pt format={format} imgsz=32') run(f"yolo export model={WEIGHTS_DIR / model}.pt format={format} imgsz=32")
def test_rtdetr(task='detect', model='yolov8n-rtdetr.yaml', data='coco8.yaml'): def test_rtdetr(task="detect", model="yolov8n-rtdetr.yaml", data="coco8.yaml"):
"""Test the RTDETR functionality with the Ultralytics framework.""" """Test the RTDETR functionality with the Ultralytics framework."""
# Warning: MUST use imgsz=640 # Warning: MUST use imgsz=640
run(f'yolo train {task} model={model} data={data} --imgsz= 640 epochs =1, cache = disk') # add coma, spaces to args run(f"yolo train {task} model={model} data={data} --imgsz= 640 epochs =1, cache = disk") # add coma, spaces to args
run(f"yolo predict {task} model={model} source={ASSETS / 'bus.jpg'} imgsz=640 save save_crop save_txt") run(f"yolo predict {task} model={model} source={ASSETS / 'bus.jpg'} imgsz=640 save save_crop save_txt")
def test_fastsam(task='segment', model=WEIGHTS_DIR / 'FastSAM-s.pt', data='coco8-seg.yaml'): def test_fastsam(task="segment", model=WEIGHTS_DIR / "FastSAM-s.pt", data="coco8-seg.yaml"):
"""Test FastSAM segmentation functionality within Ultralytics.""" """Test FastSAM segmentation functionality within Ultralytics."""
source = ASSETS / 'bus.jpg' source = ASSETS / "bus.jpg"
run(f'yolo segment val {task} model={model} data={data} imgsz=32') run(f"yolo segment val {task} model={model} data={data} imgsz=32")
run(f'yolo segment predict model={model} source={source} imgsz=32 save save_crop save_txt') run(f"yolo segment predict model={model} source={source} imgsz=32 save save_crop save_txt")
from ultralytics import FastSAM from ultralytics import FastSAM
from ultralytics.models.fastsam import FastSAMPrompt from ultralytics.models.fastsam import FastSAMPrompt
@ -83,26 +85,26 @@ def test_fastsam(task='segment', model=WEIGHTS_DIR / 'FastSAM-s.pt', data='coco8
sam_model = FastSAM(model) # or FastSAM-x.pt sam_model = FastSAM(model) # or FastSAM-x.pt
# Run inference on an image # Run inference on an image
everything_results = sam_model(source, device='cpu', retina_masks=True, imgsz=1024, conf=0.4, iou=0.9) everything_results = sam_model(source, device="cpu", retina_masks=True, imgsz=1024, conf=0.4, iou=0.9)
# Remove small regions # Remove small regions
new_masks, _ = Predictor.remove_small_regions(everything_results[0].masks.data, min_area=20) new_masks, _ = Predictor.remove_small_regions(everything_results[0].masks.data, min_area=20)
# Everything prompt # Everything prompt
prompt_process = FastSAMPrompt(source, everything_results, device='cpu') prompt_process = FastSAMPrompt(source, everything_results, device="cpu")
ann = prompt_process.everything_prompt() ann = prompt_process.everything_prompt()
# Bbox default shape [0,0,0,0] -> [x1,y1,x2,y2] # Bbox default shape [0,0,0,0] -> [x1,y1,x2,y2]
ann = prompt_process.box_prompt(bbox=[200, 200, 300, 300]) ann = prompt_process.box_prompt(bbox=[200, 200, 300, 300])
# Text prompt # Text prompt
ann = prompt_process.text_prompt(text='a photo of a dog') ann = prompt_process.text_prompt(text="a photo of a dog")
# Point prompt # Point prompt
# Points default [[0,0]] [[x1,y1],[x2,y2]] # Points default [[0,0]] [[x1,y1],[x2,y2]]
# Point_label default [0] [1,0] 0:background, 1:foreground # Point_label default [0] [1,0] 0:background, 1:foreground
ann = prompt_process.point_prompt(points=[[200, 200]], pointlabel=[1]) ann = prompt_process.point_prompt(points=[[200, 200]], pointlabel=[1])
prompt_process.plot(annotations=ann, output='./') prompt_process.plot(annotations=ann, output="./")
def test_mobilesam(): def test_mobilesam():
@ -110,10 +112,10 @@ def test_mobilesam():
from ultralytics import SAM from ultralytics import SAM
# Load the model # Load the model
model = SAM(WEIGHTS_DIR / 'mobile_sam.pt') model = SAM(WEIGHTS_DIR / "mobile_sam.pt")
# Source # Source
source = ASSETS / 'zidane.jpg' source = ASSETS / "zidane.jpg"
# Predict a segment based on a point prompt # Predict a segment based on a point prompt
model.predict(source, points=[900, 370], labels=[1]) model.predict(source, points=[900, 370], labels=[1])
@ -127,10 +129,10 @@ def test_mobilesam():
# Slow Tests ----------------------------------------------------------------------------------------------------------- # Slow Tests -----------------------------------------------------------------------------------------------------------
@pytest.mark.slow @pytest.mark.slow
@pytest.mark.parametrize('task,model,data', TASK_ARGS) @pytest.mark.parametrize("task,model,data", TASK_ARGS)
@pytest.mark.skipif(not CUDA_IS_AVAILABLE, reason='CUDA is not available') @pytest.mark.skipif(not CUDA_IS_AVAILABLE, reason="CUDA is not available")
@pytest.mark.skipif(CUDA_DEVICE_COUNT < 2, reason='DDP is not available') @pytest.mark.skipif(CUDA_DEVICE_COUNT < 2, reason="DDP is not available")
def test_train_gpu(task, model, data): def test_train_gpu(task, model, data):
"""Test YOLO training on GPU(s) for various tasks and models.""" """Test YOLO training on GPU(s) for various tasks and models."""
run(f'yolo train {task} model={model}.yaml data={data} imgsz=32 epochs=1 device=0') # single GPU run(f"yolo train {task} model={model}.yaml data={data} imgsz=32 epochs=1 device=0") # single GPU
run(f'yolo train {task} model={model}.pt data={data} imgsz=32 epochs=1 device=0,1') # multi GPU run(f"yolo train {task} model={model}.pt data={data} imgsz=32 epochs=1 device=0,1") # multi GPU

View File

@ -9,9 +9,9 @@ from ultralytics.utils import ASSETS, WEIGHTS_DIR, checks
CUDA_IS_AVAILABLE = checks.cuda_is_available() CUDA_IS_AVAILABLE = checks.cuda_is_available()
CUDA_DEVICE_COUNT = checks.cuda_device_count() CUDA_DEVICE_COUNT = checks.cuda_device_count()
MODEL = WEIGHTS_DIR / 'path with spaces' / 'yolov8n.pt' # test spaces in path MODEL = WEIGHTS_DIR / "path with spaces" / "yolov8n.pt" # test spaces in path
DATA = 'coco8.yaml' DATA = "coco8.yaml"
BUS = ASSETS / 'bus.jpg' BUS = ASSETS / "bus.jpg"
def test_checks(): def test_checks():
@ -20,7 +20,7 @@ def test_checks():
assert torch.cuda.device_count() == CUDA_DEVICE_COUNT assert torch.cuda.device_count() == CUDA_DEVICE_COUNT
@pytest.mark.skipif(not CUDA_IS_AVAILABLE, reason='CUDA is not available') @pytest.mark.skipif(not CUDA_IS_AVAILABLE, reason="CUDA is not available")
def test_train(): def test_train():
"""Test model training on a minimal dataset.""" """Test model training on a minimal dataset."""
device = 0 if CUDA_DEVICE_COUNT == 1 else [0, 1] device = 0 if CUDA_DEVICE_COUNT == 1 else [0, 1]
@ -28,32 +28,32 @@ def test_train():
@pytest.mark.slow @pytest.mark.slow
@pytest.mark.skipif(not CUDA_IS_AVAILABLE, reason='CUDA is not available') @pytest.mark.skipif(not CUDA_IS_AVAILABLE, reason="CUDA is not available")
def test_predict_multiple_devices(): def test_predict_multiple_devices():
"""Validate model prediction on multiple devices.""" """Validate model prediction on multiple devices."""
model = YOLO('yolov8n.pt') model = YOLO("yolov8n.pt")
model = model.cpu() model = model.cpu()
assert str(model.device) == 'cpu' assert str(model.device) == "cpu"
_ = model(BUS) # CPU inference _ = model(BUS) # CPU inference
assert str(model.device) == 'cpu' assert str(model.device) == "cpu"
model = model.to('cuda:0') model = model.to("cuda:0")
assert str(model.device) == 'cuda:0' assert str(model.device) == "cuda:0"
_ = model(BUS) # CUDA inference _ = model(BUS) # CUDA inference
assert str(model.device) == 'cuda:0' assert str(model.device) == "cuda:0"
model = model.cpu() model = model.cpu()
assert str(model.device) == 'cpu' assert str(model.device) == "cpu"
_ = model(BUS) # CPU inference _ = model(BUS) # CPU inference
assert str(model.device) == 'cpu' assert str(model.device) == "cpu"
model = model.cuda() model = model.cuda()
assert str(model.device) == 'cuda:0' assert str(model.device) == "cuda:0"
_ = model(BUS) # CUDA inference _ = model(BUS) # CUDA inference
assert str(model.device) == 'cuda:0' assert str(model.device) == "cuda:0"
@pytest.mark.skipif(not CUDA_IS_AVAILABLE, reason='CUDA is not available') @pytest.mark.skipif(not CUDA_IS_AVAILABLE, reason="CUDA is not available")
def test_autobatch(): def test_autobatch():
"""Check batch size for YOLO model using autobatch.""" """Check batch size for YOLO model using autobatch."""
from ultralytics.utils.autobatch import check_train_batch_size from ultralytics.utils.autobatch import check_train_batch_size
@ -62,24 +62,24 @@ def test_autobatch():
@pytest.mark.slow @pytest.mark.slow
@pytest.mark.skipif(not CUDA_IS_AVAILABLE, reason='CUDA is not available') @pytest.mark.skipif(not CUDA_IS_AVAILABLE, reason="CUDA is not available")
def test_utils_benchmarks(): def test_utils_benchmarks():
"""Profile YOLO models for performance benchmarks.""" """Profile YOLO models for performance benchmarks."""
from ultralytics.utils.benchmarks import ProfileModels from ultralytics.utils.benchmarks import ProfileModels
# Pre-export a dynamic engine model to use dynamic inference # Pre-export a dynamic engine model to use dynamic inference
YOLO(MODEL).export(format='engine', imgsz=32, dynamic=True, batch=1) YOLO(MODEL).export(format="engine", imgsz=32, dynamic=True, batch=1)
ProfileModels([MODEL], imgsz=32, half=False, min_time=1, num_timed_runs=3, num_warmup_runs=1).profile() ProfileModels([MODEL], imgsz=32, half=False, min_time=1, num_timed_runs=3, num_warmup_runs=1).profile()
@pytest.mark.skipif(not CUDA_IS_AVAILABLE, reason='CUDA is not available') @pytest.mark.skipif(not CUDA_IS_AVAILABLE, reason="CUDA is not available")
def test_predict_sam(): def test_predict_sam():
"""Test SAM model prediction with various prompts.""" """Test SAM model prediction with various prompts."""
from ultralytics import SAM from ultralytics import SAM
from ultralytics.models.sam import Predictor as SAMPredictor from ultralytics.models.sam import Predictor as SAMPredictor
# Load a model # Load a model
model = SAM(WEIGHTS_DIR / 'sam_b.pt') model = SAM(WEIGHTS_DIR / "sam_b.pt")
# Display model information (optional) # Display model information (optional)
model.info() model.info()
@ -91,14 +91,14 @@ def test_predict_sam():
model(BUS, bboxes=[439, 437, 524, 709], device=0) model(BUS, bboxes=[439, 437, 524, 709], device=0)
# Run inference with points prompt # Run inference with points prompt
model(ASSETS / 'zidane.jpg', points=[900, 370], labels=[1], device=0) model(ASSETS / "zidane.jpg", points=[900, 370], labels=[1], device=0)
# Create SAMPredictor # Create SAMPredictor
overrides = dict(conf=0.25, task='segment', mode='predict', imgsz=1024, model=WEIGHTS_DIR / 'mobile_sam.pt') overrides = dict(conf=0.25, task="segment", mode="predict", imgsz=1024, model=WEIGHTS_DIR / "mobile_sam.pt")
predictor = SAMPredictor(overrides=overrides) predictor = SAMPredictor(overrides=overrides)
# Set image # Set image
predictor.set_image(ASSETS / 'zidane.jpg') # set with image file predictor.set_image(ASSETS / "zidane.jpg") # set with image file
# predictor(bboxes=[439, 437, 524, 709]) # predictor(bboxes=[439, 437, 524, 709])
# predictor(points=[900, 370], labels=[1]) # predictor(points=[900, 370], labels=[1])

View File

@ -6,123 +6,123 @@ from ultralytics.engine.exporter import Exporter
from ultralytics.models.yolo import classify, detect, segment from ultralytics.models.yolo import classify, detect, segment
from ultralytics.utils import ASSETS, DEFAULT_CFG, WEIGHTS_DIR from ultralytics.utils import ASSETS, DEFAULT_CFG, WEIGHTS_DIR
CFG_DET = 'yolov8n.yaml' CFG_DET = "yolov8n.yaml"
CFG_SEG = 'yolov8n-seg.yaml' CFG_SEG = "yolov8n-seg.yaml"
CFG_CLS = 'yolov8n-cls.yaml' # or 'squeezenet1_0' CFG_CLS = "yolov8n-cls.yaml" # or 'squeezenet1_0'
CFG = get_cfg(DEFAULT_CFG) CFG = get_cfg(DEFAULT_CFG)
MODEL = WEIGHTS_DIR / 'yolov8n' MODEL = WEIGHTS_DIR / "yolov8n"
def test_func(*args): # noqa def test_func(*args): # noqa
"""Test function callback.""" """Test function callback."""
print('callback test passed') print("callback test passed")
def test_export(): def test_export():
"""Test model exporting functionality.""" """Test model exporting functionality."""
exporter = Exporter() exporter = Exporter()
exporter.add_callback('on_export_start', test_func) exporter.add_callback("on_export_start", test_func)
assert test_func in exporter.callbacks['on_export_start'], 'callback test failed' assert test_func in exporter.callbacks["on_export_start"], "callback test failed"
f = exporter(model=YOLO(CFG_DET).model) f = exporter(model=YOLO(CFG_DET).model)
YOLO(f)(ASSETS) # exported model inference YOLO(f)(ASSETS) # exported model inference
def test_detect(): def test_detect():
"""Test object detection functionality.""" """Test object detection functionality."""
overrides = {'data': 'coco8.yaml', 'model': CFG_DET, 'imgsz': 32, 'epochs': 1, 'save': False} overrides = {"data": "coco8.yaml", "model": CFG_DET, "imgsz": 32, "epochs": 1, "save": False}
CFG.data = 'coco8.yaml' CFG.data = "coco8.yaml"
CFG.imgsz = 32 CFG.imgsz = 32
# Trainer # Trainer
trainer = detect.DetectionTrainer(overrides=overrides) trainer = detect.DetectionTrainer(overrides=overrides)
trainer.add_callback('on_train_start', test_func) trainer.add_callback("on_train_start", test_func)
assert test_func in trainer.callbacks['on_train_start'], 'callback test failed' assert test_func in trainer.callbacks["on_train_start"], "callback test failed"
trainer.train() trainer.train()
# Validator # Validator
val = detect.DetectionValidator(args=CFG) val = detect.DetectionValidator(args=CFG)
val.add_callback('on_val_start', test_func) val.add_callback("on_val_start", test_func)
assert test_func in val.callbacks['on_val_start'], 'callback test failed' assert test_func in val.callbacks["on_val_start"], "callback test failed"
val(model=trainer.best) # validate best.pt val(model=trainer.best) # validate best.pt
# Predictor # Predictor
pred = detect.DetectionPredictor(overrides={'imgsz': [64, 64]}) pred = detect.DetectionPredictor(overrides={"imgsz": [64, 64]})
pred.add_callback('on_predict_start', test_func) pred.add_callback("on_predict_start", test_func)
assert test_func in pred.callbacks['on_predict_start'], 'callback test failed' assert test_func in pred.callbacks["on_predict_start"], "callback test failed"
result = pred(source=ASSETS, model=f'{MODEL}.pt') result = pred(source=ASSETS, model=f"{MODEL}.pt")
assert len(result), 'predictor test failed' assert len(result), "predictor test failed"
overrides['resume'] = trainer.last overrides["resume"] = trainer.last
trainer = detect.DetectionTrainer(overrides=overrides) trainer = detect.DetectionTrainer(overrides=overrides)
try: try:
trainer.train() trainer.train()
except Exception as e: except Exception as e:
print(f'Expected exception caught: {e}') print(f"Expected exception caught: {e}")
return return
Exception('Resume test failed!') Exception("Resume test failed!")
def test_segment(): def test_segment():
"""Test image segmentation functionality.""" """Test image segmentation functionality."""
overrides = {'data': 'coco8-seg.yaml', 'model': CFG_SEG, 'imgsz': 32, 'epochs': 1, 'save': False} overrides = {"data": "coco8-seg.yaml", "model": CFG_SEG, "imgsz": 32, "epochs": 1, "save": False}
CFG.data = 'coco8-seg.yaml' CFG.data = "coco8-seg.yaml"
CFG.imgsz = 32 CFG.imgsz = 32
# YOLO(CFG_SEG).train(**overrides) # works # YOLO(CFG_SEG).train(**overrides) # works
# Trainer # Trainer
trainer = segment.SegmentationTrainer(overrides=overrides) trainer = segment.SegmentationTrainer(overrides=overrides)
trainer.add_callback('on_train_start', test_func) trainer.add_callback("on_train_start", test_func)
assert test_func in trainer.callbacks['on_train_start'], 'callback test failed' assert test_func in trainer.callbacks["on_train_start"], "callback test failed"
trainer.train() trainer.train()
# Validator # Validator
val = segment.SegmentationValidator(args=CFG) val = segment.SegmentationValidator(args=CFG)
val.add_callback('on_val_start', test_func) val.add_callback("on_val_start", test_func)
assert test_func in val.callbacks['on_val_start'], 'callback test failed' assert test_func in val.callbacks["on_val_start"], "callback test failed"
val(model=trainer.best) # validate best.pt val(model=trainer.best) # validate best.pt
# Predictor # Predictor
pred = segment.SegmentationPredictor(overrides={'imgsz': [64, 64]}) pred = segment.SegmentationPredictor(overrides={"imgsz": [64, 64]})
pred.add_callback('on_predict_start', test_func) pred.add_callback("on_predict_start", test_func)
assert test_func in pred.callbacks['on_predict_start'], 'callback test failed' assert test_func in pred.callbacks["on_predict_start"], "callback test failed"
result = pred(source=ASSETS, model=f'{MODEL}-seg.pt') result = pred(source=ASSETS, model=f"{MODEL}-seg.pt")
assert len(result), 'predictor test failed' assert len(result), "predictor test failed"
# Test resume # Test resume
overrides['resume'] = trainer.last overrides["resume"] = trainer.last
trainer = segment.SegmentationTrainer(overrides=overrides) trainer = segment.SegmentationTrainer(overrides=overrides)
try: try:
trainer.train() trainer.train()
except Exception as e: except Exception as e:
print(f'Expected exception caught: {e}') print(f"Expected exception caught: {e}")
return return
Exception('Resume test failed!') Exception("Resume test failed!")
def test_classify(): def test_classify():
"""Test image classification functionality.""" """Test image classification functionality."""
overrides = {'data': 'imagenet10', 'model': CFG_CLS, 'imgsz': 32, 'epochs': 1, 'save': False} overrides = {"data": "imagenet10", "model": CFG_CLS, "imgsz": 32, "epochs": 1, "save": False}
CFG.data = 'imagenet10' CFG.data = "imagenet10"
CFG.imgsz = 32 CFG.imgsz = 32
# YOLO(CFG_SEG).train(**overrides) # works # YOLO(CFG_SEG).train(**overrides) # works
# Trainer # Trainer
trainer = classify.ClassificationTrainer(overrides=overrides) trainer = classify.ClassificationTrainer(overrides=overrides)
trainer.add_callback('on_train_start', test_func) trainer.add_callback("on_train_start", test_func)
assert test_func in trainer.callbacks['on_train_start'], 'callback test failed' assert test_func in trainer.callbacks["on_train_start"], "callback test failed"
trainer.train() trainer.train()
# Validator # Validator
val = classify.ClassificationValidator(args=CFG) val = classify.ClassificationValidator(args=CFG)
val.add_callback('on_val_start', test_func) val.add_callback("on_val_start", test_func)
assert test_func in val.callbacks['on_val_start'], 'callback test failed' assert test_func in val.callbacks["on_val_start"], "callback test failed"
val(model=trainer.best) val(model=trainer.best)
# Predictor # Predictor
pred = classify.ClassificationPredictor(overrides={'imgsz': [64, 64]}) pred = classify.ClassificationPredictor(overrides={"imgsz": [64, 64]})
pred.add_callback('on_predict_start', test_func) pred.add_callback("on_predict_start", test_func)
assert test_func in pred.callbacks['on_predict_start'], 'callback test failed' assert test_func in pred.callbacks["on_predict_start"], "callback test failed"
result = pred(source=ASSETS, model=trainer.best) result = pred(source=ASSETS, model=trainer.best)
assert len(result), 'predictor test failed' assert len(result), "predictor test failed"

View File

@ -12,7 +12,7 @@ def test_similarity():
exp.create_embeddings_table() exp.create_embeddings_table()
similar = exp.get_similar(idx=1) similar = exp.get_similar(idx=1)
assert len(similar) == 25 assert len(similar) == 25
similar = exp.get_similar(img=ASSETS / 'zidane.jpg') similar = exp.get_similar(img=ASSETS / "zidane.jpg")
assert len(similar) == 25 assert len(similar) == 25
similar = exp.get_similar(idx=[1, 2], limit=10) similar = exp.get_similar(idx=[1, 2], limit=10)
assert len(similar) == 10 assert len(similar) == 10
@ -24,9 +24,9 @@ def test_similarity():
def test_det(): def test_det():
"""Test detection functionalities and ensure the embedding table has bounding boxes.""" """Test detection functionalities and ensure the embedding table has bounding boxes."""
exp = Explorer(data='coco8.yaml', model='yolov8n.pt') exp = Explorer(data="coco8.yaml", model="yolov8n.pt")
exp.create_embeddings_table(force=True) exp.create_embeddings_table(force=True)
assert len(exp.table.head()['bboxes']) > 0 assert len(exp.table.head()["bboxes"]) > 0
similar = exp.get_similar(idx=[1, 2], limit=10) similar = exp.get_similar(idx=[1, 2], limit=10)
assert len(similar) > 0 assert len(similar) > 0
# This is a loose test, just checks errors not correctness # This is a loose test, just checks errors not correctness
@ -36,9 +36,9 @@ def test_det():
def test_seg(): def test_seg():
"""Test segmentation functionalities and verify the embedding table includes masks.""" """Test segmentation functionalities and verify the embedding table includes masks."""
exp = Explorer(data='coco8-seg.yaml', model='yolov8n-seg.pt') exp = Explorer(data="coco8-seg.yaml", model="yolov8n-seg.pt")
exp.create_embeddings_table(force=True) exp.create_embeddings_table(force=True)
assert len(exp.table.head()['masks']) > 0 assert len(exp.table.head()["masks"]) > 0
similar = exp.get_similar(idx=[1, 2], limit=10) similar = exp.get_similar(idx=[1, 2], limit=10)
assert len(similar) > 0 assert len(similar) > 0
similar = exp.plot_similar(idx=[1, 2], limit=10) similar = exp.plot_similar(idx=[1, 2], limit=10)
@ -47,9 +47,9 @@ def test_seg():
def test_pose(): def test_pose():
"""Test pose estimation functionalities and check the embedding table for keypoints.""" """Test pose estimation functionalities and check the embedding table for keypoints."""
exp = Explorer(data='coco8-pose.yaml', model='yolov8n-pose.pt') exp = Explorer(data="coco8-pose.yaml", model="yolov8n-pose.pt")
exp.create_embeddings_table(force=True) exp.create_embeddings_table(force=True)
assert len(exp.table.head()['keypoints']) > 0 assert len(exp.table.head()["keypoints"]) > 0
similar = exp.get_similar(idx=[1, 2], limit=10) similar = exp.get_similar(idx=[1, 2], limit=10)
assert len(similar) > 0 assert len(similar) > 0
similar = exp.plot_similar(idx=[1, 2], limit=10) similar = exp.plot_similar(idx=[1, 2], limit=10)

View File

@ -9,67 +9,67 @@ from ultralytics import YOLO, download
from ultralytics.utils import ASSETS, DATASETS_DIR, ROOT, SETTINGS, WEIGHTS_DIR from ultralytics.utils import ASSETS, DATASETS_DIR, ROOT, SETTINGS, WEIGHTS_DIR
from ultralytics.utils.checks import check_requirements from ultralytics.utils.checks import check_requirements
MODEL = WEIGHTS_DIR / 'path with spaces' / 'yolov8n.pt' # test spaces in path MODEL = WEIGHTS_DIR / "path with spaces" / "yolov8n.pt" # test spaces in path
CFG = 'yolov8n.yaml' CFG = "yolov8n.yaml"
SOURCE = ASSETS / 'bus.jpg' SOURCE = ASSETS / "bus.jpg"
TMP = (ROOT / '../tests/tmp').resolve() # temp directory for test files TMP = (ROOT / "../tests/tmp").resolve() # temp directory for test files
@pytest.mark.skipif(not check_requirements('ray', install=False), reason='ray[tune] not installed') @pytest.mark.skipif(not check_requirements("ray", install=False), reason="ray[tune] not installed")
def test_model_ray_tune(): def test_model_ray_tune():
"""Tune YOLO model with Ray optimization library.""" """Tune YOLO model with Ray optimization library."""
YOLO('yolov8n-cls.yaml').tune(use_ray=True, YOLO("yolov8n-cls.yaml").tune(
data='imagenet10', use_ray=True, data="imagenet10", grace_period=1, iterations=1, imgsz=32, epochs=1, plots=False, device="cpu"
grace_period=1, )
iterations=1,
imgsz=32,
epochs=1,
plots=False,
device='cpu')
@pytest.mark.skipif(not check_requirements('mlflow', install=False), reason='mlflow not installed') @pytest.mark.skipif(not check_requirements("mlflow", install=False), reason="mlflow not installed")
def test_mlflow(): def test_mlflow():
"""Test training with MLflow tracking enabled.""" """Test training with MLflow tracking enabled."""
SETTINGS['mlflow'] = True SETTINGS["mlflow"] = True
YOLO('yolov8n-cls.yaml').train(data='imagenet10', imgsz=32, epochs=3, plots=False, device='cpu') YOLO("yolov8n-cls.yaml").train(data="imagenet10", imgsz=32, epochs=3, plots=False, device="cpu")
@pytest.mark.skipif(not check_requirements('tritonclient', install=False), reason='tritonclient[all] not installed') @pytest.mark.skipif(not check_requirements("tritonclient", install=False), reason="tritonclient[all] not installed")
def test_triton(): def test_triton():
"""Test NVIDIA Triton Server functionalities.""" """Test NVIDIA Triton Server functionalities."""
check_requirements('tritonclient[all]') check_requirements("tritonclient[all]")
import subprocess import subprocess
import time import time
from tritonclient.http import InferenceServerClient # noqa from tritonclient.http import InferenceServerClient # noqa
# Create variables # Create variables
model_name = 'yolo' model_name = "yolo"
triton_repo_path = TMP / 'triton_repo' triton_repo_path = TMP / "triton_repo"
triton_model_path = triton_repo_path / model_name triton_model_path = triton_repo_path / model_name
# Export model to ONNX # Export model to ONNX
f = YOLO(MODEL).export(format='onnx', dynamic=True) f = YOLO(MODEL).export(format="onnx", dynamic=True)
# Prepare Triton repo # Prepare Triton repo
(triton_model_path / '1').mkdir(parents=True, exist_ok=True) (triton_model_path / "1").mkdir(parents=True, exist_ok=True)
Path(f).rename(triton_model_path / '1' / 'model.onnx') Path(f).rename(triton_model_path / "1" / "model.onnx")
(triton_model_path / 'config.pbtxt').touch() (triton_model_path / "config.pbtxt").touch()
# Define image https://catalog.ngc.nvidia.com/orgs/nvidia/containers/tritonserver # Define image https://catalog.ngc.nvidia.com/orgs/nvidia/containers/tritonserver
tag = 'nvcr.io/nvidia/tritonserver:23.09-py3' # 6.4 GB tag = "nvcr.io/nvidia/tritonserver:23.09-py3" # 6.4 GB
# Pull the image # Pull the image
subprocess.call(f'docker pull {tag}', shell=True) subprocess.call(f"docker pull {tag}", shell=True)
# Run the Triton server and capture the container ID # Run the Triton server and capture the container ID
container_id = subprocess.check_output( container_id = (
f'docker run -d --rm -v {triton_repo_path}:/models -p 8000:8000 {tag} tritonserver --model-repository=/models', subprocess.check_output(
shell=True).decode('utf-8').strip() f"docker run -d --rm -v {triton_repo_path}:/models -p 8000:8000 {tag} tritonserver --model-repository=/models",
shell=True,
)
.decode("utf-8")
.strip()
)
# Wait for the Triton server to start # Wait for the Triton server to start
triton_client = InferenceServerClient(url='localhost:8000', verbose=False, ssl=False) triton_client = InferenceServerClient(url="localhost:8000", verbose=False, ssl=False)
# Wait until model is ready # Wait until model is ready
for _ in range(10): for _ in range(10):
@ -79,13 +79,13 @@ def test_triton():
time.sleep(1) time.sleep(1)
# Check Triton inference # Check Triton inference
YOLO(f'http://localhost:8000/{model_name}', 'detect')(SOURCE) # exported model inference YOLO(f"http://localhost:8000/{model_name}", "detect")(SOURCE) # exported model inference
# Kill and remove the container at the end of the test # Kill and remove the container at the end of the test
subprocess.call(f'docker kill {container_id}', shell=True) subprocess.call(f"docker kill {container_id}", shell=True)
@pytest.mark.skipif(not check_requirements('pycocotools', install=False), reason='pycocotools not installed') @pytest.mark.skipif(not check_requirements("pycocotools", install=False), reason="pycocotools not installed")
def test_pycocotools(): def test_pycocotools():
"""Validate model predictions using pycocotools.""" """Validate model predictions using pycocotools."""
from ultralytics.models.yolo.detect import DetectionValidator from ultralytics.models.yolo.detect import DetectionValidator
@ -93,25 +93,25 @@ def test_pycocotools():
from ultralytics.models.yolo.segment import SegmentationValidator from ultralytics.models.yolo.segment import SegmentationValidator
# Download annotations after each dataset downloads first # Download annotations after each dataset downloads first
url = 'https://github.com/ultralytics/assets/releases/download/v8.1.0/' url = "https://github.com/ultralytics/assets/releases/download/v8.1.0/"
args = {'model': 'yolov8n.pt', 'data': 'coco8.yaml', 'save_json': True, 'imgsz': 64} args = {"model": "yolov8n.pt", "data": "coco8.yaml", "save_json": True, "imgsz": 64}
validator = DetectionValidator(args=args) validator = DetectionValidator(args=args)
validator() validator()
validator.is_coco = True validator.is_coco = True
download(f'{url}instances_val2017.json', dir=DATASETS_DIR / 'coco8/annotations') download(f"{url}instances_val2017.json", dir=DATASETS_DIR / "coco8/annotations")
_ = validator.eval_json(validator.stats) _ = validator.eval_json(validator.stats)
args = {'model': 'yolov8n-seg.pt', 'data': 'coco8-seg.yaml', 'save_json': True, 'imgsz': 64} args = {"model": "yolov8n-seg.pt", "data": "coco8-seg.yaml", "save_json": True, "imgsz": 64}
validator = SegmentationValidator(args=args) validator = SegmentationValidator(args=args)
validator() validator()
validator.is_coco = True validator.is_coco = True
download(f'{url}instances_val2017.json', dir=DATASETS_DIR / 'coco8-seg/annotations') download(f"{url}instances_val2017.json", dir=DATASETS_DIR / "coco8-seg/annotations")
_ = validator.eval_json(validator.stats) _ = validator.eval_json(validator.stats)
args = {'model': 'yolov8n-pose.pt', 'data': 'coco8-pose.yaml', 'save_json': True, 'imgsz': 64} args = {"model": "yolov8n-pose.pt", "data": "coco8-pose.yaml", "save_json": True, "imgsz": 64}
validator = PoseValidator(args=args) validator = PoseValidator(args=args)
validator() validator()
validator.is_coco = True validator.is_coco = True
download(f'{url}person_keypoints_val2017.json', dir=DATASETS_DIR / 'coco8-pose/annotations') download(f"{url}person_keypoints_val2017.json", dir=DATASETS_DIR / "coco8-pose/annotations")
_ = validator.eval_json(validator.stats) _ = validator.eval_json(validator.stats)

View File

@ -14,15 +14,27 @@ from torchvision.transforms import ToTensor
from ultralytics import RTDETR, YOLO from ultralytics import RTDETR, YOLO
from ultralytics.cfg import TASK2DATA from ultralytics.cfg import TASK2DATA
from ultralytics.data.build import load_inference_source from ultralytics.data.build import load_inference_source
from ultralytics.utils import (ASSETS, DEFAULT_CFG, DEFAULT_CFG_PATH, LINUX, MACOS, ONLINE, ROOT, WEIGHTS_DIR, WINDOWS, from ultralytics.utils import (
checks, is_dir_writeable) ASSETS,
DEFAULT_CFG,
DEFAULT_CFG_PATH,
LINUX,
MACOS,
ONLINE,
ROOT,
WEIGHTS_DIR,
WINDOWS,
Retry,
checks,
is_dir_writeable,
)
from ultralytics.utils.downloads import download from ultralytics.utils.downloads import download
from ultralytics.utils.torch_utils import TORCH_1_9 from ultralytics.utils.torch_utils import TORCH_1_9
MODEL = WEIGHTS_DIR / 'path with spaces' / 'yolov8n.pt' # test spaces in path MODEL = WEIGHTS_DIR / "path with spaces" / "yolov8n.pt" # test spaces in path
CFG = 'yolov8n.yaml' CFG = "yolov8n.yaml"
SOURCE = ASSETS / 'bus.jpg' SOURCE = ASSETS / "bus.jpg"
TMP = (ROOT / '../tests/tmp').resolve() # temp directory for test files TMP = (ROOT / "../tests/tmp").resolve() # temp directory for test files
IS_TMP_WRITEABLE = is_dir_writeable(TMP) IS_TMP_WRITEABLE = is_dir_writeable(TMP)
@ -40,9 +52,9 @@ def test_model_methods():
model.info(verbose=True, detailed=True) model.info(verbose=True, detailed=True)
model = model.reset_weights() model = model.reset_weights()
model = model.load(MODEL) model = model.load(MODEL)
model.to('cpu') model.to("cpu")
model.fuse() model.fuse()
model.clear_callback('on_train_start') model.clear_callback("on_train_start")
model.reset_callbacks() model.reset_callbacks()
# Model properties # Model properties
@ -61,23 +73,23 @@ def test_model_profile():
_ = model.predict(im, profile=True) _ = model.predict(im, profile=True)
@pytest.mark.skipif(not IS_TMP_WRITEABLE, reason='directory is not writeable') @pytest.mark.skipif(not IS_TMP_WRITEABLE, reason="directory is not writeable")
def test_predict_txt(): def test_predict_txt():
"""Test YOLO predictions with sources (file, dir, glob, recursive glob) specified in a text file.""" """Test YOLO predictions with sources (file, dir, glob, recursive glob) specified in a text file."""
txt_file = TMP / 'sources.txt' txt_file = TMP / "sources.txt"
with open(txt_file, 'w') as f: with open(txt_file, "w") as f:
for x in [ASSETS / 'bus.jpg', ASSETS, ASSETS / '*', ASSETS / '**/*.jpg']: for x in [ASSETS / "bus.jpg", ASSETS, ASSETS / "*", ASSETS / "**/*.jpg"]:
f.write(f'{x}\n') f.write(f"{x}\n")
_ = YOLO(MODEL)(source=txt_file, imgsz=32) _ = YOLO(MODEL)(source=txt_file, imgsz=32)
def test_predict_img(): def test_predict_img():
"""Test YOLO prediction on various types of image sources.""" """Test YOLO prediction on various types of image sources."""
model = YOLO(MODEL) model = YOLO(MODEL)
seg_model = YOLO(WEIGHTS_DIR / 'yolov8n-seg.pt') seg_model = YOLO(WEIGHTS_DIR / "yolov8n-seg.pt")
cls_model = YOLO(WEIGHTS_DIR / 'yolov8n-cls.pt') cls_model = YOLO(WEIGHTS_DIR / "yolov8n-cls.pt")
pose_model = YOLO(WEIGHTS_DIR / 'yolov8n-pose.pt') pose_model = YOLO(WEIGHTS_DIR / "yolov8n-pose.pt")
obb_model = YOLO(WEIGHTS_DIR / 'yolov8n-obb.pt') obb_model = YOLO(WEIGHTS_DIR / "yolov8n-obb.pt")
im = cv2.imread(str(SOURCE)) im = cv2.imread(str(SOURCE))
assert len(model(source=Image.open(SOURCE), save=True, verbose=True, imgsz=32)) == 1 # PIL assert len(model(source=Image.open(SOURCE), save=True, verbose=True, imgsz=32)) == 1 # PIL
assert len(model(source=im, save=True, save_txt=True, imgsz=32)) == 1 # ndarray assert len(model(source=im, save=True, save_txt=True, imgsz=32)) == 1 # ndarray
@ -87,10 +99,11 @@ def test_predict_img():
batch = [ batch = [
str(SOURCE), # filename str(SOURCE), # filename
Path(SOURCE), # Path Path(SOURCE), # Path
'https://ultralytics.com/images/zidane.jpg' if ONLINE else SOURCE, # URI "https://ultralytics.com/images/zidane.jpg" if ONLINE else SOURCE, # URI
cv2.imread(str(SOURCE)), # OpenCV cv2.imread(str(SOURCE)), # OpenCV
Image.open(SOURCE), # PIL Image.open(SOURCE), # PIL
np.zeros((320, 640, 3))] # numpy np.zeros((320, 640, 3)),
] # numpy
assert len(model(batch, imgsz=32)) == len(batch) # multiple sources in a batch assert len(model(batch, imgsz=32)) == len(batch) # multiple sources in a batch
# Test tensor inference # Test tensor inference
@ -113,16 +126,16 @@ def test_predict_img():
def test_predict_grey_and_4ch(): def test_predict_grey_and_4ch():
"""Test YOLO prediction on SOURCE converted to greyscale and 4-channel images.""" """Test YOLO prediction on SOURCE converted to greyscale and 4-channel images."""
im = Image.open(SOURCE) im = Image.open(SOURCE)
directory = TMP / 'im4' directory = TMP / "im4"
directory.mkdir(parents=True, exist_ok=True) directory.mkdir(parents=True, exist_ok=True)
source_greyscale = directory / 'greyscale.jpg' source_greyscale = directory / "greyscale.jpg"
source_rgba = directory / '4ch.png' source_rgba = directory / "4ch.png"
source_non_utf = directory / 'non_UTF_测试文件_tést_image.jpg' source_non_utf = directory / "non_UTF_测试文件_tést_image.jpg"
source_spaces = directory / 'image with spaces.jpg' source_spaces = directory / "image with spaces.jpg"
im.convert('L').save(source_greyscale) # greyscale im.convert("L").save(source_greyscale) # greyscale
im.convert('RGBA').save(source_rgba) # 4-ch PNG with alpha im.convert("RGBA").save(source_rgba) # 4-ch PNG with alpha
im.save(source_non_utf) # non-UTF characters in filename im.save(source_non_utf) # non-UTF characters in filename
im.save(source_spaces) # spaces in filename im.save(source_spaces) # spaces in filename
@ -136,7 +149,8 @@ def test_predict_grey_and_4ch():
@pytest.mark.slow @pytest.mark.slow
@pytest.mark.skipif(not ONLINE, reason='environment is offline') @pytest.mark.skipif(not ONLINE, reason="environment is offline")
@Retry(times=3, delay=10)
def test_youtube(): def test_youtube():
""" """
Test YouTube inference. Test YouTube inference.
@ -144,11 +158,11 @@ def test_youtube():
Marked --slow to reduce YouTube API rate limits risk. Marked --slow to reduce YouTube API rate limits risk.
""" """
model = YOLO(MODEL) model = YOLO(MODEL)
model.predict('https://youtu.be/G17sBkb38XQ', imgsz=96, save=True) model.predict("https://youtu.be/G17sBkb38XQ", imgsz=96, save=True)
@pytest.mark.skipif(not ONLINE, reason='environment is offline') @pytest.mark.skipif(not ONLINE, reason="environment is offline")
@pytest.mark.skipif(not IS_TMP_WRITEABLE, reason='directory is not writeable') @pytest.mark.skipif(not IS_TMP_WRITEABLE, reason="directory is not writeable")
def test_track_stream(): def test_track_stream():
""" """
Test streaming tracking (short 10 frame video) with non-default ByteTrack tracker. Test streaming tracking (short 10 frame video) with non-default ByteTrack tracker.
@ -157,56 +171,56 @@ def test_track_stream():
""" """
import yaml import yaml
video_url = 'https://ultralytics.com/assets/decelera_portrait_min.mov' video_url = "https://ultralytics.com/assets/decelera_portrait_min.mov"
model = YOLO(MODEL) model = YOLO(MODEL)
model.track(video_url, imgsz=160, tracker='bytetrack.yaml') model.track(video_url, imgsz=160, tracker="bytetrack.yaml")
model.track(video_url, imgsz=160, tracker='botsort.yaml', save_frames=True) # test frame saving also model.track(video_url, imgsz=160, tracker="botsort.yaml", save_frames=True) # test frame saving also
# Test Global Motion Compensation (GMC) methods # Test Global Motion Compensation (GMC) methods
for gmc in 'orb', 'sift', 'ecc': for gmc in "orb", "sift", "ecc":
with open(ROOT / 'cfg/trackers/botsort.yaml', encoding='utf-8') as f: with open(ROOT / "cfg/trackers/botsort.yaml", encoding="utf-8") as f:
data = yaml.safe_load(f) data = yaml.safe_load(f)
tracker = TMP / f'botsort-{gmc}.yaml' tracker = TMP / f"botsort-{gmc}.yaml"
data['gmc_method'] = gmc data["gmc_method"] = gmc
with open(tracker, 'w', encoding='utf-8') as f: with open(tracker, "w", encoding="utf-8") as f:
yaml.safe_dump(data, f) yaml.safe_dump(data, f)
model.track(video_url, imgsz=160, tracker=tracker) model.track(video_url, imgsz=160, tracker=tracker)
def test_val(): def test_val():
"""Test the validation mode of the YOLO model.""" """Test the validation mode of the YOLO model."""
YOLO(MODEL).val(data='coco8.yaml', imgsz=32, save_hybrid=True) YOLO(MODEL).val(data="coco8.yaml", imgsz=32, save_hybrid=True)
def test_train_scratch(): def test_train_scratch():
"""Test training the YOLO model from scratch.""" """Test training the YOLO model from scratch."""
model = YOLO(CFG) model = YOLO(CFG)
model.train(data='coco8.yaml', epochs=2, imgsz=32, cache='disk', batch=-1, close_mosaic=1, name='model') model.train(data="coco8.yaml", epochs=2, imgsz=32, cache="disk", batch=-1, close_mosaic=1, name="model")
model(SOURCE) model(SOURCE)
def test_train_pretrained(): def test_train_pretrained():
"""Test training the YOLO model from a pre-trained state.""" """Test training the YOLO model from a pre-trained state."""
model = YOLO(WEIGHTS_DIR / 'yolov8n-seg.pt') model = YOLO(WEIGHTS_DIR / "yolov8n-seg.pt")
model.train(data='coco8-seg.yaml', epochs=1, imgsz=32, cache='ram', copy_paste=0.5, mixup=0.5, name=0) model.train(data="coco8-seg.yaml", epochs=1, imgsz=32, cache="ram", copy_paste=0.5, mixup=0.5, name=0)
model(SOURCE) model(SOURCE)
def test_export_torchscript(): def test_export_torchscript():
"""Test exporting the YOLO model to TorchScript format.""" """Test exporting the YOLO model to TorchScript format."""
f = YOLO(MODEL).export(format='torchscript', optimize=False) f = YOLO(MODEL).export(format="torchscript", optimize=False)
YOLO(f)(SOURCE) # exported model inference YOLO(f)(SOURCE) # exported model inference
def test_export_onnx(): def test_export_onnx():
"""Test exporting the YOLO model to ONNX format.""" """Test exporting the YOLO model to ONNX format."""
f = YOLO(MODEL).export(format='onnx', dynamic=True) f = YOLO(MODEL).export(format="onnx", dynamic=True)
YOLO(f)(SOURCE) # exported model inference YOLO(f)(SOURCE) # exported model inference
def test_export_openvino(): def test_export_openvino():
"""Test exporting the YOLO model to OpenVINO format.""" """Test exporting the YOLO model to OpenVINO format."""
f = YOLO(MODEL).export(format='openvino') f = YOLO(MODEL).export(format="openvino")
YOLO(f)(SOURCE) # exported model inference YOLO(f)(SOURCE) # exported model inference
@ -214,10 +228,10 @@ def test_export_coreml():
"""Test exporting the YOLO model to CoreML format.""" """Test exporting the YOLO model to CoreML format."""
if not WINDOWS: # RuntimeError: BlobWriter not loaded with coremltools 7.0 on windows if not WINDOWS: # RuntimeError: BlobWriter not loaded with coremltools 7.0 on windows
if MACOS: if MACOS:
f = YOLO(MODEL).export(format='coreml') f = YOLO(MODEL).export(format="coreml")
YOLO(f)(SOURCE) # model prediction only supported on macOS for nms=False models YOLO(f)(SOURCE) # model prediction only supported on macOS for nms=False models
else: else:
YOLO(MODEL).export(format='coreml', nms=True) YOLO(MODEL).export(format="coreml", nms=True)
def test_export_tflite(enabled=False): def test_export_tflite(enabled=False):
@ -228,7 +242,7 @@ def test_export_tflite(enabled=False):
""" """
if enabled and LINUX: if enabled and LINUX:
model = YOLO(MODEL) model = YOLO(MODEL)
f = model.export(format='tflite') f = model.export(format="tflite")
YOLO(f)(SOURCE) YOLO(f)(SOURCE)
@ -240,7 +254,7 @@ def test_export_pb(enabled=False):
""" """
if enabled and LINUX: if enabled and LINUX:
model = YOLO(MODEL) model = YOLO(MODEL)
f = model.export(format='pb') f = model.export(format="pb")
YOLO(f)(SOURCE) YOLO(f)(SOURCE)
@ -251,20 +265,20 @@ def test_export_paddle(enabled=False):
Note Paddle protobuf requirements conflicting with onnx protobuf requirements. Note Paddle protobuf requirements conflicting with onnx protobuf requirements.
""" """
if enabled: if enabled:
YOLO(MODEL).export(format='paddle') YOLO(MODEL).export(format="paddle")
@pytest.mark.slow @pytest.mark.slow
def test_export_ncnn(): def test_export_ncnn():
"""Test exporting the YOLO model to NCNN format.""" """Test exporting the YOLO model to NCNN format."""
f = YOLO(MODEL).export(format='ncnn') f = YOLO(MODEL).export(format="ncnn")
YOLO(f)(SOURCE) # exported model inference YOLO(f)(SOURCE) # exported model inference
def test_all_model_yamls(): def test_all_model_yamls():
"""Test YOLO model creation for all available YAML configurations.""" """Test YOLO model creation for all available YAML configurations."""
for m in (ROOT / 'cfg' / 'models').rglob('*.yaml'): for m in (ROOT / "cfg" / "models").rglob("*.yaml"):
if 'rtdetr' in m.name: if "rtdetr" in m.name:
if TORCH_1_9: # torch<=1.8 issue - TypeError: __init__() got an unexpected keyword argument 'batch_first' if TORCH_1_9: # torch<=1.8 issue - TypeError: __init__() got an unexpected keyword argument 'batch_first'
_ = RTDETR(m.name)(SOURCE, imgsz=640) # must be 640 _ = RTDETR(m.name)(SOURCE, imgsz=640) # must be 640
else: else:
@ -274,10 +288,10 @@ def test_all_model_yamls():
def test_workflow(): def test_workflow():
"""Test the complete workflow including training, validation, prediction, and exporting.""" """Test the complete workflow including training, validation, prediction, and exporting."""
model = YOLO(MODEL) model = YOLO(MODEL)
model.train(data='coco8.yaml', epochs=1, imgsz=32, optimizer='SGD') model.train(data="coco8.yaml", epochs=1, imgsz=32, optimizer="SGD")
model.val(imgsz=32) model.val(imgsz=32)
model.predict(SOURCE, imgsz=32) model.predict(SOURCE, imgsz=32)
model.export(format='onnx') # export a model to ONNX format model.export(format="onnx") # export a model to ONNX format
def test_predict_callback_and_setup(): def test_predict_callback_and_setup():
@ -291,34 +305,34 @@ def test_predict_callback_and_setup():
predictor.results = zip(predictor.results, im0s, bs) # results is List[batch_size] predictor.results = zip(predictor.results, im0s, bs) # results is List[batch_size]
model = YOLO(MODEL) model = YOLO(MODEL)
model.add_callback('on_predict_batch_end', on_predict_batch_end) model.add_callback("on_predict_batch_end", on_predict_batch_end)
dataset = load_inference_source(source=SOURCE) dataset = load_inference_source(source=SOURCE)
bs = dataset.bs # noqa access predictor properties bs = dataset.bs # noqa access predictor properties
results = model.predict(dataset, stream=True, imgsz=160) # source already setup results = model.predict(dataset, stream=True, imgsz=160) # source already setup
for r, im0, bs in results: for r, im0, bs in results:
print('test_callback', im0.shape) print("test_callback", im0.shape)
print('test_callback', bs) print("test_callback", bs)
boxes = r.boxes # Boxes object for bbox outputs boxes = r.boxes # Boxes object for bbox outputs
print(boxes) print(boxes)
def test_results(): def test_results():
"""Test various result formats for the YOLO model.""" """Test various result formats for the YOLO model."""
for m in 'yolov8n-pose.pt', 'yolov8n-seg.pt', 'yolov8n.pt', 'yolov8n-cls.pt': for m in "yolov8n-pose.pt", "yolov8n-seg.pt", "yolov8n.pt", "yolov8n-cls.pt":
results = YOLO(WEIGHTS_DIR / m)([SOURCE, SOURCE], imgsz=160) results = YOLO(WEIGHTS_DIR / m)([SOURCE, SOURCE], imgsz=160)
for r in results: for r in results:
r = r.cpu().numpy() r = r.cpu().numpy()
r = r.to(device='cpu', dtype=torch.float32) r = r.to(device="cpu", dtype=torch.float32)
r.save_txt(txt_file=TMP / 'runs/tests/label.txt', save_conf=True) r.save_txt(txt_file=TMP / "runs/tests/label.txt", save_conf=True)
r.save_crop(save_dir=TMP / 'runs/tests/crops/') r.save_crop(save_dir=TMP / "runs/tests/crops/")
r.tojson(normalize=True) r.tojson(normalize=True)
r.plot(pil=True) r.plot(pil=True)
r.plot(conf=True, boxes=True) r.plot(conf=True, boxes=True)
print(r, len(r), r.path) print(r, len(r), r.path)
@pytest.mark.skipif(not ONLINE, reason='environment is offline') @pytest.mark.skipif(not ONLINE, reason="environment is offline")
def test_data_utils(): def test_data_utils():
"""Test utility functions in ultralytics/data/utils.py.""" """Test utility functions in ultralytics/data/utils.py."""
from ultralytics.data.utils import HUBDatasetStats, autosplit from ultralytics.data.utils import HUBDatasetStats, autosplit
@ -327,25 +341,25 @@ def test_data_utils():
# from ultralytics.utils.files import WorkingDirectory # from ultralytics.utils.files import WorkingDirectory
# with WorkingDirectory(ROOT.parent / 'tests'): # with WorkingDirectory(ROOT.parent / 'tests'):
for task in 'detect', 'segment', 'pose', 'classify': for task in "detect", "segment", "pose", "classify":
file = Path(TASK2DATA[task]).with_suffix('.zip') # i.e. coco8.zip file = Path(TASK2DATA[task]).with_suffix(".zip") # i.e. coco8.zip
download(f'https://github.com/ultralytics/hub/raw/main/example_datasets/{file}', unzip=False, dir=TMP) download(f"https://github.com/ultralytics/hub/raw/main/example_datasets/{file}", unzip=False, dir=TMP)
stats = HUBDatasetStats(TMP / file, task=task) stats = HUBDatasetStats(TMP / file, task=task)
stats.get_json(save=True) stats.get_json(save=True)
stats.process_images() stats.process_images()
autosplit(TMP / 'coco8') autosplit(TMP / "coco8")
zip_directory(TMP / 'coco8/images/val') # zip zip_directory(TMP / "coco8/images/val") # zip
@pytest.mark.skipif(not ONLINE, reason='environment is offline') @pytest.mark.skipif(not ONLINE, reason="environment is offline")
def test_data_converter(): def test_data_converter():
"""Test dataset converters.""" """Test dataset converters."""
from ultralytics.data.converter import coco80_to_coco91_class, convert_coco from ultralytics.data.converter import coco80_to_coco91_class, convert_coco
file = 'instances_val2017.json' file = "instances_val2017.json"
download(f'https://github.com/ultralytics/yolov5/releases/download/v1.0/{file}', dir=TMP) download(f"https://github.com/ultralytics/yolov5/releases/download/v1.0/{file}", dir=TMP)
convert_coco(labels_dir=TMP, save_dir=TMP / 'yolo_labels', use_segments=True, use_keypoints=False, cls91to80=True) convert_coco(labels_dir=TMP, save_dir=TMP / "yolo_labels", use_segments=True, use_keypoints=False, cls91to80=True)
coco80_to_coco91_class() coco80_to_coco91_class()
@ -353,10 +367,12 @@ def test_data_annotator():
"""Test automatic data annotation.""" """Test automatic data annotation."""
from ultralytics.data.annotator import auto_annotate from ultralytics.data.annotator import auto_annotate
auto_annotate(ASSETS, auto_annotate(
det_model=WEIGHTS_DIR / 'yolov8n.pt', ASSETS,
sam_model=WEIGHTS_DIR / 'mobile_sam.pt', det_model=WEIGHTS_DIR / "yolov8n.pt",
output_dir=TMP / 'auto_annotate_labels') sam_model=WEIGHTS_DIR / "mobile_sam.pt",
output_dir=TMP / "auto_annotate_labels",
)
def test_events(): def test_events():
@ -366,7 +382,7 @@ def test_events():
events = Events() events = Events()
events.enabled = True events.enabled = True
cfg = copy(DEFAULT_CFG) # does not require deepcopy cfg = copy(DEFAULT_CFG) # does not require deepcopy
cfg.mode = 'test' cfg.mode = "test"
events(cfg) events(cfg)
@ -375,10 +391,10 @@ def test_cfg_init():
from ultralytics.cfg import check_dict_alignment, copy_default_cfg, smart_value from ultralytics.cfg import check_dict_alignment, copy_default_cfg, smart_value
with contextlib.suppress(SyntaxError): with contextlib.suppress(SyntaxError):
check_dict_alignment({'a': 1}, {'b': 2}) check_dict_alignment({"a": 1}, {"b": 2})
copy_default_cfg() copy_default_cfg()
(Path.cwd() / DEFAULT_CFG_PATH.name.replace('.yaml', '_copy.yaml')).unlink(missing_ok=False) (Path.cwd() / DEFAULT_CFG_PATH.name.replace(".yaml", "_copy.yaml")).unlink(missing_ok=False)
[smart_value(x) for x in ['none', 'true', 'false']] [smart_value(x) for x in ["none", "true", "false"]]
def test_utils_init(): def test_utils_init():
@ -393,12 +409,12 @@ def test_utils_init():
def test_utils_checks(): def test_utils_checks():
"""Test various utility checks.""" """Test various utility checks."""
checks.check_yolov5u_filename('yolov5n.pt') checks.check_yolov5u_filename("yolov5n.pt")
checks.git_describe(ROOT) checks.git_describe(ROOT)
checks.check_requirements() # check requirements.txt checks.check_requirements() # check requirements.txt
checks.check_imgsz([600, 600], max_dim=1) checks.check_imgsz([600, 600], max_dim=1)
checks.check_imshow() checks.check_imshow()
checks.check_version('ultralytics', '8.0.0') checks.check_version("ultralytics", "8.0.0")
checks.print_args() checks.print_args()
# checks.check_imshow(warn=True) # checks.check_imshow(warn=True)
@ -407,7 +423,7 @@ def test_utils_benchmarks():
"""Test model benchmarking.""" """Test model benchmarking."""
from ultralytics.utils.benchmarks import ProfileModels from ultralytics.utils.benchmarks import ProfileModels
ProfileModels(['yolov8n.yaml'], imgsz=32, min_time=1, num_timed_runs=3, num_warmup_runs=1).profile() ProfileModels(["yolov8n.yaml"], imgsz=32, min_time=1, num_timed_runs=3, num_warmup_runs=1).profile()
def test_utils_torchutils(): def test_utils_torchutils():
@ -423,18 +439,29 @@ def test_utils_torchutils():
time_sync() time_sync()
@pytest.mark.skipif(not ONLINE, reason='environment is offline') @pytest.mark.skipif(not ONLINE, reason="environment is offline")
def test_utils_downloads(): def test_utils_downloads():
"""Test file download utilities.""" """Test file download utilities."""
from ultralytics.utils.downloads import get_google_drive_file_info from ultralytics.utils.downloads import get_google_drive_file_info
get_google_drive_file_info('https://drive.google.com/file/d/1cqT-cJgANNrhIHCrEufUYhQ4RqiWG_lJ/view?usp=drive_link') get_google_drive_file_info("https://drive.google.com/file/d/1cqT-cJgANNrhIHCrEufUYhQ4RqiWG_lJ/view?usp=drive_link")
def test_utils_ops(): def test_utils_ops():
"""Test various operations utilities.""" """Test various operations utilities."""
from ultralytics.utils.ops import (ltwh2xywh, ltwh2xyxy, make_divisible, xywh2ltwh, xywh2xyxy, xywhn2xyxy, from ultralytics.utils.ops import (
xywhr2xyxyxyxy, xyxy2ltwh, xyxy2xywh, xyxy2xywhn, xyxyxyxy2xywhr) ltwh2xywh,
ltwh2xyxy,
make_divisible,
xywh2ltwh,
xywh2xyxy,
xywhn2xyxy,
xywhr2xyxyxyxy,
xyxy2ltwh,
xyxy2xywh,
xyxy2xywhn,
xyxyxyxy2xywhr,
)
make_divisible(17, torch.tensor([8])) make_divisible(17, torch.tensor([8]))
@ -455,9 +482,9 @@ def test_utils_files():
file_age(SOURCE) file_age(SOURCE)
file_date(SOURCE) file_date(SOURCE)
get_latest_run(ROOT / 'runs') get_latest_run(ROOT / "runs")
path = TMP / 'path/with spaces' path = TMP / "path/with spaces"
path.mkdir(parents=True, exist_ok=True) path.mkdir(parents=True, exist_ok=True)
with spaces_in_path(path) as new_path: with spaces_in_path(path) as new_path:
print(new_path) print(new_path)
@ -471,9 +498,9 @@ def test_utils_patches_torch_save():
mock = MagicMock(side_effect=RuntimeError) mock = MagicMock(side_effect=RuntimeError)
with patch('ultralytics.utils.patches._torch_save', new=mock): with patch("ultralytics.utils.patches._torch_save", new=mock):
with pytest.raises(RuntimeError): with pytest.raises(RuntimeError):
torch_save(torch.zeros(1), TMP / 'test.pt') torch_save(torch.zeros(1), TMP / "test.pt")
assert mock.call_count == 4, "torch_save was not attempted the expected number of times" assert mock.call_count == 4, "torch_save was not attempted the expected number of times"
@ -512,7 +539,7 @@ def test_nn_modules_block():
BottleneckCSP(c1, c2)(x) BottleneckCSP(c1, c2)(x)
@pytest.mark.skipif(not ONLINE, reason='environment is offline') @pytest.mark.skipif(not ONLINE, reason="environment is offline")
def test_hub(): def test_hub():
"""Test Ultralytics HUB functionalities.""" """Test Ultralytics HUB functionalities."""
from ultralytics.hub import export_fmts_hub, logout from ultralytics.hub import export_fmts_hub, logout
@ -520,7 +547,7 @@ def test_hub():
export_fmts_hub() export_fmts_hub()
logout() logout()
smart_request('GET', 'https://github.com', progress=True) smart_request("GET", "https://github.com", progress=True)
@pytest.fixture @pytest.fixture
@ -529,12 +556,13 @@ def image():
@pytest.mark.parametrize( @pytest.mark.parametrize(
'auto_augment, erasing, force_color_jitter', "auto_augment, erasing, force_color_jitter",
[ [
(None, 0.0, False), (None, 0.0, False),
('randaugment', 0.5, True), ("randaugment", 0.5, True),
('augmix', 0.2, False), ("augmix", 0.2, False),
('autoaugment', 0.0, True), ], ("autoaugment", 0.0, True),
],
) )
def test_classify_transforms_train(image, auto_augment, erasing, force_color_jitter): def test_classify_transforms_train(image, auto_augment, erasing, force_color_jitter):
import torchvision.transforms as T import torchvision.transforms as T
@ -566,17 +594,17 @@ def test_classify_transforms_train(image, auto_augment, erasing, force_color_jit
@pytest.mark.slow @pytest.mark.slow
@pytest.mark.skipif(not ONLINE, reason='environment is offline') @pytest.mark.skipif(not ONLINE, reason="environment is offline")
def test_model_tune(): def test_model_tune():
"""Tune YOLO model for performance.""" """Tune YOLO model for performance."""
YOLO('yolov8n-pose.pt').tune(data='coco8-pose.yaml', plots=False, imgsz=32, epochs=1, iterations=2, device='cpu') YOLO("yolov8n-pose.pt").tune(data="coco8-pose.yaml", plots=False, imgsz=32, epochs=1, iterations=2, device="cpu")
YOLO('yolov8n-cls.pt').tune(data='imagenet10', plots=False, imgsz=32, epochs=1, iterations=2, device='cpu') YOLO("yolov8n-cls.pt").tune(data="imagenet10", plots=False, imgsz=32, epochs=1, iterations=2, device="cpu")
def test_model_embeddings(): def test_model_embeddings():
"""Test YOLO model embeddings.""" """Test YOLO model embeddings."""
model_detect = YOLO(MODEL) model_detect = YOLO(MODEL)
model_segment = YOLO(WEIGHTS_DIR / 'yolov8n-seg.pt') model_segment = YOLO(WEIGHTS_DIR / "yolov8n-seg.pt")
for batch in [SOURCE], [SOURCE, SOURCE]: # test batch size 1 and 2 for batch in [SOURCE], [SOURCE, SOURCE]: # test batch size 1 and 2
assert len(model_detect.embed(source=batch, imgsz=32)) == len(batch) assert len(model_detect.embed(source=batch, imgsz=32)) == len(batch)

View File

@ -9,6 +9,7 @@ import re
import subprocess import subprocess
import sys import sys
import threading import threading
import time
import urllib import urllib
import uuid import uuid
from pathlib import Path from pathlib import Path
@ -721,9 +722,19 @@ def remove_colorstr(input_string):
class TryExcept(contextlib.ContextDecorator): class TryExcept(contextlib.ContextDecorator):
""" """
YOLOv8 TryExcept class. Ultralytics TryExcept class. Use as @TryExcept() decorator or 'with TryExcept():' context manager.
Use as @TryExcept() decorator or 'with TryExcept():' context manager. Examples:
As a decorator:
>>> @TryExcept(msg="Error occurred in func", verbose=True)
>>> def func():
>>> # Function logic here
>>> pass
As a context manager:
>>> with TryExcept(msg="Error occurred in block", verbose=True):
>>> # Code block here
>>> pass
""" """
def __init__(self, msg="", verbose=True): def __init__(self, msg="", verbose=True):
@ -742,6 +753,64 @@ class TryExcept(contextlib.ContextDecorator):
return True return True
class Retry(contextlib.ContextDecorator):
"""
Retry class for function execution with exponential backoff.
Can be used as a decorator or a context manager to retry a function or block of code on exceptions, up to a
specified number of times with an exponentially increasing delay between retries.
Examples:
Example usage as a decorator:
>>> @Retry(times=3, delay=2)
>>> def test_func():
>>> # Replace with function logic that may raise exceptions
>>> return True
Example usage as a context manager:
>>> with Retry(times=3, delay=2):
>>> # Replace with code block that may raise exceptions
>>> pass
"""
def __init__(self, times=3, delay=2):
"""Initialize Retry class with specified number of retries and delay."""
self.times = times
self.delay = delay
self._attempts = 0
def __call__(self, func):
"""Decorator implementation for Retry with exponential backoff."""
def wrapped_func(*args, **kwargs):
self._attempts = 0
while self._attempts < self.times:
try:
return func(*args, **kwargs)
except Exception as e:
self._attempts += 1
print(f"Retry {self._attempts}/{self.times} failed: {e}")
if self._attempts >= self.times:
raise e
time.sleep(self.delay * (2**self._attempts)) # exponential backoff delay
return wrapped_func
def __enter__(self):
"""Enter the runtime context related to this object."""
self._attempts = 0
def __exit__(self, exc_type, exc_value, traceback):
"""Exit the runtime context related to this object with exponential backoff."""
if exc_type is not None:
self._attempts += 1
if self._attempts < self.times:
print(f"Retry {self._attempts}/{self.times} failed: {exc_value}")
time.sleep(self.delay * (2**self._attempts)) # exponential backoff delay
return True # Suppresses the exception and retries
return False # Re-raises the exception if retries are exhausted
def threaded(func): def threaded(func):
""" """
Multi-threads a target function by default and returns the thread or function result. Multi-threads a target function by default and returns the thread or function result.