Source code for worldline.connect.sdk.communication.default_connection

import math
import uuid
from datetime import datetime, timedelta
from typing import Mapping, Optional, Sequence
from urllib.parse import urlparse

import requests
from requests.adapters import HTTPAdapter
from requests.models import PreparedRequest
from requests.exceptions import RequestException, Timeout
from requests_toolbelt import MultipartEncoder

from .communication_exception import CommunicationException
from .connection import RequestBody, Response, URI
from .multipart_form_data_object import MultipartFormDataObject
from .pooled_connection import PooledConnection
from .request_header import RequestHeader
from .response_header import get_header_value

from worldline.connect.sdk.communicator_configuration import CommunicatorConfiguration
from worldline.connect.sdk.proxy_configuration import ProxyConfiguration
from worldline.connect.sdk.log.body_obfuscator import BodyObfuscator
from worldline.connect.sdk.log.communicator_logger import CommunicatorLogger
from worldline.connect.sdk.log.header_obfuscator import HeaderObfuscator
from worldline.connect.sdk.log.request_log_message import RequestLogMessage
from worldline.connect.sdk.log.response_log_message import ResponseLogMessage

CHARSET = "UTF-8"


[docs] class DefaultConnection(PooledConnection): """ Provides an HTTP request interface, thread-safe :param connect_timeout: timeout in seconds before a pending connection is dropped :param socket_timeout: timeout in seconds before dropping an established connection. This is the time the server is allowed for a response :param max_connections: the maximum number of connections in the connection pool :param proxy_configuration: ProxyConfiguration object that contains data about proxy settings if present. It should be writeable as string and have a scheme attribute. Use the methods get, delete, post and put to perform the corresponding HTTP request. Alternatively you can use request with the request method as the first parameter. URI, headers and body should be given on a per-request basis. """ def __init__(self, connect_timeout: int, socket_timeout: int, max_connections: int = CommunicatorConfiguration.DEFAULT_MAX_CONNECTIONS, proxy_configuration: Optional[ProxyConfiguration] = None): self.logger = None self.__requests_session = requests.session() self.__requests_session.mount("http://", HTTPAdapter(pool_maxsize=max_connections, pool_connections=1)) self.__requests_session.mount("https://", HTTPAdapter(pool_maxsize=max_connections, pool_connections=1)) # request timeouts are in seconds self.__connect_timeout = connect_timeout if connect_timeout >= 0 else None self.__socket_timeout = socket_timeout if socket_timeout >= 0 else None if proxy_configuration: proxy = { "http": str(proxy_configuration), "https": str(proxy_configuration) } self.__requests_session.proxies = proxy self.__body_obfuscator = BodyObfuscator.default_body_obfuscator() self.__header_obfuscator = HeaderObfuscator.default_header_obfuscator() @property def connect_timeout(self) -> Optional[int]: """Connection timeout in seconds""" return self.__connect_timeout @property def socket_timeout(self) -> Optional[int]: """Socket timeout in seconds""" return self.__socket_timeout
[docs] def get(self, url: URI, request_headers: Sequence[RequestHeader]) -> Response: """Perform a request to the server given by url :param url: the url to the server, given as a parsed url :param request_headers: a sequence containing RequestHeader objects representing the request headers """ return self._request('get', url, request_headers)
[docs] def delete(self, url: URI, request_headers: Sequence[RequestHeader]) -> Response: """Perform a request to the server given by url :param url: the url to the server, given as a parsed url :param request_headers: a sequence containing RequestHeader objects representing the request headers """ return self._request('delete', url, request_headers)
[docs] def post(self, url: URI, request_headers: Sequence[RequestHeader], body: RequestBody) -> Response: """Perform a request to the server given by url :param url: the url to the server, given as a parsed url :param request_headers: a sequence containing RequestHeader objects representing the request headers :param body: the request body """ if isinstance(body, MultipartFormDataObject): body = self.__to_multipart_encoder(body) return self._request('post', url, request_headers, body)
[docs] def put(self, url: URI, request_headers: Sequence[RequestHeader], body: RequestBody) -> Response: """Perform a request to the server given by url :param url: the url to the server, given as a parsed url :param request_headers: a sequence containing RequestHeader objects representing the request headers :param body: the request body """ if isinstance(body, MultipartFormDataObject): body = self.__to_multipart_encoder(body) return self._request('put', url, request_headers, body)
@staticmethod def __to_multipart_encoder(multipart: MultipartFormDataObject) -> MultipartEncoder: fields = {} for name, value in multipart.values.items(): fields[name] = value for name, uploadable_file in multipart.files.items(): fields[name] = (uploadable_file.file_name, uploadable_file.content, uploadable_file.content_type) encoder = MultipartEncoder(fields=fields, boundary=multipart.boundary) if encoder.content_type != multipart.content_type: raise ValueError("MultipartEncoder did not create the expected content type") return encoder class _ToResult(object): def __call__(self, func): def _wrapper(*args, **kwargs): result = func(*args, **kwargs) header = next(result) return header + (result,) return _wrapper @_ToResult() def _request(self, method: str, url: URI, headers: Sequence[RequestHeader], body: RequestBody = None) -> Response: """ Perform a request to the server given by url :param url: the url to the server, given as a parsed url :param headers: a sequence containing RequestHeader objects representing the request headers :param body: the request body """ headers = {} if not headers else headers if not isinstance(url, str): url = url.geturl() # convert the sequence of RequestParam objects to a dictionary of key:value pairs if necessary if headers and not isinstance(headers, dict): headers = {param.name: param.value for param in headers} # send request with all parameters not declared in session and with callback for logging response request = requests.Request(method, url, headers=headers, data=body, hooks={'response': self._cb_log_response}) prepped_request = self.__requests_session.prepare_request(request) # add timestamp to request for later reference prepped_request.timestamp = datetime.now() _id = str(uuid.uuid4()) # store random id in request so it can be matched with its response in logging prepped_request.id = _id self._log_request(prepped_request) try: timeout_ = (self.__connect_timeout, self.__socket_timeout) requests_response = self.__requests_session.send(prepped_request, timeout=timeout_, stream=True) try: iterable = requests_response.iter_content(chunk_size=1024) yield requests_response.status_code, requests_response.headers for chunk in iterable: yield chunk finally: requests_response.close() except Timeout as timeout: self._log_error(prepped_request.id, timeout, prepped_request.timestamp) raise CommunicationException(timeout) except RequestException as exception: self._log_error(prepped_request.id, exception, prepped_request.timestamp) raise CommunicationException(exception) except Exception as exception: self._log_error(prepped_request.id, exception, prepped_request.timestamp) raise def _log_request(self, request: PreparedRequest) -> None: """ Log parameter request if logging is enabled at the moment of logging. Also adds a timestamp to the request for response logging """ logger = self.logger if logger is None: return method = request.method url = urlparse(request.url) if url.query: local_path = url.path + "?" + url.query else: local_path = url.path try: message = RequestLogMessage(request_id=request.id, method=method, uri=local_path, body_obfuscator=self.__body_obfuscator, header_obfuscator=self.__header_obfuscator) for name in request.headers: message.add_header(name, request.headers[name]) body = request.body if body: content = request.headers['Content-Type'] if content != "application/json": message.set_body(body, content, CHARSET) else: message.set_body(body, content) logger.log_request(message) except Exception as exception: logger.log("An error occurred trying to log request '{}'".format(request.id), exception) def _cb_log_response(self, response: requests.models.Response, **kwargs) -> None: """Log parameter response if logging is enabled at the moment of logging""" logger = self.logger if logger is None: return request = response.request _id = request.id duration = math.ceil((datetime.now() - request.timestamp).total_seconds() * 1000) status_code = response.status_code try: message = ResponseLogMessage(request_id=_id, status_code=status_code, duration=duration, body_obfuscator=self.__body_obfuscator, header_obfuscator=self.__header_obfuscator) for name in response.headers: message.add_header(name, response.headers[name]) if self.__is_binary(response.headers): body = "<binary content>" else: # The response is always encoded UTF8 # When this is not specified anywhere, the response body will be encoded in the wrong way response.encoding = 'utf8' body = response.text if body: content = response.headers['Content-Type'] message.set_body(body, content) logger.log_response(message) except Exception as exception: logger.log("An error occurred trying to log response '{}'".format(_id), exception) def _log_error(self, request_id: str, error: Exception, start_time: datetime) -> None: """Log communication errors when logging is enabled""" logger = self.logger if logger: duration = math.ceil((datetime.now() - start_time).total_seconds() * 1000) logger.log("Error occurred for outgoing request (requestId='{}', {} s)".format(request_id, duration), error) @staticmethod def __is_binary(headers: Mapping[str, str]) -> bool: content_type = get_header_value(headers, "Content-Type") if content_type is None: return False content_type = content_type.lower() return not (content_type.startswith("text/") or "json" in content_type or "xml" in content_type)
[docs] def set_body_obfuscator(self, body_obfuscator: BodyObfuscator) -> None: self.__body_obfuscator = body_obfuscator
[docs] def set_header_obfuscator(self, header_obfuscator: HeaderObfuscator) -> None: self.__header_obfuscator = header_obfuscator
[docs] def enable_logging(self, communicator_logger: CommunicatorLogger) -> None: self.logger = communicator_logger
[docs] def disable_logging(self) -> None: self.logger = None
[docs] def close_idle_connections(self, idle_time: timedelta) -> None: """ :param idle_time: a datetime.timedelta object indicating the idle time """ pass
[docs] def close_expired_connections(self) -> None: pass
[docs] def __enter__(self): return self
[docs] def __exit__(self, exc_type, exc_val, exc_tb): self.close()
[docs] def close(self) -> None: """ Explicitly closes the connection """ self.__requests_session.close()