воскресенье, 15 июня 2025 г.

Parser, 250530

Parser, 250530

import log_parser
import loggin_support

#import log_parser_project.log_parser.loggin_support as loggin_support
import logging

import argparse

from log_parser import (
    parse_batch_directory,
    loglines_to_dataframe,
    calc_metrics,
    save_metrics_to_html
)

def main():
    logging.info("Start main()")
    parser = argparse.ArgumentParser(description="Batch log parser and checker")
    parser.add_argument("directory", help="Папка с батч-файлами")
    #parser.add_argument("--pattern", default="batch_*.txt", help="Шаблон имени файла")
    parser.add_argument("--pattern", default="*nginx*", help="Шаблон имени файла")

    group = parser.add_mutually_exclusive_group()
    group.add_argument('--today', action='store_true', help="Взять только файлы за сегодня")
    group.add_argument('--yesterday', action='store_true', help="Взять только файлы за вчера")
    group.add_argument('--all', action='store_true', help="Взять все батч-файлы (по умолчанию)")
    group.add_argument('--first', type=int, metavar='N', help="Взять первые N батч-файлов")
    group.add_argument('--last', type=int, metavar='N', help="Взять последние N батч-файлов")

    parser.add_argument("--html", default=None, help="Сохранить метрики в HTML (metrics.html)")
    args = parser.parse_args()

    try:
        logs = parse_batch_directory(
            args.directory,
            batch_pattern=args.pattern,
            today=args.today,
            yesterday=args.yesterday,
            use_all=args.all,
            first_n=args.first,
            last_n=args.last
        )
        df = loglines_to_dataframe(logs)
    except Exception as e:
        print(e)
        exit(1)

    print(df.head())

    try:
        metrics = calc_metrics(df)
        print(metrics.head(20).to_string(index=False))
    except Exception as e:
        print(e)
        exit(1)

    if args.html:
        save_metrics_to_html(metrics, args.html)
        print(f"Метрики сохранены в HTML-файл: {args.html}")

    logging.info("Complete main()")

if __name__ == "__main__":
    main()


# loggin_suport.py
import logging

log_format = "%(asctime)s %(message)s"
date_format = "%d-%m-%y %H:%M:%S"

file_handler = logging.FileHandler("log_file.log", encoding="utf-8")
file_handler.setFormatter(logging.Formatter(log_format, datefmt=date_format))

console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter(log_format, datefmt=date_format))

logging.basicConfig(
    level=logging.INFO,
    handlers=[file_handler, console_handler]
)


import loggin_support
import os
import glob
import sys
import re

import logging

from typing import List, Optional, Tuple
from dataclasses import dataclass
import pandas as pd
import time
import gzip
from datetime import datetime, timedelta

def log_time(func):
    def wrapper(*args, **kwargs):
        start = time.time()
        result = func(*args, **kwargs)
        end = time.time()
        print(f"Время выполнения {func.__name__}: {end - start:.4f} секунд")
        return result
    return wrapper

@dataclass
class LogLine:
    ip: str
    user_ident: Optional[str]
    user_auth: Optional[str]
    timestamp: str
    request: str
    status: int
    size: int
    referrer: Optional[str]
    user_agent: Optional[str]
    request_time: float

LOG_REGEX = re.compile(
    r'^(?P<ip>\S+)\s+'
    r'(?P<user_ident>\S+)\s+'
    r'(?P<user_auth>\S+)\s+'
    r'\[(?P<timestamp>[^\]]+)\]\s+'
    r'"(?P<request>[^"]*)"\s+'
    r'(?P<status>\d{3})\s+'
    r'(?P<size>\d+)\s+'
    r'"(?P<referrer>[^"]*)"\s+'
    r'"(?P<user_agent>[^"]*)"\s+'
    r'(?:".*?"\s+){0,5}'
    r'(?P<request_time>[\d.]+)\s*$'
)

def parse_log_line(line: str) -> Optional[LogLine]:
    m = LOG_REGEX.match(line)
    if not m:
        return None
    d = m.groupdict()
    return LogLine(
        ip=d['ip'],
        user_ident=None if d['user_ident'] == '-' else d['user_ident'],
        user_auth=None if d['user_auth'] == '-' else d['user_auth'],
        timestamp=d['timestamp'],
        request=d['request'],
        status=int(d['status']),
        size=int(d['size']),
        referrer=None if d['referrer'] == '-' else d['referrer'],
        user_agent=None if d['user_agent'] == '-' else d['user_agent'],
        request_time=float(d['request_time'])
    )

@log_time
def parse_batch_file(batch_file: str, is_gz: bool = False) -> List[LogLine]:
    logging.info("Start  parse_batch_file()")
    lines = []
    open_func = gzip.open if is_gz else open
    mode = 'rt' if is_gz else 'r'
    with open_func(batch_file, mode, encoding='utf-8') as f:
        for idx, line in enumerate(f, start=1):
            line = line.rstrip('\n')
            entry = parse_log_line(line)
            if entry is None:
                raise ValueError(f"Ошибка в файле {batch_file} на строке {idx}", line)
            lines.append(entry)
    logging.info("Finish parse_batch_file()")
    return lines

