Source code for worldline.connect.sdk.webhooks.signature_validator
import hmac
import hashlib
from base64 import b64encode
from signature_validation_exception import SignatureValidationException
[docs]class SignatureValidator(object):
"""
Validator for webhooks signatures.
"""
def __init__(self, secret_key_store):
if secret_key_store is None:
raise ValueError("secret_key_store is required")
self.__secret_key_store = secret_key_store
[docs] def validate(self, body, request_headers):
"""
Validates the given body using the given request headers.
:raise SignatureValidationException: If the body could not be validated successfully.
"""
self.__validate_body(body, request_headers)
def __validate_body(self, body, request_headers):
signature = self.__get_header_value(request_headers, "X-GCS-Signature")
key_id = self.__get_header_value(request_headers, "X-GCS-KeyId")
secret_key = self.__secret_key_store.get_secret_key(key_id)
unencoded_result = hmac.new(secret_key, body, hashlib.sha256).digest()
expected_signature = b64encode(unencoded_result)
is_valid = hmac.compare_digest(signature, expected_signature)
if is_valid is False:
raise SignatureValidationException("failed to validate signature: " + signature)
@staticmethod
def __get_header_value(request_headers, header_name):
value = None
for header in request_headers:
if header_name.lower() == header.name.lower():
if value is None:
value = header.value
else:
raise SignatureValidationException("encountered multiple occurrences of header '" + header_name + "'")
if value is None:
raise SignatureValidationException("could not find header '" + header_name + "'")
return value
# Used for unit tests
@property
def secret_key_store(self):
return self.__secret_key_store