调查基于日志的工件


到目前为止,我们已经了解了如何使用 Python 在 Windows 中获取工件。在本章中,让我们学习使用 Python 研究基于日志的工件。

介绍


基于日志的工件是对数字取证专家非常有用的信息宝库。虽然我们有各种监控软件来收集信息,但从中解析有用信息的主要问题是我们需要大量数据。

各种基于日志的工件和 Python 调查


在本节中,让我们讨论各种基于日志的工件及其在 Python 中的研究:

时间戳


时间戳在日志中传达活动的数据和时间。它是任何日志文件的重要元素之一。请注意,这些数据和时间值可以有多种格式。

下面显示的 Python 脚本将原始日期时间作为输入,并提供格式化的时间戳作为其输出。

对于这个脚本,我们需要遵循以下步骤:

  • 首先,设置将获取原始数据值以及数据源和数据类型的参数。

  • 现在,提供一个类,为不同日期格式的数据提供通用接口。

Python代码

让我们看看如何为此目的使用 Python 代码:

首先,导入以下 Python 模块:

from __future__ import print_function
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
from datetime import datetime as dt
from datetime import timedelta

现在像往常一样,我们需要为命令行处理程序提供参数。这里它将接受三个参数,第一个是要处理的日期值,第二个是该日期值的来源,第三个是它的类型:

if __name__ == '__main__':
    parser = ArgumentParser('Timestamp Log-based artifact')
    parser.add_argument("date_value", help="Raw date value to parse")
    parser.add_argument(
        "source", help = "Source format of date",choices = ParseDate.get_supported_formats())
    parser.add_argument(
        "type", help = "Data type of input value",choices = ('number', 'hex'), default = 'int')
   
    args = parser.parse_args()
    date_parser = ParseDate(args.date_value, args.source, args.type)
    date_parser.run()
    print(date_parser.timestamp)

现在,我们需要定义一个类,它将接受日期值、日期源和值类型的参数:

class ParseDate(object):
    def __init__(self, date_value, source, data_type):
        self.date_value = date_value
        self.source = source
        self.data_type = data_type
        self.timestamp = None

现在我们将定义一个像控制器一样的方法,就像 main() 方法一样:

def run(self):
    if self.source == 'unix-epoch':
        self.parse_unix_epoch()
    elif self.source == 'unix-epoch-ms':
        self.parse_unix_epoch(True)
    elif self.source == 'windows-filetime':
        self.parse_windows_filetime()
@classmethod
def get_supported_formats(cls):
    return ['unix-epoch', 'unix-epoch-ms', 'windows-filetime']

现在,我们需要定义两个分别处理 Unix 纪元时间和 FILETIME 的方法:

def parse_unix_epoch(self, milliseconds=False):
    if self.data_type == 'hex':
        conv_value = int(self.date_value)
        if milliseconds:
            conv_value = conv_value / 1000.0
    elif self.data_type == 'number':
        conv_value = float(self.date_value)
        if milliseconds:
            conv_value = conv_value / 1000.0
    else:
        print("Unsupported data type '{}' provided".format(self.data_type))
        sys.exit('1')
    ts = dt.fromtimestamp(conv_value)
    self.timestamp = ts.strftime('%Y-%m-%d %H:%M:%S.%f')
def parse_windows_filetime(self):
    if self.data_type == 'hex':
        microseconds = int(self.date_value, 16) / 10.0
    elif self.data_type == 'number':
        microseconds = float(self.date_value) / 10
    else:
        print("Unsupported data type '{}'   provided".format(self.data_type))
        sys.exit('1')
    ts = dt(1601, 1, 1) + timedelta(microseconds=microseconds)
    self.timestamp = ts.strftime('%Y-%m-%d %H:%M:%S.%f')

运行上述脚本后,通过提供时间戳,我们可以以易于阅读的格式获取转换后的值。

Web 服务器日志


