Source code for worldline.connect.sdk.webhooks.signature_validator
import hmac
import hashlib
from base64 import b64encode
from typing import Union, Sequence
from .secret_key_store import SecretKeyStore
from .signature_validation_exception import SignatureValidationException
from worldline.connect.sdk.communication.request_header import RequestHeader
[docs]
class SignatureValidator(object):
"""
Validator for webhooks signatures.
"""
def __init__(self, secret_key_store: SecretKeyStore):
if secret_key_store is None:
raise ValueError("secret_key_store is required")
self.__secret_key_store = secret_key_store
[docs]
def validate(self, body: Union[str, bytes], request_headers: Sequence[RequestHeader]) -> None:
"""
Validates the given body using the given request headers.
:raise SignatureValidationException: If the body could not be validated successfully.
"""
self.__validate_body(body, request_headers)
def __validate_body(self, body: Union[str, bytes], request_headers: Sequence[RequestHeader]) -> None:
signature = self.__get_header_value(request_headers, "X-GCS-Signature")
key_id = self.__get_header_value(request_headers, "X-GCS-KeyId")
secret_key = self.__secret_key_store.get_secret_key(key_id)
unencoded_result = hmac.new(secret_key.encode("utf-8"), body, hashlib.sha256).digest()
expected_signature = b64encode(unencoded_result)
is_valid = hmac.compare_digest(signature.encode("utf-8"), expected_signature)
if is_valid is False:
raise SignatureValidationException("failed to validate signature: " + signature)
@staticmethod
def __get_header_value(request_headers: Sequence[RequestHeader], header_name: str) -> str:
value = None
for header in request_headers:
if header_name.lower() == header.name.lower():
if value is None:
value = header.value
else:
raise SignatureValidationException("encountered multiple occurrences of header '" + header_name + "'")
if value is None:
raise SignatureValidationException("could not find header '" + header_name + "'")
return value
# Used for unit tests
@property
def secret_key_store(self) -> SecretKeyStore:
return self.__secret_key_store