Coverage for src/flag_gems/ops/min.py: 58%

96 statements  

« prev     ^ index     » next       coverage.py v7.6.9, created at 2026-03-27 02:51 +0800

1import logging 

2import math 

3from collections import namedtuple 

4 

5import torch 

6import triton 

7import triton.language as tl 

8 

9from flag_gems import runtime 

10from flag_gems.runtime import torch_device_fn 

11from flag_gems.utils import dim_compress, libentry, libtuner 

12from flag_gems.utils import triton_lang_extension as tle 

13from flag_gems.utils.limits import get_dtype_max 

14 

15logger = logging.getLogger(__name__) 

16 

17 

18@libentry() 

19@triton.jit 

20def min_kernel_1( 

21 inp, 

22 mid, 

23 M, 

24 BLOCK_SIZE: tl.constexpr, 

25): 

26 pid = tle.program_id(0) 

27 offset = pid * BLOCK_SIZE + tl.arange(0, BLOCK_SIZE) 

28 inp_ptrs = inp + offset 

29 mask = offset < M 

30 max_value = get_dtype_max(inp.type.element_ty) 

31 inp_val = tl.load(inp_ptrs, mask=mask, other=max_value) 

32 min_val = tl.min(inp_val) 

33 mid_ptr = mid + pid 

34 tl.store(mid_ptr, min_val) 

35 

36 

37@libentry() 

38@triton.jit 

39def min_kernel_2(mid, out, mid_size, BLOCK_MID: tl.constexpr): 

40 offset = tl.arange(0, BLOCK_MID) 

41 mid_ptrs = mid + offset 

42 mask = offset < mid_size 

43 max_value = get_dtype_max(mid.type.element_ty) 

44 mid_val = tl.load(mid_ptrs, mask=mask, other=max_value) 

45 min_val = tl.min(mid_val) 

46 tl.store(out, min_val) 

47 

48 

49def heur_block_n(args): 

50 return triton.next_power_of_2(args["N"]) 

51 

52 

53@libentry() 

54@libtuner( 

55 configs=runtime.get_tuned_config("naive_reduction"), 

56 key=["M", "N"], 

57) 

58@triton.jit 

59def min_kernel( 

60 inp, 

61 out_value, 

62 out_index, 

63 M, 

64 N, 

65 BLOCK_M: tl.constexpr, 

66 BLOCK_N: tl.constexpr, 

67): 

68 # set offset 

69 pid_m = tle.program_id(0) 

70 m_offset = pid_m * BLOCK_M + tl.arange(0, BLOCK_M) 

71 

72 dtype = inp.type.element_ty 

73 # you just cannot create a function that return a tl.dtype in triton lang 

74 acc_type = tl.float32 if dtype is tl.bfloat16 else dtype 

75 max_value = get_dtype_max(dtype) 

76 min_values = tl.full([BLOCK_M], dtype=acc_type, value=max_value) 

77 argmin_values = tl.full([BLOCK_M], dtype=tl.int64, value=0) 

78 for start_n in range(0, N, BLOCK_N): 

79 n_offset = start_n + tl.arange(0, BLOCK_N) 

80 offset = m_offset[:, None] * N + n_offset[None, :] 

81 mask = m_offset[:, None] < M and n_offset[None, :] < N 

82 inp_ptrs = inp + offset 

83 inp_vals = tl.load(inp_ptrs, mask=mask, other=max_value) 

84 local_min, local_argmin = tl.min(inp_vals, 1, return_indices=True) 

85 # if return indices is not supported, call a tl.argmax in addition 

86 # local_argmin = tl.argmin(inp_vals, 1) 

87 update = local_min < min_values 

88 min_values = tl.where(update, local_min, min_values) 

89 argmin_values = tl.where(update, start_n + local_argmin, argmin_values) 

90 

91 offset_index = m_offset 

92 out_value_ptrs = out_value + offset_index 

93 out_index_ptrs = out_index + offset_index 

94 mask1 = m_offset < M 

95 tl.store(out_value_ptrs, min_values, mask=mask1) 

96 tl.store(out_index_ptrs, argmin_values, mask=mask1) 

97 

98 

99def min(inp): 

100 logger.debug("GEMS MIN") 

101 M = inp.numel() 

102 block_size = triton.next_power_of_2(math.ceil(math.sqrt(M))) 

103 mid_size = triton.cdiv(M, block_size) 

104 block_mid = triton.next_power_of_2(mid_size) 

105 

106 dtype = inp.dtype 

107 mid = torch.empty((mid_size,), dtype=dtype, device=inp.device) 

108 out = torch.empty([], dtype=dtype, device=inp.device) 

109 

110 with torch_device_fn.device(inp.device): 

111 min_kernel_1[(mid_size, 1, 1)](inp, mid, M, block_size) 

112 min_kernel_2[(1, 1, 1)](mid, out, mid_size, block_mid) 

113 return out 

114 

115 

116def min_dim(inp, dim=None, keepdim=False): 

117 logger.debug("GEMS MIN DIM") 

118 assert dim >= -inp.ndim and dim < inp.ndim, "Invalid dim" 

119 shape = list(inp.shape) 

120 dim = dim % inp.ndim 

121 inp = dim_compress(inp, dim) 

122 N = shape[dim] 

123 shape[dim] = 1 

124 M = inp.numel() // N 

125 

126 out_value = torch.empty(shape, dtype=inp.dtype, device=inp.device) 

127 out_index = torch.empty(shape, dtype=torch.int64, device=inp.device) 

128 

129 if not keepdim: 

130 out_value = torch.squeeze(out_value, dim) 

131 out_index = torch.squeeze(out_index, dim) 

132 

133 grid = lambda meta: (triton.cdiv(M, meta["BLOCK_M"]),) 

134 with torch_device_fn.device(inp.device): 

135 min_kernel[grid](inp, out_value, out_index, M, N) 

136 Min_out = namedtuple("min", ["values", "indices"]) 

137 out = Min_out(values=out_value, indices=out_index) 

138 return out