# Copyright 1999-2021 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
from abc import ABC
from typing import Dict, List, Optional, Tuple, Type, TypeVar, Union
from ... import oscar as mo
from ...lib.aio import alru_cache
from ..subtask import Subtask
APIType = TypeVar('APIType', bound='SchedulingAPI')
[docs]class SchedulingAPI(ABC):
[docs] def __init__(self, session_id: str, address: str,
manager_ref=None, queueing_ref=None):
self._session_id = session_id
self._address = address
self._manager_ref = manager_ref
self._queueing_ref = queueing_ref
@classmethod
@alru_cache
async def create(cls: Type[APIType],
session_id: str,
address: str) -> APIType:
from .supervisor.manager import SubtaskManagerActor
manager_ref = await mo.actor_ref(
SubtaskManagerActor.gen_uid(session_id), address=address
)
from .supervisor.queueing import SubtaskQueueingActor
queueing_ref = await mo.actor_ref(
SubtaskQueueingActor.gen_uid(session_id), address=address
)
scheduling_api = SchedulingAPI(
session_id, address, manager_ref, queueing_ref)
return scheduling_api
@classmethod
async def create_session(cls: Type[APIType],
session_id: str,
address: str,
service_config: Optional[Dict] = None) -> APIType:
service_config = service_config or dict()
scheduling_config = service_config.get('scheduling', {})
from .supervisor.assigner import AssignerActor
assigner_coro = mo.create_actor(
AssignerActor, session_id, address=address,
uid=AssignerActor.gen_uid(session_id))
from .supervisor.queueing import SubtaskQueueingActor
queueing_coro = mo.create_actor(
SubtaskQueueingActor, session_id, scheduling_config.get('submit_period'),
address=address, uid=SubtaskQueueingActor.gen_uid(session_id))
_assigner_ref, queueing_ref = await asyncio.gather(assigner_coro, queueing_coro)
from .supervisor.manager import SubtaskManagerActor
manager_ref = await mo.create_actor(
SubtaskManagerActor, session_id, address=address,
uid=SubtaskManagerActor.gen_uid(session_id)
)
scheduling_api = SchedulingAPI(
session_id, address, manager_ref, queueing_ref)
return scheduling_api
@classmethod
async def destroy_session(cls,
session_id: str,
address: str):
from .supervisor.queueing import SubtaskQueueingActor
from .supervisor.manager import SubtaskManagerActor
from .supervisor.assigner import AssignerActor
destroy_tasks = []
for actor_cls in [SubtaskManagerActor, SubtaskQueueingActor, AssignerActor]:
ref = await mo.actor_ref(actor_cls.gen_uid(session_id), address=address)
destroy_tasks.append(asyncio.create_task(ref.destroy()))
await asyncio.gather(*destroy_tasks)
async def add_subtasks(self,
subtasks: List[Subtask],
priorities: Optional[List[Tuple]] = None):
"""
Submit subtasks into scheduling service
Parameters
----------
subtasks
list of subtasks to be submitted to service
priorities
list of priorities of subtasks
"""
if priorities is None:
priorities = [subtask.priority or tuple() for subtask in subtasks]
await self._manager_ref.add_subtasks(subtasks, priorities)
@mo.extensible
async def update_subtask_priority(self,
subtask_id: str,
priority: Tuple):
"""
Update priorities of subtasks
Parameters
----------
subtask_id
id of subtask to update priority
priority
list of priority of subtasks
"""
raise NotImplementedError
@update_subtask_priority.batch
async def update_subtask_priority(self, args_list, kwargs_list):
await self._queueing_ref.update_subtask_priority.batch(
*(self._queueing_ref.update_subtask_priority.delay(*args, **kwargs)
for args, kwargs in zip(args_list, kwargs_list)))
async def cancel_subtasks(self,
subtask_ids: List[str],
kill_timeout: Union[float, int] = 5):
"""
Cancel pending and running subtasks.
Parameters
----------
subtask_ids
ids of subtasks to cancel
kill_timeout
timeout seconds to kill actor process forcibly
"""
await self._manager_ref.cancel_subtasks(
subtask_ids, kill_timeout=kill_timeout)
async def finish_subtasks(self,
subtask_ids: List[str],
schedule_next: bool = True):
"""
Mark subtasks as finished, letting scheduling service to schedule
next tasks in the ready queue
Parameters
----------
subtask_ids
ids of subtasks to mark as finished
schedule_next
whether to schedule succeeding subtasks
"""
await self._manager_ref.finish_subtasks(subtask_ids, schedule_next)
class MockSchedulingAPI(SchedulingAPI):
@classmethod
async def create(cls: Type[APIType],
session_id: str,
address: str) -> APIType:
from .supervisor import GlobalSlotManagerActor
await mo.create_actor(GlobalSlotManagerActor,
uid=GlobalSlotManagerActor.default_uid(),
address=address)
from ... import resource as mars_resource
from .worker import SubtaskExecutionActor, \
WorkerSlotManagerActor, WorkerQuotaManagerActor
await mo.create_actor(SubtaskExecutionActor,
subtask_max_retries=0,
uid=SubtaskExecutionActor.default_uid(),
address=address)
await mo.create_actor(WorkerSlotManagerActor,
uid=WorkerSlotManagerActor.default_uid(),
address=address)
await mo.create_actor(WorkerQuotaManagerActor,
{'quota_size': mars_resource.virtual_memory().total},
uid=WorkerQuotaManagerActor.default_uid(),
address=address)
return await super().create_session(session_id, address)