Skip to content

Commit

Permalink
vmsdk: add runtime event log fetching
Browse files Browse the repository at this point in the history
* add IMA event log fetching in TDX case when IMA over
  RTMR is enabled in the system
* declare new class for event log formatting
* revise comments for get_eventlog API

Signed-off-by: Ruoyu Ying <[email protected]>
  • Loading branch information
Ruoyu-y committed Jan 22, 2024
1 parent e3dcb82 commit acbd440
Show file tree
Hide file tree
Showing 4 changed files with 249 additions and 48 deletions.
220 changes: 189 additions & 31 deletions common/python/cctrusted_base/eventlog.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import logging
from cctrusted_base.binaryblob import BinaryBlob
from cctrusted_base.tcg import TcgAlgorithmRegistry
from cctrusted_base.tcg import TcgDigest
from cctrusted_base.tcg import TcgEventType
from cctrusted_base.tcg import TcgEfiSpecIdEvent
Expand All @@ -17,24 +18,87 @@
class TcgEventLog:
"""TcgEventLog class.
This class contains the event logs following TCG specification.
This is the common class for tcg event logs to be delivered in different formats.
Currently TCG supports several event log formats defined in TCG_PCClient Spec,
Canonical Eventlog Spec, etc.
This class provides the functionality to convey event logs in different format
according to request.
Attributes:
data: raw data containing all boot time event logs
rec_num: contains the record number of the event log within the imr index
imr_index: the index of the register that the event log belongs to
event_type: event type of the event log
digests: a list of TcgDigest objects
event_size: size of the event
event: raw event information
extra_info: extra information in the event
"""

TCG_PCCLIENT_FORMAT = "tcg_pcclient"
TCG_CANONICAL_FORMAT = "tcg_canonical"

def __init__(self, rec_num:int, imr_index:int, event_type:TcgEventType, digests:list[TcgDigest],
event_size:int, event:bytes, extra_info=None) -> None:
self._rec_num = rec_num
self._imr_index = imr_index
self._event_type = event_type
self._digests = digests
self._event_size = event_size
self._event = event
self._extra_info = extra_info

def format_event_log(self, parse_format:str):
"""Format the event log into differen format."""
if parse_format == TcgEventLog.TCG_PCCLIENT_FORMAT:
return self._to_tcg_pcclient_format()

if parse_format == TcgEventLog.TCG_CANONICAL_FORMAT:
return self._to_tcg_canonical_format()

return None

def _to_tcg_pcclient_format(self):
"""The function to convert event log data into event log
following TCG Pcclient Spec.
Return different class according to event type
"""
if self._event_type == TcgEventType.EV_NO_ACTION :
return TcgPcClientImrEvent(self._imr_index, self._event_size, self._digests[0].hash,
self._event_size, self._event)

return TcgImrEvent(self._imr_index, self._event_type, self._digests, self._event_size,
self._event)

def _to_tcg_canonical_format(self):
"""The function to convert event log data into event log following
Canonical Eventlog Spec.
"""
return NotImplementedError


class EventLogs:
"""EventLogs class.
This class contains the all event logs available on the system.
Attributes:
boot_time_data: raw data containing all boot time event logs
runtime_data: raw data containing runtime event logs(now IMA events)
event_logs: all parsed event logs
count: total number of event logs
special_flag: flag to identify if there're customized runtime event logs
"""
spec_id_header_event = None
# Initiate the record number list for each index with default value 0
event_logs_record_number_list = [0] * 24

def __init__(self, data:bytes) -> None:
self._data = data
def __init__(self, boot_time_data:bytes, runtime_data:bytes, parse_format:str=None) -> None:
self._boot_time_data = boot_time_data
self._runtime_data = runtime_data
self._event_logs = []
self._count:int = 0

@property
def data(self):
"""Raw data of TCG event logs."""
return self._data
self._format:str = parse_format

@property
def event_logs(self):
Expand All @@ -55,14 +119,17 @@ def dump(self, is_raw=True) -> None:
False: dump in human readable texts
"""
if self._count == 0:
LOG.info("No parsed event log found.")
LOG.info("No event log found.")
return

if is_raw:
LOG.info("RAW DATA: ------------------------------------------------------------------")
blob = BinaryBlob(self._data, 0)
LOG.info("RAW UEFI EVENT LOG DATA: ---------------------------------------------------")
blob = BinaryBlob(self._boot_time_data, 0)
blob.dump()
LOG.info("RAW RUNTIME EVENT LOG DATA: ------------------------------------------------")
blob = BinaryBlob(self._runtime_data, 0)
blob.dump()
LOG.info("RAW DATA: ------------------------------------------------------------------")
LOG.info("End: -----------------------------------------------------------------------")
return

LOG.info("Event Log Entries:")
Expand All @@ -76,7 +143,7 @@ def select(self, start:int, count:int) -> None:
start: index of the first event log to collect
count: total number of event logs to collect
"""
self._parse()
self._parse(self._format)

