вторник, 17 июня 2025 г.

Parser, 250530, Nginx, CommandLine

Parser, 250530, Nginx, CommandLine, Projects

Project:

D:\VC25\Otus\Py\250530\01_new_project\homework\1\log_magic_parser1

command line:

& D:/Tools/Anaconda/python.exe

run file

d:/VC25/Otus/Py/250530/01_new_project/homework/1/log_magic_parser1/logmagic_parser.py

directory

D:\VC25\Otus\Py\250530\01_new_project\homework\1\Batche

parameters

 --first 5   --pattern batch* --html metrics.html

----------------------------------------------------------------------------

log_parser.py

import logging
import argparse
import sys
from log_files import batch_log_files_select, timer_decorator
from log_core import parse_logs, calc_metrics, save_metrics_to_html, loglines_to_dataframe

# Константа по умолчанию
MIN_REQUESTS = 1000  # Minimum requests per group

def is_gzip_file(filepath):
    try:
        with open(filepath, "rb") as f:
            return f.read(2) == b'\x1f\x8b'
    except Exception:
        return False

@timer_decorator
def main():
    logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s')
    logging.info("main() Start")

    parser = argparse.ArgumentParser(
        description="Universal log batch analyzer with gzip/archive support"
    )
    parser.add_argument('directory', help='Папка с логами (batch и/или архивы .gz)')
    parser.add_argument('--pattern', action='append', default=['*20250529*'],
                        help='Шаблон(ы) поиска файлов, поддерживаются glob, можно указывать несколько')
    parser.add_argument('--from-date', type=str, help='Начальная дата файла (ГГГГММДД)')
    parser.add_argument('--to-date', type=str, help='Конечная дата файла (ГГГГММДД)')
    parser.add_argument('--today', action='store_true', help='Только за сегодня')
    parser.add_argument('--yesterday', action='store_true', help='Только за вчера')
    parser.add_argument('--first', type=int, default=None, help='Первые N файлов')
    parser.add_argument('--last', type=int, default=None, help='Последние N файлов')
    parser.add_argument('--dry-run', action='store_true', help='Показать какие файлы будут обработаны и выйти')
    parser.add_argument('--html', default=None, help='Сохранить метрики в HTML-файл')
    parser.add_argument('--min-request', type=int, default=MIN_REQUESTS, help='Минимальное число запросов в группе (default: %d)' % MIN_REQUESTS)
    args = parser.parse_args()

    try:
        selected = batch_log_files_select(
            args.directory,
            patterns=args.pattern,
            from_date=args.from_date,
            to_date=args.to_date,
            today=args.today,
            yesterday=args.yesterday,
            first_n=args.first,
            last_n=args.last
        )
    except Exception as e:
        logging.error(f'Ошибка при поиске файлов: {e}')
        sys.exit(1)

    if not selected:
        logging.error('Нет подходящих файлов лога под указанные параметры')
        sys.exit(1)

    # Оставляем только распакованные файлы для парсинга
    # parse_candidates = [f for f in selected if not getattr(f, "is_gzipped", False)]
    parse_candidates = [f for f in selected if not is_gzip_file(f.filename)]

    if args.dry_run:
        print(f"Файлы для обработки ({len(parse_candidates)}):")
        for f in parse_candidates:
            print(f"    [TXT] {f.filename}")
        gzipped = [f for f in selected if getattr(f, "is_gzipped", False)]
        if gzipped:
            print(f"\nИгнорируемые сжатые файлы ({len(gzipped)}):")
            for f in gzipped:
                print(f"    [GZ ] {f.filename}")
        sys.exit(0)

    if not parse_candidates:
        logging.error('Нет ни одного распакованного файла для парсинга (все файлы сжаты)')
        sys.exit(1)

    logging.info('main() Files for parsing: %d', len(parse_candidates))
    for fileinfo in parse_candidates:
        logging.info(f"main() Processing file: {fileinfo.filename}")

    # Ниже изменен только вход - теперь подаются только «разрешённые» файлы
    try:
        logs = parse_logs(parse_candidates)
        df = loglines_to_dataframe(logs)
    except Exception as e:
        logging.exception(f'Ошибка при парсинге логов: {e}')
        sys.exit(2)

    logging.info("print df.head():\n")
    print(df.head())

    try:
        metrics = calc_metrics(df)
        if 'count' in metrics.columns:
            metrics = metrics[metrics['count'] >= args.min_request]
        else:
            logging.warning("В данных нет столбца 'count', фильтрация по min_request не применяется")
    except Exception as e:
        logging.exception(f'Ошибка при расчетах: {e}')
        sys.exit(3)

    logging.info("print metrics samples:\n")
    print(metrics.head(25).to_string(index=False))
    print("\n")

    if args.html:
        save_metrics_to_html(metrics, args.html)
        logging.info(f"Метрики сохранены: {args.html}")

    logging.info("main() Complete")

if __name__ == '__main__':
    main()

