Source code for mars.tensor.base.searchsorted

# Copyright 1999-2021 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import itertools

import numpy as np

from ... import opcodes as OperandDef
from ...core.operand import OperandStage
from ...serialization.serializables import KeyField, StringField, \
    AnyField, Int64Field, Int32Field
from ...config import options
from ...utils import has_unknown_shape
from ..operands import TensorOperand, TensorOperandMixin
from ..core import TENSOR_TYPE, TensorOrder
from ..datasource.array import tensor as astensor
from ..array_utils import as_same_device, device


class TensorSearchsorted(TensorOperand, TensorOperandMixin):
    _op_type_ = OperandDef.SEARCHSORTED

    _input = KeyField('input')
    _values = AnyField('values')
    _side = StringField('side')
    _combine_size = Int32Field('combine_size')
    # offset is used only for map stage
    _offset = Int64Field('offset')

    def __init__(self, values=None, side=None, combine_size=None, offset=None, **kw):
        super().__init__(_values=values, _side=side, _combine_size=combine_size,
                         _offset=offset, **kw)

    def _set_inputs(self, inputs):
        super()._set_inputs(inputs)
        self._input = self._inputs[0]
        if len(self._inputs) == 2:
            self._values = self._inputs[1]

    @property
    def input(self):
        return self._input

    @property
    def values(self):
        return self._values

    @property
    def side(self):
        return self._side

    @property
    def offset(self):
        return self._offset

    @property
    def combine_size(self):
        return self._combine_size

    def __call__(self, a, v):
        inputs = [a]
        if isinstance(v, TENSOR_TYPE):
            inputs.append(v)
            shape = v.shape
        else:
            shape = ()
        return self.new_tensor(inputs, shape=shape, order=TensorOrder.C_ORDER)

    @classmethod
    def _tile_one_chunk(cls, op, a, v, out):
        chunks = []
        if len(op.inputs) == 1:
            v_chunks = [v]
        else:
            v_chunks = v.chunks
        for v_chunk in v_chunks:
            chunk_op = op.copy().reset_key()
            in_chunks = [a.chunks[0]]
            if len(op.inputs) == 2:
                in_chunks.append(v_chunk)
            v_shape = v_chunk.shape if hasattr(v_chunk, 'shape') else ()
            chunk_idx = v_chunk.index if len(op.inputs) == 2 else (0,)
            chunk = chunk_op.new_chunk(in_chunks, shape=v_shape,
                                       index=chunk_idx, order=out.order)
            chunks.append(chunk)
        new_op = op.copy().reset_key()
        nsplits = ((s,) for s in out.shape) if len(op.inputs) == 1 else v.nsplits
        return new_op.new_tensors(op.inputs, out.shape,
                                  chunks=chunks, nsplits=nsplits)

    @classmethod
    def _combine_chunks(cls, to_combine, op, stage, v, idx):
        from ..merge import TensorStack

        v_shape = v.shape if hasattr(v, 'shape') else ()
        combine_op = TensorStack(axis=0, dtype=op.outputs[0].dtype)
        combine_chunk = combine_op.new_chunk(to_combine, shape=v_shape)
        chunk_op = op.copy().reset_key()
        chunk_op.stage = stage
        in_chunks = [combine_chunk]
        if len(op.inputs) == 2:
            in_chunks.append(v)
        return chunk_op.new_chunk(in_chunks, shape=v_shape, index=idx,
                                  order=op.outputs[0].order)

    @classmethod
    def _tile_tree_reduction(cls, op, a, v, out):
        if has_unknown_shape(*op.inputs):
            yield

        combine_size = op.combine_size or options.combine_size
        input_len = len(op.inputs)
        v_chunks = [v] if input_len == 1 else v.chunks

        out_chunks = []
        for v_chunk in v_chunks:
            offsets = [0] + np.cumsum(a.nsplits[0]).tolist()[:-1]
            v_shape = v_chunk.shape if hasattr(v_chunk, 'shape') else ()
            v_index = v_chunk.index if hasattr(v_chunk, 'index') else (0,)
            chunks = []
            for i, c in enumerate(a.chunks):
                chunk_op = op.copy().reset_key()
                chunk_op.stage = OperandStage.map
                chunk_op._offset = offsets[i]
                in_chunks = [c]
                if input_len == 2:
                    in_chunks.append(v_chunk)
                chunks.append(chunk_op.new_chunk(in_chunks, shape=v_shape,
                                                 index=c.index, order=out.order))

            while len(chunks) > combine_size:
                new_chunks = []
                it = itertools.count(0)
                while True:
                    j = next(it)
                    to_combine = chunks[j * combine_size: (j + 1) * combine_size]
                    if len(to_combine) == 0:
                        break

                    new_chunks.append(
                        cls._combine_chunks(to_combine, op, OperandStage.combine, v_chunk, (j,)))
                chunks = new_chunks

            chunk = cls._combine_chunks(chunks, op, OperandStage.reduce, v_chunk, v_index)
            out_chunks.append(chunk)

        new_op = op.copy().reset_key()
        nsplits = ((s,) for s in out.shape) if len(op.inputs) == 1 else v.nsplits
        return new_op.new_tensors(op.inputs, out.shape,
                                  chunks=out_chunks, nsplits=nsplits)

    @classmethod
    def tile(cls, op):
        a = op.inputs[0]
        out = op.outputs[0]
        input_len = len(op.inputs)
        if input_len == 1:
            v = op.values
        else:
            v = op.inputs[1]

        if len(a.chunks) == 1:
            return cls._tile_one_chunk(op, a, v, out)
        return (yield from cls._tile_tree_reduction(op, a, v, out))

    @classmethod
    def _execute_without_stage(cls, xp, a, v, op):
        return xp.searchsorted(a, v, side=op.side)

    @classmethod
    def _execute_map(cls, xp, a, v, op):
        # in the map phase, calculate the indices and positions
        # for instance, a=[1, 4, 6], v=5, return will be (2, 6)
        indices = xp.atleast_1d(xp.searchsorted(a, v, side=op.side))
        data_indices = indices.copy()
        # if the value is larger than all data
        # for instance, a=[1, 4, 6], v=7
        # return will be (2, 6), not (3, 6), thus needs to subtract 1
        data_indices = xp.subtract(data_indices, 1, out=data_indices,
                                   where=data_indices >= len(a))
        data = a[data_indices]
        if op.offset > 0:
            indices = xp.add(indices, op.offset, out=indices)
        if np.isscalar(v):
            indices, data = indices[0], data[0]

        return indices, data

    @classmethod
    def _execute_combine(cls, xp, a, v, op):
        inp_indices, inp_data = a
        if np.isscalar(v):
            ind = xp.searchsorted(inp_data, v, side=op.side)
            if ind >= len(inp_data):
                ind -= 1
            return inp_indices[ind], inp_data[ind]
        else:
            ret_indices = np.empty(v.shape, dtype=np.intp)
            ret_data = np.empty(v.shape, dtype=inp_data.dtype)
            for idx in itertools.product(*(range(s) for s in v.shape)):
                ind = xp.searchsorted(inp_data[(slice(None),) + idx], v[idx], side=op.side)
                if ind >= len(inp_indices):
                    ind -= 1
                ret_indices[idx] = inp_indices[(ind,) + idx]
                ret_data[idx] = inp_data[(ind,) + idx]
            return ret_indices, ret_data

    @classmethod
    def _execute_reduce(cls, xp, a, v, op):
        inp_indices, inp_data = a
        if np.isscalar(v):
            ind = xp.searchsorted(inp_data, v, side=op.side)
            if ind >= len(inp_indices):
                ind -= 1
            return inp_indices[ind]
        else:
            indices = np.empty(v.shape, dtype=np.intp)
            for idx in itertools.product(*(range(s) for s in v.shape)):
                ind = xp.searchsorted(inp_data[(slice(None),) + idx], v[idx], side=op.side)
                if ind >= len(inp_indices):
                    ind -= 1
                indices[idx] = inp_indices[(ind,) + idx]
            return indices

    @classmethod
    def execute(cls, ctx, op):
        a = ctx[op.inputs[0].key]
        v = ctx[op.inputs[1].key] if len(op.inputs) == 2 else op.values

        data = []
        if isinstance(a, tuple):
            data.extend(a)
        else:
            data.append(a)
        if len(op.inputs) == 2:
            data.append(v)

        data, device_id, xp = as_same_device(
            data, device=op.device, ret_extra=True)

        if isinstance(a, tuple):
            a = data[:2]
        else:
            a = data[0]
        if len(op.inputs) == 2:
            v = data[-1]

        with device(device_id):
            if op.stage is None:
                ret = cls._execute_without_stage(xp, a, v, op)
            elif op.stage == OperandStage.map:
                ret = cls._execute_map(xp, a, v, op)
            elif op.stage == OperandStage.combine:
                ret = cls._execute_combine(xp, a, v, op)
            else:
                ret = cls._execute_reduce(xp, a, v, op)
            ctx[op.outputs[0].key] = ret


