Source code for wolframclient.serializers.encoder

from __future__ import absolute_import, print_function, unicode_literals

import logging
import sys
from collections import defaultdict
from functools import partial

import pkg_resources

from wolframclient.serializers.utils import safe_len
from wolframclient.utils import six
from wolframclient.utils.api import multiprocessing
from wolframclient.utils.dispatch import Dispatch
from wolframclient.utils.functional import composition, is_iterable, iterate, map
from wolframclient.utils.importutils import safe_import_string

logger = logging.getLogger(__name__)

__all__ = ["wolfram_encoder", "Encoder"]


class WolframDispatch(Dispatch):
    def __init__(self, *args, **opts):

        super(WolframDispatch, self).__init__(*args, **opts)

        self.registry = defaultdict(list)
        self.modules = set()
        self.plugins_registry = defaultdict(list)

    def register_modules(self, **handlers):
        for module, _handlers in handlers.items():
            self.modules.add(module)
            self.registry[module].extend(iterate(_handlers))

    def register_plugins(self, name="wolframclient_serializers_encoder"):
        if logger.isEnabledFor(logging.INFO):
            logger.info(
                "Registering Wolfram encoders plugins associated with entrypoint %s." % name
            )
        for entry_point in pkg_resources.iter_entry_points(group=name):
            self.plugins_registry[entry_point.name].extend(entry_point.module_name)

    def _update_dispatch(self):
        if self.modules:
            installed_modules = sys.modules.keys()
            for module in self.modules.intersection(installed_modules):
                for handler in self.registry[module]:
                    self.update(safe_import_string(handler), keep_existing=True)

                del self.registry[module]
                self.modules.remove(module)

    def _update_plugins(self):
        if self.plugins_registry:
            for plugins_name, handler in self.plugins_registry.items():
                handler = "".join(handler)
                try:
                    self.update(safe_import_string(handler))
                except TypeError as e:
                    logger.fatal(
                        "Failed to load encoder associated to plugins %s." % plugins_name
                    )
                    raise e
            self.plugins_registry = defaultdict(list)

    if not six.JYTHON:
        # global lock to avoid multiple dispatcher updating in multithreaded programs.
        _lock = multiprocessing.Lock()

        def update_dispatch(self):
            with self._lock:
                self._update_dispatch()
                self._update_plugins()

    else:

        def update_dispatch(self):
            self._update_dispatch()
            self._update_plugins()


wolfram_encoder = WolframDispatch()
wolfram_encoder.register_modules(
    # builtin libraries
    sys="wolframclient.serializers.encoders.builtin.encoder",
    decimal="wolframclient.serializers.encoders.decimal.encoder",
    datetime="wolframclient.serializers.encoders.datetime.encoder",
    fractions="wolframclient.serializers.encoders.fractions.encoder",
    # wolfram language support
    numpy="wolframclient.serializers.encoders.numpy.encoder",
    pandas="wolframclient.serializers.encoders.pandas.encoder",
    PIL=(
        "wolframclient.serializers.encoders.pil.encoder",
        "wolframclient.serializers.encoders.numpy.encoder",
    ),
)
wolfram_encoder.register_plugins()


@wolfram_encoder.dispatch(object)
def encode(serializer, o):

    if serializer.allow_external_objects:
        return serializer.serialize_external_object(o)

    if is_iterable(o):
        return serializer.serialize_iterable(map(serializer.encode, o), length=safe_len(o))

    raise NotImplementedError("Cannot serialize object of class %s" % o.__class__)


wolfram_encoder.__doc__ = """ 
    Mapping between Python types and encoders used during serializations.

    This instance of :class:`~wolframclient.utils.dispatch.Dispatch` is used in 
    :func:`~wolframclient.serializers.export` to serialize Python expressions and produce a stream of bytes.

    **Register new encoders:**


    The annotation :meth:`~wolframclient.utils.dispatch.Dispatch.dispatch` applied to a function, defines an encoder 
    and associates it to the types passed as argument of the annotation.

    Define a new class::

        class MyPythonClass(object):
            def __init__(self, *arguments):
                self.arguments = arguments

    Specify its encoder::

        from wolframclient.serializers import wolfram_encoder
        from wolframclient.language import wl
        from wolframclient.serializers import export

        @wolfram_encoder.dispatch(MyPythonClass)
        def my_encoder(serializer, o):
            return serializer.encode(wl.MyWolframFunction(*o.arguments))

    Serialize an expression::

        >>> export(MyPythonClass(1,2))
        b'MyWolframFunction[1, 2]'

    Alternatively, apply :meth:`~wolframclient.utils.dispatch.Dispatch.register` to a function and its associated 
    type(s) achieves the same result.

    It is not possible to associate two encoders with the same type, but it's possible to remove a mapping. First, 
    unregister the previous encoder::

        wolfram_encoder.unregister(MyPythonClass)

    And register it again with :meth:`~wolframclient.utils.dispatch.Dispatch.register`::

        wolfram_encoder.register(my_encoder, MyPythonClass) 

    **Update with a dispatcher:**


    Another way to extend supported types is to create a new :class:`~wolframclient.utils.dispatch.Dispatch`, map 
    various types and encoders and ultimately update :data:`wolfram_encoder` using 
    :meth:`~wolframclient.utils.dispatch.Dispatch.update`.

    Create a new dispatcher and register :data:`MyPythonClass`::

        from wolframclient.utils.dispatch import Dispatch

        dispatch = Dispatch()
        dispatch.register(my_encoder, MyPythonClass)

    Update the main encoder with the new dispatch instance::

        wolfram_encoder.update(dispatch)

    Serialize an expression::

        >>> export(MyPythonClass(1,2))
        b'MyWolframFunction[1, 2]'

    **Define plugins:**


    The library supports an entry point dedicated to new encoders: `wolframclient_serializers_encoder`. The library uses 
    this entry point to loads plugins at runtime as separated libraries. For more information about entry points, refer 
    to the documentation page about `entry points <https://packaging.python.org/specifications/entry-points/>`_.

    The plugin name must be unique and the value must reference a dispatcher instance. This instance is loaded and used 
    to update :data:`wolfram_encoder`. A plugin is a simple way to distribute encoders as a separate library. 
    
    One type must have a unique encoder associated to it; as a consequence, two plugins registering an encoder for the 
    same type are incompatible. It is strongly advised to create one plugin for each existing Python library, 
    e.g. have one plugin dedicated to NumPy and one to Pandas, which makes heavy use of NumPy arrays.
    """


[docs]class Encoder(object): """ A generic class exposing an :meth:`~wolframclient.serializers.encode.Encoder.encode` method applying an optional normalizer function, followed the most relevant encoding available for a given type. Arbitrary named parameters passed during initialization are later accessible with :meth:`~wolframclient.serializers.encode.Encoder.get_property`. """ def __init__( self, normalizer=None, encoder=None, allow_external_objects=False, target_kernel_version=None, **kwargs ): self.encode = self.chain_normalizer( normalizer, encoder=safe_import_string(encoder or wolfram_encoder) ) self.allow_external_objects = allow_external_objects self.target_kernel_version = target_kernel_version or 11.3 self._properties = kwargs
[docs] def chain_normalizer(self, func, encoder): if isinstance(encoder, WolframDispatch): encoder.update_dispatch() return composition( *map(safe_import_string, iterate(func or (), partial(encoder.as_method(), self))) )
[docs] def get_property(self, key, d=None): """ Return the value of the named parameter passed during initialization. Set `d` to the default value if key was not present. """ return self._properties.get(key, d)