суббота, 14 июня 2025 г.

Parser, Log, Homework, Lesson01, Parser

Parser, Log, Homework, Lesson01, Parser

D:\VC25\Otus\Py\250530\01_new_project\homework

(base) PS D:\VC25\Otus\Py\250530\01_new_project\homework\1> & "python.exe" batch_parser_05.py D:\VC25\Otus\Py\250530\01_new_project\homework\1\Batches --csv metrics.csv  --html metrics.html

& "python.exe" batch_parser_05.py D:\VC25\Otus\Py\250530\01_new_project\homework\1\Batches --csv metrics.csv  --html metrics.html
import os
import glob
import sys
from typing import List, Optional
from dataclasses import dataclass
import re
import pandas as pd

@dataclass
class LogLine:
    ip: str
    user_ident: Optional[str]
    user_auth: Optional[str]
    timestamp: str
    request: str
    status: int
    size: int
    referrer: Optional[str]
    user_agent: Optional[str]
    request_time: float

LOG_REGEX = re.compile(
    r'^(?P<ip>\S+)\s+'
    r'(?P<user_ident>\S+)\s+'
    r'(?P<user_auth>\S+)\s+'
    r'\[(?P<timestamp>[^\]]+)\]\s+'
    r'"(?P<request>[^"]*)"\s+'
    r'(?P<status>\d{3})\s+'
    r'(?P<size>\d+)\s+'
    r'"(?P<referrer>[^"]*)"\s+'
    r'"(?P<user_agent>[^"]*)"\s+'
    r'(?:".*?"\s+){0,5}'
    r'(?P<request_time>[\d.]+)\s*$'
)

def parse_log_line(line: str) -> Optional[LogLine]:
    m = LOG_REGEX.match(line)
    if not m:
        return None
    d = m.groupdict()
    return LogLine(
        ip=d['ip'],
        user_ident=None if d['user_ident'] == '-' else d['user_ident'],
        user_auth=None if d['user_auth'] == '-' else d['user_auth'],
        timestamp=d['timestamp'],
        request=d['request'],
        status=int(d['status']),
        size=int(d['size']),
        referrer=None if d['referrer'] == '-' else d['referrer'],
        user_agent=None if d['user_agent'] == '-' else d['user_agent'],
        request_time=float(d['request_time'])
    )

def parse_batch_file(batch_file: str) -> List[LogLine]:
    """Парсит один файл, возвращает список LogLine"""
    lines = []
    with open(batch_file, encoding='utf-8') as f:
        for idx, line in enumerate(f, start=1):
            line = line.rstrip('\n')
            entry = parse_log_line(line)
            if entry is None:
                print(f"Ошибка в файле {batch_file} на строке {idx}")
                print(f"Проблемная строка: {line}")
                sys.exit(1)
            lines.append(entry)
    return lines

def parse_batch_directory(directory: str, batch_pattern: str = 'batch_*.txt') -> List[LogLine]:
    """Парсит все файлы в директории, возвращает общий список LogLine"""
    pattern = os.path.join(directory, batch_pattern)
    files = sorted(glob.glob(pattern))
    if not files:
        print(f"Файлы не найдены по шаблону {pattern}")
        sys.exit(1)
    all_lines = []
    for file in files:
        result = parse_batch_file(file)
        all_lines.extend(result)
        print(f"{file}: Проверка завершена успешно! ({len(result)} строк)")
    print("Проверка всех файлов завершена успешно!")
    return all_lines

def loglines_to_dataframe(logs: List[LogLine]) -> pd.DataFrame:
    """Преобразует список LogLine в pandas.DataFrame"""
    df = pd.DataFrame([log.__dict__ for log in logs])
    return df

