from __future__ import absolute_import, print_function, unicode_literals
import logging
from wolframclient.deserializers import binary_deserialize
from wolframclient.evaluation.cloud.request_adapter import wrap_response
from wolframclient.exception import (
RequestException,
WolframEvaluationException,
WolframLanguageException,
WolframParserException,
)
from wolframclient.utils import six
from wolframclient.utils.api import json
from wolframclient.utils.decorators import cached_property
from wolframclient.utils.logger import str_trim
logger = logging.getLogger(__name__)
__all__ = [
"WolframResult",
"WolframAPIResponseBuilder",
"WolframAPIResponse",
"WolframCloudEvaluationResponse",
"WolframCloudEvaluationWXFResponse",
"WolframCloudEvaluationJSONResponse",
"WolframKernelEvaluationResult",
"WolframAPIResponseAsync",
"WolframEvaluationJSONResponseAsync",
"WolframEvaluationWXFResponseAsync",
]
class WolframResultBase(object):
pass
[docs]class WolframResult(WolframResultBase):
""" The most generic result object.
The actual result is returned via the method :func:`~wolframclient.evaluation.result.WolframResult.get`. If the
result is a `success`, the field `result` is returned; otherwise, `failure` is returned and most likely contains an
error message.
"""
def __init__(self, result=None, failure=None):
self.success = failure is None
self.failure = failure
self.result = result
[docs] def get(self):
"""Return the result or raise an exception based on the success status."""
if self.success:
return self.result
else:
raise WolframLanguageException(self.failure)
def __repr__(self):
if self.success:
return "{}<success={}, result={}>".format(
self.__class__.__name__, self.success, self.result
)
else:
return "{}<success={}, failure={}>".format(
self.__class__.__name__, self.success, self.failure
)
class WolframAsyncResult(WolframResultBase):
async def get(self):
raise NotImplementedError
class WolframEvaluationResultBase(WolframResultBase):
def __init__(self):
self._built = False
self._success = False
self._failure = None
self._messages = None
self._messages_name = None
self._output = None
self._result = None
self._is_message_failure = False
self.parsed_response = None
@property
def success(self):
""" Evaluations succeed when it returns a result and no message is issued. """
if not self._built:
self.build()
return self._success
@property
def failure(self):
if not self._built:
self.build()
return self._failure
@property
def messages(self):
""" A list of the messages issued during the evaluation. """
if not self._built:
self.build()
return self._messages
@property
def messages_name(self):
""" A list of the name of all the messages issued during the evaluation. """
if not self._built:
self.build()
return self._messages_name
def iter_messages(self):
"""
Iterator over all text messages issued during the evaluation.
:return: message text as a string.
"""
if self.messages:
yield from self.messages
def iter_messages_name(self):
if self.messages_name:
yield from self.messages_name
def iter_messages_tuple(self):
""" Iterator over all messages returned as a tuple: (message name, message text)"""
if self.messages and self.messages_name:
yield from zip(self.iter_messages_name(), self.iter_messages())
@property
def output(self):
""" A list of all content that got printed during evaluation e.g. using :wl:`Print`. """
if not self._built:
self.build()
return self._output
def iter_output(self):
""" Iterator over all printed output."""
if self.output:
yield from self.output
@property
def result(self):
if not self._built:
self.build()
return self._result
@property
def is_message_failure(self):
if not self._built:
self.build()
return self._is_message_failure
def build(self):
self.parse_response()
if self.parsed_response:
self.build_from_parsed_response()
# make sure to always build a response, here a generic one.
elif not self._built:
self.build_invalid_format()
def get(self, silent=True):
"""Return the result or raise an exception.
`silent` can be set to False to log all messages with warning severity. """
if self.success:
return self.result
elif self.is_message_failure:
if not silent:
for msg in self.iter_messages():
logger.warning(msg)
return self.result
else:
raise WolframEvaluationException("Evaluation failed.", messages=self.failure)
def parse_response(self):
""" Parse the result input and set the attribute `parsed_response`.
The result input can be encoded in various formats such as WXF, JSON, etc.
The `parsed_response` dict is expected to have keys corresponding to those of the association returned by
:wl:`EvaluationData`.
"""
raise NotImplementedError(
"%s does not implement parse_response method." % (self.__class__.__name__,)
)
def build_invalid_format(self, response_format_name=None):
""" Build a result object for invalid format cases. """
logger.fatal("Invalid format. Failed to parse result.")
self._success = False
if response_format_name:
self._failure = "Failed to decode response encoded with %s" % response_format_name
else:
self._failure = "Failed to decode response."
self._built = True
def build_from_parsed_response(self):
self._success = self.parsed_response["Success"]
self._result = self.parsed_response["Result"]
self._output = self.parsed_response.get("Output", [])
if not self._success:
self._failure = self.parsed_response["FailureType"]
if self._failure == "MessageFailure":
self._is_message_failure = True
self._messages_name = self.parsed_response["Messages"]
self._messages = self.parsed_response["MessagesText"]
else:
logger.warning("Evaluation failed.")
self._is_message_failure = False
self._built = True
def __repr__(self):
if self.success:
return "{}<expression={}>".format(self.__class__.__name__, self.result)
elif self.is_message_failure:
return "{}<success={}, result={}, messages={}>".format(
self.__class__.__name__, self.success, self.result, self.messages
)
else:
return "{}<failure={}>".format(self.__class__.__name__, self.failure)
[docs]class WolframKernelEvaluationResult(WolframEvaluationResultBase):
"""A Wolfram result with WXF encoded data.
Messages can be issued during a kernel evaluation. Those are stored as `messages`. If any message was returned by
the kernel then the success status is `False`.
The evaluation result is lazily computed when accessing the field `result`. The WXF bytes holding the evaluation
result are stored in `wxf` and thus can be later parsed with a customized parser if necessary.
All strings printed during the evaluation (e.g. Print["something"]) are stored in property `output` as a list.
The dict holding evaluation data is available in `evaluation_data`.
"""
def __init__(self, wxf_eval_data, consumer=None):
super().__init__()
self.wxf_evaluation_data = wxf_eval_data
# store the expression result serialized.
self.wxf = None
self.consumer = consumer
[docs] def parse_response(self):
self.parsed_response = binary_deserialize(self.wxf_evaluation_data)
self.wxf = self.parsed_response["Result"]
@cached_property
def result(self):
# Kernel evaluation encode the result as WXF. Lazily decoding it using the user consumer.
return binary_deserialize(super().result, consumer=self.consumer)
[docs]class WolframCloudEvaluationResponse(WolframEvaluationResultBase):
"""Result object associated with cloud kernel evaluation.
The response body associated to this type of result is encoded.
Other fields provide additional information. The HTTP response object is
stored as `http_response` and when HTTP error occurred it is stored in `request_error`.
"""
def __init__(self, response):
super().__init__()
self.http_response = wrap_response(response)
self.request_error = self.http_response.status() != 200
def __repr__(self):
if self._built and not self.request_error:
return super().__repr__()
elif self.request_error:
return "{}<request error {}>".format(
self.__class__.__name__, self.http_response.status()
)
else:
return "{}<successful request, request body not yet parsed>".format(
self.__class__.__name__
)
[docs] def get(self, silent=False):
return super().get(silent)
[docs] def build(self):
if not self.request_error:
super().build()
else:
logger.fatal(
"Server invalid response %i: %s",
self.http_response.status(),
self.http_response.text(),
)
raise RequestException(self.http_response)
[docs]class WolframCloudEvaluationWXFResponse(WolframCloudEvaluationResponse):
""" Result object associated with cloud evaluation request WXF encoded. """
[docs] def parse_response(self):
wxf = self.http_response.content()
try:
self.parsed_response = binary_deserialize(wxf)
except WolframLanguageException as e:
self.build_invalid_format(response_format_name="WXF")
[docs]class WolframCloudEvaluationJSONResponse(WolframCloudEvaluationResponse):
""" Result object associated with cloud evaluation request JSON encoded. """
[docs] def parse_response(self):
try:
self.parsed_response = self.http_response.json()
except json.JSONDecodeError as e:
self.build_invalid_format(response_format_name="JSON")
class WolframCloudEvaluationResponseAsync(WolframCloudEvaluationResponse):
"""Asynchronous result object associated with cloud evaluation request. """
async def build(self):
if not self.request_error:
await self.parse_response()
if self.parsed_response:
self.build_from_parsed_response()
elif not self._built:
self.build_invalid_format()
else:
msg = await self.http_response.text()
logger.fatal("Server invalid response %i: %s", self.http_response.status(), msg)
self._built = True
raise RequestException(self.http_response, msg=msg)
async def parse_response(self):
raise NotImplementedError(
"%s does not implement parse_response asynchronous method."
% (self.__class__.__name__,)
)
@property
async def success(self):
if not self._built:
await self.build()
return self._success
@property
async def failure(self):
if not self._built:
await self.build()
return self._failure
@property
async def result(self):
if not self._built:
await self.build()
return self._result
@property
async def is_message_failure(self):
if not self._built:
await self.build()
return self.is_message_failure
@property
async def messages(self):
""" A list of the messages issued during the evaluation. """
if not self._built:
await self.build()
return self._messages
@property
async def messages_name(self):
""" A list of the name of all the messages issued during the evaluation. """
if not self._built:
await self.build()
return self._messages_name
async def iter_messages(self):
"""
Iterator over all text messages issued during the evaluation.
:return: message text as a string.
"""
msgs = await self.messages
if msgs:
for msg in msgs:
yield msg
async def iter_messages_name(self):
names = await self.messages_name
if names:
for name in names:
yield name
async def iter_messages_tuple(self):
""" Iterator over all messages returned as a tuple: (message name, message text)"""
msg = await self.messages
names = await self.messages_name
if msg and names:
for tuple_msg in zip(names, msg):
yield tuple_msg
@property
async def output(self):
""" A list of all content that got printed during evaluation e.g. using :wl:`Print`. """
if not self._built:
await self.build()
return self._output
async def iter_output(self):
""" Iterator over all printed output."""
output = await self.output
if output:
for line in self.output:
yield line
@property
async def is_message_failure(self):
if not self._built:
await self.build()
return self._is_message_failure
async def get(self, silent=False):
"""Return the result or raise an exception based on the success status."""
if await self.success:
return await self.result
elif await self.is_message_failure:
if not silent:
for msg in self.iter_messages():
logger.warning(msg)
return self.result
else:
raise WolframEvaluationException("Evaluation failed.", messages=await self.failure)
[docs]class WolframEvaluationJSONResponseAsync(WolframCloudEvaluationResponseAsync):
"""Asynchronous result object associated with cloud evaluation request encoded with JSON. """
[docs] async def parse_response(self):
try:
self.parsed_response = await self.http_response.json()
except json.JSONDecodeError as e:
self.build_invalid_format(response_format_name="JSON")
[docs]class WolframEvaluationWXFResponseAsync(WolframCloudEvaluationResponseAsync):
"""Asynchronous result object associated with cloud evaluation request encoded with WXF. """
[docs] async def parse_response(self):
wxf = await self.http_response.content()
try:
self.parsed_response = binary_deserialize(wxf)
except WolframLanguageException as e:
self.build_invalid_format(response_format_name="WXF")
_DEFAULT_DECODERS = {
"application/vnd.wolfram.wxf": binary_deserialize,
"application/json": json.loads,
}
[docs]class WolframAPIResponse(WolframResult):
""" A generic API response.
This class is lazily constructed when the response body becomes available.
A decoder is inferred from the content type. Currently JSON and WXF formats are supported.
"""
def __init__(self, response, decoder=None):
self.response = response
self.content_type = response.headers().get("Content-Type", None)
if decoder:
self.decoder = decoder
else:
self.decoder = _DEFAULT_DECODERS.get(self.content_type)
self.status = response.status()
self.parsed_response = None
self.success = None
self._failure = None
self._built = False
[docs] def build(self):
raise NotImplementedError
[docs] def get(self):
if not self._built:
self.build()
return self._get()
def _get(self):
if self.success:
return self.result
else:
raise WolframLanguageException(self._failure)
[docs] def failure(self):
if not self._built:
self.build()
return self._failure
def __repr__(self):
return "<%s:success=%s>" % (self.__class__.__name__, self.success)
[docs]class WolframAPIResponseAsync(WolframAPIResponse):
""" Asynchronous counterpart of :class:`~wolframclient.evaluation.result.WolframAPIResponse`, awaiting for the
response body.
Most of the class logic is implemented in :data:`WolframAPIResponse`, except the build method which has to be a
coroutine.
"""
[docs] async def get(self):
""" Return the result or raise an exception based on the success status.
This is a coroutine."""
if not self._built:
await self.build()
return self._get()
[docs] async def build(self):
raise NotImplementedError
class WolframAPIFailureResponse(WolframAPIResponse):
def __init__(self, response, decoder=None):
super().__init__(response, decoder=decoder)
self.success = False
class WolframAPIFailureResponseAsync(WolframAPIResponseAsync, WolframAPIFailureResponse):
pass
class WolframAPIResponse200(WolframAPIResponse):
def __init__(self, response, decoder=None):
super().__init__(response, decoder=decoder)
self.success = True
def build(self):
self._built = True
if self.decoder is not None:
try:
self.result = self.decoder(self.response.content())
except Exception as e:
self.success = False
self._failure = "Decoder error: {}".format(e)
self.exception = e
else:
self.result = self.response.content()
class WolframAPIResponse200Async(WolframAPIResponseAsync, WolframAPIResponse200):
async def build(self):
if self.decoder is not None:
try:
self.result = self.decoder(await self.response.content())
except Exception as e:
self.success = False
self._failure = "Decoder error: {}".format(e)
self.exception = e
else:
self.result = await self.response.content()
self._built = True
class WolframAPIResponseRedirect(WolframAPIFailureResponse):
def __init__(self, response, decoder=None):
super().__init__(response, decoder)
self.location = None
def build(self):
self.location = self.response.headers().get("location", None)
logger.warning("Redirected to %s.", self.location)
self._specific_failure()
self._built = True
def _specific_failure(self):
raise NotImplementedError
class WolframAPIResponse301(WolframAPIResponseRedirect):
def _specific_failure(self):
""" should not happen since we follow redirection """
self._failure = "Resource permanently moved to new location {}".format(self.location)
class WolframAPIResponse301Async(WolframAPIResponse301, WolframAPIResponseAsync):
pass
class WolframAPIResponse302(WolframAPIResponseRedirect):
def _specific_failure(self):
# hack because the server is not returning 403. cf. CLOUD-12946
if self.location is not None and "j_spring_oauth_security_check" in self.location:
self._failure = "Not allowed to access requested resource."
else:
self._failure = "Resource moved to new location {}".format(self.location)
class WolframAPIResponse302Async(WolframAPIResponse302, WolframAPIResponseAsync):
pass
class WolframAPIResponse400(WolframAPIFailureResponse):
def __init__(self, response, decoder=None):
super().__init__(response, decoder=decoder)
self._fields_in_error = None
def build(self):
try:
if self.decoder:
self.parsed_response = self.decoder(self.response.content())
else:
self.parsed_response = self._unexpected_content_type()
except (json.JSONDecodeError, WolframParserException):
logger.fatal(
"Failed to parse server response as %s:\n%s",
self.content_type,
str_trim(self.response.content(), max_char=200),
)
raise self._failed_to_parse()
self._update_from_response()
self._built = True
def fields_in_error(self):
"""Return all the fields in error with their message as a tuple of tuples"""
if not self._built:
self.build()
return self._fields_in_error
def _failed_to_parse(self):
return RequestException(self.response, msg="Failed to parse server response.")
def _unexpected_content_type(self):
logger.warning(
"Response content-type: %s is not supported. Cannot decode content: %s",
self.content_type,
str_trim(self.response.content()),
)
return {
"Failure": "Cannot decode server response. No decoder found for content-type: %s."
% self.content_type
}
def _update_from_response(self):
self._failure = self.parsed_response.get("Failure", None)
fields = self.parsed_response.get("Fields", None)
logger.warning("Wolfram API error response: %s", self._failure)
if fields is not None:
self._fields = set(fields.keys())
logger.warning("Fields in error: %s", self._fields)
self._fields_in_error = []
for field, err in fields.items():
failure = err.get("Failure", None)
if failure is not None:
self._fields_in_error.append((field, failure))
class WolframAPIResponse400Async(WolframAPIResponse400, WolframAPIResponseAsync):
async def build(self):
# ignoring content-type. Must be JSON. Make sure it's robust enough.
try:
if self.decoder:
self.parsed_response = self.decoder(await self.response.content())
else:
self.parsed_response = await self._unexpected_content_type()
except json.JSONDecodeError as e:
logger.fatal(
"Failed to parse server response as %s:\n%s",
self.content_type,
await self.response.content(),
)
raise self._failed_to_parse()
self._update_from_response()
self._built = True
async def _unexpected_content_type(self):
logger.warning(
"Response content-type: %s is not supported. Cannot decode content: %s",
self.content_type,
str_trim(await self.response.content()),
)
return {
"Failure": "Cannot decode server response. No decoder found for content-type: %s."
% self.content_type
}
async def fields_in_error(self):
"""Return all the fields in error with their message as a list of tuples"""
if not self._built:
await self.build()
return self._fields_in_error
class WolframAPIResponse401(WolframAPIFailureResponse):
def build(self):
self._failure = self.response.text()
logger.warning("Authentication missing or failed. Server response: %s", self._failure)
self._built = True
class WolframAPIResponse401Async(WolframAPIResponse401, WolframAPIResponseAsync):
async def build(self):
# ignoring content-type. Must be JSON. Make sure it's robust enough.
self._failure = await self.response.text()
self._built = True
logger.warning("Authentication missing or failed. Server response: %s", self._failure)
class WolframAPIResponse404(WolframAPIFailureResponse):
def build(self):
self._failure = "The resource %s can't not be found." % self.response.url()
logger.warning("Wolfram API error response: %s", self._failure)
self._built = True
class WolframAPIResponse404Async(WolframAPIResponse404, WolframAPIResponseAsync):
async def build(self):
WolframAPIResponse404.build(self)
class WolframAPIResponseGeneric(WolframAPIFailureResponse):
def build(self):
self._failure = self.response.text()
self._built = True
class WolframAPIResponseGenericAsync(WolframAPIResponseAsync):
async def build(self):
self._failure = await self.response.text()
self._built = True
class WolframAPIResponse500(WolframAPIResponseGeneric):
def __init__(self, response, decoder=None):
super().__init__(response, decoder)
logger.fatal("Internal server error occurred.")
class WolframAPIResponse500Async(WolframAPIResponseGenericAsync):
def __init__(self, response, decoder=None):
super().__init__(response, decoder)
logger.fatal("Internal server error occurred.")
[docs]class WolframAPIResponseBuilder(object):
"""Map error code to handler building the appropriate
:class:`~wolframclient.evaluation.result.WolframAPIResponse`
"""
response_mapper = {
200: WolframAPIResponse200,
301: WolframAPIResponse301,
302: WolframAPIResponse302,
400: WolframAPIResponse400,
401: WolframAPIResponse401,
404: WolframAPIResponse404,
500: WolframAPIResponse500,
}
async_response_mapper = {
200: WolframAPIResponse200Async,
301: WolframAPIResponse301Async,
302: WolframAPIResponse302Async,
400: WolframAPIResponse400Async,
401: WolframAPIResponse401Async,
404: WolframAPIResponse404Async,
500: WolframAPIResponse500Async,
}
[docs] @staticmethod
def build(response, decoder=None):
adapter = wrap_response(response)
if adapter.asynchronous:
return WolframAPIResponseBuilder.async_response_mapper.get(
adapter.status(), WolframAPIResponseGenericAsync
)(adapter, decoder=decoder)
else:
return WolframAPIResponseBuilder.response_mapper.get(
adapter.status(), WolframAPIResponseGeneric
)(adapter, decoder=decoder)
[docs] @staticmethod
def map(status_code, response_class):
if not isinstance(response_class, WolframAPIResponse):
raise ValueError(
"Response class %s is not a subclass of %s"
% (response_class.__class__.__name__, WolframAPIResponse.__class__.__name__)
)
if not isinstance(status_code, six.integer_types):
logger.warning("Invalid status code: %s", status_code)
raise ValueError("HTTP status code must be string.")
logger.debug(
"Mapping http response status %i to function %s",
status_code,
response_class.__name__,
)
WolframAPIResponseBuilder.response_mapper[status_code] = response_class
def __init__(self):
raise NotImplementedError("Cannot initialize. Use static 'method' build.")