Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

254

255

256

257

258

259

260

261

262

263

264

265

266

267

268

269

270

271

272

273

274

275

276

277

278

279

280

281

282

283

284

285

286

287

288

289

290

291

292

293

294

295

296

297

298

299

300

301

302

303

304

305

306

307

308

309

310

311

312

313

314

315

316

317

318

319

320

321

322

323

324

325

326

327

328

329

330

331

332

333

334

335

336

337

338

339

340

341

342

343

344

345

346

347

348

349

350

351

352

353

354

355

356

357

358

359

360

361

362

363

364

365

366

367

368

369

370

371

372

373

374

375

376

377

378

379

380

381

382

383

384

385

386

387

388

389

390

391

392

393

394

395

396

397

398

399

400

401

402

403

404

405

406

407

408

409

410

411

412

413

414

415

416

417

418

419

420

421

422

423

424

425

426

427

428

429

430

431

432

433

434

435

436

437

438

439

440

441

442

443

444

445

446

447

448

449

450

451

452

453

# Copyright (C) 2009  Internet Systems Consortium. 

# 

# Permission to use, copy, modify, and distribute this software for any 

# purpose with or without fee is hereby granted, provided that the above 

# copyright notice and this permission notice appear in all copies. 

# 

# THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM 

# DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL 

# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL 

# INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT, 

# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING 

# FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, 

# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION 

# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 

 

"""Module Specifications 

 

   A module specification holds the information about what configuration 

   a module can have, and what commands it understands. It provides 

   functions to read it from a .spec file, and to validate a given 

   set of data against the specification 

""" 

 

import json 

import sys 

import time 

 

import isc.cc.data 

 

# file objects are passed around as _io.TextIOWrapper objects 

# import that so we can check those types 

 

class ModuleSpecError(Exception): 

    """This exception is raised it the ModuleSpec fails to initialize 

       or if there is a failure or parse error reading the specification 

       file""" 

    pass 

 

def module_spec_from_file(spec_file, check = True): 

    """Returns a ModuleSpec object defined by the file at spec_file. 

       If check is True, the contents are verified. If there is an error 

       in those contents, a ModuleSpecError is raised. 

       A ModuleSpecError is also raised if the file cannot be read, or 

       if it is not valid JSON.""" 

    module_spec = None 

    try: 

        if hasattr(spec_file, 'read'): 

            json_str = spec_file.read() 

            module_spec = json.loads(json_str) 

        elif type(spec_file) == str: 

            file = open(spec_file) 

            json_str = file.read() 

            module_spec = json.loads(json_str) 

            file.close() 

        else: 

            raise ModuleSpecError("spec_file not a str or file-like object") 

    except ValueError as ve: 

        raise ModuleSpecError("JSON parse error: " + str(ve)) 

    except IOError as ioe: 

        raise ModuleSpecError("JSON read error: " + str(ioe)) 

 

    if 'module_spec' not in module_spec: 

        raise ModuleSpecError("Data definition has no module_spec element") 

 

    result = ModuleSpec(module_spec['module_spec'], check) 

    return result 

 

class ModuleSpec: 

    def __init__(self, module_spec, check = True): 

        """Initializes a ModuleSpec object from the specification in 

           the given module_spec (which must be a dict). If check is 

           True, the contents are verified. Raises a ModuleSpec error 

           if there is something wrong with the contents of the dict""" 

        if type(module_spec) != dict: 

            raise ModuleSpecError("module_spec is of type " + str(type(module_spec)) + ", not dict") 

        if check: 

            _check(module_spec) 

        self._module_spec = module_spec 

 

    def validate_config(self, full, data, errors = None): 

        """Check whether the given piece of data conforms to this 

           data definition. If so, it returns True. If not, it will 

           return false. If errors is given, and is an array, a string 

           describing the error will be appended to it. The current 

           version stops as soon as there is one error so this list 

           will not be exhaustive. If 'full' is true, it also errors on 

           non-optional missing values. Set this to False if you want to 

           validate only a part of a configuration tree (like a list of 

           non-default values)""" 

        data_def = self.get_config_spec() 

        if data_def is not None: 

            return _validate_spec_list(data_def, full, data, errors) 

        else: 

            # no spec, always bad 

