Source code for worldline.connect.sdk.authentication.v1hmac_authenticator

import hashlib
import hmac
from base64 import b64encode
from operator import attrgetter
from re import sub

from authenticator import Authenticator

from worldline.connect.sdk.communication.request_header import RequestHeader


[docs]class V1HMACAuthenticator(Authenticator): """ Authenticator implementation using v1HMAC signatures. """
[docs] def __init__(self, api_key_id, secret_api_key): """ :param api_key_id: An identifier for the secret API key. The api_key_id can be retrieved from the Configuration Center. This identifier is visible in the HTTP request and is also used to identify the correct account. :param secret_api_key: A shared secret. The shared secret can be retrieved from the Configuration Center. An api_key_id and secret_api_key always go hand-in-hand, the difference is that secret_api_key is never visible in the HTTP request. This secret is used as input for the HMAC algorithm. """ Authenticator.__init__(self) if secret_api_key is None or not secret_api_key.strip(): raise ValueError("secret_api_key is required") if api_key_id is None or not api_key_id.strip(): raise ValueError("api_key_id is required") self.__api_key_id = api_key_id self.__secret_api_key = secret_api_key
[docs] def get_authorization(self, http_method, resource_uri, http_headers): """Returns a v1HMAC authentication signature header""" if http_method is None or not http_method.strip(): raise ValueError("http_method is required") if resource_uri is None: raise ValueError("resource_uri is required") data_to_sign = self.to_data_to_sign(http_method, resource_uri, http_headers) return "GCS v1HMAC:" + self.__api_key_id + ":" + self.create_authentication_signature(data_to_sign)
[docs] def to_data_to_sign(self, http_method, resource_uri, http_headers): content_type = None date = None canonicalized_headers = "" canonicalized_resource = self.__to_canonicalized_resource(resource_uri) xgcs_http_headers = [] if http_headers is not None: for http_header in http_headers: if "Content-Type".lower() == http_header.name.lower(): content_type = http_header.value elif "Date".lower() == http_header.name.lower(): date = http_header.value else: name = self.__to_canonicalize_header_name(http_header.name) if name.startswith("x-gcs"): value = self.to_canonicalize_header_value(http_header.value) xgcs_http_header = RequestHeader(name, value) xgcs_http_headers.append(xgcs_http_header) xgcs_http_headers.sort(key=attrgetter('name')) for xgcs_http_header in xgcs_http_headers: canonicalized_headers += xgcs_http_header.name + ":" + xgcs_http_header.value + "\n" string = http_method.upper() + "\n" if content_type is not None: string += content_type + "\n" else: string += "\n" string += date + "\n" string += str(canonicalized_headers) string += canonicalized_resource + "\n" return str(string)
@staticmethod def __to_canonicalized_resource(resource_uri): """ Returns the encoded URI path without the HTTP method and including all decoded query parameters. """ string = "" string += resource_uri.path if resource_uri.query: string += "?" + resource_uri.query return str(string) @staticmethod def __to_canonicalize_header_name(original_name): if original_name is None: return None else: return original_name.lower()
[docs] @staticmethod def to_canonicalize_header_value(original_value): if original_value is None: return "" return sub(r"\r?\n(?:(?![\r\n])\s)*", " ", original_value).strip()
[docs] def create_authentication_signature(self, data_to_sign): sig = hmac.new(self.__secret_api_key, data_to_sign, hashlib.sha256) return b64encode(sig.digest())