if __name__ == "__main__":
    import argparse
    import pandas as pd

    parser = argparse.ArgumentParser(description="Batch log parser and checker")
    parser.add_argument("directory", help="Папка с батч-файлами")
    parser.add_argument("--pattern", default="batch_*.txt", help="Шаблон имени файла")
    parser.add_argument("--csv", default=None, help="Сохранить DataFrame в CSV (указать имя файла)")
    parser.add_argument("--html", default=None, help="Сохранить метрики в HTML (metrics.html)")
    args = parser.parse_args()

    # 1. Чтение логов и формирование DataFrame
    logs = parse_batch_directory(args.directory, args.pattern)
    df = loglines_to_dataframe(logs)

    # Можно посмотреть первые строки:
    print(df.head())

    # 2. Извлечение url
    def extract_url(request):
        try:
            return request.split()[1]
        except Exception:
            return None

    df['url'] = df['request'].apply(extract_url)

    total_count = df.shape[0]
    total_time_sum = df['request_time'].sum()

    # Защита на случай пустого DataFrame
    if total_count == 0:
        print("Нет данных для анализа (DataFrame пустой)")
        exit(0)

    # 3. Группировка и расчёт метрик по url
    metrics = (
        df
        .groupby('url')
        .agg(
            count=('url', 'size'),
            time_avg=('request_time', 'mean'),
            time_max=('request_time', 'max'),
            time_min=('request_time', 'min'),
            time_med=('request_time', 'median'),
            time_sum=('request_time', 'sum'),
        )
        .reset_index()
    )
    metrics['count_perc'] = metrics['count'] / total_count * 100
    metrics['time_perc'] = metrics['time_sum'] / total_time_sum * 100

    # Перестановка столбцов
    metrics = metrics[[
        'url', 'count', 'count_perc', 'time_avg', 'time_max', 'time_min', 'time_med', 'time_perc', 'time_sum'
    ]]

    # 4. Сортировка и вывод
    metrics = metrics.sort_values('time_sum', ascending=False)
    print(metrics.head(20).to_string(index=False))

    # 5. Сохраняем в CSV логи (если надо)
    if args.csv:
        df.to_csv(args.csv, index=False)
        print(f"DataFrame записан в файл {args.csv}")

    # 6. Сохраняем таблицу метрик в HTML (если надо)
    if getattr(args, "html", None):
        # Для красивого отображения округлим части метрик
        html_table = metrics.round({
            'count_perc': 3,
            'time_avg': 3,
            'time_max': 3,
            'time_min': 3,
            'time_med': 3,
            'time_perc': 3,
            'time_sum': 3,
        }).to_html(index=False, border=1, justify="center", classes="metrics-table")
        with open(args.html, "w", encoding="utf-8") as f:
            # Добавляем небольшой стиль для читабельности
            f.write("""
            <html>
            <head>
              <meta charset='utf-8'>
              <style>
                body { font-family: Arial, sans-serif; }
                .metrics-table { border-collapse: collapse; }
                .metrics-table th, .metrics-table td { border: 1px solid #999; padding: 4px 8px; }
                .metrics-table th { background-color: #efefef; }
                .metrics-table tbody tr:nth-child(odd) { background-color: #fafafa; }
                /* Ограничиваем ширину первого столбца и разрешаем перенос слов */
                .metrics-table th:first-child,
                .metrics-table td:first-child {
                  max-width: 707px;
                  white-space: nowrap;
                  overflow: hidden;
                  text-overflow: ellipsis;
                }
            </style>
            </head>
            <body>
            <h2>Метрики по URL</h2>
            """)
            f.write(html_table)
            f.write("</body></html>")
        print(f"Метрики сохранены в HTML-файл: {args.html}")


Start from Python
& "python.exe" batch_parser_02.py D:\VC25\Otus\Py\250530\01_new_project\homework\1\Batches
import os
import glob
import sys
from typing import List, Optional
from dataclasses import dataclass
import re
import pandas as pd

@dataclass
class LogLine:
    ip: str
    user_ident: Optional[str]
    user_auth: Optional[str]
    timestamp: str
    request: str
    status: int
    size: int
    referrer: Optional[str]
    user_agent: Optional[str]
    request_time: float

