Source code for mars.learn.utils.checks

# Copyright 1999-2021 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import numpy as np
try:
    from sklearn import get_config as get_sklearn_config
except ImportError:  # pragma: no cover
    get_sklearn_config = None

from ... import opcodes as OperandDef
from ... import tensor as mt
from ...core import ENTITY_TYPE, get_output_types, recursive_tile
from ...core.operand import OperandStage
from ...config import options
from ...serialization.serializables import KeyField, StringField, BoolField, DataTypeField
from ...tensor.core import TensorOrder, TENSOR_CHUNK_TYPE
from ...tensor.array_utils import as_same_device, device, issparse, get_array_module
from ...utils import ceildiv
from ..operands import LearnOperand, LearnOperandMixin, OutputType


class CheckBase(LearnOperand, LearnOperandMixin):
    _input = KeyField('input')
    _value = KeyField('value')
    _err_msg = StringField('err_msg')

    def __init__(self, input=None, value=None, err_msg=None, output_types=None, **kw):
        super().__init__(_input=input, _value=value, _err_msg=err_msg,
                         _output_types=output_types, **kw)

    @property
    def input(self):
        return self._input

    @property
    def value(self):
        return self._value

    @property
    def err_msg(self):
        return self._err_msg

    def _set_inputs(self, inputs):
        super()._set_inputs(inputs)
        if self._input is not None:
            self._input = self._inputs[0]
        if self._value is not None:
            self._value = self._inputs[-1]

    def __call__(self, x, value=None):
        # output input if value not specified
        self._value = value = value if value is not None else x
        self.output_types = get_output_types(value)
        self.stage = OperandStage.agg
        return self.new_tileable([x, value],
                                 kws=[value.params])

    @classmethod
    def tile(cls, op):
        combine_size = options.combine_size
        x, value = op.input, op.value
        check_chunks = []
        for i, chunk in enumerate(x.chunks):
            chunk_op = cls(err_msg=op.err_msg, stage=OperandStage.map,
                           output_types=[OutputType.tensor])
            check_chunk = chunk_op.new_chunk([chunk], shape=(),
                                             index=(i,),
                                             dtype=np.dtype(bool),
                                             order=TensorOrder.C_ORDER)
            check_chunks.append(check_chunk)

        while len(check_chunks) > 1:
            prev_check_chunks = check_chunks
            check_chunks = []
            chunk_size = ceildiv(len(prev_check_chunks), combine_size)
            for i in range(chunk_size):
                chunks = prev_check_chunks[i * combine_size: (i + 1) * combine_size]
                chunk_op = cls(err_msg=op.err_msg, stage=OperandStage.combine,
                               output_types=[OutputType.tensor])
                check_chunk = chunk_op.new_chunk(chunks, shape=(),
                                                 index=(i,),
                                                 dtype=np.dtype(bool),
                                                 order=TensorOrder.C_ORDER)
                check_chunks.append(check_chunk)

        check_chunk = check_chunks[0]
        out_chunks = []
        for val_chunk in value.chunks:
            chunk_op = cls(value=val_chunk, err_msg=op.err_msg, stage=OperandStage.agg,
                           output_types=op.output_types)
            out_chunk = chunk_op.new_chunk([check_chunk, val_chunk], kws=[val_chunk.params])
            out_chunks.append(out_chunk)

        new_op = op.copy()
        kw = op.outputs[0].params
        kw['chunks'] = out_chunks
        kw['nsplits'] = value.nsplits
        return new_op.new_tileables(op.inputs, kws=[kw])