从数字取证专家的角度来看,Web 服务器日志是另一个重要的工件,因为它们可以获得有用的用户统计信息以及有关用户和地理位置的信息。以下是处理 Web 服务器日志后将创建电子表格的 Python 脚本,以便于分析信息。

首先我们需要导入以下 Python 模块:

from __future__ import print_function
from argparse import ArgumentParser, FileType

import re
import shlex
import logging
import sys
import csv

logger = logging.getLogger(__file__)

现在,我们需要定义将从日志中解析的模式:

iis_log_format = [
    ("date", re.compile(r"\d{4}-\d{2}-\d{2}")),
    ("time", re.compile(r"\d\d:\d\d:\d\d")),
    ("s-ip", re.compile(
        r"((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)(\.|$)){4}")),
    ("cs-method", re.compile(
        r"(GET)|(POST)|(PUT)|(DELETE)|(OPTIONS)|(HEAD)|(CONNECT)")),
    ("cs-uri-stem", re.compile(r"([A-Za-z0-1/\.-]*)")),
    ("cs-uri-query", re.compile(r"([A-Za-z0-1/\.-]*)")),
    ("s-port", re.compile(r"\d*")),
    ("cs-username", re.compile(r"([A-Za-z0-1/\.-]*)")),
    ("c-ip", re.compile(
        r"((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)(\.|$)){4}")),
    ("cs(User-Agent)", re.compile(r".*")),
    ("sc-status", re.compile(r"\d*")),
    ("sc-substatus", re.compile(r"\d*")),
    ("sc-win32-status", re.compile(r"\d*")),
    ("time-taken", re.compile(r"\d*"))]

现在,为命令行处理程序提供一个参数。这里它将接受两个参数,第一个是要处理的 IIS 日志,第二个是所需的 CSV 文件路径。

if __name__ == '__main__':
    parser = ArgumentParser('Parsing Server Based Logs')
    parser.add_argument('iis_log', help = "Path to IIS Log",type = FileType('r'))
    parser.add_argument('csv_report', help = "Path to CSV report")
    parser.add_argument('-l', help = "Path to processing log",default=__name__ + '.log')
    args = parser.parse_args()
    logger.setLevel(logging.DEBUG)
    msg_fmt = logging.Formatter(
        "%(asctime)-15s %(funcName)-10s ""%(levelname)-8s %(message)s")
   
    strhndl = logging.StreamHandler(sys.stdout)
    strhndl.setFormatter(fmt = msg_fmt)
    fhndl = logging.FileHandler(args.log, mode = 'a')
    fhndl.setFormatter(fmt = msg_fmt)
   
    logger.addHandler(strhndl)
    logger.addHandler(fhndl)
    logger.info("Starting IIS Parsing ")
    logger.debug("Supplied arguments: {}".format(", ".join(sys.argv[1:])))
    logger.debug("System " + sys.platform)
    logger.debug("Version " + sys.version)
    main(args.iis_log, args.csv_report, logger)
    iologger.info("IIS Parsing Complete")

现在我们需要定义 main() 方法来处理批量日志信息的脚本:

def main(iis_log, report_file, logger):
    parsed_logs = []

for raw_line in iis_log:
    line = raw_line.strip()
    log_entry = {}

if line.startswith("#") or len(line) == 0:
    continue

if '\"' in line:
    line_iter = shlex.shlex(line_iter)
else:
    line_iter = line.split(" ")
    for count, split_entry in enumerate(line_iter):
        col_name, col_pattern = iis_log_format[count]

        if col_pattern.match(split_entry):
            log_entry[col_name] = split_entry
else:
    logger.error("Unknown column pattern discovered. "
        "Line preserved in full below")
        logger.error("Unparsed Line: {}".format(line))
        parsed_logs.append(log_entry)
      
        logger.info("Parsed {} lines".format(len(parsed_logs)))
        cols = [x[0] for x in iis_log_format]
      
        logger.info("Creating report file: {}".format(report_file))
        write_csv(report_file, cols, parsed_logs)
        logger.info("Report created")

