import asyncio import datetime import logging from collections import namedtuple from dateutil.parser import parse as dtparse import socket import ssl CertCheckResult = namedtuple( "CertCheckResult", ["hostname", "check_successful", "expiration_date", "debug"] ) # Unit of time slept asynchronously to simulate async socket handling AWAIT_IOTA = 0.001 async def get_cert(hostname, timeout): ctx = ssl.create_default_context() with ctx.wrap_socket( socket.socket(), server_hostname=hostname, do_handshake_on_connect=False ) as s: s.settimeout(timeout) # @todo simulate async connect s.connect((hostname, 443)) s.setblocking(False) # Cannot await the handshake: simulate it with asyncio sleep while "Handshake not finished": try: s.do_handshake() break except ssl.SSLWantReadError: await asyncio.sleep(AWAIT_IOTA) except ssl.SSLWantWriteError: await asyncio.sleep(AWAIT_IOTA) return s.getpeercert() async def check_host_certificate_expiration(hostname, days_to_expiration, timeout=5): logging.info(f"Getting CERT from {hostname}") try: cert = await get_cert(hostname, timeout) except ssl.SSLCertVerificationError as e: return CertCheckResult(hostname, False, None, e.strerror) expdate = dtparse(cert.get("notAfter")) curdate = datetime.datetime.now(tz=expdate.tzinfo) if expdate - curdate < datetime.timedelta(days=days_to_expiration): return CertCheckResult( hostname, False, expdate, f"Certificate expires in {(expdate - curdate).days} days, expected more than {days_to_expiration}", ) return CertCheckResult( hostname, True, expdate, f"Certificate expires in {(expdate - curdate).days} days", )