Refactor reporting to provide instant feedback on default output
This commit is contained in:
@@ -12,7 +12,7 @@ import logging
|
|||||||
from docopt import docopt
|
from docopt import docopt
|
||||||
|
|
||||||
from certo.checks.hostname import check_host_certificate_expiration
|
from certo.checks.hostname import check_host_certificate_expiration
|
||||||
from certo.output import default_output, json_output
|
from certo.report import JSONReporter, DefaultReporter
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
args = docopt(__doc__)
|
args = docopt(__doc__)
|
||||||
@@ -23,17 +23,18 @@ if __name__ == "__main__":
|
|||||||
hostnames = args.get("<hostnames>")
|
hostnames = args.get("<hostnames>")
|
||||||
days_to_expiration = int(args.get("--days-to-expiration"))
|
days_to_expiration = int(args.get("--days-to-expiration"))
|
||||||
|
|
||||||
results = []
|
# @todo factory
|
||||||
|
if output_as_json:
|
||||||
|
reporter = JSONReporter()
|
||||||
|
else:
|
||||||
|
reporter = DefaultReporter()
|
||||||
|
|
||||||
|
# @todo async
|
||||||
for hs in hostnames:
|
for hs in hostnames:
|
||||||
logging.info(f"Getting CERT from {hs}")
|
logging.info(f"Getting CERT from {hs}")
|
||||||
results.append(check_host_certificate_expiration(hs, days_to_expiration))
|
reporter.add_check(check_host_certificate_expiration(hs, days_to_expiration))
|
||||||
|
|
||||||
failed = list(r for r in results if not r.check_successful)
|
if log := reporter.report():
|
||||||
|
print(log)
|
||||||
|
|
||||||
if output_as_json:
|
exit(reporter.num_failed())
|
||||||
print(json_output(results))
|
|
||||||
else:
|
|
||||||
print(default_output(results))
|
|
||||||
|
|
||||||
exit(len(failed))
|
|
||||||
|
|||||||
@@ -34,7 +34,7 @@ def check_host_certificate_expiration(hostname, days_to_expiration, timeout=5):
|
|||||||
hostname,
|
hostname,
|
||||||
False,
|
False,
|
||||||
expdate,
|
expdate,
|
||||||
f"Certificate expires in {(expdate - curdate).days} days - expected more than {days_to_expiration}",
|
f"Certificate expires in {(expdate - curdate).days} days, expected more than {days_to_expiration}",
|
||||||
)
|
)
|
||||||
return CertCheckResult(
|
return CertCheckResult(
|
||||||
hostname,
|
hostname,
|
||||||
|
|||||||
@@ -1,34 +0,0 @@
|
|||||||
import json
|
|
||||||
from typing import List
|
|
||||||
|
|
||||||
from certo.checks.hostname import CertCheckResult
|
|
||||||
|
|
||||||
|
|
||||||
def json_output(cert_check_results: List[CertCheckResult]):
|
|
||||||
def make_check_serialisable(check):
|
|
||||||
"""
|
|
||||||
Converts a CertCheckResult for json serialisation
|
|
||||||
:param check: CertCheckResult as output by checks
|
|
||||||
:return: check as dict() with appropriate type conversions for json.dumps
|
|
||||||
"""
|
|
||||||
ret = check._asdict()
|
|
||||||
if check.expiration_date:
|
|
||||||
ret["expiration_date"] = check.expiration_date.strftime("%c %Z")
|
|
||||||
return ret
|
|
||||||
|
|
||||||
return json.dumps(
|
|
||||||
list(map(lambda check: make_check_serialisable(check), cert_check_results)),
|
|
||||||
indent=4,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def default_output(cert_check_results: List[CertCheckResult]):
|
|
||||||
res = list()
|
|
||||||
for check in cert_check_results:
|
|
||||||
result = f"[{'PASS' if check.check_successful else 'FAIL'}] Check host {check.hostname}"
|
|
||||||
if check.debug:
|
|
||||||
result += f" - {check.debug}"
|
|
||||||
if check.expiration_date:
|
|
||||||
result += f" - Certificate expires on {check.expiration_date} {check.expiration_date.tzname()}"
|
|
||||||
res.append(result)
|
|
||||||
return "\n".join(res)
|
|
||||||
53
certo/report.py
Normal file
53
certo/report.py
Normal file
@@ -0,0 +1,53 @@
|
|||||||
|
import json
|
||||||
|
|
||||||
|
|
||||||
|
class CheckReporter:
|
||||||
|
def __init__(self):
|
||||||
|
self.checks = list()
|
||||||
|
|
||||||
|
def add_check(self, check):
|
||||||
|
self.checks.append(check)
|
||||||
|
|
||||||
|
def failed(self):
|
||||||
|
return list(r for r in self.checks if not r.check_successful)
|
||||||
|
|
||||||
|
def num_failed(self):
|
||||||
|
return len(self.failed())
|
||||||
|
|
||||||
|
def report(self):
|
||||||
|
...
|
||||||
|
|
||||||
|
|
||||||
|
class JSONReporter(CheckReporter):
|
||||||
|
def __make_check_serialisable(check):
|
||||||
|
"""
|
||||||
|
Converts a CertCheckResult for json serialisation
|
||||||
|
:param check: CertCheckResult as output by checks
|
||||||
|
:return: check as dict() with appropriate type conversions for json.dumps
|
||||||
|
"""
|
||||||
|
ret = check._asdict()
|
||||||
|
if check.expiration_date:
|
||||||
|
ret["expiration_date"] = check.expiration_date.strftime("%c %Z")
|
||||||
|
return ret
|
||||||
|
|
||||||
|
def report(self):
|
||||||
|
return json.dumps(
|
||||||
|
list(
|
||||||
|
map(
|
||||||
|
lambda check: JSONReporter.__make_check_serialisable(check),
|
||||||
|
self.checks,
|
||||||
|
)
|
||||||
|
),
|
||||||
|
indent=4,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class DefaultReporter(CheckReporter):
|
||||||
|
def add_check(self, check):
|
||||||
|
super().add_check(check)
|
||||||
|
result = f"[{'PASS' if check.check_successful else 'FAIL'}] Check host {check.hostname}"
|
||||||
|
if check.debug:
|
||||||
|
result += f" - {check.debug}"
|
||||||
|
if check.expiration_date:
|
||||||
|
result += f" - Certificate expires on {check.expiration_date} {check.expiration_date.tzname()}"
|
||||||
|
print(result)
|
||||||
Reference in New Issue
Block a user