[docs]def searchsorted(a, v, side='left', sorter=None, combine_size=None): """ Find indices where elements should be inserted to maintain order. Find the indices into a sorted tensor `a` such that, if the corresponding elements in `v` were inserted before the indices, the order of `a` would be preserved. Assuming that `a` is sorted: ====== ============================ `side` returned index `i` satisfies ====== ============================ left ``a[i-1] < v <= a[i]`` right ``a[i-1] <= v < a[i]`` ====== ============================ Parameters ---------- a : 1-D array_like Input tensor. If `sorter` is None, then it must be sorted in ascending order, otherwise `sorter` must be an array of indices that sort it. v : array_like Values to insert into `a`. side : {'left', 'right'}, optional If 'left', the index of the first suitable location found is given. If 'right', return the last such index. If there is no suitable index, return either 0 or N (where N is the length of `a`). sorter : 1-D array_like, optional Optional tensor of integer indices that sort array a into ascending order. They are typically the result of argsort. combine_size: int, optional The number of chunks to combine. Returns ------- indices : tensor of ints Array of insertion points with the same shape as `v`. See Also -------- sort : Return a sorted copy of a tensor. histogram : Produce histogram from 1-D data. Notes ----- Binary search is used to find the required insertion points. This function is a faster version of the builtin python `bisect.bisect_left` (``side='left'``) and `bisect.bisect_right` (``side='right'``) functions, which is also vectorized in the `v` argument. Examples -------- >>> import mars.tensor as mt >>> mt.searchsorted([1,2,3,4,5], 3).execute() 2 >>> mt.searchsorted([1,2,3,4,5], 3, side='right').execute() 3 >>> mt.searchsorted([1,2,3,4,5], [-10, 10, 2, 3]).execute() array([0, 5, 1, 2]) """ if not isinstance(a, TENSOR_TYPE) and sorter is not None and \ not isinstance(sorter, TENSOR_TYPE): a = astensor(np.asarray(a)[sorter]) else: a = astensor(a) if sorter is not None: a = a[sorter] if a.ndim != 1: raise ValueError('`a` should be 1-d tensor') if a.issparse(): # does not support sparse tensor raise ValueError('`a` should be a dense tensor') if side not in {'left', 'right'}: raise ValueError(f"'{side}' is an invalid value for keyword 'side'") if not np.isscalar(v): v = astensor(v) op = TensorSearchsorted(values=v, side=side, dtype=np.dtype(np.intp), combine_size=combine_size) return op(a, v)