97            if errors is not None: 

                errors.append("No config_data specification") 

            return False 

 

    def validate_command(self, cmd_name, cmd_params, errors = None): 

        '''Check whether the given piece of command conforms to this  

        command definition. If so, it reutrns True. If not, it will  

        return False. If errors is given, and is an array, a string 

        describing the error will be appended to it. The current version 

        stops as soon as there is one error. 

           cmd_name is command name to be validated, cmd_params includes  

        command's parameters needs to be validated. cmd_params must  

        be a map, with the format like: 

        {param1_name: param1_value, param2_name: param2_value} 

        ''' 

        cmd_spec = self.get_commands_spec() 

112        if not cmd_spec: 

            return False 

 

        for cmd in cmd_spec: 

            if cmd['command_name'] != cmd_name: 

                continue 

            return _validate_spec_list(cmd['command_args'], True, cmd_params, errors) 

 

        return False 

 

    def validate_statistics(self, full, stat, errors = None): 

        """Check whether the given piece of data conforms to this 

           data definition. If so, it returns True. If not, it will 

           return false. If errors is given, and is an array, a string 

           describing the error will be appended to it. The current 

           version stops as soon as there is one error so this list 

           will not be exhaustive. If 'full' is true, it also errors on 

           non-optional missing values. Set this to False if you want to 

           validate only a part of a statistics tree (like a list of 

           non-default values). Also it checks 'item_format' in case 

           of time""" 

        stat_spec = self.get_statistics_spec() 

        if stat_spec is not None: 

            return _validate_spec_list(stat_spec, full, stat, errors) 

        else: 

            # no spec, always bad 

            if errors is not None: 

                errors.append("No statistics specification") 

            return False 

 

    def get_module_name(self): 

        """Returns a string containing the name of the module as 

           specified by the specification given at __init__()""" 

        return self._module_spec['module_name'] 

 

    def get_module_description(self): 

        """Returns a string containing the description of the module as 

           specified by the specification given at __init__(). 

           Returns an empty string if there is no description. 

        """ 

154        if 'module_description' in self._module_spec: 

            return self._module_spec['module_description'] 

        else: 

            return "" 

 

    def get_full_spec(self): 

        """Returns a dict representation of the full module specification""" 

        return self._module_spec 

 

    def get_config_spec(self): 

        """Returns a dict representation of the configuration data part 

           of the specification, or None if there is none.""" 

        if 'config_data' in self._module_spec: 

            return self._module_spec['config_data'] 

        else: 

            return None 

 

    def get_commands_spec(self): 

        """Returns a dict representation of the commands part of the 

           specification, or None if there is none.""" 

        if 'commands' in self._module_spec: 

            return self._module_spec['commands'] 

        else: 

            return None 

 

    def get_statistics_spec(self): 

        """Returns a dict representation of the statistics part of the 

           specification, or None if there is none.""" 

        if 'statistics' in self._module_spec: 

            return self._module_spec['statistics'] 

        else: 

            return None 

 

    def __str__(self): 

        """Returns a string representation of the full specification""" 

        return self._module_spec.__str__() 

 

def _check(module_spec): 

    """Checks the full specification. This is a dict that contains the 

       element "module_spec", which is in itself a dict that 

       must contain at least a "module_name" (string) and optionally 

       a "config_data", a "commands" and a "statistics" element, all 

       of which are lists of dicts. Raises a ModuleSpecError if there 

       is a problem.""" 

    if type(module_spec) != dict: 

        raise ModuleSpecError("data specification not a dict") 

    if "module_name" not in module_spec: 

        raise ModuleSpecError("no module_name in module_spec") 

    if "module_description" in module_spec and \ 

       type(module_spec["module_description"]) != str: 

        raise ModuleSpecError("module_description is not a string") 

    if "config_data" in module_spec: 

        _check_config_spec(module_spec["config_data"]) 

    if "commands" in module_spec: 

        _check_command_spec(module_spec["commands"]) 

    if "statistics" in module_spec: 

        _check_statistics_spec(module_spec["statistics"]) 

 

def _check_config_spec(config_data): 

    # config data is a list of items represented by dicts that contain 

    # things like "item_name", depending on the type they can have 

    # specific subitems 

    """Checks a list that contains the configuration part of the 

       specification. Raises a ModuleSpecError if there is a 

       problem.""" 

    if type(config_data) != list: 

        raise ModuleSpecError("config_data is of type " + str(type(config_data)) + ", not a list of items") 

    for config_item in config_data: 

        _check_item_spec(config_item) 

 

