Skip to content

Commit

Permalink
vmsdk: add api for event log replay (#66)
Browse files Browse the repository at this point in the history
* add event log replay api
* add event log replay processing logic

Signed-off-by: Ruoyu Ying <[email protected]>
  • Loading branch information
Ruoyu-y authored Jan 26, 2024
1 parent 965409a commit 6145e8a
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 0 deletions.
51 changes: 51 additions & 0 deletions common/python/cctrusted_base/eventlog.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"""

import logging
from hashlib import sha1, sha256, sha384, sha512
from cctrusted_base.binaryblob import BinaryBlob
from cctrusted_base.tcg import TcgAlgorithmRegistry
from cctrusted_base.tcg import TcgDigest
Expand Down Expand Up @@ -393,3 +394,53 @@ def _parse_ima_event_log(self, event:bytes) -> TcgEventLog:
return TcgEventLog(rec_num, int(elements[imr_idx]),
TcgEventType.IMA_MEASUREMENT_EVENT, digests,
event_size, event, extra_info)

def replay(self) -> dict:
"""
Replay event logs by IMR index.
Returns:
A dictionary containing the replay result displayed by IMR index and hash algorithm.
Layer 1 key of the dict is the IMR index, the value is another dict which using the
hash algorithm as the key and the replayed measurement as value.
Sample value:
{ 0: { 12: <measurement_replayed>}}
"""
measurement_dict = {}
for event in self._event_logs:
# Skip TcgPcClientImrEvent during replay
if isinstance(event, TcgPcClientImrEvent):
continue

# pylint: disable-next=consider-iterating-dictionary
if event.imr_index not in measurement_dict.keys():
measurement_dict[event.imr_index] = {}

for digest in event.digests:
alg_id = digest.alg.alg_id
hash_val = digest.hash

# Check algorithm type and prepare for replay
match alg_id:
case TcgAlgorithmRegistry.TPM_ALG_SHA1:
algo = sha1()
case TcgAlgorithmRegistry.TPM_ALG_SHA384:
algo = sha384()
case TcgAlgorithmRegistry.TPM_ALG_SHA256:
algo = sha256()
case TcgAlgorithmRegistry.TPM_ALG_SHA512:
algo = sha512()
case _:
LOG.error("Unsupported hash algorithm %d", alg_id)
continue

# Initialize value if alg_id not found in dict
if alg_id not in measurement_dict[event.imr_index].keys():
measurement_dict[event.imr_index][alg_id] = bytearray(
TcgAlgorithmRegistry.TPM_ALG_HASH_DIGEST_SIZE_TABLE[alg_id])

# Do replay and update the result into dict
algo.update(measurement_dict[event.imr_index][alg_id] + hash_val)
measurement_dict[event.imr_index][alg_id] = algo.digest()

return measurement_dict
24 changes: 24 additions & 0 deletions vmsdk/python/cctrusted_vm/sdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ def get_eventlog(self, start:int = None, count:int = None) -> EventLogs:
To measure the full CC runtime environment, the eventlog may include addtional
OS type and cloud native type event beyond the measured-boot.
Args:
start(int): the first index of event log to fetch
count(int): the number of event logs to fetch
Returns:
``Eventlogs`` object containing all event logs following TCG PCClient Spec.
"""
Expand All @@ -136,3 +140,23 @@ def get_eventlog(self, start:int = None, count:int = None) -> EventLogs:
event_logs.select(start, count)

return event_logs

def replay_eventlog(self, event_logs:EventLogs) -> dict:
"""Replay event logs based on data provided.
TCG event logs can be replayed against IMR measurements to prove the integrity of
the event logs.
Args:
event_logs(Eventlogs): the ``Eventlogs`` object to replay
Returns:
A dictionary containing the replay result displayed by IMR index and hash algorithm.
Layer 1 key of the dict is the IMR index, the value is another dict which using the
hash algorithm as the key and the replayed measurement as value.
Sample value:
{ 0: { 12: <measurement_replayed>}}
"""
replay_res = event_logs.replay()

return replay_res

0 comments on commit 6145e8a

Please sign in to comment.