67 lines
1.9 KiB
Python
67 lines
1.9 KiB
Python
import asyncio
|
|
import datetime
|
|
import logging
|
|
from collections import namedtuple
|
|
|
|
from dateutil.parser import parse as dtparse
|
|
|
|
import socket
|
|
import ssl
|
|
|
|
CertCheckResult = namedtuple(
|
|
"CertCheckResult", ["hostname", "check_successful", "expiration_date", "debug"]
|
|
)
|
|
|
|
|
|
# Unit of time slept asynchronously to simulate async socket handling
|
|
AWAIT_IOTA = 0.001
|
|
|
|
|
|
async def get_cert(hostname, timeout):
|
|
ctx = ssl.create_default_context()
|
|
with ctx.wrap_socket(
|
|
socket.socket(), server_hostname=hostname, do_handshake_on_connect=False
|
|
) as s:
|
|
s.settimeout(timeout)
|
|
|
|
# @todo simulate async connect
|
|
s.connect((hostname, 443))
|
|
|
|
s.setblocking(False)
|
|
# Cannot await the handshake: simulate it with asyncio sleep
|
|
while "Handshake not finished":
|
|
try:
|
|
s.do_handshake()
|
|
break
|
|
except ssl.SSLWantReadError:
|
|
await asyncio.sleep(AWAIT_IOTA)
|
|
except ssl.SSLWantWriteError:
|
|
await asyncio.sleep(AWAIT_IOTA)
|
|
|
|
return s.getpeercert()
|
|
|
|
|
|
async def check_host_certificate_expiration(hostname, days_to_expiration, timeout=5):
|
|
logging.info(f"Getting CERT from {hostname}")
|
|
try:
|
|
cert = await get_cert(hostname, timeout)
|
|
except ssl.SSLCertVerificationError as e:
|
|
return CertCheckResult(hostname, False, None, e.strerror)
|
|
|
|
expdate = dtparse(cert.get("notAfter"))
|
|
curdate = datetime.datetime.now(tz=expdate.tzinfo)
|
|
|
|
if expdate - curdate < datetime.timedelta(days=days_to_expiration):
|
|
return CertCheckResult(
|
|
hostname,
|
|
False,
|
|
expdate,
|
|
f"Certificate expires in {(expdate - curdate).days} days, expected more than {days_to_expiration}",
|
|
)
|
|
return CertCheckResult(
|
|
hostname,
|
|
True,
|
|
expdate,
|
|
f"Certificate expires in {(expdate - curdate).days} days",
|
|
)
|