from __future__ import absolute_import, print_function, unicode_literals
from wolframclient.exception import WolframLanguageException
from wolframclient.serializers.wxfencoder.constants import (
ARRAY_TYPES,
VALID_PACKED_ARRAY_TYPES,
WXF_CONSTANTS,
StructDouble,
StructFloat,
StructInt8LE,
StructInt16LE,
StructInt32LE,
StructInt64LE,
StructUInt8LE,
StructUInt16LE,
StructUInt32LE,
StructUInt64LE,
)
from wolframclient.utils import six
from wolframclient.utils.datastructures import Settings
if six.JYTHON:
import jarray
[docs]def write_varint(int_value, stream):
"""Serialize `int_value` into varint bytes and write them to
`stream`, return the stream.
"""
stream.write(varint_bytes(int_value))
[docs]def varint_bytes(int_value):
"""Serialize `int_value` into varint bytes and return them as a byetarray."""
buf = bytearray(9)
if int_value < 0:
raise TypeError("Negative values cannot be encoded as varint.")
count = 0
while True:
next = int_value & 0x7F
int_value >>= 7
if int_value:
buf[count] = next | 0x80
count += 1
else:
buf[count] = next
count += 1
break
return buf[:count]
_exceptions = {
0: (WXF_CONSTANTS.Integer8, 1),
-(1 << 7): (WXF_CONSTANTS.Integer8, 1),
-(1 << 15): (WXF_CONSTANTS.Integer16, 2),
-(1 << 31): (WXF_CONSTANTS.Integer32, 4),
-(1 << 63): (WXF_CONSTANTS.Integer64, 8),
}
_size = dict(
(j, (WXF_CONSTANTS["Integer%i" % ih], ih // 8))
for il, ih in ((1, 8), (9, 16), (17, 32), (33, 64))
for j in range(il, ih + 1)
)
[docs]def integer_size(value):
try:
return _exceptions.get(value, None) or _size[value.bit_length() + 1]
except KeyError:
raise ValueError("Value %i is not a machine-sized integer." % value)
_packing = {1: StructInt8LE, 2: StructInt16LE, 4: StructInt32LE, 8: StructInt64LE}
if six.JYTHON:
def integer_to_bytes(value, int_size):
buffer = jarray.zeros(8, "c")
_packing.get(int_size).pack_into(buffer, 0, value)
return buffer[:int_size].tostring()
elif six.PY2:
def integer_to_bytes(value, int_size):
buffer = bytearray(8)
_packing.get(int_size).pack_into(buffer, 0, value)
return buffer[:int_size]
else:
[docs] def integer_to_bytes(value, int_size):
return value.to_bytes(int_size, byteorder="little", signed=True)
if six.JYTHON:
def float_to_bytes(value):
buffer = jarray.zeros(8, "c")
StructDouble.pack_into(buffer, 0, value)
return buffer.tostring()
else:
[docs] def float_to_bytes(value):
buffer = bytearray(8)
StructDouble.pack_into(buffer, 0, value)
return buffer
[docs]def valid_dimension_or_fail(dimension):
if dimension <= 0:
raise WolframLanguageException(
"Invalid array dimensions: %s. Expecting strictly positive integer." % dimension
)
[docs]def array_to_wxf(wxf_token, data, dimensions, array_type_token):
yield wxf_token
yield array_type_token
yield varint_bytes(len(dimensions))
for dim in dimensions:
valid_dimension_or_fail(dim)
yield varint_bytes(dim)
yield data
[docs]def numeric_array_to_wxf(data, dimensions, wl_type):
return array_to_wxf(WXF_CONSTANTS.NumericArray, data, dimensions, ARRAY_TYPES[wl_type])
[docs]def packed_array_to_wxf(data, dimensions, wl_type):
array_type_token = ARRAY_TYPES[wl_type]
if array_type_token not in VALID_PACKED_ARRAY_TYPES:
raise ValueError("Invalid PackedArray type %s" % array_type_token)
return array_to_wxf(WXF_CONSTANTS.PackedArray, data, dimensions, array_type_token)
[docs]def array_to_list(data, dimensions, wl_type):
for dimension in dimensions:
valid_dimension_or_fail(dimension)
return _array_to_list(data, dimensions, wl_type)
if hasattr(memoryview, "cast"):
unpack_mapping = Settings(
Integer8="b",
UnsignedInteger8="B",
Integer16="h",
UnsignedInteger16="H",
Integer32="i",
UnsignedInteger32="I",
Integer64="q",
UnsignedInteger64="Q",
Real32="f",
Real64="d",
ComplexReal32="f",
ComplexReal64="d",
)
def _to_complex(array, max_depth, curr_depth):
# recursivelly traverse the array until the last (real) dimension is reached
# it correspond to an array of (fake) array of two elements (real and im parts).
if curr_depth < max_depth - 1:
for sub in array:
_to_complex(sub, max_depth, curr_depth + 1)
return
# iterate over the pairs
for index, complex_pair in enumerate(array):
array[index] = complex(*complex_pair)
def _array_to_list(data, shape, array_type):
view = memoryview(data)
if array_type == "ComplexReal32" or array_type == "ComplexReal64":
dimensions = list(shape)
array_depth = len(dimensions)
# In the given array, 2 reals give one complex,
# adding one last dimension to represent it.
dimensions.append(2)
as_list = view.cast(unpack_mapping[array_type], shape=dimensions).tolist()
_to_complex(as_list, array_depth, 0)
return as_list
else:
return view.cast(unpack_mapping[array_type], shape=shape).tolist()
else:
unpack_mapping = Settings(
Integer8=StructInt8LE,
UnsignedInteger8=StructUInt8LE,
Integer16=StructInt16LE,
UnsignedInteger16=StructUInt16LE,
Integer32=StructInt32LE,
UnsignedInteger32=StructUInt32LE,
Integer64=StructInt64LE,
UnsignedInteger64=StructUInt64LE,
Real32=StructFloat,
Real64=StructDouble,
ComplexReal32=StructFloat,
ComplexReal64=StructDouble,
)
def _array_to_list(data, shape, array_type):
value, _ = _build_array_from_bytes(data, 0, array_type, shape, 0)
return value
def _build_array_from_bytes(data, offset, array_type, dimensions, current_dim):
new_array = list()
if current_dim < len(dimensions) - 1:
for i in range(dimensions[current_dim]):
new_elem, offset = _build_array_from_bytes(
data, offset, array_type, dimensions, current_dim + 1
)
new_array.append(new_elem)
else:
struct = unpack_mapping[array_type]
# complex values, need two reals for each.
if array_type == "ComplexReal32" or array_type == "ComplexReal64":
for i in range(dimensions[-1]):
# this returns a tuple.
re = struct.unpack_from(data, offset=offset)
offset = offset + struct.size
im = struct.unpack_from(data, offset=offset)
offset = offset + struct.size
new_array.append(complex(re[0], im[0]))
else:
for i in range(dimensions[-1]):
# this returns a tuple.
value = struct.unpack_from(data, offset=offset)
offset = offset + struct.size
new_array.append(value[0])
return new_array, offset