Source code for wolframclient.serializers.encoders.pandas

from __future__ import absolute_import, print_function, unicode_literals

from collections import OrderedDict
from itertools import starmap

from wolframclient.utils.api import pandas
from wolframclient.utils.dispatch import Dispatch

encoder = Dispatch()


[docs]def safe_pandas_length(o): """ Return the length of a pandas Series and DataFrame as expected for WL serialization. - The length of a Series is the only value of the tuple `shape`. - The length of a dataframe is the number of columns. It's the second value of `shape`. This function is safe, when the shape does not have the expected number of elements, it fails silently and returns `None`, the object is later traversed to find out how many elements it contains. """ try: return o.shape[-1] except (TypeError, IndexError): return
[docs]def encode_as_dataset(serializer, o, length): return serializer.serialize_function( serializer.serialize_symbol(b"Dataset"), (encode_as_association(serializer, o, length),), )
[docs]def encode_as_list(serializer, o, length): return serializer.serialize_iterable( starmap( lambda k, v: serializer.serialize_rule(k, v), encoded_kv_tuples(serializer, o) ), length=length, )
[docs]def encode_as_association(serializer, o, length): return serializer.serialize_association(encoded_kv_tuples(serializer, o), length=length)
[docs]def encoded_kv_tuples(serializer, o): return ((serializer.encode(k), serializer.encode(v)) for k, v in o.items())
[docs]def encode_as_timeseries(serializer, o, length): return serializer.serialize_function( serializer.serialize_symbol(b"TimeSeries"), ( serializer.serialize_iterable( ( serializer.serialize_iterable(item, length=2) for item in encoded_kv_tuples(serializer, o) ), length=length, ), ), )
def _distribute_multikey(o): expr_dict = OrderedDict() for multikey, value in o.iteritems(): cur_dict = expr_dict for key in multikey[:-1]: if key not in cur_dict: cur_dict[key] = OrderedDict() cur_dict = cur_dict[key] cur_dict[multikey[-1]] = value return expr_dict
[docs]def encode_multiindex_as_assoc(serializer, o, length): return serializer.encode(_distribute_multikey(o))
[docs]def encode_multiindex_as_dataset(serializer, o, length): return serializer.serialize_function( serializer.serialize_symbol(b"Dataset"), (serializer.encode(_distribute_multikey(o)),) )
PANDAS_PROPERTIES = { "pandas_series_head": {"dataset", "list", "association"}, "pandas_dataframe_head": {"dataset", "association"}, "timeseries": True, } ENCODERS = { "default": { "dataset": encode_as_dataset, "list": encode_as_list, "association": encode_as_association, }, "datetimeindex": encode_as_timeseries, "multiindex": { "association": encode_multiindex_as_assoc, "list": encode_multiindex_as_assoc, "dataset": encode_multiindex_as_dataset, }, }
[docs]def get_series_encoder_from_index(index, use_ts, form): if use_ts and isinstance(index, pandas.DatetimeIndex): return ENCODERS["datetimeindex"] elif isinstance(index, pandas.MultiIndex): return ENCODERS["multiindex"][form or "dataset"] else: return ENCODERS["default"][form or "association"]
INVALID_PROPERTY_MSG = "Invalid property %s, expecting %s"
[docs]def normalized_prop_timeseries(serializer): prop = serializer.get_property("timeseries", d=True) if not isinstance(prop, bool): raise ValueError( "Invalid value for property 'timeseries'. Expecting a boolean, got %s." % prop ) return prop
[docs]def normalized_prop_pandas_series_head(serializer): """ Check property `pandas_series_head` only if specified (not None). """ prop = serializer.get_property("pandas_series_head", d=None) if prop and prop not in PANDAS_PROPERTIES["pandas_series_head"]: raise ValueError( "Invalid value for property 'pandas_series_head'. Expecting one of (%s), got %s." % (", ".join(PANDAS_PROPERTIES["pandas_series_head"]), prop) ) return prop
[docs]@encoder.dispatch(pandas.Series) def encode_panda_series(serializer, o): use_ts = normalized_prop_timeseries(serializer) form = normalized_prop_pandas_series_head(serializer) encoder = get_series_encoder_from_index(o.index, use_ts, form) return encoder(serializer, o, safe_pandas_length(o))
[docs]def encode_dataframe_as_assoc(serializer, o, length): use_ts = normalized_prop_timeseries(serializer) return serializer.serialize_association( ( ( serializer.encode(k), get_series_encoder_from_index(v.index, use_ts, "association")( serializer, v, safe_pandas_length(v) ), ) for k, v in o.T.items() ), length=length, )
[docs]def encode_dataframe_as_dataset(serializer, o, length): return serializer.serialize_function( serializer.serialize_symbol(b"Dataset"), (encode_dataframe_as_assoc(serializer, o, length),), )
[docs]@encoder.dispatch(pandas.DataFrame) def encoder_panda_dataframe(serializer, o): head = serializer.get_property("pandas_dataframe_head", d=None) if head is None or head == "dataset": return encode_dataframe_as_dataset(serializer, o, safe_pandas_length(o.index)) elif head in PANDAS_PROPERTIES["pandas_dataframe_head"]: return encode_dataframe_as_assoc(serializer, o, safe_pandas_length(o.index)) else: raise ValueError( "Invalid value for property 'pandas_dataframe_head'. Expecting one of (%s), got %s." % (", ".join(PANDAS_PROPERTIES["pandas_dataframe_head"]), head) )