LOG_REGEX = re.compile(
    r'^(?P<ip>\S+)\s+'
    r'(?P<user_ident>\S+)\s+'
    r'(?P<user_auth>\S+)\s+'
    r'\[(?P<timestamp>[^\]]+)\]\s+'
    r'"(?P<request>[^"]*)"\s+'
    r'(?P<status>\d{3})\s+'
    r'(?P<size>\d+)\s+'
    r'"(?P<referrer>[^"]*)"\s+'
    r'"(?P<user_agent>[^"]*)"\s+'
    r'(?:".*?"\s+){0,5}'
    r'(?P<request_time>[\d.]+)\s*$'
)

def parse_log_line(line: str) -> Optional[LogLine]:
    m = LOG_REGEX.match(line)
    if not m:
        return None
    d = m.groupdict()
    return LogLine(
        ip=d['ip'],
        user_ident=None if d['user_ident'] == '-' else d['user_ident'],
        user_auth=None if d['user_auth'] == '-' else d['user_auth'],
        timestamp=d['timestamp'],
        request=d['request'],
        status=int(d['status']),
        size=int(d['size']),
        referrer=None if d['referrer'] == '-' else d['referrer'],
        user_agent=None if d['user_agent'] == '-' else d['user_agent'],
        request_time=float(d['request_time'])
    )

def parse_batch_file(batch_file: str) -> List[LogLine]:
    """Парсит один файл, возвращает список LogLine"""
    lines = []
    with open(batch_file, encoding='utf-8') as f:
        for idx, line in enumerate(f, start=1):
            line = line.rstrip('\n')
            entry = parse_log_line(line)
            if entry is None:
                print(f"Ошибка в файле {batch_file} на строке {idx}")
                print(f"Проблемная строка: {line}")
                sys.exit(1)
            lines.append(entry)
    return lines

def parse_batch_directory(directory: str, batch_pattern: str = 'batch_*.txt') -> List[LogLine]:
    """Парсит все файлы в директории, возвращает общий список LogLine"""
    pattern = os.path.join(directory, batch_pattern)
    files = sorted(glob.glob(pattern))
    if not files:
        print(f"Файлы не найдены по шаблону {pattern}")
        sys.exit(1)
    all_lines = []
    for file in files:
        result = parse_batch_file(file)
        all_lines.extend(result)
        print(f"{file}: Проверка завершена успешно! ({len(result)} строк)")
    print("Проверка всех файлов завершена успешно!")
    return all_lines

def loglines_to_dataframe(logs: List[LogLine]) -> pd.DataFrame:
    """Преобразует список LogLine в pandas.DataFrame"""
    df = pd.DataFrame([log.__dict__ for log in logs])
    return df

if __name__ == "__main__":
    import argparse
    import pandas as pd

    parser = argparse.ArgumentParser(description="Batch log parser and checker")
    parser.add_argument("directory", help="Папка с батч-файлами")
    parser.add_argument("--pattern", default="batch_*.txt", help="Шаблон имени файла")
    parser.add_argument("--csv", default=None, help="Сохранить DataFrame в CSV (указать имя файла)")
    args = parser.parse_args()

    # 1. Чтение логов и формирование DataFrame
    logs = parse_batch_directory(args.directory, args.pattern)
    df = loglines_to_dataframe(logs)

    # Можно посмотреть первые строки:
    print(df.head())

    # 2. Извлечение url
    def extract_url(request):
        try:
            return request.split()[1]
        except Exception:
            return None

    df['url'] = df['request'].apply(extract_url)

    total_count = df.shape[0]
    total_time_sum = df['request_time'].sum()

    # Защита на случай пустого DataFrame
    if total_count == 0:
        print("Нет данных для анализа (DataFrame пустой)")
        exit(0)

    # 3. Группировка и расчёт метрик по url
    metrics = (
        df
        .groupby('url')
        .agg(
            count=('url', 'size'),
            time_avg=('request_time', 'mean'),
            time_max=('request_time', 'max'),
            time_min=('request_time', 'min'),
            time_med=('request_time', 'median'),
            time_sum=('request_time', 'sum'),
        )
        .reset_index()
    )
    metrics['count_perc'] = metrics['count'] / total_count
    metrics['time_perc'] = metrics['time_sum'] / total_time_sum

    # Перестановка столбцов
    metrics = metrics[[
        'url', 'count', 'count_perc', 'time_avg', 'time_max', 'time_min', 'time_med', 'time_perc', 'time_sum'
    ]]

    # 4. Сортировка и вывод
    metrics = metrics.sort_values('time_sum', ascending=False)
    print(metrics.head(20).to_string(index=False))

    # 5. Сохраняем в CSV логи (если надо)
    if args.csv:
        df.to_csv(args.csv, index=False)
        print(f"DataFrame записан в файл {args.csv}")