if start is not None:
if not 0 < start <= self._count:
Expand All @@ -94,19 +161,35 @@ def select(self, start:int, count:int) -> None:

self._event_logs = self._event_logs[:count]

def _parse(self) -> None:
def _get_record_number(self, imr_index:int) -> int:
"""Fetch the record number maintained separately by index.
Increment the number to be prepared for next measurement.
Args:
imr_index: the imr index used to fetch certain record number
Returns:
The record number
"""
rec_num = EventLogs.event_logs_record_number_list[imr_index]
EventLogs.event_logs_record_number_list[imr_index] += 1

return rec_num

def _parse(self, parse_format:str) -> None:
"""Parse event log data into TCG compatible forms.
Run through all event log data and parse the contents accordingly
Save the parsed event logs into TcgEventLog.
"""
if self._data is None:
LOG.error("Providing invalid data blob.")
if self._boot_time_data is None:
LOG.error("No boot time event log found.")
return

blob = BinaryBlob(self._data, 0)
blob = BinaryBlob(self._boot_time_data, 0)
index = 0

while index < len(self._data):
while index < len(self._boot_time_data):
start = index
imr, index = blob.get_uint32(index)
event_type, index = blob.get_uint32(index)
Expand All @@ -116,17 +199,25 @@ def _parse(self) -> None:

if event_type == TcgEventType.EV_NO_ACTION:
spec_id_event, event_len = \
self._parse_spec_id_event_log(self._data[start:])
self._parse_spec_id_event_log(self._boot_time_data[start:])
index = start + event_len
self._event_logs.append(spec_id_event)
self._event_logs.append(spec_id_event.format_event_log(parse_format))
self._count += 1
else:
event_log, event_len = self._parse_event_log(self._data[start:])
event_log, event_len = self._parse_event_log(self._boot_time_data[start:])
index = start + event_len
self._event_logs.append(event_log)
self._event_logs.append(event_log.format_event_log(parse_format))
self._count += 1

def _parse_spec_id_event_log(self, data:bytes) -> (TcgPcClientImrEvent, int):
if self._runtime_data is None:
return

for event in self._runtime_data.splitlines():
event_log = self._parse_ima_event_log(event)
self._event_logs.append(event_log.format_event_log(parse_format))
self._count += 1

def _parse_spec_id_event_log(self, data:bytes) -> (TcgEventLog, int):
"""Parse TCG specification Id event according to TCG spec at
https://trustedcomputinggroup.org/wp-content/uploads/TCG_PCClientSpecPlat_TPM_2p0_1p04_pub.pdf.
Expand All @@ -143,7 +234,7 @@ def _parse_spec_id_event_log(self, data:bytes) -> (TcgPcClientImrEvent, int):
data: event log data in bytes
Returns:
A TcgPcClientImrEvent containing the Specification ID version event
A common TcgEventLog containing the Specification ID version event
An int specifying the event size
"""
index = 0
Expand All @@ -154,11 +245,18 @@ def _parse_spec_id_event_log(self, data:bytes) -> (TcgPcClientImrEvent, int):
header_imr = imr_index - 1
header_event_type, index = blob.get_uint32(index)

rec_num = self._get_record_number(header_imr)

digest, index = blob.get_bytes(index, 20) # 20 zero for digest
# Convert digest to common TcgDigest type
digest = TcgDigest(TcgAlgorithmRegistry.TPM_ALG_ERROR, digest)
digests = []
digests.append(digest)

header_event_size, index = blob.get_uint32(index) # 4 bytes containing event size
header_event, _ = blob.get_bytes(index, header_event_size)