log_files.py

import os
import glob
import datetime
import re
from typing import List, Optional
from dataclasses import dataclass

import time
import logging

# Настроим логгер
logging.basicConfig(level=logging.INFO, format="[%(asctime)s] [%(levelname)s]: %(message)s")

def timer_decorator(func):
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        execution_time = end_time - start_time
        logging.info(f"{func.__name__} executed in {execution_time:.4f} seconds.")
        return result
    return wrapper


@dataclass
class BatchLogFile:
    filename: str
    is_gzipped: bool = False
    date: Optional[str] = None

def extract_date_from_filename(filename: str) -> Optional[str]:
    m = re.search(r'(\d{8})', filename)
    return m.group(1) if m else None

@timer_decorator
def find_files_for_parsing(directory: str, patterns: List[str]) -> List[BatchLogFile]:
    logging.info("find_files_for_parsing() Start")
    files = []
    for pat in patterns:
        path_plain = os.path.join(directory, pat)
        path_gz = path_plain + '.gz' if not pat.endswith('.gz') else path_plain
        # И plain, и gz-архивы
        for f in glob.glob(path_plain):
            files.append(BatchLogFile(filename=f, is_gzipped=False, date=extract_date_from_filename(f)))
        for f in glob.glob(path_gz):
            files.append(BatchLogFile(filename=f, is_gzipped=True, date=extract_date_from_filename(f)))
    # deduplicate by full path (редкий кейс)
    unique = {}
    for f in files:
        unique[(f.filename, f.is_gzipped)] = f

    logging.info("find_files_for_parsing() Complete")
    return sorted(unique.values(), key=lambda x: x.filename)

def batch_log_files_select(
    directory: str,
    patterns: List[str] = None,
    today=False,
    yesterday=False,
    from_date: str = None,
    to_date: str = None,
    first_n: int = None,
    last_n: int = None
) -> List[BatchLogFile]:
    if patterns is None:
        patterns = ['*']

    all_found = find_files_for_parsing(directory, patterns)
    if not all_found:
        return []

    # Группировка по "базовому" имени файла (без .gz), чтобы выбрать либо разархивированный, либо gz-архив если первого нет
    filemap = {}
    for f in all_found:
        key = os.path.splitext(f.filename)[0] if f.is_gzipped else f.filename
        # Выбирай только неархивный, если есть, иначе архив
        if key not in filemap or (not filemap[key].is_gzipped and f.is_gzipped):
            filemap[key] = f

    files = list(filemap.values())

    # Фильтрация по дате
    if today or yesterday or from_date or to_date:
        now = datetime.datetime.now()
        target_from = target_to = None
        if today:
            target_from = target_to = now.strftime('%Y%m%d')
        elif yesterday:
            d = now - datetime.timedelta(days=1)
            target_from = target_to = d.strftime('%Y%m%d')
        else:
            target_from = from_date
            target_to = to_date if to_date else from_date
        files = [
            f for f in files
            if f.date and target_from <= f.date <= (target_to or f.date)
        ]
    # Сортировка по дате/имени (например, batch_20240301.txt, batch_20240315.txt)
    # files = sorted(files, key=lambda f: (f.date or '', f.filename)) *************************************************************
    # files = sorted(files, key=lambda f: (f.date or '', os.path.basename(f.filename)))
    # files = sorted(files, key=lambda f: int(re.sub(r'\D', '', os.path.basename(f.filename))))
    files = sorted(files, key=lambda f: os.path.basename(f.filename))
    if first_n:
        files = files[:first_n]
    elif last_n:
        files = files[-last_n:]
    return files

log_core.py

import gzip
import pandas as pd
import logging
from dataclasses import asdict
import re
from typing import List, Optional
from log_files import BatchLogFile

from log_files import timer_decorator

import datetime

class LogLine:
    __slots__ = ('ip', 'user_ident', 'user_auth', 'timestamp', 'request', 'status', 'size', 'referrer', 'user_agent', 'request_time')
    def __init__(self, ip, user_ident, user_auth, timestamp, request, status, size, referrer, user_agent, request_time):
        self.ip = ip
        self.user_ident = user_ident
        self.user_auth = user_auth
        self.timestamp = timestamp
        self.request = request
        self.status = int(status)
        self.size = int(size)
        self.referrer = referrer
        self.user_agent = user_agent
        self.request_time = float(request_time)
    def to_dict(self):
        return {k: getattr(self, k) for k in self.__slots__}

LOG_REGEX = re.compile(
    r'^(?P<ip>\S+)\s+'
    r'(?P<user_ident>\S+)\s+'
    r'(?P<user_auth>\S+)\s+'
    r'\[(?P<timestamp>[^\]]+)\]\s+'
    r'"(?P<request>[^"]*)"\s+'
    r'(?P<status>\d{3})\s+'
    r'(?P<size>\d+)\s+'
    r'"(?P<referrer>[^"]*)"\s+'
    r'"(?P<user_agent>[^"]*)"\s+'
    r'(?:".*?"\s+){0,5}'
    r'(?P<request_time>[\d.]+)\s*$'
)