& "C:/Program Files (x86)/Microsoft Visual Studio/Shared/Python39_64/python.exe" d:/VC25/Otus/Py/250530/01_new_project/homework/1/batch_val-03.py D:\VC25\Otus\Py\250530\01_new_project\homework\1\Batches

Code:
import os
import glob
import sys
from typing import List, Optional
from dataclasses import dataclass
import re

@dataclass
class LogLine:
    ip: str
    user_ident: Optional[str]
    user_auth: Optional[str]
    timestamp: str
    request: str
    status: int
    size: int
    referrer: Optional[str]
    user_agent: Optional[str]
    request_time: float

LOG_REGEX = re.compile(
    r'^(?P<ip>\S+)\s+'
    r'(?P<user_ident>\S+)\s+'
    r'(?P<user_auth>\S+)\s+'
    r'\[(?P<timestamp>[^\]]+)\]\s+'
    r'"(?P<request>[^"]*)"\s+'
    r'(?P<status>\d{3})\s+'
    r'(?P<size>\d+)\s+'
    r'"(?P<referrer>[^"]*)"\s+'
    r'"(?P<user_agent>[^"]*)"\s+'
    r'(?:".*?"\s+){0,5}'
    r'(?P<request_time>[\d.]+)\s*$'
)

def parse_log_line(line: str) -> Optional[LogLine]:
    m = LOG_REGEX.match(line)
    if not m:
        return None
    d = m.groupdict()
    return LogLine(
        ip=d['ip'],
        user_ident=None if d['user_ident'] == '-' else d['user_ident'],
        user_auth=None if d['user_auth'] == '-' else d['user_auth'],
        timestamp=d['timestamp'],
        request=d['request'],
        status=int(d['status']),
        size=int(d['size']),
        referrer=None if d['referrer'] == '-' else d['referrer'],
        user_agent=None if d['user_agent'] == '-' else d['user_agent'],
        request_time=float(d['request_time'])
    )

def process_batch_file(batch_file: str):
    with open(batch_file, encoding='utf-8') as f:
        for idx, line in enumerate(f, start=1):
            line = line.rstrip('\n')
            entry = parse_log_line(line)
            if entry is None:
                print(f"Ошибка в файле {batch_file} на строке {idx}")
                print(f"Проблемная строка: {line}")
                sys.exit(1)

def main(directory: str, batch_pattern: str = 'batch_*.txt'):
    pattern = os.path.join(directory, batch_pattern)
    files = sorted(glob.glob(pattern))
    if not files:
        print(f"Файлы не найдены по шаблону {pattern}")
        sys.exit(1)
    for file in files:
        process_batch_file(file)
        print(f"{file}: Проверка завершена успешно!")
    print("Проверка всех файлов завершена успешно!")

if __name__ == "__main__":
    import argparse
    parser = argparse.ArgumentParser(description="Batch log checker")
    parser.add_argument("directory", help="D:\VC25\Otus\Py\250530\01_new_project\homework\1\Batches")
    parser.add_argument("--pattern", default="batch_*.txt", help="batch_*.txt")
    args = parser.parse_args()
    main(args.directory, args.pattern)

Комментариев нет:

Отправить комментарий