Source code for worldline.connect.sdk.authentication.v1hmac_authenticator
import hashlib
import hmac
from base64 import b64encode
from operator import attrgetter
from re import sub
from typing import Optional, Sequence
from urllib.parse import ParseResult
from .authenticator import Authenticator
from worldline.connect.sdk.communication.request_header import RequestHeader
[docs]
class V1HMACAuthenticator(Authenticator):
"""
Authenticator implementation using v1HMAC signatures.
"""
[docs]
def __init__(self, api_key_id: str, secret_api_key: str):
"""
:param api_key_id: An identifier for the secret API key. The api_key_id
can be retrieved from the Configuration Center. This identifier is
visible in the HTTP request and is also used to identify the correct
account.
:param secret_api_key: A shared secret. The shared secret can be
retrieved from the Configuration Center. An api_key_id and
secret_api_key always go hand-in-hand, the difference is that
secret_api_key is never visible in the HTTP request. This secret is
used as input for the HMAC algorithm.
"""
Authenticator.__init__(self)
if secret_api_key is None or not secret_api_key.strip():
raise ValueError("secret_api_key is required")
if api_key_id is None or not api_key_id.strip():
raise ValueError("api_key_id is required")
self.__api_key_id = api_key_id
self.__secret_api_key = secret_api_key
[docs]
def get_authorization(self, http_method: str, resource_uri: ParseResult,
http_headers: Optional[Sequence[RequestHeader]]) -> str:
"""Returns a v1HMAC authentication signature header"""
if http_method is None or not http_method.strip():
raise ValueError("http_method is required")
if resource_uri is None:
raise ValueError("resource_uri is required")
data_to_sign = self.to_data_to_sign(http_method, resource_uri, http_headers)
return "GCS v1HMAC:" + self.__api_key_id + ":" + self.create_authentication_signature(data_to_sign)
[docs]
def to_data_to_sign(self, http_method: str, resource_uri: ParseResult, http_headers: Optional[Sequence[RequestHeader]]) -> str:
content_type = None
date = None
canonicalized_headers = ""
canonicalized_resource = self.__to_canonicalized_resource(resource_uri)
xgcs_http_headers = []
if http_headers is not None:
for http_header in http_headers:
if "Content-Type".lower() == http_header.name.lower():
content_type = http_header.value
elif "Date".lower() == http_header.name.lower():
date = http_header.value
else:
name = self.__to_canonicalize_header_name(http_header.name)
if name.startswith("x-gcs"):
value = self.to_canonicalize_header_value(http_header.value)
xgcs_http_header = RequestHeader(name, value)
xgcs_http_headers.append(xgcs_http_header)
xgcs_http_headers.sort(key=attrgetter('name'))
for xgcs_http_header in xgcs_http_headers:
canonicalized_headers += xgcs_http_header.name + ":" + xgcs_http_header.value + "\n"
string = http_method.upper() + "\n"
if content_type is not None:
string += content_type + "\n"
else:
string += "\n"
string += date + "\n"
string += str(canonicalized_headers)
string += canonicalized_resource + "\n"
return str(string)
@staticmethod
def __to_canonicalized_resource(resource_uri: ParseResult) -> str:
"""
Returns the encoded URI path without the HTTP method and including all decoded query parameters.
"""
string = ""
string += resource_uri.path
if resource_uri.query:
string += "?" + resource_uri.query
return str(string)
@staticmethod
def __to_canonicalize_header_name(original_name: Optional[str]) -> Optional[str]:
if original_name is None:
return None
else:
return original_name.lower()
[docs]
def create_authentication_signature(self, data_to_sign: str) -> str:
sig = hmac.new(self.__secret_api_key.encode("utf-8"), data_to_sign.encode("utf-8"), hashlib.sha256)
return b64encode(sig.digest()).decode("utf-8").rstrip('\n')