class CheckNonNegative(CheckBase):
    _op_type_ = OperandDef.CHECK_NON_NEGATIVE

    _whom = StringField('whom')

    def __init__(self, input=None, value=None, whom=None, err_msg=None,
                 stage=None, gpu=None, output_types=None, **kw):
        super().__init__(input=input, value=value, _whom=whom,
                         err_msg=err_msg, stage=stage,
                         output_types=output_types,
                         gpu=gpu, **kw)
        if self._err_msg is None and self._whom is not None:
            self._err_msg = f"Negative values in data passed to {self._whom}"

    @property
    def whom(self):
        return self._whom

    @classmethod
    def _execute_tensor(cls, ctx, op):
        (x,), device_id, xp = as_same_device(
            [ctx[inp.key] for inp in op.inputs], device=op.device, ret_extra=True)

        with device(device_id):
            if issparse(x) and x.nnz == 0:
                x_min = 0
            else:
                x_min = xp.min(x)

            if x_min < 0:
                raise ValueError(op.err_msg)

            ctx[op.outputs[0].key] = np.array(True)

    @classmethod
    def _execute_df(cls, ctx, op):
        x = ctx[op.inputs[0].key]
        x_min = x.min().min()
        if x_min < 0:
            raise ValueError(op.err_msg)

        ctx[op.outputs[0].key] = np.array(True)

    @classmethod
    def _execute_map(cls, ctx, op):
        if isinstance(op.inputs[0], TENSOR_CHUNK_TYPE):
            return cls._execute_tensor(ctx, op)
        else:
            return cls._execute_df(ctx, op)

    @classmethod
    def _execute_combine(cls, ctx, op):
        # just pass value cuz all inputs executed successfully
        ctx[op.outputs[0].key] = np.array(True)

    @classmethod
    def _execute_agg(cls, ctx, op):
        ctx[op.outputs[0].key] = ctx[op.value.key]

    @classmethod
    def execute(cls, ctx, op):
        if op.stage == OperandStage.map:
            return cls._execute_map(ctx, op)
        elif op.stage == OperandStage.combine:
            return cls._execute_combine(ctx, op)
        else:
            assert op.stage == OperandStage.agg
            return cls._execute_agg(ctx, op)


def check_non_negative_then_return_value(to_check, value, whom):
    op = CheckNonNegative(input=to_check, value=value, whom=whom)
    return op(to_check, value)


