Source code for hpeOneView.resources.task_monitor

# -*- coding: utf-8 -*-
###
# (C) Copyright [2020] Hewlett Packard Enterprise Development LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals

from future import standard_library

standard_library.install_aliases()


import logging
import time

from errno import ECONNABORTED, ETIMEDOUT, ENOEXEC, EINVAL, ENETUNREACH, ECONNRESET, ENETDOWN, ECONNREFUSED
from hpeOneView.exceptions import HPEOneViewInvalidResource, HPEOneViewTimeout, HPEOneViewTaskError, HPEOneViewUnknownType

TASK_PENDING_STATES = ['New', 'Starting', 'Pending', 'Running', 'Suspended', 'Stopping']
TASK_ERROR_STATES = ['Error', 'Warning', 'Terminated', 'Killed']
TASK_COMPLETED_STATES = ['Error', 'Warning', 'Completed', 'Terminated', 'Killed']

MSG_UNKNOWN_OBJECT_TYPE = 'Unknown object type'
MSG_TASK_TYPE_UNRECONIZED = "Task type: '%s' resource is not a recognized version"
MSG_UNKNOWN_EXCEPTION = 'Unknown Exception'
MSG_TIMEOUT = 'Waited %s seconds for task to complete, aborting'
MSG_INVALID_TASK = 'Invalid task was provided'

UNLIMITED_TIMEOUT = -1

logger = logging.getLogger(__name__)


[docs]class TaskMonitor(object): # Seconds to wait when a network failure occurs CONNECTION_FAILURE_TIMEOUT = 90 # Known error numbers when the connection drops CONNECTION_FAILURE_ERROR_NUMBERS = [ENOEXEC, EINVAL, ENETUNREACH, ETIMEDOUT, ECONNRESET, ECONNABORTED, ENETUNREACH, ENETDOWN, ECONNREFUSED] def __init__(self, con): self._connection = con
[docs] @staticmethod def get_current_seconds(): return int(time.time())
[docs] def wait_for_task(self, task, timeout=-1): """ Wait for task execution and return associated resource. Args: task: task dict timeout: timeout in seconds Returns: Associated resource when creating or updating; True when deleting. """ self.__wait_task_completion(task, timeout) task = self.get(task) logger.debug("Waiting for task. Percentage complete: " + str(task.get('computedPercentComplete'))) logger.debug("Waiting for task. Task state: " + str(task.get('taskState'))) task_response = self.__get_task_response(task) logger.debug('Task completed') return task_response
[docs] def get_completed_task(self, task, timeout=-1): """ Waits until the task is completed and returns the task resource. Args: task: TaskResource timeout: Timeout in seconds Returns: dict: TaskResource """ self.__wait_task_completion(task, timeout) return self.get(task)
def __wait_task_completion(self, task, timeout): if not task: raise HPEOneViewUnknownType(MSG_INVALID_TASK) logger.debug('Waiting for task completion...') # gets current cpu second for timeout start_time = self.get_current_seconds() connection_failure_control = dict(last_success=self.get_current_seconds()) i = 0 while self.is_task_running(task, connection_failure_control): # wait 1 to 10 seconds # the value increases to avoid flooding server with requests i = i + 1 if i < 10 else 10 logger.debug("Waiting for task. Percentage complete: " + str(task.get('computedPercentComplete'))) logger.debug("Waiting for task. Task state: " + str(task.get('taskState'))) time.sleep(i) if (timeout != UNLIMITED_TIMEOUT) and (start_time + timeout < self.get_current_seconds()): raise HPEOneViewTimeout(MSG_TIMEOUT % str(timeout)) def __get_task_response(self, task): deleted_states = ['Delete', 'Remove', 'Delete server hardware type', 'Remove SAN manager', 'Delete hypervisor cluster profile.'] if task['taskState'] in TASK_ERROR_STATES and task['taskState'] != 'Warning': msg = None error_code = None if 'taskErrors' in task and len(task['taskErrors']) > 0: err = task['taskErrors'][0] if 'message' in err: msg = err['message'] error_code = err.get('errorCode') if msg: raise HPEOneViewTaskError(msg, error_code) elif 'taskStatus' in task and task['taskStatus']: raise HPEOneViewTaskError(task['taskStatus'], error_code) else: raise HPEOneViewTaskError(MSG_UNKNOWN_EXCEPTION, error_code) if 'name' in task and task['name'] in deleted_states: return True if 'type' in task and task['type'].startswith('Task'): # get associated resource when is not a delete task task, entity = self.get_associated_resource(task) return entity logger.warning('Task completed, unknown response: ' + str(task)) return task
[docs] def is_task_running(self, task, connection_failure_control=None): """ Check if a task is running according to: TASK_PENDING_STATES ['New', 'Starting', 'Pending', 'Running', 'Suspended', 'Stopping'] Args: task (dict): OneView Task resource. connection_failure_control (dict): A dictionary instance that contains last_success for error tolerance control. Examples: >>> connection_failure_control = dict(last_success=int(time.time())) >>> while self.is_task_running(task, connection_failure_control): >>> pass Returns: True when in TASK_PENDING_STATES; False when not. """ if 'uri' in task: try: task = self.get(task) if connection_failure_control: # Updates last success connection_failure_control['last_success'] = self.get_current_seconds() if 'taskState' in task and task['taskState'] in TASK_PENDING_STATES: return True except Exception as error: logger.error('; '.join(str(e) for e in error.args) + ' when waiting for the task: ' + str(task)) if not connection_failure_control: raise error if hasattr(error, 'errno') and error.errno in self.CONNECTION_FAILURE_ERROR_NUMBERS: last_success = connection_failure_control['last_success'] if last_success + self.CONNECTION_FAILURE_TIMEOUT < self.get_current_seconds(): # Timeout reached raise error else: # Return task is running when network instability occurs return True else: raise error return False
[docs] def get(self, task): """ Retrieve a task by its uri. Args: task: task dict, must have 'uri' key. Returns: task dict """ task = self._connection.get(task['uri']) return task
[docs] def get_associated_resource(self, task): """ Retrieve a resource associated with a task. Args: task: task dict Returns: tuple: task (updated), the entity found (dict) """ if not task: raise HPEOneViewUnknownType(MSG_INVALID_TASK) if task['category'] not in ['tasks', 'backups']: # it is an error if type is not in obj, so let the except flow raise HPEOneViewUnknownType(MSG_UNKNOWN_OBJECT_TYPE) if task['type'] in ['TaskResourceV2', 'TaskResourceV3']: resource_uri = task['associatedResource']['resourceUri'] if resource_uri and resource_uri.startswith("/rest/appliance/support-dumps/"): # Specific for support dumps return task, resource_uri elif task['type'] == 'BACKUP': task = self._connection.get(task['taskUri']) resource_uri = task['uri'] else: raise HPEOneViewInvalidResource(MSG_TASK_TYPE_UNRECONIZED % task['type']) entity = {} if resource_uri: entity = self._connection.get(resource_uri) return task, entity