###
# (C) Copyright [2019] Hewlett Packard Enterprise Development LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##
"""This module maintains communication with SimplVity."""
import http.client
from base64 import b64encode
import json
import logging
import ssl
import urllib
import traceback
from simplivity import exceptions
logger = logging.getLogger(__name__)
[docs]class Connection(object):
"""Helps to make connection with the OVC and do rest calls."""
def __init__(self, ovc_ip, ssl_bundle=False, timeout=None):
"""Initialize Connection class"""
self._ovc_ip = ovc_ip
self._timeout = timeout
self._ssl_trusted_bundle = ssl_bundle
self._ssl_trust_all = False if ssl_bundle else True
self._username = None
self._password = None
self._access_token = None
self._headers = {'Accept': 'application/json'}
self._base_url = "https://{}/api".format(ovc_ip)
[docs] def do_http(self, method, path, body, custom_headers=None, login=False):
"""Makes http calls.
Args:
method: HTTP methods (GET, POST, PUT, DELETE).
path: URL
body: Request body.
custom_headers: Custom headers to update/append default headers.
login: True if the call is for login and get the token.
Returns:
tuple: Tuple with two members (HTTP response object and the response body in json).
"""
http_headers = self._headers.copy()
full_path = "{}{}".format(self._base_url, path)
if login:
user_pass = b64encode(b"simplivity:").decode("ascii")
http_headers.update({'Content-type': 'application/x-www-form-urlencoded',
'Authorization': 'Basic %s' % user_pass})
else:
if not self._access_token:
raise exceptions.HPESimpliVityException("There is no active session, please login")
http_headers['Content-type'] = 'application/vnd.simplivity.v1.8+json'
http_headers['Authorization'] = "Bearer " + self._access_token
# Updates default headers with the custom headers
if custom_headers:
http_headers.update(custom_headers)
json_body = None
try:
connection = self.get_connection()
connection.request(method, full_path, body, http_headers)
resp = connection.getresponse()
resp_body = resp.read()
connection.close()
if resp_body:
json_body = json.loads(resp_body.decode('utf-8'))
except http.client.HTTPException:
raise exceptions.HPESimpliVityException(traceback.format_exc())
# Obtain a new token, if the Simplivity Product returns an invalid token error.
if json_body and 'error' in json_body and json_body['error'] == 'invalid_token':
self.login(self._username, self._password)
resp, json_body = self.do_http(method, path, body, custom_headers)
return resp, json_body
[docs] def get_connection(self):
"""Makes connection with the OVC.
Returns:
HTTPSConnection object
"""
context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
if self._ssl_trust_all is False:
context.verify_mode = ssl.CERT_REQUIRED
context.load_verify_locations(self._ssl_trusted_bundle)
conn = http.client.HTTPSConnection(self._ovc_ip,
context=context,
timeout=self._timeout)
else:
context.verify_mode = ssl.CERT_NONE
conn = http.client.HTTPSConnection(self._ovc_ip,
context=context,
timeout=self._timeout)
return conn
[docs] def get(self, url):
"""Calls get http method.
Args:
url: Resource URL
Returns:
tuple: Tuple with two members (HTTP response object and the response body in json).
Raises:
HPESimpliVityException: if the response status is 400 and above
"""
resp, body = self.do_http('GET', url, '')
if resp.status >= 400:
raise exceptions.HPESimpliVityException(body)
return body
[docs] def post(self, uri, body, custom_headers=None):
"""Calls post http method.
Args:
uri: Resource URI.
body: Request body.
custom_headers: Custome headers to update/append default headers.
Returns:
dict: Response body
"""
return self.__do_rest_call('POST', uri, body, custom_headers=custom_headers)
[docs] def put(self, uri, body, custom_headers=None):
"""Calls put http method.
Args:
uri: Resource URI.
body: Request body.
custom_headers: Custome headers to update/append default headers.
Returns:
tuple: Tuple with two members (HTTP response object and the response body in json).
"""
return self.__do_rest_call('PUT', uri, body, custom_headers=custom_headers)
[docs] def delete(self, uri, custom_headers=None):
"""Calls delete http method.
Args:
uri: Resource URI.
custom_headers: Custom headers to appened/update default headers.
Returns:
tuple: Tuple with two members (HTTP response object and the response body in json).
"""
return self.__do_rest_call('DELETE', uri, {}, custom_headers=custom_headers)
def __body_content_is_task(self, body):
"""Check to find task in response body
Args:
body: Response body of a rest call
Returns:
boolean: Returns True if the body is task data or False
"""
return isinstance(body, dict) and 'task' in body
def __do_rest_call(self, http_method, url, body, custom_headers):
"""Calls do_http method and handles the http status code.
Args:
http_method: HTTP method (GET, POST, PUT and DELETE)
url: Resource URL
body: Request body
custom_headers: Headers to appened/update default headers
Returns:
tuple: A tuple of two elements (task and response body)
Raises:
HPESimpliVityException: if the response status code is 401/403/404
"""
resp, body = self.do_http(method=http_method,
path=url,
body=json.dumps(body),
custom_headers=custom_headers)
if resp.status in [400, 401, 403, 404]:
raise exceptions.HPESimpliVityException(body)
if self.__body_content_is_task(body):
return body, body
return None, body
[docs] def login(self, username, password):
"""Login using OVC username and password.
Args:
username: OVC username
password: OVC password
Returns:
boolean: Returns True if login is successfull.
"""
login_url = "/oauth/token"
data = {'grant_type': 'password',
'username': username,
'password': password}
resp, body = self.do_http('POST', login_url, body=urllib.parse.urlencode(data), login=True)
try:
self._access_token = body["access_token"]
logger.info('Logged in successfully')
except KeyError:
raise exceptions.HPESimpliVityAuthenticationError("Invalid credentials")
# Save the username and password for refreshing the connection
self._username = username
self._password = password
return True
[docs] def logout(self):
"""Removes the access token.
Returns:
boolean: Returns True
"""
self._access_token = None
logger.info('Logged out successfully')
return True