# Copyright 1999-2021 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import itertools
import numpy as np
from ... import opcodes as OperandDef
from ...core.operand import OperandStage
from ...serialization.serializables import KeyField, StringField, \
AnyField, Int64Field, Int32Field
from ...config import options
from ...utils import has_unknown_shape
from ..operands import TensorOperand, TensorOperandMixin
from ..core import TENSOR_TYPE, TensorOrder
from ..datasource.array import tensor as astensor
from ..array_utils import as_same_device, device
class TensorSearchsorted(TensorOperand, TensorOperandMixin):
_op_type_ = OperandDef.SEARCHSORTED
_input = KeyField('input')
_values = AnyField('values')
_side = StringField('side')
_combine_size = Int32Field('combine_size')
# offset is used only for map stage
_offset = Int64Field('offset')
def __init__(self, values=None, side=None, combine_size=None, offset=None, **kw):
super().__init__(_values=values, _side=side, _combine_size=combine_size,
_offset=offset, **kw)
def _set_inputs(self, inputs):
super()._set_inputs(inputs)
self._input = self._inputs[0]
if len(self._inputs) == 2:
self._values = self._inputs[1]
@property
def input(self):
return self._input
@property
def values(self):
return self._values
@property
def side(self):
return self._side
@property
def offset(self):
return self._offset
@property
def combine_size(self):
return self._combine_size
def __call__(self, a, v):
inputs = [a]
if isinstance(v, TENSOR_TYPE):
inputs.append(v)
shape = v.shape
else:
shape = ()
return self.new_tensor(inputs, shape=shape, order=TensorOrder.C_ORDER)
@classmethod
def _tile_one_chunk(cls, op, a, v, out):
chunks = []
if len(op.inputs) == 1:
v_chunks = [v]
else:
v_chunks = v.chunks
for v_chunk in v_chunks:
chunk_op = op.copy().reset_key()
in_chunks = [a.chunks[0]]
if len(op.inputs) == 2:
in_chunks.append(v_chunk)
v_shape = v_chunk.shape if hasattr(v_chunk, 'shape') else ()
chunk_idx = v_chunk.index if len(op.inputs) == 2 else (0,)
chunk = chunk_op.new_chunk(in_chunks, shape=v_shape,
index=chunk_idx, order=out.order)
chunks.append(chunk)
new_op = op.copy().reset_key()
nsplits = ((s,) for s in out.shape) if len(op.inputs) == 1 else v.nsplits
return new_op.new_tensors(op.inputs, out.shape,
chunks=chunks, nsplits=nsplits)
@classmethod
def _combine_chunks(cls, to_combine, op, stage, v, idx):
from ..merge import TensorStack
v_shape = v.shape if hasattr(v, 'shape') else ()
combine_op = TensorStack(axis=0, dtype=op.outputs[0].dtype)
combine_chunk = combine_op.new_chunk(to_combine, shape=v_shape)
chunk_op = op.copy().reset_key()
chunk_op.stage = stage
in_chunks = [combine_chunk]
if len(op.inputs) == 2:
in_chunks.append(v)
return chunk_op.new_chunk(in_chunks, shape=v_shape, index=idx,
order=op.outputs[0].order)
@classmethod
def _tile_tree_reduction(cls, op, a, v, out):
if has_unknown_shape(*op.inputs):
yield
combine_size = op.combine_size or options.combine_size
input_len = len(op.inputs)
v_chunks = [v] if input_len == 1 else v.chunks
out_chunks = []
for v_chunk in v_chunks:
offsets = [0] + np.cumsum(a.nsplits[0]).tolist()[:-1]
v_shape = v_chunk.shape if hasattr(v_chunk, 'shape') else ()
v_index = v_chunk.index if hasattr(v_chunk, 'index') else (0,)
chunks = []
for i, c in enumerate(a.chunks):
chunk_op = op.copy().reset_key()
chunk_op.stage = OperandStage.map
chunk_op._offset = offsets[i]
in_chunks = [c]
if input_len == 2:
in_chunks.append(v_chunk)
chunks.append(chunk_op.new_chunk(in_chunks, shape=v_shape,
index=c.index, order=out.order))
while len(chunks) > combine_size:
new_chunks = []
it = itertools.count(0)
while True:
j = next(it)
to_combine = chunks[j * combine_size: (j + 1) * combine_size]
if len(to_combine) == 0:
break
new_chunks.append(
cls._combine_chunks(to_combine, op, OperandStage.combine, v_chunk, (j,)))
chunks = new_chunks
chunk = cls._combine_chunks(chunks, op, OperandStage.reduce, v_chunk, v_index)
out_chunks.append(chunk)
new_op = op.copy().reset_key()
nsplits = ((s,) for s in out.shape) if len(op.inputs) == 1 else v.nsplits
return new_op.new_tensors(op.inputs, out.shape,
chunks=out_chunks, nsplits=nsplits)
@classmethod
def tile(cls, op):
a = op.inputs[0]
out = op.outputs[0]
input_len = len(op.inputs)
if input_len == 1:
v = op.values
else:
v = op.inputs[1]
if len(a.chunks) == 1:
return cls._tile_one_chunk(op, a, v, out)
return (yield from cls._tile_tree_reduction(op, a, v, out))
@classmethod
def _execute_without_stage(cls, xp, a, v, op):
return xp.searchsorted(a, v, side=op.side)
@classmethod
def _execute_map(cls, xp, a, v, op):
# in the map phase, calculate the indices and positions
# for instance, a=[1, 4, 6], v=5, return will be (2, 6)
indices = xp.atleast_1d(xp.searchsorted(a, v, side=op.side))
data_indices = indices.copy()
# if the value is larger than all data
# for instance, a=[1, 4, 6], v=7
# return will be (2, 6), not (3, 6), thus needs to subtract 1
data_indices = xp.subtract(data_indices, 1, out=data_indices,
where=data_indices >= len(a))
data = a[data_indices]
if op.offset > 0:
indices = xp.add(indices, op.offset, out=indices)
if np.isscalar(v):
indices, data = indices[0], data[0]
return indices, data
@classmethod
def _execute_combine(cls, xp, a, v, op):
inp_indices, inp_data = a
if np.isscalar(v):
ind = xp.searchsorted(inp_data, v, side=op.side)
if ind >= len(inp_data):
ind -= 1
return inp_indices[ind], inp_data[ind]
else:
ret_indices = np.empty(v.shape, dtype=np.intp)
ret_data = np.empty(v.shape, dtype=inp_data.dtype)
for idx in itertools.product(*(range(s) for s in v.shape)):
ind = xp.searchsorted(inp_data[(slice(None),) + idx], v[idx], side=op.side)
if ind >= len(inp_indices):
ind -= 1
ret_indices[idx] = inp_indices[(ind,) + idx]
ret_data[idx] = inp_data[(ind,) + idx]
return ret_indices, ret_data
@classmethod
def _execute_reduce(cls, xp, a, v, op):
inp_indices, inp_data = a
if np.isscalar(v):
ind = xp.searchsorted(inp_data, v, side=op.side)
if ind >= len(inp_indices):
ind -= 1
return inp_indices[ind]
else:
indices = np.empty(v.shape, dtype=np.intp)
for idx in itertools.product(*(range(s) for s in v.shape)):
ind = xp.searchsorted(inp_data[(slice(None),) + idx], v[idx], side=op.side)
if ind >= len(inp_indices):
ind -= 1
indices[idx] = inp_indices[(ind,) + idx]
return indices
@classmethod
def execute(cls, ctx, op):
a = ctx[op.inputs[0].key]
v = ctx[op.inputs[1].key] if len(op.inputs) == 2 else op.values
data = []
if isinstance(a, tuple):
data.extend(a)
else:
data.append(a)
if len(op.inputs) == 2:
data.append(v)
data, device_id, xp = as_same_device(
data, device=op.device, ret_extra=True)
if isinstance(a, tuple):
a = data[:2]
else:
a = data[0]
if len(op.inputs) == 2:
v = data[-1]
with device(device_id):
if op.stage is None:
ret = cls._execute_without_stage(xp, a, v, op)
elif op.stage == OperandStage.map:
ret = cls._execute_map(xp, a, v, op)
elif op.stage == OperandStage.combine:
ret = cls._execute_combine(xp, a, v, op)
else:
ret = cls._execute_reduce(xp, a, v, op)
ctx[op.outputs[0].key] = ret
[docs]def searchsorted(a, v, side='left', sorter=None, combine_size=None):
"""
Find indices where elements should be inserted to maintain order.
Find the indices into a sorted tensor `a` such that, if the
corresponding elements in `v` were inserted before the indices, the
order of `a` would be preserved.
Assuming that `a` is sorted:
====== ============================
`side` returned index `i` satisfies
====== ============================
left ``a[i-1] < v <= a[i]``
right ``a[i-1] <= v < a[i]``
====== ============================
Parameters
----------
a : 1-D array_like
Input tensor. If `sorter` is None, then it must be sorted in
ascending order, otherwise `sorter` must be an array of indices
that sort it.
v : array_like
Values to insert into `a`.
side : {'left', 'right'}, optional
If 'left', the index of the first suitable location found is given.
If 'right', return the last such index. If there is no suitable
index, return either 0 or N (where N is the length of `a`).
sorter : 1-D array_like, optional
Optional tensor of integer indices that sort array a into ascending
order. They are typically the result of argsort.
combine_size: int, optional
The number of chunks to combine.
Returns
-------
indices : tensor of ints
Array of insertion points with the same shape as `v`.
See Also
--------
sort : Return a sorted copy of a tensor.
histogram : Produce histogram from 1-D data.
Notes
-----
Binary search is used to find the required insertion points.
This function is a faster version of the builtin python `bisect.bisect_left`
(``side='left'``) and `bisect.bisect_right` (``side='right'``) functions,
which is also vectorized in the `v` argument.
Examples
--------
>>> import mars.tensor as mt
>>> mt.searchsorted([1,2,3,4,5], 3).execute()
2
>>> mt.searchsorted([1,2,3,4,5], 3, side='right').execute()
3
>>> mt.searchsorted([1,2,3,4,5], [-10, 10, 2, 3]).execute()
array([0, 5, 1, 2])
"""
if not isinstance(a, TENSOR_TYPE) and sorter is not None and \
not isinstance(sorter, TENSOR_TYPE):
a = astensor(np.asarray(a)[sorter])
else:
a = astensor(a)
if sorter is not None:
a = a[sorter]
if a.ndim != 1:
raise ValueError('`a` should be 1-d tensor')
if a.issparse():
# does not support sparse tensor
raise ValueError('`a` should be a dense tensor')
if side not in {'left', 'right'}:
raise ValueError(f"'{side}' is an invalid value for keyword 'side'")
if not np.isscalar(v):
v = astensor(v)
op = TensorSearchsorted(values=v, side=side, dtype=np.dtype(np.intp),
combine_size=combine_size)
return op(a, v)