def parse_log_line(line: str) -> Optional[LogLine]:
    m = LOG_REGEX.match(line)
    if not m:
        return None
    d = m.groupdict()
    return LogLine(
        d['ip'],
        None if d['user_ident'] == '-' else d['user_ident'],
        None if d['user_auth'] == '-' else d['user_auth'],
        d['timestamp'],
        d['request'],
        d['status'],
        d['size'],
        None if d['referrer'] == '-' else d['referrer'],
        None if d['user_agent'] == '-' else d['user_agent'],
        d['request_time']
    )

# def parse_logs(files: List[BatchLogFile]) -> List[LogLine]:
#     out = []
#     for fileinfo in files:
#         ope = gzip.open if fileinfo.is_gzipped else open
#         mode = 'rt' if fileinfo.is_gzipped else 'r'
#         with ope(fileinfo.filename, mode, encoding='utf8') as f:
#             for idx, line in enumerate(f, 1):
#                 line = line.rstrip('\n')
#                 l = parse_log_line(line)
#                 if not l:
#                     raise ValueError(f'Ошибка парсинга {fileinfo.filename} строка {idx}')
#                 out.append(l)
#     return out
# Настройка логгера
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

@timer_decorator
def parse_logs(files: List[BatchLogFile]) -> List[dict]:
    out = []
    logger.info("parse_logs() Start")
    for fileinfo in files:    
        try:
            logger.info(f"{fileinfo.filename} Try to parse")
            ope = gzip.open if fileinfo.is_gzipped else open
            mode = 'rt' if fileinfo.is_gzipped else 'r'
           
            with ope(fileinfo.filename, mode, encoding='utf8') as f:                                
                for idx, line in enumerate(f, 1):
                    line = line.rstrip('\n')
                    l = parse_log_line(line)                  
                    if not l:
                        raise ValueError(f'Ошибка парсинга {fileinfo.filename}, строка {idx}')                        
                    out.append(l)
                logger.info(f"{fileinfo.filename} Ok")
        except OSError as e:
            # Логируем проблему открытия файла
            logger.error(f"{fileinfo.filename}: {e} Failure")
   
    return out

@timer_decorator
def loglines_to_dataframe(logs: List[LogLine]) -> pd.DataFrame:
    return pd.DataFrame([l.to_dict() for l in logs])

def extract_url(request: str) -> Optional[str]:
    try:
        return request.split()[1]
    except Exception:
        return None

@timer_decorator
def calc_metrics(df: pd.DataFrame) -> pd.DataFrame:
    if 'url' not in df.columns:
        df['url'] = df['request'].apply(extract_url)

    total_count = df.shape[0]
    total_time_sum = df['request_time'].sum()
    if total_count == 0:
        raise ValueError("Нет данных для анализа (DataFrame пустой)")

    metrics = (
        df
        .groupby('url')
        .agg(
            count=('url', 'size'),
            time_avg=('request_time', 'mean'),
            time_max=('request_time', 'max'),
            time_min=('request_time', 'min'),
            time_med=('request_time', 'median'),
            time_sum=('request_time', 'sum'),
        )
        .reset_index()
    )
    metrics['count_perc'] = metrics['count'] / total_count * 100.0
    metrics['time_perc'] = metrics['time_sum'] / total_time_sum * 100.0

    metrics = metrics[[
        'url', 'count', 'count_perc', 'time_avg', 'time_max', 'time_min', 'time_med', 'time_perc', 'time_sum'
    ]]
    metrics = metrics.sort_values('time_sum', ascending=False)
    return metrics

@timer_decorator
def save_metrics_to_html(metrics: pd.DataFrame, filename: str):
    html_table = metrics.round({
        'count_perc': 3,
        'time_avg': 3,
        'time_max': 3,
        'time_min': 3,
        'time_med': 3,
        'time_perc': 3,
        'time_sum': 3,
    }).to_html(index=False, border=1, justify="center", classes="metrics-table")
    with open(filename, "w", encoding="utf-8") as f:
        f.write("""
        <html>
        <head>
          <meta charset='utf-8'>
          <style>
            body { font-family: Arial, sans-serif; }
            .metrics-table { border-collapse: collapse; }
            .metrics-table th, .metrics-table td { border: 1px solid #999; padding: 4px 8px; }
            .metrics-table th { background-color: #efefef; }
            .metrics-table tbody tr:nth-child(odd) { background-color: #fafafa; }
            .metrics-table th:first-child,
            .metrics-table td:first-child {
              max-width: 707px;
              white-space: nowrap;
              overflow: hidden;
              text-overflow: ellipsis;
            }
        </style>
        </head>
        <body>
        <h2>Метрики по URL</h2>
        """)
        f.write(html_table)
        f.write("</body></html>")


Комментариев нет:

Отправить комментарий