@log_time
def find_batch_files(directory: str, pattern: str) -> List[Tuple[bool, str]]:
    # True - gzipped, False - plain
    pattern_plain = os.path.join(directory, pattern)
    pattern_gz = pattern_plain + '.gz'
    files_plain = sorted(glob.glob(pattern_plain))
    files_gz = sorted(glob.glob(pattern_gz))
    result = [(False, fn) for fn in files_plain] + [(True, fn) for fn in files_gz]
    # sort again, so gz and plain together in order (by name)
    result.sort(key=lambda x: x[1])
    return result

def extract_date_from_filename(filename: str) -> Optional[str]:
    # Ожидается что дата есть в формате ГГГГММДД в имени файла, пример batch_20240107.txt(.gz)
    m = re.search(r'(\d{8})', filename)
    if m:
        return m.group(1)
    return None

def select_files(files: List[Tuple[bool, str]], today=False, yesterday=False, use_all=False, first_n=None, last_n=None) -> List[Tuple[bool, str]]:
    if not files:
        return []
    if today or yesterday:
        date_to_file = {}
        for is_gz, fname in files:
            date = extract_date_from_filename(fname)
            if date:
                date_to_file[date] = date_to_file.get(date, []) + [(is_gz, fname)]
        now = datetime.now()
        if today:
            target = now.strftime('%Y%m%d')
        else:  # yesterday
            target = (now - timedelta(days=1)).strftime('%Y%m%d')
        return date_to_file.get(target, [])
    elif first_n:
        return files[:first_n]
    elif last_n:
        return files[-last_n:]
    else:  # use_all by default
        return files

@log_time
def parse_batch_directory(directory: str, batch_pattern: str = 'batch_*.txt', today=False, yesterday=False, use_all=False, first_n=None, last_n=None) -> List[LogLine]:
    files = find_batch_files(directory, batch_pattern)
    files = select_files(files, today=today, yesterday=yesterday, use_all=use_all, first_n=first_n, last_n=last_n)
    logging.info("Start  parse_batch_directory()")
    if not files:
        raise FileNotFoundError("Нет файлов для обработки согласно выбранному режиму")
    all_lines = []
    for is_gz, filename in files:
        result = parse_batch_file(filename, is_gz=is_gz)
        all_lines.extend(result)
    logging.info("Finish parse_batch_directory()")
    return all_lines

@log_time
def loglines_to_dataframe(logs: List[LogLine]) -> pd.DataFrame:
    return pd.DataFrame([log.__dict__ for log in logs])

def extract_url(request: str) -> Optional[str]:
    try:
        return request.split()[1]
    except Exception:
        return None

@log_time
def calc_metrics(df: pd.DataFrame) -> pd.DataFrame:
    logging.info("Start  calc_metrics()")
    if 'url' not in df.columns:
        df['url'] = df['request'].apply(extract_url)

    total_count = df.shape[0]
    total_time_sum = df['request_time'].sum()

    if total_count == 0:
        raise ValueError("Нет данных для анализа (DataFrame пустой)")

    metrics = (
        df
        .groupby('url')
        .agg(
            count=('url', 'size'),
            time_avg=('request_time', 'mean'),
            time_max=('request_time', 'max'),
            time_min=('request_time', 'min'),
            time_med=('request_time', 'median'),
            time_sum=('request_time', 'sum'),
        )
        .reset_index()
    )
    metrics['count_perc'] = metrics['count'] / total_count * 100
    metrics['time_perc'] = metrics['time_sum'] / total_time_sum * 100

    metrics = metrics[[
        'url', 'count', 'count_perc', 'time_avg', 'time_max', 'time_min', 'time_med', 'time_perc', 'time_sum'
    ]]
    metrics = metrics.sort_values('time_sum', ascending=False)
    logging.info("Finish calc_metrics()")
    return metrics

@log_time
def save_metrics_to_html(metrics: pd.DataFrame, filename: str):
    html_table = metrics.round({
        'count_perc': 3,
        'time_avg': 3,
        'time_max': 3,
        'time_min': 3,
        'time_med': 3,
        'time_perc': 3,
        'time_sum': 3,
    }).to_html(index=False, border=1, justify="center", classes="metrics-table")
    with open(filename, "w", encoding="utf-8") as f:
        f.write("""
        <html>
        <head>
          <meta charset='utf-8'>
          <style>
            body { font-family: Arial, sans-serif; }
            .metrics-table { border-collapse: collapse; }
            .metrics-table th, .metrics-table td { border: 1px solid #999; padding: 4px 8px; }
            .metrics-table th { background-color: #efefef; }
            .metrics-table tbody tr:nth-child(odd) { background-color: #fafafa; }
            .metrics-table th:first-child,
            .metrics-table td:first-child {
              max-width: 707px;
              white-space: nowrap;
              overflow: hidden;
              text-overflow: ellipsis;
            }
        </style>
        </head>
        <body>
        <h2>Метрики по URL</h2>
        """)
        f.write(html_table)
        f.write("</body></html>")


Комментариев нет:

Отправить комментарий