Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

254

255

256

257

258

259

260

261

262

263

264

265

266

267

268

269

270

271

272

273

274

275

276

# Copyright (C) 2010  Internet Systems Consortium. 

# 

# Permission to use, copy, modify, and distribute this software for any 

# purpose with or without fee is hereby granted, provided that the above 

# copyright notice and this permission notice appear in all copies. 

# 

# THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM 

# DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL 

# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL 

# INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT, 

# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING 

# FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, 

# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION 

# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 

 

# 

# Helper functions for data elements as used in cc-channel and 

# configuration. There is no python equivalent for the cpp Element 

# class, since data elements are represented by native python types 

# (int, real, bool, string, list and dict respectively) 

# 

 

import json 

import re 

 

class DataNotFoundError(Exception): 

    """Raised if an identifier does not exist according to a spec file, 

       or if an item is addressed that is not in the current (or default) 

       config (such as a nonexistent list or map element)""" 

    pass 

 

class DataAlreadyPresentError(Exception): 

    """Raised if there is an attemt to add an element to a list or a 

       map that is already present in that list or map (i.e. if 'add' 

       is used when it should be 'set')""" 

    pass 

 

class DataTypeError(Exception): 

    """Raised if there is an attempt to set an element that is of a 

       different type than the type specified in the specification.""" 

    pass 

 

def remove_identical(a, b): 

    """Removes the values from dict a that are the same as in dict b. 

       Raises a DataTypeError is a or b is not a dict""" 

    to_remove = [] 

    if type(a) != dict or type(b) != dict: 

        raise DataTypeError("Not a dict in remove_identical()") 

    duplicate_keys = [key for key in a.keys() if key in b and a[key] == b[key]] 

    for id in duplicate_keys: 

        del(a[id]) 

 

def merge(orig, new): 

    """Merges the contents of new into orig, think recursive update() 

       orig and new must both be dicts. If an element value is None in 

       new it will be removed in orig.""" 

    if type(orig) != dict or type(new) != dict: 

        raise DataTypeError("Not a dict in merge()") 

    orig.update(new) 

    remove_null_items(orig) 

 

def remove_null_items(d): 

    """Recursively removes all (key,value) pairs from d where the 

       value is None""" 

    null_keys = [] 

    for key in d.keys(): 

        if type(d[key]) == dict: 

            remove_null_items(d[key]) 

        elif d[key] is None: 

            null_keys.append(key) 

    for k in null_keys: 

        del d[k] 

 

def _concat_identifier(id_parts): 

    """Concatenates the given identifier parts into a string, 

       delimited with the '/' character. 

    """ 

    return '/'.join(id_parts) 

 

def split_identifier(identifier): 

    """Splits the given identifier into a list of identifier parts, 

       as delimited by the '/' character. 

       Raises a DataTypeError if identifier is not a string.""" 

85    if type(identifier) != str: 

        raise DataTypeError("identifier is not a string") 

    id_parts = identifier.split('/') 

    id_parts[:] = (value for value in id_parts if value != "") 

    return id_parts 

 

def identifier_has_list_index(identifier): 

    """Returns True if the given identifier string has at least one 

       list index (with [I], where I is a number""" 

    return (type(identifier) == str and 

            re.search("\[\d+\]", identifier) is not None) 

 

 

def split_identifier_list_indices(identifier): 

    """Finds list indexes in the given identifier, which are of the 

       format [integer]. 

       Identifier must be a string. 

       This will only give the list index for the last 'part' of the 

       given identifier (as delimited by the '/' sign). 

       Raises a DataTypeError if the identifier is not a string, 

       or if the format is bad. 

       Returns a tuple, where the first element is the string part of 

       the identifier, and the second element is a list of (nested) list 

       indices. 

       Examples: 

       'a/b/c' will return ('a/b/c', None) 

       'a/b/c[1]' will return ('a/b/c', [1]) 

       'a/b/c[1][2][3]' will return ('a/b/c', [1, 2, 3]) 

       'a[0]/b[1]/c[2]' will return ('a[0]/b[1]/c', [2]) 

    """ 

    if type(identifier) != str: 

        raise DataTypeError("identifier in " 

                            "split_identifier_list_indices() " 

                            "not a string: " + str(identifier)) 

 

    # We only work on the final 'part' of the identifier 

    id_parts = split_identifier(identifier) 

    id_str = id_parts[-1] 

 

    i = id_str.find('[') 

    if i < 0: 

        if id_str.find(']') >= 0: 

            raise DataTypeError("Bad format in identifier (] but no [): " + str(identifier)) 

        return identifier, None 

 

    # keep the non-index part of that to replace later 

    id = id_str[:i] 

    indices = [] 

    while i >= 0: 

        e = id_str.find(']') 

        if e < i + 1: 

            raise DataTypeError("Bad format in identifier (] before [): " + str(identifier)) 

        try: 

            indices.append(int(id_str[i+1:e])) 

        except ValueError: 

            raise DataTypeError("List index in " + identifier + " not an integer") 

        id_str = id_str[e + 1:] 

        i = id_str.find('[') 

        if i > 0: 

            raise DataTypeError("Bad format in identifier ([ within []): " + str(identifier)) 

    if id.find(']') >= 0 or len(id_str) > 0: 

        raise DataTypeError("Bad format in identifier (extra ]): " + str(identifier)) 

 

    # we replace the final part of the original identifier with 

    # the stripped string 

    id_parts[-1] = id 

    id = _concat_identifier(id_parts) 

    return id, indices 

 