specification_id_header = TcgPcClientImrEvent(header_imr, header_event_type, digest,
specification_id_header = TcgEventLog(rec_num, header_imr, header_event_type, digests,
header_event_size, header_event)

# Parse EFI Spec Id Event structure
Expand All @@ -179,7 +277,7 @@ def _parse_spec_id_event_log(self, data:bytes) -> (TcgPcClientImrEvent, int):
spec_id_vendor_info, index = blob.get_bytes(index, int(spec_id_vendor_size))
else:
spec_id_vendor_info = bytes()
TcgEventLog.spec_id_header_event = \
EventLogs.spec_id_header_event = \
TcgEfiSpecIdEvent(spec_id_signature, spec_id_platform_cls,
spec_id_version_minor, spec_id_version_major,
spec_id_errata, spec_id_uint_size, spec_id_num_of_algo,
Expand All @@ -188,7 +286,7 @@ def _parse_spec_id_event_log(self, data:bytes) -> (TcgPcClientImrEvent, int):

return specification_id_header, index

def _parse_event_log(self, data:bytes) -> (TcgImrEvent, int):
def _parse_event_log(self, data:bytes) -> (TcgEventLog, int):
"""Parse TCG event log body as single event log entry (TcgImrEventLogEntry) defined at
https://trustedcomputinggroup.org/wp-content/uploads/TCG_PCClientSpecPlat_TPM_2p0_1p04_pub.pdf
Expand All @@ -204,7 +302,7 @@ def _parse_event_log(self, data:bytes) -> (TcgImrEvent, int):
data: event log data in bytes
Returns:
A TcgImrEvent containing the event information
A TcgEventLog containing the event information
An int specifying the event size
"""
index = 0
Expand All @@ -215,13 +313,15 @@ def _parse_event_log(self, data:bytes) -> (TcgImrEvent, int):
imr_index = imr_index - 1
event_type, index = blob.get_uint32(index)

rec_num = self._get_record_number(imr_index)

# Fetch digest count and get each digest and its algorithm
digest_count, index = blob.get_uint32(index)
digests = []
for _ in range(digest_count):
alg_id, index = blob.get_uint16(index)
alg = next((alg for alg in \
TcgEventLog.spec_id_header_event.digest_sizes \
EventLogs.spec_id_header_event.digest_sizes \
if alg.algo_id == alg_id), None)
if alg is None:
raise ValueError(f'No algorithm with such algo_id {alg_id} found')
Expand All @@ -233,6 +333,64 @@ def _parse_event_log(self, data:bytes) -> (TcgImrEvent, int):
event, index = blob.get_bytes(index, event_size)

# Generate TcgImrEvent using the info parsed
entry = TcgImrEvent(imr_index, event_type, digests, event_size,
entry = TcgEventLog(rec_num, imr_index, event_type, digests, event_size,
event)
return entry, index

# pylint: disable=c0301
def _parse_ima_event_log(self, event:bytes) -> TcgEventLog:
"""Parse ascii IMA events gathered during runtime.
Sample event and format:
IMR index | Template hash | Template name | Event data according to template
10 1e762ca412a3ef388ddcab416e2eb382d9d1e356 ima-ng sha384:74ccc46104f42db070375e6876a23aeaa3c2ae458888475baaa171c3fb7001b0fc385ed08420d5f60620924fc64d0b80 /etc/lsb-release
Args:
event: IMA ascii raw event
Returns:
A TcgEventLog object containing the ima event log
"""

# Split the IMA ascii event log entry using space for processing
elements = event.decode().split(" ")

# IMR index value affects the spliting result.
# Number less than 10 will leave an extra space at front.
base_idx = 0
if elements[0] == "":
base_idx = 1

# imr_idx contains the element index that stores the IMR index
# digest_idx containes the element index that stores digest
# template_idx contains the element index that stores the template name
imr_idx = base_idx
digest_idx = imr_idx + 1
template_idx = digest_idx + 1

rec_num = self._get_record_number(int(elements[imr_idx]))

# Merge the elements left as event data
event = bytes(" ".join(elements[template_idx+1:]), "ascii")
event_size = len(event)

# Use digest size to figure out the algorithm id
digests = []
digest_size = len(elements[digest_idx])/2
alg_id = TcgAlgorithmRegistry.TPM_ALG_ERROR
if digest_size in TcgAlgorithmRegistry.TPM_ALG_HASH_DIGEST_SIZE_TABLE.values():
for key, value in TcgAlgorithmRegistry.TPM_ALG_HASH_DIGEST_SIZE_TABLE.items():
if value == digest_size:
alg_id = key
break
digest = TcgDigest(alg_id, bytearray.fromhex(elements[digest_idx]))
digests.append(digest)

# Put template name within extra info
extra_info = {
"template_name": elements[template_idx]
}

return TcgEventLog(rec_num, int(elements[base_idx]),
TcgEventType.IMA_MEASUREMENT_EVENT, digests,
event_size, event, extra_info)
Loading

0 comments on commit acbd440

Please sign in to comment.