def _check_command_spec(commands): 

    """Checks the list that contains a set of commands. Raises a 

       ModuleSpecError is there is an error""" 

    if type(commands) != list: 

        raise ModuleSpecError("commands is not a list of commands") 

    for command in commands: 

        if type(command) != dict: 

            raise ModuleSpecError("command in commands list is not a dict") 

        if "command_name" not in command: 

            raise ModuleSpecError("no command_name in command item") 

        command_name = command["command_name"] 

        if type(command_name) != str: 

            raise ModuleSpecError("command_name not a string: " + str(type(command_name))) 

        if "command_description" in command: 

            if type(command["command_description"]) != str: 

                raise ModuleSpecError("command_description not a string in " + command_name) 

        if "command_args" in command: 

            if type(command["command_args"]) != list: 

                raise ModuleSpecError("command_args is not a list in " + command_name) 

            for command_arg in command["command_args"]: 

                if type(command_arg) != dict: 

                    raise ModuleSpecError("command argument not a dict in " + command_name) 

                _check_item_spec(command_arg) 

        else: 

            raise ModuleSpecError("command_args missing in " + command_name) 

    pass 

 

def _check_item_spec(config_item): 

    """Checks the dict that defines one config item 

       (i.e. containing "item_name", "item_type", etc. 

       Raises a ModuleSpecError if there is an error""" 

    if type(config_item) != dict: 

        raise ModuleSpecError("item spec not a dict") 

    if "item_name" not in config_item: 

        raise ModuleSpecError("no item_name in config item") 

    if type(config_item["item_name"]) != str: 

        raise ModuleSpecError("item_name is not a string: " + str(config_item["item_name"])) 

    item_name = config_item["item_name"] 

    if "item_type" not in config_item: 

        raise ModuleSpecError("no item_type in config item") 

    item_type = config_item["item_type"] 

    if type(item_type) != str: 

        raise ModuleSpecError("item_type in " + item_name + " is not a string: " + str(type(item_type))) 

    if item_type not in ["integer", "real", "boolean", "string", "list", "map", "named_set", "any"]: 

        raise ModuleSpecError("unknown item_type in " + item_name + ": " + item_type) 

    if "item_optional" in config_item: 

        if type(config_item["item_optional"]) != bool: 

            raise ModuleSpecError("item_default in " + item_name + " is not a boolean") 

        if not config_item["item_optional"] and "item_default" not in config_item: 

            raise ModuleSpecError("no default value for non-optional item " + item_name) 

    else: 

        raise ModuleSpecError("item_optional not in item " + item_name) 

    if "item_default" in config_item: 

        item_default = config_item["item_default"] 

        if (item_type == "integer" and type(item_default) != int) or \ 

           (item_type == "real" and type(item_default) != float) or \ 

           (item_type == "boolean" and type(item_default) != bool) or \ 

           (item_type == "string" and type(item_default) != str) or \ 

           (item_type == "list" and type(item_default) != list) or \ 

           (item_type == "map" and type(item_default) != dict): 

            raise ModuleSpecError("Wrong type for item_default in " + item_name) 

    # TODO: once we have check_type, run the item default through that with the list|map_item_spec 

    if item_type == "list": 

        if "list_item_spec" not in config_item: 

            raise ModuleSpecError("no list_item_spec in list item " + item_name) 

        if type(config_item["list_item_spec"]) != dict: 

            raise ModuleSpecError("list_item_spec in " + item_name + " is not a dict") 

        _check_item_spec(config_item["list_item_spec"]) 

    if item_type == "map": 

        if "map_item_spec" not in config_item: 

            raise ModuleSpecError("no map_item_sepc in map item " + item_name) 

        if type(config_item["map_item_spec"]) != list: 

            raise ModuleSpecError("map_item_spec in " + item_name + " is not a list") 

        for map_item in config_item["map_item_spec"]: 

            if type(map_item) != dict: 

                raise ModuleSpecError("map_item_spec element is not a dict") 

            _check_item_spec(map_item) 

    if 'item_format' in config_item and 'item_default' in config_item: 

        item_format = config_item["item_format"] 

        item_default = config_item["item_default"] 

        if not _check_format(item_default, item_format): 

            raise ModuleSpecError( 

                "Wrong format for " + str(item_default) + " in " + str(item_name)) 

 

def _check_statistics_spec(statistics): 

    # statistics is a list of items represented by dicts that contain 

    # things like "item_name", depending on the type they can have 

    # specific subitems 

    """Checks a list that contains the statistics part of the 

       specification. Raises a ModuleSpecError if there is a 

       problem.""" 

    if type(statistics) != list: 

        raise ModuleSpecError("statistics is of type " + str(type(statistics)) 

                              + ", not a list of items") 

    for stat_item in statistics: 

        _check_item_spec(stat_item) 

        # Additionally checks if there are 'item_title' and 

        # 'item_description' 

        for item in [ 'item_title',  'item_description' ]: 

            if item not in stat_item: 

                raise ModuleSpecError("no " + item + " in statistics item") 

 