def _find_child_el(element, id): 

    """Finds the child of element with the given id. If the id contains 

       [i], where i is a number, and the child element is a list, the 

       i-th element of that list is returned instead of the list itself. 

       Raises a DataTypeError if the element is of wrong type, if id 

       is not a string, or if the id string contains a bad value. 

       Raises a DataNotFoundError if the element at id could not be 

       found. 

    """ 

    id, list_indices = split_identifier_list_indices(id) 

    if type(element) == dict and id in element.keys(): 

        result = element[id] 

    else: 

        raise DataNotFoundError(id + " in " + str(element)) 

    if type(result) == list and list_indices is not None: 

        for list_index in list_indices: 

            if list_index >= len(result): 

                raise DataNotFoundError("Element " + str(list_index) + " in " + str(result)) 

            result = result[list_index] 

    return result 

 

def find(element, identifier): 

    """Returns the subelement in the given data element, raises 

       DataNotFoundError if not found. 

       Returns the given element if the identifier is an empty string. 

       Raises a DataTypeError if identifier is not a string, or if 

       identifier is not empty, and element is not a dict. 

    """ 

    if type(identifier) != str: 

        raise DataTypeError("identifier in find() is not a str") 

    if identifier == "": 

        return element 

    if type(element) != dict: 

        raise DataTypeError("element in find() is not a dict") 

    id_parts = split_identifier(identifier) 

    cur_el = element 

    for id in id_parts: 

        cur_el = _find_child_el(cur_el, id) 

    return cur_el 

 

def set(element, identifier, value): 

    """Sets the value at the element specified by identifier to value. 

       If the value is None, it is removed from the dict. If element 

       is not a dict, or if the identifier points to something that is 

       not, a DataTypeError is raised. The element is updated inline, 

       so if the original needs to be kept, you must make a copy before 

       calling set(). The updated base element is returned (so that 

       el.set().set().set() is possible)""" 

    if type(element) != dict: 

        raise DataTypeError("element in set() is not a dict") 

    if type(identifier) != str: 

        raise DataTypeError("identifier in set() is not a str") 

    id_parts = split_identifier(identifier) 

    cur_el = element 

    for id in id_parts[:-1]: 

        try: 

            cur_el = _find_child_el(cur_el, id) 

        except DataNotFoundError: 

            if value is None: 

                # ok we are unsetting a value that wasn't set in 

                # the first place. Simply stop. 

                return 

            cur_el[id] = {} 

            cur_el = cur_el[id] 

 

    id, list_indices = split_identifier_list_indices(id_parts[-1]) 

    if list_indices is None: 

        # value can be an empty list or dict, so check for None eplicitely 

        if value is not None: 

            cur_el[id] = value 

        else: 

            del cur_el[id] 

    else: 

        cur_el = cur_el[id] 

        # in case of nested lists, we need to get to the next to last 

        for list_index in list_indices[:-1]: 

            if type(cur_el) != list: 

                raise DataTypeError("Element at " + identifier + " is not a list") 

            if len(cur_el) <= list_index: 

                raise DataNotFoundError("List index at " + identifier + " out of range") 

            cur_el = cur_el[list_index] 

        # value can be an empty list or dict, so check for None eplicitely 

        list_index = list_indices[-1] 

        if type(cur_el) != list: 

            raise DataTypeError("Element at " + identifier + " is not a list") 

        if len(cur_el) <= list_index: 

            raise DataNotFoundError("List index at " + identifier + " out of range") 

        if value is not None: 

            cur_el[list_index] = value 

        else: 

            del cur_el[list_index] 

    return element 

 

def unset(element, identifier): 

    """Removes the element at the given identifier if it exists. Raises 

       a DataTypeError if element is not a dict or if identifier is not 

       a string. Returns the base element.""" 

    # perhaps we can simply do with set none, and remove this whole 

    # function 

    return set(element, identifier, None) 

 

def find_no_exc(element, identifier): 

    """Returns the subelement in the given data element, returns None 

       if not found, or if an error occurred (i.e. this function should 

       never raise an exception)""" 

    try: 

        return find(element, identifier) 

    except DataNotFoundError: 

        return None 

    except DataTypeError: 

        return None 

 

def parse_value_str(value_str): 

    """Parses the given string to a native python object. If the 

       string cannot be parsed, it is returned. If it is not a string, 

       None is returned""" 

    if type(value_str) != str: 

        return None 

    try: 

        return json.loads(value_str) 

    except ValueError as ve: 

        # simply return the string itself 

        return value_str