class AssertAllFinite(LearnOperand, LearnOperandMixin):
    _op_type_ = OperandDef.ASSERT_ALL_FINITE

    _x = KeyField('x')
    _allow_nan = BoolField('allow_nan')
    _msg_dtype = DataTypeField('msg_dtype')
    _check_only = BoolField('check_only')
    # chunks
    _is_finite = KeyField('is_finite')
    _check_nan = KeyField('check_nan')

    def __init__(self, x=None, allow_nan=None, msg_dtype=None,
                 check_only=None, is_finite=None, check_nan=None,
                 output_types=None, **kw):
        super().__init__(_x=x, _allow_nan=allow_nan, _msg_dtype=msg_dtype,
                         _check_only=check_only, _is_finite=is_finite,
                         _check_nan=check_nan, _output_types=output_types, **kw)

    @property
    def x(self):
        return self._x

    @property
    def allow_nan(self):
        return self._allow_nan

    @property
    def msg_dtype(self):
        return self._msg_dtype

    @property
    def check_only(self):
        return self._check_only

    @property
    def is_finite(self):
        return self._is_finite

    @property
    def check_nan(self):
        return self._check_nan

    def _set_inputs(self, inputs):
        super()._set_inputs(inputs)
        inputs_iter = iter(self._inputs)
        for attr in ('_x', '_is_finite', '_check_nan'):
            if getattr(self, attr) is not None:
                setattr(self, attr, next(inputs_iter))

    @classmethod
    def _assume_finite(cls):
        assume_finite = options.learn.assume_finite
        if assume_finite is None and get_sklearn_config is not None:
            # get config from scikit-learn
            assume_finite = get_sklearn_config()['assume_finite']
        if assume_finite is None:  # pragma: no cover
            assume_finite = False

        return assume_finite

    def __call__(self, x):
        if self._assume_finite():
            # skip check
            if self._check_only:
                return
            else:
                return x

        if self._check_only:
            return self.new_tileable([x], dtype=np.dtype(bool),
                                     shape=(), order=TensorOrder.C_ORDER)
        else:
            return self.new_tileable([x], kws=[x.params])

    @classmethod
    def tile(cls, op):
        from .extmath import _safe_accumulator_op

        x = op.x
        out = op.outputs[0]
        is_float = x.dtype.kind in 'fc'
        combine_size = options.combine_size

        is_finite_chunk = check_nan_chunk = None
        if is_float:
            is_finite_chunk = (yield from recursive_tile(
                mt.isfinite(_safe_accumulator_op(mt.sum, x)))).chunks[0]
        elif x.dtype == np.dtype(object) and not op.allow_nan:
            check_nan_chunk = (yield from recursive_tile(
                (x != x).any())).chunks[0]

        map_chunks = []
        for c in x.chunks:
            chunk_op = op.copy().reset_key()
            chunk_op.stage = OperandStage.map
            chunk_op._is_finite = is_finite_chunk
            chunk_op._check_nan = check_nan_chunk
            chunk_inputs = [c]
            if is_finite_chunk is not None:
                chunk_inputs.append(is_finite_chunk)
            if check_nan_chunk is not None:
                chunk_inputs.append(check_nan_chunk)
            chunk_params = c.params
            if op.check_only:
                chunk_params['dtype'] = np.dtype(bool)
                chunk_params['shape'] = ()
                if len(x.chunks) == 1:
                    chunk_params['index'] = ()
            map_chunk = chunk_op.new_chunk(chunk_inputs, kws=[chunk_params])
            map_chunks.append(map_chunk)

        new_op = op.copy()
        if not op.check_only:
            params = out.params
            params['nsplits'] = x.nsplits
            params['chunks'] = map_chunks
            return new_op.new_tileables(op.inputs, kws=[params])

        out_chunks = map_chunks
        # if check only, we use tree reduction to aggregate to one chunk
        while len(out_chunks) > 1:
            size = ceildiv(len(out_chunks), combine_size)
            new_out_chunks = []
            for i in range(size):
                chunk_op = AssertAllFinite(
                    check_only=True, output_types=op.output_types,
                    stage=OperandStage.combine if size > 1 else OperandStage.agg)
                chunk_index = (i,) if size > 1 else ()
                out_chunk = chunk_op.new_chunk(
                    out_chunks[combine_size * i: combine_size * (i + 1)],
                    dtype=out.dtype, shape=(), index=chunk_index, order=out.order)
                new_out_chunks.append(out_chunk)
            out_chunks = new_out_chunks

        params = out.params
        params['nsplits'] = ()
        params['chunks'] = out_chunks
        return new_op.new_tileables(op.inputs, kws=[params])

    @classmethod
    def _execute_map(cls, ctx, op):
        allow_nan = op.allow_nan
        msg_dtype = op.msg_dtype
        raw = x = ctx[op.x.key]
        xp = get_array_module(x, nosparse=True)

        if issparse(x):
            x = x.data
        # First try an O(n) time, O(1) space solution for the common case that
        # everything is finite; fall back to O(n) space np.isfinite to prevent
        # false positives from overflow in sum method. The sum is also calculated
        # safely to reduce dtype induced overflows.
        is_float = x.dtype.kind in 'fc'
        if is_float and ctx[op.is_finite.key]:
            pass
        elif is_float:
            msg_err = "Input contains {} or a value too large for {!r}."
            if (allow_nan and xp.isinf(x).any() or
                    not allow_nan and not xp.isfinite(x).all()):
                type_err = 'infinity' if allow_nan else 'NaN, infinity'
                raise ValueError(
                    msg_err.format
                    (type_err,
                     msg_dtype if msg_dtype is not None else x.dtype)
                )
        # for object dtype data, we only check for NaNs
        elif x.dtype == np.dtype(object) and not allow_nan:
            if ctx[op.check_nan.key]:
                raise ValueError("Input contains NaN")

        if op.check_only:
            result = np.array(True)
        else:
            result = raw
        ctx[op.outputs[0].key] = result

    @classmethod
    def _execute_combine_reduce(cls, ctx, op):
        # just return True
        ctx[op.outputs[0].key] = np.array(True)

    @classmethod
    def execute(cls, ctx, op):
        if op.stage == OperandStage.map:
            return cls._execute_map(ctx, op)
        else:
            assert op.stage in (OperandStage.combine, OperandStage.agg)
            return cls._execute_combine_reduce(ctx, op)


[docs]def assert_all_finite(X, allow_nan=False, msg_dtype=None, check_only=True): if not isinstance(X, ENTITY_TYPE): X = mt.asarray(X) if isinstance(X.op, AssertAllFinite) and X.op.allow_nan == allow_nan and \ X.op.msg_dtype == msg_dtype and X.op.check_only == check_only: return X if check_only: output_types = [OutputType.tensor] sparse = False else: output_types = get_output_types(X) sparse = X.issparse() op = AssertAllFinite(x=X, allow_nan=allow_nan, msg_dtype=msg_dtype, check_only=check_only, sparse=sparse, output_types=output_types) return op(X)