最后,我们需要定义一个将输出写入电子表格的方法:

def write_csv(outfile, fieldnames, data):
    with open(outfile, 'w', newline="") as open_outfile:
        csvfile = csv.DictWriter(open_outfile, fieldnames)
        csvfile.writeheader()
        csvfile.writerows(data)

运行上述脚本后,我们将在电子表格中获取基于 Web 服务器的日志。

使用 YARA 扫描重要文件


YARA(Yet Another Recursive Algorithm)是一种模式匹配实用程序,专为恶意软件识别和事件响应而设计。我们将使用 YARA 来扫描文件。在以下 Python 脚本中,我们将使用 YARA。

我们可以通过以下命令安装 YARA:

pip install YARA

我们可以按照以下步骤使用 YARA 规则扫描文件:

  • 一、设置和编译YARA规则

  • 然后,扫描单个文件,然后遍历目录以处理单个文件。

  • 最后,我们将结果导出为 CSV。

Python代码

让我们看看如何为此目的使用 Python 代码:

首先,我们需要导入以下 Python 模块:

from __future__ import print_function
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter

import os
import csv
import yara

接下来,为命令行处理程序提供参数。请注意,这里它将接受两个参数——第一个是 YARA 规则的路径,第二个是要扫描的文件。

if __name__ == '__main__':
    parser = ArgumentParser('Scanning files by YARA')
    parser.add_argument(
        'yara_rules',help = "Path to Yara rule to scan with. May be file or folder path.")
    parser.add_argument('path_to_scan',help = "Path to file or folder to scan")
    parser.add_argument('--output',help = "Path to output a CSV report of scan results")
    args = parser.parse_args()
    main(args.yara_rules, args.path_to_scan, args.output)

现在我们将定义 main() 函数,该函数将接受 yara 规则和要扫描的文件的路径:

def main(yara_rules, path_to_scan, output):
    if os.path.isdir(yara_rules):
        yrules = yara.compile(yara_rules)
    else:
        yrules = yara.compile(filepath=yara_rules)
    if os.path.isdir(path_to_scan):
        match_info = process_directory(yrules, path_to_scan)
    else:
        match_info = process_file(yrules, path_to_scan)
    columns = ['rule_name', 'hit_value', 'hit_offset', 'file_name',
    'rule_string', 'rule_tag']
   
    if output is None:
        write_stdout(columns, match_info)
    else:
        write_csv(output, columns, match_info)

现在,定义一个将遍历目录并将结果传递给另一个方法进行进一步处理的方法:

def process_directory(yrules, folder_path):
    match_info = []
    for root, _, files in os.walk(folder_path):
        for entry in files:
            file_entry = os.path.join(root, entry)
            match_info += process_file(yrules, file_entry)
    return match_info

接下来,定义两个函数。请注意,首先我们将使用 match() 方法 yrules 如果用户未指定任何输出文件,则另一个对象将向控制台报告匹配信息。观察如下代码:

def process_file(yrules, file_path):
    match = yrules.match(file_path)
    match_info = []
   
    for rule_set in match:
        for hit in rule_set.strings:
            match_info.append({
                'file_name': file_path,
                'rule_name': rule_set.rule,
                'rule_tag': ",".join(rule_set.tags),
                'hit_offset': hit[0],
                'rule_string': hit[1],
                'hit_value': hit[2]
            })
    return match_info
def write_stdout(columns, match_info):
    for entry in match_info:
        for col in columns:
            print("{}: {}".format(col, entry[col]))
    print("=" * 30)

最后,我们将定义一个将输出写入 CSV 文件的方法,如下所示:

def write_csv(outfile, fieldnames, data):
    with open(outfile, 'w', newline="") as open_outfile:
        csvfile = csv.DictWriter(open_outfile, fieldnames)
        csvfile.writeheader()
        csvfile.writerows(data)

成功运行上述脚本后,我们可以在命令行中提供适当的参数并生成 CSV 报告。