Skip to content
Snippets Groups Projects
Commit 602ee724 authored by Piotr Maślanka's avatar Piotr Maślanka
Browse files

Union descriptor added

parent 61faf12d
No related branches found
Tags v1.0.3
No related merge requests found
# v2.1.3
* bugfix in MergingSource
* argument rename in MemoryErrorExceptionHandler `killpg` -> `kill_pg`
* added the Union descriptor
# v2.1.2
......
......@@ -9,7 +9,8 @@ __all__ = [
'Descriptor',
'Integer', 'Float', 'String', 'Boolean',
'IPv4',
'List', 'Dict', 'create_key',
'List', 'Dict', 'Union',
'create_key',
'must_be_type',
'must_be_one_of',
'CheckerCondition',
......@@ -64,12 +65,15 @@ class Descriptor(object):
for checker in self.__class__.CHECKERS:
self.add_checker(checker)
self.MY_EXCEPTIONS = tuple(self.MY_EXCEPTIONS)
def __call__(self, value: ConfigDictValue) -> tp.Any:
"""
raises ConfigurationSchemaError: on invalid schema
"""
self.pre_checkers(value)
print(repr(self.MY_EXCEPTIONS))
try:
value = self.BASIC_MAKER(value)
except self.MY_EXCEPTIONS:
......@@ -140,14 +144,13 @@ class IPv4(Regexp):
class List(Descriptor):
CHECKERS = [must_be_type(list, tuple)]
BASIC_MAKER = list
MY_EXCEPTIONS = []
def __init__(self, type_desciptor: tp.Optional[Descriptor] = None):
super(List, self).__init__()
super().__init__()
self.type_descriptor = type_desciptor or Descriptor()
def __call__(self, value: ConfigDictValue) -> tp.List:
value = super(List, self).__call__(value)
value = super().__call__(value)
return [self.type_descriptor(p) for p in value]
......@@ -166,7 +169,6 @@ def create_key(descriptor: Descriptor, name: str, optional: bool = False,
class Dict(Descriptor):
BASIC_MAKER = dict
MY_EXCEPTIONS = []
CHECKERS = [must_be_type(dict)]
UnknownKeyHandlerType = tp.Callable[[str, ConfigDictValue], tp.Any]
......@@ -174,15 +176,18 @@ class Dict(Descriptor):
def __init__(self, keys: tp.List[DictDescriptorKey],
unknown_key_mapper: UnknownKeyHandlerType = lambda str,
data: data):
super(Dict, self).__init__()
super().__init__()
self.keys = {item.name: item for item in keys} # tp.Dict[str, DictDescriptorKey]
self.unknown_key_mapper = unknown_key_mapper # Dict.UnknownKeyHandlerType
def __call__(self, value: ConfigDictValue) -> dict:
value = copy.copy(value)
assert isinstance(value, dict), 'value is an instance of %s' % (repr(value), )
value = super(Dict, self).__call__(value)
assert isinstance(value, dict), 'value is an instance of %s' % (repr(value), )
if not isinstance(value, dict):
raise ConfigurationValidationError('value passed was not a dict: %s' % (value, ))
value = super().__call__(value)
if not isinstance(value, dict):
raise ConfigurationValidationError('value received from descriptor was not a dict: %s' % (value, ))
output = {}
......@@ -203,8 +208,29 @@ class Dict(Descriptor):
return output
class Union(Descriptor):
"""
The type of one of the child descriptors. If posed as such:
Union(List(), Dict())
then value can be either a list or a dict
"""
def __init__(self, *descriptors: tp.List[Descriptor]):
super().__init__()
self.descriptors = descriptors
def __call__(self, value: tp.Any) -> tp.Any:
for descriptor in self.descriptors:
try:
return descriptor(value)
except ConfigurationValidationError:
continue
raise ConfigurationValidationError('Could not match value %s to a descriptor' % (value, ))
BASE_LOOKUP_TABLE = {'int': Integer, 'float': Float, 'str': String, 'ipv4': IPv4, 'list': List, 'dict': Dict,
'any': Descriptor, 'bool': Boolean}
'any': Descriptor, 'bool': Boolean, 'union': Union}
def _get_descriptor_for(key: str, value: tp.Any) -> Descriptor:
......
import unittest
import tempfile
import os
import tempfile
import unittest
from satella.configuration.schema import *
from satella.configuration.sources import DirectorySource
from satella.exceptions import ConfigurationValidationError
class TestSchema(unittest.TestCase):
def test_union(self):
ps = Union(List(), Dict(keys=[create_key(String(), 'a')]))
self.assertEquals(ps([1, 2, 3]), [1, 2, 3])
self.assertEquals(ps({'a': 'b'}), {'a': 'b'})
self.assertRaises(ConfigurationValidationError, lambda: ps(3))
def test_descriptor_from_schema(self):
schema = {
"key_s": "str",
......@@ -96,7 +104,6 @@ class TestSchema(unittest.TestCase):
unknown_key='None')
self.assertEqual(s(D1), D2)
def test_schema_x(self):
dir = tempfile.mkdtemp()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment