Add local TLS server

This also adds certificates for testing purposes and files to make it
easy to generate/regenerate them.

This also replaces an existing test of how we utilize our pool manager
such that we don't connect to badssl.com

Finally, this adds additional context parameters for our pool manager to
account for mTLS certificates used by clients to authenticate to a
server.
This commit is contained in:
Ian Stapleton Cordasco
2024-03-13 15:58:45 -05:00
parent a58d7f2ffb
commit a94e9b5308
30 changed files with 716 additions and 9 deletions
+42
View File
@@ -1,5 +1,6 @@
import select
import socket
import ssl
import threading
@@ -132,3 +133,44 @@ class Server(threading.Thread):
self._close_server_sock_ignore_errors()
self.join()
return False # allow exceptions to propagate
class TLSServer(Server):
def __init__(
self,
*,
handler=None,
host="localhost",
port=0,
requests_to_handle=1,
wait_to_close_event=None,
cert_chain=None,
keyfile=None,
mutual_tls=False,
cacert=None,
):
super().__init__(
handler=handler,
host=host,
port=port,
requests_to_handle=requests_to_handle,
wait_to_close_event=wait_to_close_event,
)
self.cert_chain = cert_chain
self.keyfile = keyfile
self.ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
self.ssl_context.load_cert_chain(self.cert_chain, keyfile=self.keyfile)
self.mutual_tls = mutual_tls
self.cacert = cacert
if mutual_tls:
# For simplicity, we're going to assume that the client cert is
# issued by the same CA as our Server certificate
self.ssl_context.verify_mode = ssl.CERT_OPTIONAL
self.ssl_context.load_verify_locations(self.cacert)
def _create_socket_and_bind(self):
sock = socket.socket()
sock = self.ssl_context.wrap_socket(sock, server_side=True)
sock.bind((self.host, self.port))
sock.listen()
return sock