Source code for worldline.connect.sdk.authentication.v1hmac_authenticator
import hashlib
import hmac
from base64 import b64encode
from operator import attrgetter
from re import sub
from authenticator import Authenticator
from worldline.connect.sdk.communication.request_header import RequestHeader
[docs]class V1HMACAuthenticator(Authenticator):
"""
Authenticator implementation using v1HMAC signatures.
"""
[docs] def __init__(self, api_key_id, secret_api_key):
"""
:param api_key_id: An identifier for the secret API key. The api_key_id
can be retrieved from the Configuration Center. This identifier is
visible in the HTTP request and is also used to identify the correct
account.
:param secret_api_key: A shared secret. The shared secret can be
retrieved from the Configuration Center. An api_key_id and
secret_api_key always go hand-in-hand, the difference is that
secret_api_key is never visible in the HTTP request. This secret is
used as input for the HMAC algorithm.
"""
Authenticator.__init__(self)
if secret_api_key is None or not secret_api_key.strip():
raise ValueError("secret_api_key is required")
if api_key_id is None or not api_key_id.strip():
raise ValueError("api_key_id is required")
self.__api_key_id = api_key_id
self.__secret_api_key = secret_api_key
[docs] def get_authorization(self, http_method, resource_uri, http_headers):
"""Returns a v1HMAC authentication signature header"""
if http_method is None or not http_method.strip():
raise ValueError("http_method is required")
if resource_uri is None:
raise ValueError("resource_uri is required")
data_to_sign = self.to_data_to_sign(http_method, resource_uri, http_headers)
return "GCS v1HMAC:" + self.__api_key_id + ":" + self.create_authentication_signature(data_to_sign)
[docs] def to_data_to_sign(self, http_method, resource_uri, http_headers):
content_type = None
date = None
canonicalized_headers = ""
canonicalized_resource = self.__to_canonicalized_resource(resource_uri)
xgcs_http_headers = []
if http_headers is not None:
for http_header in http_headers:
if "Content-Type".lower() == http_header.name.lower():
content_type = http_header.value
elif "Date".lower() == http_header.name.lower():
date = http_header.value
else:
name = self.__to_canonicalize_header_name(http_header.name)
if name.startswith("x-gcs"):
value = self.to_canonicalize_header_value(http_header.value)
xgcs_http_header = RequestHeader(name, value)
xgcs_http_headers.append(xgcs_http_header)
xgcs_http_headers.sort(key=attrgetter('name'))
for xgcs_http_header in xgcs_http_headers:
canonicalized_headers += xgcs_http_header.name + ":" + xgcs_http_header.value + "\n"
string = http_method.upper() + "\n"
if content_type is not None:
string += content_type + "\n"
else:
string += "\n"
string += date + "\n"
string += str(canonicalized_headers)
string += canonicalized_resource + "\n"
return str(string)
@staticmethod
def __to_canonicalized_resource(resource_uri):
"""
Returns the encoded URI path without the HTTP method and including all decoded query parameters.
"""
string = ""
string += resource_uri.path
if resource_uri.query:
string += "?" + resource_uri.query
return str(string)
@staticmethod
def __to_canonicalize_header_name(original_name):
if original_name is None:
return None
else:
return original_name.lower()
[docs] def create_authentication_signature(self, data_to_sign):
sig = hmac.new(self.__secret_api_key, data_to_sign, hashlib.sha256)
return b64encode(sig.digest())