diff options
-rw-r--r-- | CHANGES.rst | 2 | ||||
-rw-r--r-- | pyasn1/__init__.py | 2 | ||||
-rw-r--r-- | pyasn1/codec/ber/encoder.py | 202 | ||||
-rw-r--r-- | pyasn1/codec/cer/encoder.py | 178 | ||||
-rw-r--r-- | pyasn1/codec/der/encoder.py | 38 | ||||
-rw-r--r-- | pyasn1/type/univ.py | 19 | ||||
-rw-r--r-- | tests/codec/ber/test_encoder.py | 297 |
7 files changed, 599 insertions, 139 deletions
diff --git a/CHANGES.rst b/CHANGES.rst index 3e65b8f..cc771a5 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -3,6 +3,8 @@ Revision 0.4.1, released XX-09-2017 ----------------------------------- - ANY DEFINED BY clause support implemented +- Encoders refactored to take either a value (as ASN.1 object) + or a Python value plus ASN.1 schema Revision 0.3.5, released 16-09-2017 ----------------------------------- diff --git a/pyasn1/__init__.py b/pyasn1/__init__.py index 26495d5..42f4704 100644 --- a/pyasn1/__init__.py +++ b/pyasn1/__init__.py @@ -1,7 +1,7 @@ import sys # http://www.python.org/dev/peps/pep-0396/ -__version__ = '0.3.5' +__version__ = '0.4.1' if sys.version_info[:2] < (2, 4): raise RuntimeError('PyASN1 requires Python 2.4 or later') diff --git a/pyasn1/codec/ber/encoder.py b/pyasn1/codec/ber/encoder.py index 2c5ba14..492df88 100644 --- a/pyasn1/codec/ber/encoder.py +++ b/pyasn1/codec/ber/encoder.py @@ -47,17 +47,20 @@ class AbstractItemEncoder(object): raise error.PyAsn1Error('Length octets overflow (%d)' % substrateLen) return (0x80 | substrateLen,) + substrate - def encodeValue(self, value, encodeFun, **options): + def encodeValue(self, value, asn1Spec, encodeFun, **options): raise error.PyAsn1Error('Not implemented') - def encode(self, value, encodeFun, **options): + def encode(self, value, asn1Spec=None, encodeFun=None, **options): - tagSet = value.tagSet + if asn1Spec is None: + tagSet = value.tagSet + else: + tagSet = asn1Spec.tagSet # untagged item? if not tagSet: substrate, isConstructed, isOctets = self.encodeValue( - value, encodeFun, **options + value, asn1Spec, encodeFun, **options ) return substrate @@ -70,7 +73,7 @@ class AbstractItemEncoder(object): # base tag? if not idx: substrate, isConstructed, isOctets = self.encodeValue( - value, encodeFun, **options + value, asn1Spec, encodeFun, **options ) if options.get('ifNotEmpty', False) and not substrate: @@ -95,14 +98,14 @@ class AbstractItemEncoder(object): class EndOfOctetsEncoder(AbstractItemEncoder): - def encodeValue(self, value, encodeFun, **options): + def encodeValue(self, value, asn1Spec, encodeFun, **options): return null, False, True class BooleanEncoder(AbstractItemEncoder): supportIndefLenMode = False - def encodeValue(self, value, encodeFun, **options): + def encodeValue(self, value, asn1Spec, encodeFun, **options): return value and (1,) or (0,), False, False @@ -110,7 +113,7 @@ class IntegerEncoder(AbstractItemEncoder): supportIndefLenMode = False supportCompactZero = False - def encodeValue(self, value, encodeFun, **options): + def encodeValue(self, value, asn1Spec, encodeFun, **options): if value == 0: # de-facto way to encode zero if self.supportCompactZero: @@ -122,7 +125,10 @@ class IntegerEncoder(AbstractItemEncoder): class BitStringEncoder(AbstractItemEncoder): - def encodeValue(self, value, encodeFun, **options): + def encodeValue(self, value, asn1Spec, encodeFun, **options): + if asn1Spec is not None: + value = asn1Spec.clone(value) + valueLength = len(value) if valueLength % 8: alignedValue = value << (8 - valueLength % 8) @@ -134,9 +140,11 @@ class BitStringEncoder(AbstractItemEncoder): substrate = alignedValue.asOctets() return int2oct(len(substrate) * 8 - valueLength) + substrate, False, True + tagSet = value.tagSet + # strip off explicit tags alignedValue = alignedValue.clone( - tagSet=tag.TagSet(value.tagSet.baseTag, value.tagSet.baseTag) + tagSet=tag.TagSet(tagSet.baseTag, tagSet.baseTag) ) stop = 0 @@ -144,20 +152,25 @@ class BitStringEncoder(AbstractItemEncoder): while stop < valueLength: start = stop stop = min(start + maxChunkSize * 8, valueLength) - substrate += encodeFun(alignedValue[start:stop], **options) + substrate += encodeFun(alignedValue[start:stop], asn1Spec, **options) return substrate, True, True class OctetStringEncoder(AbstractItemEncoder): - def encodeValue(self, value, encodeFun, **options): + def encodeValue(self, value, asn1Spec, encodeFun, **options): + if asn1Spec is not None: + value = asn1Spec.clone(value) + maxChunkSize = options.get('maxChunkSize', 0) if not maxChunkSize or len(value) <= maxChunkSize: return value.asOctets(), False, True else: + tagSet = value.tagSet + # will strip off explicit tags - baseTagSet = tag.TagSet(value.tagSet.baseTag, value.tagSet.baseTag) + baseTagSet = tag.TagSet(tagSet.baseTag, tagSet.baseTag) pos = 0 substrate = null @@ -166,7 +179,8 @@ class OctetStringEncoder(AbstractItemEncoder): tagSet=baseTagSet) if not chunk: break - substrate += encodeFun(chunk, **options) + + substrate += encodeFun(chunk, asn1Spec, **options) pos += maxChunkSize return substrate, True, True @@ -175,14 +189,17 @@ class OctetStringEncoder(AbstractItemEncoder): class NullEncoder(AbstractItemEncoder): supportIndefLenMode = False - def encodeValue(self, value, encodeFun, **options): + def encodeValue(self, value, asn1Spec, encodeFun, **options): return null, False, True class ObjectIdentifierEncoder(AbstractItemEncoder): supportIndefLenMode = False - def encodeValue(self, value, encodeFun, **options): + def encodeValue(self, value, asn1Spec, encodeFun, **options): + if asn1Spec is not None: + value = asn1Spec.clone(value) + oid = value.asTuple() # Build the first pair @@ -280,7 +297,10 @@ class RealEncoder(AbstractItemEncoder): encbase = encBase[i] return sign, m, encbase, e - def encodeValue(self, value, encodeFun, **options): + def encodeValue(self, value, asn1Spec, encodeFun, **options): + if asn1Spec is not None: + value = asn1Spec.clone(value) + if value.isPlusInf: return (0x40,), False, False if value.isMinusInf: @@ -351,53 +371,113 @@ class RealEncoder(AbstractItemEncoder): class SequenceEncoder(AbstractItemEncoder): - def encodeValue(self, value, encodeFun, **options): - value.verifySizeSpec() + omitEmptyOptionals = False + + # TODO: handling three flavors of input is too much -- split over codecs + + def encodeValue(self, value, asn1Spec, encodeFun, **options): - namedTypes = value.componentType substrate = null - idx = len(value) - while idx > 0: - idx -= 1 - if namedTypes: - namedType = namedTypes[idx] - if namedType.isOptional and not value[idx].isValue: + if asn1Spec is None: + # instance of ASN.1 schema + value.verifySizeSpec() + + namedTypes = value.componentType + + for idx, component in enumerate(value.values()): + if namedTypes: + namedType = namedTypes[idx] + + if namedType.isOptional and not component.isValue: + continue + + if namedType.isDefaulted and component == namedType.asn1Object: + continue + + if self.omitEmptyOptionals: + options.update(ifNotEmpty=namedType.isOptional) + + chunk = encodeFun(component, asn1Spec, **options) + + # wrap open type blob if needed + if namedTypes and namedType.openType: + wrapType = namedType.asn1Object + if wrapType.tagSet and not wrapType.isSameTypeWith(component): + chunk = encodeFun(chunk, wrapType, **options) + + substrate += chunk + + else: + # bare Python value + ASN.1 schema + for idx, namedType in enumerate(asn1Spec.componentType.namedTypes): + + try: + component = value[namedType.name] + + except KeyError: + raise error.PyAsn1Error('Component name "%s" not found in %r' % (namedType.name, value)) + + if namedType.isOptional and namedType.name not in value: continue - if namedType.isDefaulted and value[idx] == namedType.asn1Object: + + if namedType.isDefaulted and component == namedType.asn1Object: continue - chunk = encodeFun(value[idx], **options) + if self.omitEmptyOptionals: + options.update(ifNotEmpty=namedType.isOptional) - # wrap open type blob if needed - if namedTypes and namedType.openType: - asn1Spec = namedType.asn1Object - if asn1Spec.tagSet and not asn1Spec.isSameTypeWith(value[idx]): - chunk = encodeFun(asn1Spec.clone(chunk), **options) + chunk = encodeFun(component, asn1Spec[idx], **options) - substrate = chunk + substrate + # wrap open type blob if needed + if namedType.openType: + wrapType = namedType.asn1Object + if wrapType.tagSet and not wrapType.isSameTypeWith(component): + chunk = encodeFun(chunk, wrapType, **options) + + substrate += chunk return substrate, True, True class SequenceOfEncoder(AbstractItemEncoder): - def encodeValue(self, value, encodeFun, **options): - value.verifySizeSpec() + def encodeValue(self, value, asn1Spec, encodeFun, **options): + if asn1Spec is None: + value.verifySizeSpec() + else: + asn1Spec = asn1Spec.componentType + substrate = null - idx = len(value) - while idx > 0: - idx -= 1 - substrate = encodeFun(value[idx], **options) + substrate + + for idx, component in enumerate(value): + substrate += encodeFun(value[idx], asn1Spec, **options) + return substrate, True, True class ChoiceEncoder(AbstractItemEncoder): - def encodeValue(self, value, encodeFun, **options): - return encodeFun(value.getComponent(), **options), True, True + def encodeValue(self, value, asn1Spec, encodeFun, **options): + if asn1Spec is None: + component = value.getComponent() + else: + names = [namedType.name for namedType in asn1Spec.componentType.namedTypes + if namedType.name in value] + if len(names) != 1: + raise error.PyAsn1Error('%s components for Choice at %r' % (len(names) and 'Multiple ' or 'None ', value)) + + name = names[0] + + component = value[name] + asn1Spec = asn1Spec[name] + + return encodeFun(component, asn1Spec, **options), True, True class AnyEncoder(OctetStringEncoder): - def encodeValue(self, value, encodeFun, **options): + def encodeValue(self, value, asn1Spec, encodeFun, **options): + if asn1Spec is not None: + value = asn1Spec.clone(value) + return value.asOctets(), not options.get('defMode', True), True @@ -478,7 +558,16 @@ class Encoder(object): self.__tagMap = tagMap self.__typeMap = typeMap - def __call__(self, value, **options): + def __call__(self, value, asn1Spec=None, **options): + try: + if asn1Spec is None: + typeId = value.typeId + else: + typeId = asn1Spec.typeId + + except AttributeError: + raise error.PyAsn1Error('Value %r is not ASN.1 type instance ' + 'and "asn1Spec" not given' % (value,)) if debug.logger & debug.flagEncoder: logger = debug.logger @@ -486,7 +575,8 @@ class Encoder(object): logger = None if logger: - logger('encoder called in %sdef mode, chunk size %s for type %s, value:\n%s' % (not options.get('defMode', True) and 'in' or '', options.get('maxChunkSize', 0), value.prettyPrintType(), value.prettyPrint())) + logger('encoder called in %sdef mode, chunk size %s for ' + 'type %s, value:\n%s' % (not options.get('defMode', True) and 'in' or '', options.get('maxChunkSize', 0), asn1Spec is None and value.prettyPrintType() or asn1Spec.prettyPrintType(), value)) if self.fixedDefLengthMode is not None: options.update(defMode=self.fixedDefLengthMode) @@ -494,28 +584,36 @@ class Encoder(object): if self.fixedChunkSize is not None: options.update(maxChunkSize=self.fixedChunkSize) - tagSet = value.tagSet try: - concreteEncoder = self.__typeMap[value.typeId] + concreteEncoder = self.__typeMap[typeId] + + if logger: + logger('using value codec %s chosen by type ID %s' % (concreteEncoder.__class__.__name__, typeId)) except KeyError: + if asn1Spec is None: + tagSet = value.tagSet + else: + tagSet = asn1Spec.tagSet + # use base type for codec lookup to recover untagged types - baseTagSet = tag.TagSet(value.tagSet.baseTag, value.tagSet.baseTag) + baseTagSet = tag.TagSet(tagSet.baseTag, tagSet.baseTag) try: concreteEncoder = self.__tagMap[baseTagSet] except KeyError: - raise error.PyAsn1Error('No encoder for %s' % (value,)) + raise error.PyAsn1Error('No encoder for %r (%s)' % (value, tagSet)) - if logger: - logger('using value codec %s chosen by %s' % (concreteEncoder.__class__.__name__, tagSet)) + if logger: + logger('using value codec %s chosen by tagSet %s' % (concreteEncoder.__class__.__name__, tagSet)) - substrate = concreteEncoder.encode(value, self, **options) + substrate = concreteEncoder.encode(value, asn1Spec, self, **options) if logger: logger('codec %s built %s octets of substrate: %s\nencoder completed' % (concreteEncoder, len(substrate), debug.hexdump(substrate))) + return substrate #: Turns ASN.1 object into BER octet stream. diff --git a/pyasn1/codec/cer/encoder.py b/pyasn1/codec/cer/encoder.py index 40e7eba..57f532d 100644 --- a/pyasn1/codec/cer/encoder.py +++ b/pyasn1/codec/cer/encoder.py @@ -14,7 +14,7 @@ __all__ = ['encode'] class BooleanEncoder(encoder.IntegerEncoder): - def encodeValue(self, value, encodeFun, **options): + def encodeValue(self, value, asn1Spec, encodeFun, **options): if value == 0: substrate = (0,) else: @@ -38,7 +38,7 @@ class TimeEncoderMixIn(object): minLength = 12 maxLength = 19 - def encodeValue(self, value, encodeFun, **options): + def encodeValue(self, value, asn1Spec, encodeFun, **options): # Encoding constraints: # - minutes are mandatory, seconds are optional # - subseconds must NOT be zero @@ -46,6 +46,9 @@ class TimeEncoderMixIn(object): # - time in UTC (Z) # - only dot is allowed for fractions + if asn1Spec is not None: + value = asn1Spec.clone(value) + octets = value.asOctets() if not self.minLength < len(octets) < self.maxLength: @@ -63,7 +66,7 @@ class TimeEncoderMixIn(object): options.update(maxChunkSize=1000) return encoder.OctetStringEncoder.encodeValue( - self, value, encodeFun, **options + self, value, asn1Spec, encodeFun, **options ) @@ -77,97 +80,140 @@ class UTCTimeEncoder(TimeEncoderMixIn, encoder.OctetStringEncoder): maxLength = 14 -class SetOfEncoder(encoder.SequenceOfEncoder): +class SetEncoder(encoder.SequenceEncoder): @staticmethod - def _sortComponents(components): - # sort by tags regardless of the Choice value (static sort) - return sorted(components, key=lambda x: isinstance(x, univ.Choice) and x.minTagSet or x.tagSet) + def _componentSortKey(componentAndType): + """Sort SET components by tag + + Sort regardless of the Choice value (static sort) + """ + component, asn1Spec = componentAndType + + if asn1Spec is None: + asn1Spec = component + + if asn1Spec.typeId == univ.Choice.typeId: + if asn1Spec.tagSet: + return asn1Spec.tagSet + else: + return asn1Spec.componentType.minTagSet + else: + return asn1Spec.tagSet + + def encodeValue(self, value, asn1Spec, encodeFun, **options): - def encodeValue(self, value, encodeFun, **options): - value.verifySizeSpec() substrate = null - idx = len(value) - if value.typeId == univ.Set.typeId: + + comps = [] + compsMap = {} + + if asn1Spec is None: + # instance of ASN.1 schema + value.verifySizeSpec() + namedTypes = value.componentType - comps = [] - compsMap = {} - while idx > 0: - idx -= 1 + + for idx, component in enumerate(value.values()): if namedTypes: - if namedTypes[idx].isOptional and not value[idx].isValue: - continue - if namedTypes[idx].isDefaulted and value[idx] == namedTypes[idx].asn1Object: - continue + namedType = namedTypes[idx] - comps.append(value[idx]) - compsMap[id(value[idx])] = namedTypes and namedTypes[idx].isOptional + if namedType.isOptional and not component.isValue: + continue - for comp in self._sortComponents(comps): - options.update(ifNotEmpty=compsMap[id(comp)]) - substrate += encodeFun(comp, **options) - else: - components = [encodeFun(x, **options) for x in value] - - # sort by serialized and padded components - if len(components) > 1: - zero = str2octs('\x00') - maxLen = max(map(len, components)) - paddedComponents = [ - (x.ljust(maxLen, zero), x) for x in components - ] - paddedComponents.sort(key=lambda x: x[0]) + if namedType.isDefaulted and component == namedType.asn1Object: + continue - components = [x[1] for x in paddedComponents] + compsMap[id(component)] = namedType - substrate = null.join(components) + else: + compsMap[id(component)] = None - return substrate, True, True + comps.append((component, asn1Spec)) + else: + # bare Python value + ASN.1 schema + for idx, namedType in enumerate(asn1Spec.componentType.namedTypes): -class SequenceEncoder(encoder.SequenceEncoder): - def encodeValue(self, value, encodeFun, **options): - value.verifySizeSpec() + try: + component = value[namedType.name] - namedTypes = value.componentType - substrate = null + except KeyError: + raise error.PyAsn1Error('Component name "%s" not found in %r' % (namedType.name, value)) - idx = len(value) - while idx > 0: - idx -= 1 - if namedTypes: - namedType = namedTypes[idx] - if namedType.isOptional and not value[idx].isValue: + if namedType.isOptional and namedType.name not in value: continue - if namedType.isDefaulted and value[idx] == namedType.asn1Object: + + if namedType.isDefaulted and component == namedType.asn1Object: continue - options.update(ifNotEmpty=namedTypes and namedType.isOptional) + compsMap[id(component)] = namedType + comps.append((component, asn1Spec[idx])) + + for comp, compType in sorted(comps, key=self._componentSortKey): + namedType = compsMap[id(comp)] - chunk = encodeFun(value[idx], **options) + if namedType: + options.update(ifNotEmpty=namedType.isOptional) + + chunk = encodeFun(comp, compType, **options) # wrap open type blob if needed - if namedTypes and namedType.openType: - asn1Spec = namedType.asn1Object - if asn1Spec.tagSet and not asn1Spec.isSameTypeWith(value[idx]): - chunk = encodeFun(asn1Spec.clone(chunk), **options) + if namedType and namedType.openType: + wrapType = namedType.asn1Object + if wrapType.tagSet and not wrapType.isSameTypeWith(comp): + chunk = encodeFun(chunk, wrapType, **options) - substrate = chunk + substrate + substrate += chunk return substrate, True, True +class SetOfEncoder(encoder.SequenceOfEncoder): + def encodeValue(self, value, asn1Spec, encodeFun, **options): + if asn1Spec is None: + value.verifySizeSpec() + else: + asn1Spec = asn1Spec.componentType + + components = [encodeFun(x, asn1Spec, **options) + for x in value] + + # sort by serialized and padded components + if len(components) > 1: + zero = str2octs('\x00') + maxLen = max(map(len, components)) + paddedComponents = [ + (x.ljust(maxLen, zero), x) for x in components + ] + paddedComponents.sort(key=lambda x: x[0]) + + components = [x[1] for x in paddedComponents] + + substrate = null.join(components) + + return substrate, True, True + + +class SequenceEncoder(encoder.SequenceEncoder): + omitEmptyOptionals = True + + class SequenceOfEncoder(encoder.SequenceOfEncoder): - def encodeValue(self, value, encodeFun, **options): + def encodeValue(self, value, asn1Spec, encodeFun, **options): + + if options.get('ifNotEmpty', False) and not len(value): + return null, True, True + + if asn1Spec is None: + value.verifySizeSpec() + else: + asn1Spec = asn1Spec.componentType + substrate = null - idx = len(value) - if options.get('ifNotEmpty', False) and not idx: - return substrate, True, True + for idx, component in enumerate(value): + substrate += encodeFun(value[idx], asn1Spec, **options) - value.verifySizeSpec() - while idx > 0: - idx -= 1 - substrate = encodeFun(value[idx], **options) + substrate return substrate, True, True @@ -189,7 +235,7 @@ typeMap.update({ useful.GeneralizedTime.typeId: GeneralizedTimeEncoder(), useful.UTCTime.typeId: UTCTimeEncoder(), # Sequence & Set have same tags as SequenceOf & SetOf - univ.Set.typeId: SetOfEncoder(), + univ.Set.typeId: SetEncoder(), univ.SetOf.typeId: SetOfEncoder(), univ.Sequence.typeId: SequenceEncoder(), univ.SequenceOf.typeId: SequenceOfEncoder() diff --git a/pyasn1/codec/der/encoder.py b/pyasn1/codec/der/encoder.py index d2992a9..ca0b660 100644 --- a/pyasn1/codec/der/encoder.py +++ b/pyasn1/codec/der/encoder.py @@ -6,27 +6,51 @@ # from pyasn1.type import univ from pyasn1.codec.cer import encoder +from pyasn1 import error __all__ = ['encode'] -class SetOfEncoder(encoder.SetOfEncoder): +class SetEncoder(encoder.SetEncoder): @staticmethod - def _sortComponents(components): - # sort by tags depending on the actual Choice value (dynamic sort) - return sorted(components, key=lambda x: isinstance(x, univ.Choice) and x.getComponent().tagSet or x.tagSet) + def _componentSortKey(componentAndType): + """Sort SET components by tag + + Sort depending on the actual Choice value (dynamic sort) + """ + component, asn1Spec = componentAndType + + if asn1Spec is None: + compType = component + else: + compType = asn1Spec + + if compType.typeId == univ.Choice.typeId: + if asn1Spec is None: + return component.getComponent().tagSet + else: + # TODO: move out of sorting key function + names = [namedType.name for namedType in asn1Spec.componentType.namedTypes + if namedType.name in component] + if len(names) != 1: + raise error.PyAsn1Error( + '%s components for Choice at %r' % (len(names) and 'Multiple ' or 'None ', component)) + + return asn1Spec[names[0]].tagSet + + else: + return compType.tagSet tagMap = encoder.tagMap.copy() tagMap.update({ # Set & SetOf have same tags - univ.SetOf.tagSet: SetOfEncoder() + univ.Set.tagSet: SetEncoder() }) typeMap = encoder.typeMap.copy() typeMap.update({ # Set & SetOf have same tags - univ.Set.typeId: SetOfEncoder(), - univ.SetOf.typeId: SetOfEncoder() + univ.Set.typeId: SetEncoder() }) diff --git a/pyasn1/type/univ.py b/pyasn1/type/univ.py index 1e7c8ce..3db6ab9 100644 --- a/pyasn1/type/univ.py +++ b/pyasn1/type/univ.py @@ -1498,11 +1498,7 @@ class Real(base.AbstractSimpleAsn1Item): if self.isInf: return self.prettyOut(self._value) else: - try: - return str(float(self)) - - except OverflowError: - return '<overflow>' + return str(float(self)) @property def isPlusInf(self): @@ -1534,7 +1530,11 @@ class Real(base.AbstractSimpleAsn1Item): return self._value in self._inf def __str__(self): - return str(float(self)) + try: + return str(float(self)) + + except OverflowError: + return '<overflow>' def __add__(self, value): return self.clone(float(self) + value) @@ -2711,13 +2711,6 @@ class Choice(Set): return self @property - def minTagSet(self): - if self.tagSet: - return self.tagSet - else: - return self.componentType.minTagSet - - @property def effectiveTagSet(self): """Return a :class:`~pyasn1.type.tag.TagSet` object of the currently initialized component or self (if |ASN.1| is tagged).""" if self.tagSet: diff --git a/tests/codec/ber/test_encoder.py b/tests/codec/ber/test_encoder.py index 265e866..094ca85 100644 --- a/tests/codec/ber/test_encoder.py +++ b/tests/codec/ber/test_encoder.py @@ -62,6 +62,22 @@ class IntegerEncoderTestCase(BaseTestCase): ) == ints2octs((2, 9, 255, 0, 0, 0, 0, 0, 0, 0, 1)) +class IntegerEncoderWithSchemaTestCase(BaseTestCase): + def testPosInt(self): + assert encoder.encode(12, asn1Spec=univ.Integer()) == ints2octs((2, 1, 12)) + + def testNegInt(self): + assert encoder.encode(-12, asn1Spec=univ.Integer()) == ints2octs((2, 1, 244)) + + def testZero(self): + assert encoder.encode(0, asn1Spec=univ.Integer()) == ints2octs((2, 1, 0)) + + def testPosLong(self): + assert encoder.encode( + 0xffffffffffffffff, asn1Spec=univ.Integer() + ) == ints2octs((2, 9, 0, 255, 255, 255, 255, 255, 255, 255, 255)) + + class BooleanEncoderTestCase(BaseTestCase): def testTrue(self): assert encoder.encode(univ.Boolean(1)) == ints2octs((1, 1, 1)) @@ -70,6 +86,14 @@ class BooleanEncoderTestCase(BaseTestCase): assert encoder.encode(univ.Boolean(0)) == ints2octs((1, 1, 0)) +class BooleanEncoderWithSchemaTestCase(BaseTestCase): + def testTrue(self): + assert encoder.encode(True, asn1Spec=univ.Boolean()) == ints2octs((1, 1, 1)) + + def testFalse(self): + assert encoder.encode(False, asn1Spec=univ.Boolean()) == ints2octs((1, 1, 0)) + + class BitStringEncoderTestCase(BaseTestCase): def setUp(self): BaseTestCase.setUp(self) @@ -97,6 +121,34 @@ class BitStringEncoderTestCase(BaseTestCase): assert encoder.encode(univ.BitString([])) == ints2octs((3, 1, 0)) +class BitStringEncoderWithSchemaTestCase(BaseTestCase): + def setUp(self): + BaseTestCase.setUp(self) + self.b = (1, 0, 1, 0, 1, 0, 0, 1, 1, 0, 0, 0, 1, 0, 1) + self.s = univ.BitString() + + def testDefMode(self): + assert encoder.encode(self.b, asn1Spec=self.s) == ints2octs((3, 3, 1, 169, 138)) + + def testIndefMode(self): + assert encoder.encode( + self.b, asn1Spec=self.s, defMode=False + ) == ints2octs((3, 3, 1, 169, 138)) + + def testDefModeChunked(self): + assert encoder.encode( + self.b, asn1Spec=self.s, maxChunkSize=1 + ) == ints2octs((35, 8, 3, 2, 0, 169, 3, 2, 1, 138)) + + def testIndefModeChunked(self): + assert encoder.encode( + self.b, asn1Spec=self.s, defMode=False, maxChunkSize=1 + ) == ints2octs((35, 128, 3, 2, 0, 169, 3, 2, 1, 138, 0, 0)) + + def testEmptyValue(self): + assert encoder.encode([], asn1Spec=self.s) == ints2octs((3, 1, 0)) + + class OctetStringEncoderTestCase(BaseTestCase): def setUp(self): BaseTestCase.setUp(self) @@ -124,6 +176,34 @@ class OctetStringEncoderTestCase(BaseTestCase): 32, 4, 3, 102, 111, 120, 0, 0)) +class OctetStringEncoderWithSchemaTestCase(BaseTestCase): + def setUp(self): + BaseTestCase.setUp(self) + self.s = univ.OctetString() + self.o = 'Quick brown fox' + + def testDefMode(self): + assert encoder.encode(self.o, asn1Spec=self.s) == ints2octs( + (4, 15, 81, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 32, 102, 111, 120)) + + def testIndefMode(self): + assert encoder.encode( + self.o, asn1Spec=self.s, defMode=False + ) == ints2octs((4, 15, 81, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 32, 102, 111, 120)) + + def testDefModeChunked(self): + assert encoder.encode( + self.o, asn1Spec=self.s, maxChunkSize=4 + ) == ints2octs((36, 23, 4, 4, 81, 117, 105, 99, 4, 4, 107, 32, 98, 114, 4, 4, 111, 119, + 110, 32, 4, 3, 102, 111, 120)) + + def testIndefModeChunked(self): + assert encoder.encode( + self.o, asn1Spec=self.s, defMode=False, maxChunkSize=4 + ) == ints2octs((36, 128, 4, 4, 81, 117, 105, 99, 4, 4, 107, 32, 98, 114, 4, 4, 111, 119, 110, + 32, 4, 3, 102, 111, 120, 0, 0)) + + class ExpTaggedOctetStringEncoderTestCase(BaseTestCase): def setUp(self): BaseTestCase.setUp(self) @@ -158,6 +238,11 @@ class NullEncoderTestCase(BaseTestCase): assert encoder.encode(univ.Null('')) == ints2octs((5, 0)) +class NullEncoderWithSchemaTestCase(BaseTestCase): + def testNull(self): + assert encoder.encode(None, univ.Null()) == ints2octs((5, 0)) + + class ObjectIdentifierEncoderTestCase(BaseTestCase): def testOne(self): assert encoder.encode( @@ -266,6 +351,13 @@ class ObjectIdentifierEncoderTestCase(BaseTestCase): 0xB8, 0xCB, 0xE2, 0xB6, 0x47)) +class ObjectIdentifierWithSchemaEncoderTestCase(BaseTestCase): + def testOne(self): + assert encoder.encode( + (1, 3, 6, 0, 0xffffe), asn1Spec=univ.ObjectIdentifier() + ) == ints2octs((6, 6, 43, 6, 0, 191, 255, 126)) + + class RealEncoderTestCase(BaseTestCase): def testChar(self): assert encoder.encode( @@ -330,6 +422,13 @@ class RealEncoderTestCase(BaseTestCase): assert encoder.encode(univ.Real(0)) == ints2octs((9, 0)) +class RealEncoderWithSchemaTestCase(BaseTestCase): + def testChar(self): + assert encoder.encode( + (123, 10, 11), asn1Spec=univ.Real() + ) == ints2octs((9, 7, 3, 49, 50, 51, 69, 49, 49)) + + if sys.version_info[0:2] > (2, 5): class UniversalStringEncoderTestCase(BaseTestCase): def testEncoding(self): @@ -385,6 +484,36 @@ class SequenceOfEncoderWithSchemaTestCase(BaseTestCase): def setUp(self): BaseTestCase.setUp(self) self.s = univ.SequenceOf(componentType=univ.OctetString()) + self.v = ['quick brown'] + + def testEmpty(self): + assert encoder.encode([], asn1Spec=self.s) == ints2octs((48, 0)) + + def testDefMode(self): + assert encoder.encode( + self.v, asn1Spec=self.s + ) == ints2octs((48, 13, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110)) + + def testIndefMode(self): + assert encoder.encode( + self.v, asn1Spec=self.s, defMode=False + ) == ints2octs((48, 128, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 0, 0)) + + def testDefModeChunked(self): + assert encoder.encode( + self.v, asn1Spec=self.s, defMode=True, maxChunkSize=4 + ) == ints2octs((48, 19, 36, 17, 4, 4, 113, 117, 105, 99, 4, 4, 107, 32, 98, 114, 4, 3, 111, 119, 110)) + + def testIndefModeChunked(self): + assert encoder.encode( + self.v, asn1Spec=self.s, defMode=False, maxChunkSize=4 + ) == ints2octs((48, 128, 36, 128, 4, 4, 113, 117, 105, 99, 4, 4, 107, 32, 98, 114, 4, 3, 111, 119, 110, 0, 0, 0, 0)) + + +class SequenceOfEncoderWithComponentsSchemaTestCase(BaseTestCase): + def setUp(self): + BaseTestCase.setUp(self) + self.s = univ.SequenceOf(componentType=univ.OctetString()) def __init(self): self.s.clear() @@ -449,6 +578,38 @@ class SetOfEncoderWithSchemaTestCase(BaseTestCase): def setUp(self): BaseTestCase.setUp(self) self.s = univ.SetOf(componentType=univ.OctetString()) + self.v = ['quick brown'] + + def testEmpty(self): + s = univ.SetOf() + assert encoder.encode([], asn1Spec=self.s) == ints2octs((49, 0)) + + def testDefMode(self): + assert encoder.encode( + self.v, asn1Spec=self.s + ) == ints2octs((49, 13, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110)) + + def testIndefMode(self): + assert encoder.encode( + self.v, asn1Spec=self.s, defMode=False + ) == ints2octs((49, 128, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 0, 0)) + + def testDefModeChunked(self): + assert encoder.encode( + self.v, asn1Spec=self.s, defMode=True, maxChunkSize=4 + ) == ints2octs((49, 19, 36, 17, 4, 4, 113, 117, 105, 99, 4, 4, 107, 32, 98, 114, 4, 3, 111, 119, 110)) + + def testIndefModeChunked(self): + assert encoder.encode( + self.v, asn1Spec=self.s, defMode=False, maxChunkSize=4 + ) == ints2octs( + (49, 128, 36, 128, 4, 4, 113, 117, 105, 99, 4, 4, 107, 32, 98, 114, 4, 3, 111, 119, 110, 0, 0, 0, 0)) + + +class SetOfEncoderWithComponentsSchemaTestCase(BaseTestCase): + def setUp(self): + BaseTestCase.setUp(self) + self.s = univ.SetOf(componentType=univ.OctetString()) def __init(self): self.s.clear() @@ -514,6 +675,53 @@ class SequenceEncoderWithSchemaTestCase(BaseTestCase): namedtype.DefaultedNamedType('age', univ.Integer(33)), ) ) + self.v = { + 'place-holder': None, + 'first-name': 'quick brown', + 'age': 1 + } + + def testEmpty(self): + try: + assert encoder.encode({}, asn1Spec=self.s) + + except PyAsn1Error: + pass + + else: + assert False, 'empty bare sequence tolerated' + + def testDefMode(self): + assert encoder.encode( + self.v, asn1Spec=self.s + ) == ints2octs((48, 18, 5, 0, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 2, 1, 1)) + + def testIndefMode(self): + assert encoder.encode( + self.v, asn1Spec=self.s, defMode=False + ) == ints2octs((48, 128, 5, 0, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 2, 1, 1, 0, 0)) + + def testDefModeChunked(self): + assert encoder.encode( + self.v, asn1Spec=self.s, defMode=True, maxChunkSize=4 + ) == ints2octs((48, 24, 5, 0, 36, 17, 4, 4, 113, 117, 105, 99, 4, 4, 107, 32, 98, 114, 4, 3, 111, 119, 110, 2, 1, 1)) + + def testIndefModeChunked(self): + assert encoder.encode( + self.v, asn1Spec=self.s, defMode=False, maxChunkSize=4 + ) == ints2octs((48, 128, 5, 0, 36, 128, 4, 4, 113, 117, 105, 99, 4, 4, 107, 32, 98, 114, 4, 3, 111, 119, 110, 0, 0, 2, 1, 1, 0, 0)) + + +class SequenceEncoderWithComponentsSchemaTestCase(BaseTestCase): + def setUp(self): + BaseTestCase.setUp(self) + self.s = univ.Sequence( + componentType=namedtype.NamedTypes( + namedtype.NamedType('place-holder', univ.Null()), + namedtype.OptionalNamedType('first-name', univ.OctetString()), + namedtype.DefaultedNamedType('age', univ.Integer(33)), + ) + ) def __init(self): self.s.clear() @@ -692,6 +900,53 @@ class SetEncoderWithSchemaTestCase(BaseTestCase): namedtype.DefaultedNamedType('age', univ.Integer(33)), ) ) + self.v = { + 'place-holder': None, + 'first-name': 'quick brown', + 'age': 1 + } + + def testEmpty(self): + try: + assert encoder.encode({}, asn1Spec=self.s) + + except PyAsn1Error: + pass + + else: + assert False, 'empty bare SET tolerated' + + def testDefMode(self): + assert encoder.encode( + self.v, asn1Spec=self.s + ) == ints2octs((49, 18, 5, 0, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 2, 1, 1)) + + def testIndefMode(self): + assert encoder.encode( + self.v, asn1Spec=self.s, defMode=False + ) == ints2octs((49, 128, 5, 0, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 2, 1, 1, 0, 0)) + + def testDefModeChunked(self): + assert encoder.encode( + self.v, asn1Spec=self.s, defMode=True, maxChunkSize=4 + ) == ints2octs((49, 24, 5, 0, 36, 17, 4, 4, 113, 117, 105, 99, 4, 4, 107, 32, 98, 114, 4, 3, 111, 119, 110, 2, 1, 1)) + + def testIndefModeChunked(self): + assert encoder.encode( + self.v, asn1Spec=self.s, defMode=False, maxChunkSize=4 + ) == ints2octs((49, 128, 5, 0, 36, 128, 4, 4, 113, 117, 105, 99, 4, 4, 107, 32, 98, 114, 4, 3, 111, 119, 110, 0, 0, 2, 1, 1, 0, 0)) + + +class SetEncoderWithComponentsSchemaTestCase(BaseTestCase): + def setUp(self): + BaseTestCase.setUp(self) + self.s = univ.Set( + componentType=namedtype.NamedTypes( + namedtype.NamedType('place-holder', univ.Null()), + namedtype.OptionalNamedType('first-name', univ.OctetString()), + namedtype.DefaultedNamedType('age', univ.Integer(33)), + ) + ) def __init(self): self.s.clear() @@ -859,6 +1114,26 @@ class ChoiceEncoderWithSchemaTestCase(BaseTestCase): namedtype.NamedType('string', univ.OctetString()) ) ) + self.v = { + 'place-holder': None + } + + def testFilled(self): + assert encoder.encode( + self.v, asn1Spec=self.s + ) == ints2octs((5, 0)) + + +class ChoiceEncoderWithComponentsSchemaTestCase(BaseTestCase): + def setUp(self): + BaseTestCase.setUp(self) + self.s = univ.Choice( + componentType=namedtype.NamedTypes( + namedtype.NamedType('place-holder', univ.Null('')), + namedtype.NamedType('number', univ.Integer(0)), + namedtype.NamedType('string', univ.OctetString()) + ) + ) def testEmpty(self): try: @@ -914,6 +1189,28 @@ class AnyEncoderTestCase(BaseTestCase): assert encoder.encode(s) == ints2octs((132, 5, 4, 3, 102, 111, 120)) +class AnyEncoderWithSchemaTestCase(BaseTestCase): + def setUp(self): + BaseTestCase.setUp(self) + self.s = univ.Any() + self.v = encoder.encode(univ.OctetString('fox')) + + def testUntagged(self): + assert encoder.encode(self.v, asn1Spec=self.s) == ints2octs((4, 3, 102, 111, 120)) + + def testTaggedEx(self): + s = self.s.subtype( + explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 4) + ) + assert encoder.encode(self.v, asn1Spec=s) == ints2octs((164, 5, 4, 3, 102, 111, 120)) + + def testTaggedIm(self): + s = self.s.subtype( + implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 4) + ) + assert encoder.encode(self.v, asn1Spec=s) == ints2octs((132, 5, 4, 3, 102, 111, 120)) + + suite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__]) if __name__ == '__main__': |