def _check_format(value, format_name): 

    """Check if specified value and format are correct. Return True if 

       is is correct.""" 

    # TODO: should be added other format types if necessary 

    time_formats = { 'date-time' : "%Y-%m-%dT%H:%M:%SZ", 

                     'date'      : "%Y-%m-%d", 

                     'time'      : "%H:%M:%S" } 

    for fmt in time_formats: 

        if format_name == fmt: 

            try: 

                # reverse check 

                return value == time.strftime( 

                    time_formats[fmt], 

                    time.strptime(value, time_formats[fmt])) 

            except (ValueError, TypeError): 

                break 

    return False 

 

def _validate_type(spec, value, errors): 

    """Returns true if the value is of the correct type given the 

       specification""" 

    data_type = spec['item_type'] 

    if data_type == "integer" and type(value) != int: 

        if errors is not None: 

            errors.append(str(value) + " should be an integer") 

        return False 

    elif data_type == "real" and type(value) != float: 

        if errors is not None: 

            errors.append(str(value) + " should be a real") 

        return False 

    elif data_type == "boolean" and type(value) != bool: 

        if errors is not None: 

            errors.append(str(value) + " should be a boolean") 

        return False 

    elif data_type == "string" and type(value) != str: 

        if errors is not None: 

            errors.append(str(value) + " should be a string") 

        return False 

    elif data_type == "list" and type(value) != list: 

        if errors is not None: 

            errors.append(str(value) + " should be a list") 

        return False 

    elif data_type == "map" and type(value) != dict: 

        if errors is not None: 

            errors.append(str(value) + " should be a map") 

        return False 

    elif data_type == "named_set" and type(value) != dict: 

371        if errors != None: 

            errors.append(str(value) + " should be a map") 

        return False 

    else: 

        return True 

 

def _validate_format(spec, value, errors): 

    """Returns true if the value is of the correct format given the 

       specification. And also return true if no 'item_format'""" 

    if "item_format" in spec: 

        item_format = spec['item_format'] 

        if not _check_format(value, item_format): 

            if errors is not None: 

                errors.append("format type of " + str(value) 

                              + " should be " + item_format) 

            return False 

    return True 

 

def _validate_item(spec, full, data, errors): 

    if not _validate_type(spec, data, errors): 

        return False 

    elif type(data) == list: 

        list_spec = spec['list_item_spec'] 

        for data_el in data: 

            if not _validate_type(list_spec, data_el, errors): 

                return False 

397            if not _validate_format(list_spec, data_el, errors): 

                return False 

            if list_spec['item_type'] == "map": 

                if not _validate_item(list_spec, full, data_el, errors): 

                    return False 

    elif type(data) == dict: 

        if 'map_item_spec' in spec: 

            if not _validate_spec_list(spec['map_item_spec'], full, data, errors): 

                return False 

        else: 

            named_set_spec = spec['named_set_item_spec'] 

            for data_el in data.values(): 

                if not _validate_type(named_set_spec, data_el, errors): 

                    return False 

411                if not _validate_item(named_set_spec, full, data_el, errors): 

                    return False 

    elif not _validate_format(spec, data, errors): 

        return False 

    return True 

 

def _validate_spec(spec, full, data, errors): 

    item_name = spec['item_name'] 

    item_optional = spec['item_optional'] 

 

    if not data and item_optional: 

        return True 

    elif item_name in data: 

        return _validate_item(spec, full, data[item_name], errors) 

    elif full and not item_optional: 

        if errors is not None: 

            errors.append("non-optional item " + item_name + " missing") 

        return False 

    else: 

        return True 

 

def _validate_spec_list(module_spec, full, data, errors): 

    # we do not return immediately, there may be more errors 

    # so we keep a boolean to keep track if we found errors 

    validated = True 

 

    # check if the known items are correct 

    for spec_item in module_spec: 

        if not _validate_spec(spec_item, full, data, errors): 

            validated = False 

 

    # check if there are items in our data that are not in the 

    # specification 

    if data is not None: 

        for item_name in data: 

            found = False 

            for spec_item in module_spec: 

                if spec_item["item_name"] == item_name: 

                    found = True 

            if not found and item_name != "version": 

                if errors is not None: 

                    errors.append("unknown item " + item_name) 

                validated = False 

    return validated