Coverage for src/flag_gems/utils/code_utils.py: 60%

96 statements  

« prev     ^ index     » next       coverage.py v7.6.9, created at 2026-03-18 02:36 +0800

1# The code for IndentedBuffer is adapted from 

2# https://github.com/pytorch/pytorch/blob/ed48ea9997c2b04736096e4b6669543ab2e627d5/torch/_inductor/utils.py#L742 

3# The code for Namespace is adapted from 

4# https://github.com/pytorch/pytorch/blob/ed48ea9997c2b04736096e4b6669543ab2e627d5/torch/fx/graph.py#L115 

5 

6# License from pytorch(https://github.com/pytorch/pytorch) 

7 

8# From PyTorch: 

9 

10# Copyright (c) 2016- Facebook, Inc (Adam Paszke) 

11# Copyright (c) 2014- Facebook, Inc (Soumith Chintala) 

12# Copyright (c) 2011-2014 Idiap Research Institute (Ronan Collobert) 

13# Copyright (c) 2012-2014 Deepmind Technologies (Koray Kavukcuoglu) 

14# Copyright (c) 2011-2012 NEC Laboratories America (Koray Kavukcuoglu) 

15# Copyright (c) 2011-2013 NYU (Clement Farabet) 

16# Copyright (c) 2006-2010 NEC Laboratories America (Ronan Collobert, Leon Bottou, Iain Melvin, Jason Weston) 

17# Copyright (c) 2006 Idiap Research Institute (Samy Bengio) 

18# Copyright (c) 2001-2004 Idiap Research Institute (Ronan Collobert, Samy Bengio, Johnny Mariethoz) 

19 

20# From Caffe2: 

21 

22# Copyright (c) 2016-present, Facebook Inc. All rights reserved. 

23 

24# All contributions by Facebook: 

25# Copyright (c) 2016 Facebook Inc. 

26 

27# All contributions by Google: 

28# Copyright (c) 2015 Google Inc. 

29# All rights reserved. 

30 

31# All contributions by Yangqing Jia: 

32# Copyright (c) 2015 Yangqing Jia 

33# All rights reserved. 

34 

35# All contributions by Kakao Brain: 

36# Copyright 2019-2020 Kakao Brain 

37 

38# All contributions by Cruise LLC: 

39# Copyright (c) 2022 Cruise LLC. 

40# All rights reserved. 

41 

42# All contributions from Caffe: 

43# Copyright(c) 2013, 2014, 2015, the respective contributors 

44# All rights reserved. 

45 

46# All other contributions: 

47# Copyright(c) 2015, 2016 the respective contributors 

48# All rights reserved. 

49 

50# Caffe2 uses a copyright model similar to Caffe: each contributor holds 

51# copyright over their contributions to Caffe2. The project versioning records 

52# all such contribution and copyright details. If a contributor wants to further 

53# mark their specific copyright on a particular contribution, they should 

54# indicate their copyright solely in the commit message of the change when it is 

55# committed. 

56 

57# All rights reserved. 

58 

59import builtins 

60import contextlib 

61import keyword 

62import os 

63import re 

64import threading 

65import uuid 

66from collections import defaultdict 

67from io import StringIO 

68from pathlib import Path 

69from typing import Dict, Set 

70 

71 

72class IndentedBuffer: 

73 tabwidth = 4 

74 

75 def __init__(self, initial_indent=0): 

76 self._lines = [] 

77 self._indent = initial_indent 

78 

79 def getvalue(self) -> str: 

80 buf = StringIO() 

81 for line in self._lines: 

82 assert isinstance(line, str) 

83 buf.write(line) 

84 buf.write("\n") 

85 return buf.getvalue() 

86 

87 def clear(self): 

88 self._lines.clear() 

89 

90 def __bool__(self): 

91 return bool(self._lines) 

92 

93 def prefix(self): 

94 return " " * (self._indent * self.tabwidth) 

95 

96 def newline(self): 

97 self.writeline("\n") 

98 

99 def writeline(self, line): 

100 if line.strip(): 

101 self._lines.append(f"{self.prefix()}{line}") 

102 else: 

103 self._lines.append("") 

104 

105 def tpl(self, format_str, **kwargs): 

106 assert isinstance(format_str, str), "format_str must be string of type." 

107 format_str = format_str.format(**kwargs) 

108 lines = format_str.strip().splitlines() 

109 for line in lines: 

110 line = line.replace("\t", " " * self.tabwidth) 

111 self.writeline(line) 

112 

113 def writelines(self, lines): 

114 for line in lines: 

115 self.writeline(line) 

116 

117 def writemultiline(self, s): 

118 self.writelines(s.splitlines()) 

119 

120 def indent(self, offset=1): 

121 @contextlib.contextmanager 

122 def ctx(): 

123 self._indent += offset 

124 try: 

125 yield 

126 finally: 

127 self._indent -= offset 

128 

129 return ctx() 

130 

131 

132class NameSpace: 

133 def __init__(self): 

134 self._used_names: Set[str] = set() 

135 self._base_count: Dict[str, int] = defaultdict(int) 

136 

137 self._illegal_char_regex = re.compile("[^0-9a-zA-Z_]+") 

138 self._name_suffix_regex = re.compile(r"(.*)_(\d+)$") 

139 

140 def create_name(self, candidate: str) -> str: 

141 """Create a unique name. 

142 

143 Arguments: 

144 candidate: used as the basis for the unique name, relevant to the user. 

145 """ 

146 # delete all characters that are illegal in a Python identifier 

147 candidate = self._illegal_char_regex.sub("_", candidate) 

148 

149 if not candidate: 

150 candidate = "_unnamed" 

151 

152 if candidate[0].isdigit(): 

153 candidate = f"_{candidate}" 

154 

155 match = self._name_suffix_regex.match(candidate) 

156 if match is None: 

157 base = candidate 

158 num = None 

159 else: 

160 base, num_str = match.group(1, 2) 

161 num = int(num_str) 

162 

163 candidate = base if num is None else f"{base}_{num}" 

164 if not num: 

165 num = self._base_count[base] 

166 

167 while candidate in self._used_names or self._is_illegal_name(candidate): 

168 num += 1 

169 candidate = f"{base}_{num}" 

170 

171 self._used_names.add(candidate) 

172 self._base_count[base] = num 

173 return candidate 

174 

175 def _is_illegal_name(self, name: str) -> bool: 

176 # 1. keywords are never allowed as names. 

177 if name in keyword.kwlist: 

178 return True 

179 

180 # 2. Can't shadow a builtin name, unless you *are* that builtin. 

181 if name in builtins.__dict__: 

182 return True 

183 

184 return False 

185 

186 

187def write_atomic( 

188 path_: str, 

189 content: str, 

190 make_dirs: bool = False, 

191 encoding: str = "utf-8", 

192) -> None: 

193 path = Path(path_) 

194 if make_dirs: 

195 path.parent.mkdir(parents=True, exist_ok=True) 

196 tmp_path = ( 

197 path.parent / f".{os.getpid()}.{threading.get_ident()}.{uuid.uuid4().hex}.tmp" 

198 ) 

199 with tmp_path.open("wt", encoding=encoding) as f: 

200 f.write(content) 

201 tmp_path.replace(path)