diff --git a/coolamqp/framing/compilation/utilities.py b/coolamqp/framing/compilation/utilities.py index a8ae76554557a025efb3353a8fd4a1ff0dd2d030..8c333b32db655dfaed005fa87954bcba4ee53bf2 100644 --- a/coolamqp/framing/compilation/utilities.py +++ b/coolamqp/framing/compilation/utilities.py @@ -2,189 +2,16 @@ from __future__ import absolute_import, division, print_function import math -import copy -import six - -from coolamqp.framing.base import BASIC_TYPES, DYNAMIC_BASIC_TYPES - -# docs may be None - -class _Required(object): - pass - -def nop(x): - return x - -class _Field(object): - - def set(self, obj, elem): - obj.__dict__[self.field_name] = self.find(elem) - - def __init__(self, field_name): - self.field_name = field_name - - def find(self, elem): - raise NotImplementedError('abstract') - -_boolint = lambda x: bool(int(x)) - - -class _ComputedField(_Field): - def __init__(self, field_name, find_fun): - super(_ComputedField, self).__init__(field_name) - self.find = find_fun +import six -class _ValueField(_Field): - def __init__(self, xml_names, field_name, field_type=nop, - default=_Required): - if not isinstance(xml_names, tuple): - xml_names = (xml_names, ) - self.xml_names = xml_names - assert field_type is not None - self.field_name = field_name - self.field_type = field_type - self.default = default +from coolamqp.framing.base import BASIC_TYPES - def find(self, elem): - return self.field_type(self._find(elem)) +from .xml_tags import * - def _find(self, elem): - xmln = [xmln for xmln in self.xml_names if xmln in elem.attrib] +# docs may be None - if xmln: - xmln = xmln[0] - if xmln in elem.attrib: - return elem.attrib[xmln] - else: - if self.default is _Required: - raise TypeError('Did not find field %s in elem tag %s, looked for names %s' % (self.field_name, elem.tag, self.xml_names)) - else: - return self.default - -class _SimpleField(_ValueField): - def __init__(self, name, field_type=nop, default=_Required): - super(_SimpleField, self).__init__(name, name, field_type, default) - - -def get_docs(elem, label=False): - """Parse an XML element. Return documentation""" - for kid in elem.getchildren(): - - if kid.tag == 'rule': - return get_docs(kid) - - s = kid.text.strip().split('\n') - return u'\n'.join([u.strip() for u in s if len(u.strip()) > 0]) - - if label: - return elem.attrib.get('label', None) - -_name = _SimpleField('name', unicode) -_docs = _ComputedField('docs', lambda elem: get_docs(elem)) - -class BaseObject(object): - - FIELDS = [] - # tuples of (xml name, field name, type, (optional) default value) - - def __init__(self, elem): - for ft in self.FIELDS: - ft.set(self, elem) - - @classmethod - def findall(cls, xml): - return [cls(p) for p in xml.findall(cls.NAME)] - - def _replace(self, **kwargs): - c = copy.copy(self) - c.__dict__.update(**kwargs) - return c - -class Constant(BaseObject): - NAME = 'constant' - FIELDS = [ - _name, - _SimpleField('value', int), - _ValueField('class', 'kind', default=''), - _docs, - ] - -class Field(BaseObject): - NAME = 'field' - FIELDS = [ - _name, - _ValueField(('domain', 'type'), 'type', str), - _SimpleField('label', default=None), - _SimpleField('reserved', lambda x: bool(int(x)), default=0), - _ComputedField('basic_type', lambda elem: elem.attrib.get('type', '') == elem.attrib.get('name', '')), - _docs - ] - -class Class(BaseObject): - NAME = 'class' - FIELDS = [ - _name, - _SimpleField('index', int), - _ComputedField('docs', lambda elem: get_docs(elem, label=True)), - _ComputedField('methods', lambda elem: sorted( - [Method(me) for me in elem.getchildren() if me.tag == 'method'], - key=lambda m: (m.name.strip('-')[0], -len(m.response)))), - _ComputedField('properties', lambda elem: [Field(e) for e in elem.getchildren() if - e.tag == 'field']) - ] - - -class Domain(BaseObject): - NAME = 'domain' - FIELDS = [ - _name, - _SimpleField('type'), - _ComputedField('elementary', lambda a: a.attrib['type'] == a.attrib['name']) - ] - - -def _get_tagchild(elem, tag): - return [e for e in elem.getchildren() if e.tag == tag] - -class Method(BaseObject): - NAME = 'method' - FIELDS = [ - _name, - _SimpleField('synchronous', _boolint, default=False), - _SimpleField('index', int), - _SimpleField('label', default=None), - _docs, - _ComputedField('fields', lambda elem: [Field(fie) for fie in _get_tagchild(elem, 'field')]), - _ComputedField('response', lambda elem: [e.attrib['name'] for e in elem.findall('response')]), - _ComputedField('sent_by_client', lambda elem: any(e.attrib.get('name', '') == 'server' for e in - _get_tagchild(elem, 'chassis'))), - _ComputedField('sent_by_server', lambda elem: any(e.attrib.get('name', '') == 'client' for e in - _get_tagchild(elem, 'chassis'))), - _ComputedField('constant', lambda elem: all(Field(fie).reserved for fie in _get_tagchild(elem, 'field'))), - ] - - - def get_static_body(self): # only arguments part - body = [] - bits = 0 - for field in self.fields: - - if bits > 0 and field.basic_type != 'bit': - body.append(b'\x00' * math.ceil(bits / 8)) - bits = 0 - if field.basic_type == 'bit': - bits += 1 - else: - body.append(eval(BASIC_TYPES[field.basic_type][2])) - return b''.join(body) - - def is_static(self, domain_to_type=None): # is size constant? - for field in self.fields: - if field.basic_type in DYNAMIC_BASIC_TYPES: - return False - return True def get_size(fields): # assume all fields have static length diff --git a/coolamqp/framing/compilation/xml_fields.py b/coolamqp/framing/compilation/xml_fields.py new file mode 100644 index 0000000000000000000000000000000000000000..7b395a0eca8800843df7ee29a489f4eef4bf9efe --- /dev/null +++ b/coolamqp/framing/compilation/xml_fields.py @@ -0,0 +1,84 @@ +# coding=UTF-8 +from __future__ import print_function, absolute_import, division +import six +import logging + + +class _Required(object): + pass + +def nop(x): + return x + +__all__ = [ + '_name', '_docs', '_ComputedField', '_ValueField', '_SimpleField', + 'get_docs' +] + +class _Field(object): + + def set(self, obj, elem): + obj.__dict__[self.field_name] = self.find(elem) + + def __init__(self, field_name): + self.field_name = field_name + + def find(self, elem): + raise NotImplementedError('abstract') + + +class _ComputedField(_Field): + def __init__(self, field_name, find_fun): + super(_ComputedField, self).__init__(field_name) + self.find = find_fun + + +class _ValueField(_Field): + def __init__(self, xml_names, field_name, field_type=nop, + default=_Required): + if not isinstance(xml_names, tuple): + xml_names = (xml_names, ) + self.xml_names = xml_names + assert field_type is not None + self.field_name = field_name + self.field_type = field_type + self.default = default + + def find(self, elem): + return self.field_type(self._find(elem)) + + def _find(self, elem): + xmln = [xmln for xmln in self.xml_names if xmln in elem.attrib] + + if xmln: + xmln = xmln[0] + if xmln in elem.attrib: + return elem.attrib[xmln] + else: + if self.default is _Required: + raise TypeError('Did not find field %s in elem tag %s, looked for names %s' % (self.field_name, elem.tag, self.xml_names)) + else: + return self.default + +class _SimpleField(_ValueField): + def __init__(self, name, field_type=nop, default=_Required): + super(_SimpleField, self).__init__(name, name, field_type, default) + + +def get_docs(elem, label=False): + """Parse an XML element. Return documentation""" + for kid in elem.getchildren(): + + if kid.tag == 'rule': + return get_docs(kid) + + s = kid.text.strip().split('\n') + return u'\n'.join([u.strip() for u in s if len(u.strip()) > 0]) + + if label: + return elem.attrib.get('label', None) + +_name = _SimpleField('name', unicode) +_docs = _ComputedField('docs', lambda elem: get_docs(elem)) + + diff --git a/coolamqp/framing/compilation/xml_tags.py b/coolamqp/framing/compilation/xml_tags.py new file mode 100644 index 0000000000000000000000000000000000000000..7c7faa80daa8cbd5a92929534ebba3fc78cebe34 --- /dev/null +++ b/coolamqp/framing/compilation/xml_tags.py @@ -0,0 +1,121 @@ +# coding=UTF-8 +from __future__ import print_function, absolute_import, division +import six +import logging +import copy + +from coolamqp.framing.base import BASIC_TYPES, DYNAMIC_BASIC_TYPES +from .xml_fields import * + +logger = logging.getLogger(__name__) + + +def _boolint(x): + return bool(int(x)) + +__all__ = [ + 'Domain', 'Method', 'Class', 'Field', 'Constant' +] + +class BaseObject(object): + + FIELDS = [] + # tuples of (xml name, field name, type, (optional) default value) + + def __init__(self, elem): + for ft in self.FIELDS: + ft.set(self, elem) + + @classmethod + def findall(cls, xml): + return [cls(p) for p in xml.findall(cls.NAME)] + + def _replace(self, **kwargs): + c = copy.copy(self) + c.__dict__.update(**kwargs) + return c + +class Constant(BaseObject): + NAME = 'constant' + FIELDS = [ + _name, + _SimpleField('value', int), + _ValueField('class', 'kind', default=''), + _docs, + ] + +class Field(BaseObject): + NAME = 'field' + FIELDS = [ + _name, + _ValueField(('domain', 'type'), 'type', str), + _SimpleField('label', default=None), + _SimpleField('reserved', lambda x: bool(int(x)), default=0), + _ComputedField('basic_type', lambda elem: elem.attrib.get('type', '') == elem.attrib.get('name', '')), + _docs + ] + +class Class(BaseObject): + NAME = 'class' + FIELDS = [ + _name, + _SimpleField('index', int), + _ComputedField('docs', lambda elem: get_docs(elem, label=True)), + _ComputedField('methods', lambda elem: sorted( + [Method(me) for me in elem.getchildren() if me.tag == 'method'], + key=lambda m: (m.name.strip('-')[0], -len(m.response)))), + _ComputedField('properties', lambda elem: [Field(e) for e in elem.getchildren() if + e.tag == 'field']) + ] + + +class Domain(BaseObject): + NAME = 'domain' + FIELDS = [ + _name, + _SimpleField('type'), + _ComputedField('elementary', lambda a: a.attrib['type'] == a.attrib['name']) + ] + + +def _get_tagchild(elem, tag): + return [e for e in elem.getchildren() if e.tag == tag] + +class Method(BaseObject): + NAME = 'method' + FIELDS = [ + _name, + _SimpleField('synchronous', _boolint, default=False), + _SimpleField('index', int), + _SimpleField('label', default=None), + _docs, + _ComputedField('fields', lambda elem: [Field(fie) for fie in _get_tagchild(elem, 'field')]), + _ComputedField('response', lambda elem: [e.attrib['name'] for e in elem.findall('response')]), + _ComputedField('sent_by_client', lambda elem: any(e.attrib.get('name', '') == 'server' for e in + _get_tagchild(elem, 'chassis'))), + _ComputedField('sent_by_server', lambda elem: any(e.attrib.get('name', '') == 'client' for e in + _get_tagchild(elem, 'chassis'))), + _ComputedField('constant', lambda elem: all(Field(fie).reserved for fie in _get_tagchild(elem, 'field'))), + ] + + + def get_static_body(self): # only arguments part + body = [] + bits = 0 + for field in self.fields: + + if bits > 0 and field.basic_type != 'bit': + body.append(b'\x00' * math.ceil(bits / 8)) + bits = 0 + + if field.basic_type == 'bit': + bits += 1 + else: + body.append(eval(BASIC_TYPES[field.basic_type][2])) + return b''.join(body) + + def is_static(self, domain_to_type=None): # is size constant? + for field in self.fields: + if field.basic_type in DYNAMIC_BASIC_TYPES: + return False + return True