Parser, 250530, Nginx, CommandLine, Projects
Project:
D:\VC25\Otus\Py\250530\01_new_project\homework\1\log_magic_parser1
command line:
& D:/Tools/Anaconda/python.exe
run file
d:/VC25/Otus/Py/250530/01_new_project/homework/1/log_magic_parser1/logmagic_parser.py
directory
D:\VC25\Otus\Py\250530\01_new_project\homework\1\Batche
parameters
--first 5 --pattern batch* --html metrics.html
----------------------------------------------------------------------------
log_parser.py
import logging
import argparse
import sys
from log_files import batch_log_files_select, timer_decorator
from log_core import parse_logs, calc_metrics, save_metrics_to_html, loglines_to_dataframe
# Константа по умолчанию
MIN_REQUESTS = 1000 # Minimum requests per group
def is_gzip_file(filepath):
try:
with open(filepath, "rb") as f:
return f.read(2) == b'\x1f\x8b'
except Exception:
return False
@timer_decorator
def main():
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s')
logging.info("main() Start")
parser = argparse.ArgumentParser(
description="Universal log batch analyzer with gzip/archive support"
)
parser.add_argument('directory', help='Папка с логами (batch и/или архивы .gz)')
parser.add_argument('--pattern', action='append', default=['*20250529*'],
help='Шаблон(ы) поиска файлов, поддерживаются glob, можно указывать несколько')
parser.add_argument('--from-date', type=str, help='Начальная дата файла (ГГГГММДД)')
parser.add_argument('--to-date', type=str, help='Конечная дата файла (ГГГГММДД)')
parser.add_argument('--today', action='store_true', help='Только за сегодня')
parser.add_argument('--yesterday', action='store_true', help='Только за вчера')
parser.add_argument('--first', type=int, default=None, help='Первые N файлов')
parser.add_argument('--last', type=int, default=None, help='Последние N файлов')
parser.add_argument('--dry-run', action='store_true', help='Показать какие файлы будут обработаны и выйти')
parser.add_argument('--html', default=None, help='Сохранить метрики в HTML-файл')
parser.add_argument('--min-request', type=int, default=MIN_REQUESTS, help='Минимальное число запросов в группе (default: %d)' % MIN_REQUESTS)
args = parser.parse_args()
try:
selected = batch_log_files_select(
args.directory,
patterns=args.pattern,
from_date=args.from_date,
to_date=args.to_date,
today=args.today,
yesterday=args.yesterday,
first_n=args.first,
last_n=args.last
)
except Exception as e:
logging.error(f'Ошибка при поиске файлов: {e}')
sys.exit(1)
if not selected:
logging.error('Нет подходящих файлов лога под указанные параметры')
sys.exit(1)
# Оставляем только распакованные файлы для парсинга
# parse_candidates = [f for f in selected if not getattr(f, "is_gzipped", False)]
parse_candidates = [f for f in selected if not is_gzip_file(f.filename)]
if args.dry_run:
print(f"Файлы для обработки ({len(parse_candidates)}):")
for f in parse_candidates:
print(f" [TXT] {f.filename}")
gzipped = [f for f in selected if getattr(f, "is_gzipped", False)]
if gzipped:
print(f"\nИгнорируемые сжатые файлы ({len(gzipped)}):")
for f in gzipped:
print(f" [GZ ] {f.filename}")
sys.exit(0)
if not parse_candidates:
logging.error('Нет ни одного распакованного файла для парсинга (все файлы сжаты)')
sys.exit(1)
logging.info('main() Files for parsing: %d', len(parse_candidates))
for fileinfo in parse_candidates:
logging.info(f"main() Processing file: {fileinfo.filename}")
# Ниже изменен только вход - теперь подаются только «разрешённые» файлы
try:
logs = parse_logs(parse_candidates)
df = loglines_to_dataframe(logs)
except Exception as e:
logging.exception(f'Ошибка при парсинге логов: {e}')
sys.exit(2)
logging.info("print df.head():\n")
print(df.head())
try:
metrics = calc_metrics(df)
if 'count' in metrics.columns:
metrics = metrics[metrics['count'] >= args.min_request]
else:
logging.warning("В данных нет столбца 'count', фильтрация по min_request не применяется")
except Exception as e:
logging.exception(f'Ошибка при расчетах: {e}')
sys.exit(3)
logging.info("print metrics samples:\n")
print(metrics.head(25).to_string(index=False))
print("\n")
if args.html:
save_metrics_to_html(metrics, args.html)
logging.info(f"Метрики сохранены: {args.html}")
logging.info("main() Complete")
if __name__ == '__main__':
main()
log_files.py
import os
import glob
import datetime
import re
from typing import List, Optional
from dataclasses import dataclass
import time
import logging
# Настроим логгер
logging.basicConfig(level=logging.INFO, format="[%(asctime)s] [%(levelname)s]: %(message)s")
def timer_decorator(func):
def wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
execution_time = end_time - start_time
logging.info(f"{func.__name__} executed in {execution_time:.4f} seconds.")
return result
return wrapper
@dataclass
class BatchLogFile:
filename: str
is_gzipped: bool = False
date: Optional[str] = None
def extract_date_from_filename(filename: str) -> Optional[str]:
m = re.search(r'(\d{8})', filename)
return m.group(1) if m else None
@timer_decorator
def find_files_for_parsing(directory: str, patterns: List[str]) -> List[BatchLogFile]:
logging.info("find_files_for_parsing() Start")
files = []
for pat in patterns:
path_plain = os.path.join(directory, pat)
path_gz = path_plain + '.gz' if not pat.endswith('.gz') else path_plain
# И plain, и gz-архивы
for f in glob.glob(path_plain):
files.append(BatchLogFile(filename=f, is_gzipped=False, date=extract_date_from_filename(f)))
for f in glob.glob(path_gz):
files.append(BatchLogFile(filename=f, is_gzipped=True, date=extract_date_from_filename(f)))
# deduplicate by full path (редкий кейс)
unique = {}
for f in files:
unique[(f.filename, f.is_gzipped)] = f
logging.info("find_files_for_parsing() Complete")
return sorted(unique.values(), key=lambda x: x.filename)
def batch_log_files_select(
directory: str,
patterns: List[str] = None,
today=False,
yesterday=False,
from_date: str = None,
to_date: str = None,
first_n: int = None,
last_n: int = None
) -> List[BatchLogFile]:
if patterns is None:
patterns = ['*']
all_found = find_files_for_parsing(directory, patterns)
if not all_found:
return []
# Группировка по "базовому" имени файла (без .gz), чтобы выбрать либо разархивированный, либо gz-архив если первого нет
filemap = {}
for f in all_found:
key = os.path.splitext(f.filename)[0] if f.is_gzipped else f.filename
# Выбирай только неархивный, если есть, иначе архив
if key not in filemap or (not filemap[key].is_gzipped and f.is_gzipped):
filemap[key] = f
files = list(filemap.values())
# Фильтрация по дате
if today or yesterday or from_date or to_date:
now = datetime.datetime.now()
target_from = target_to = None
if today:
target_from = target_to = now.strftime('%Y%m%d')
elif yesterday:
d = now - datetime.timedelta(days=1)
target_from = target_to = d.strftime('%Y%m%d')
else:
target_from = from_date
target_to = to_date if to_date else from_date
files = [
f for f in files
if f.date and target_from <= f.date <= (target_to or f.date)
]
# Сортировка по дате/имени (например, batch_20240301.txt, batch_20240315.txt)
# files = sorted(files, key=lambda f: (f.date or '', f.filename)) *************************************************************
# files = sorted(files, key=lambda f: (f.date or '', os.path.basename(f.filename)))
# files = sorted(files, key=lambda f: int(re.sub(r'\D', '', os.path.basename(f.filename))))
files = sorted(files, key=lambda f: os.path.basename(f.filename))
if first_n:
files = files[:first_n]
elif last_n:
files = files[-last_n:]
return files
log_core.py
import gzip
import pandas as pd
import logging
from dataclasses import asdict
import re
from typing import List, Optional
from log_files import BatchLogFile
from log_files import timer_decorator
import datetime
class LogLine:
__slots__ = ('ip', 'user_ident', 'user_auth', 'timestamp', 'request', 'status', 'size', 'referrer', 'user_agent', 'request_time')
def __init__(self, ip, user_ident, user_auth, timestamp, request, status, size, referrer, user_agent, request_time):
self.ip = ip
self.user_ident = user_ident
self.user_auth = user_auth
self.timestamp = timestamp
self.request = request
self.status = int(status)
self.size = int(size)
self.referrer = referrer
self.user_agent = user_agent
self.request_time = float(request_time)
def to_dict(self):
return {k: getattr(self, k) for k in self.__slots__}
LOG_REGEX = re.compile(
r'^(?P<ip>\S+)\s+'
r'(?P<user_ident>\S+)\s+'
r'(?P<user_auth>\S+)\s+'
r'\[(?P<timestamp>[^\]]+)\]\s+'
r'"(?P<request>[^"]*)"\s+'
r'(?P<status>\d{3})\s+'
r'(?P<size>\d+)\s+'
r'"(?P<referrer>[^"]*)"\s+'
r'"(?P<user_agent>[^"]*)"\s+'
r'(?:".*?"\s+){0,5}'
r'(?P<request_time>[\d.]+)\s*$'
)
def parse_log_line(line: str) -> Optional[LogLine]:
m = LOG_REGEX.match(line)
if not m:
return None
d = m.groupdict()
return LogLine(
d['ip'],
None if d['user_ident'] == '-' else d['user_ident'],
None if d['user_auth'] == '-' else d['user_auth'],
d['timestamp'],
d['request'],
d['status'],
d['size'],
None if d['referrer'] == '-' else d['referrer'],
None if d['user_agent'] == '-' else d['user_agent'],
d['request_time']
)
# def parse_logs(files: List[BatchLogFile]) -> List[LogLine]:
# out = []
# for fileinfo in files:
# ope = gzip.open if fileinfo.is_gzipped else open
# mode = 'rt' if fileinfo.is_gzipped else 'r'
# with ope(fileinfo.filename, mode, encoding='utf8') as f:
# for idx, line in enumerate(f, 1):
# line = line.rstrip('\n')
# l = parse_log_line(line)
# if not l:
# raise ValueError(f'Ошибка парсинга {fileinfo.filename} строка {idx}')
# out.append(l)
# return out
# Настройка логгера
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
@timer_decorator
def parse_logs(files: List[BatchLogFile]) -> List[dict]:
out = []
logger.info("parse_logs() Start")
for fileinfo in files:
try:
logger.info(f"{fileinfo.filename} Try to parse")
ope = gzip.open if fileinfo.is_gzipped else open
mode = 'rt' if fileinfo.is_gzipped else 'r'
with ope(fileinfo.filename, mode, encoding='utf8') as f:
for idx, line in enumerate(f, 1):
line = line.rstrip('\n')
l = parse_log_line(line)
if not l:
raise ValueError(f'Ошибка парсинга {fileinfo.filename}, строка {idx}')
out.append(l)
logger.info(f"{fileinfo.filename} Ok")
except OSError as e:
# Логируем проблему открытия файла
logger.error(f"{fileinfo.filename}: {e} Failure")
return out
@timer_decorator
def loglines_to_dataframe(logs: List[LogLine]) -> pd.DataFrame:
return pd.DataFrame([l.to_dict() for l in logs])
def extract_url(request: str) -> Optional[str]:
try:
return request.split()[1]
except Exception:
return None
@timer_decorator
def calc_metrics(df: pd.DataFrame) -> pd.DataFrame:
if 'url' not in df.columns:
df['url'] = df['request'].apply(extract_url)
total_count = df.shape[0]
total_time_sum = df['request_time'].sum()
if total_count == 0:
raise ValueError("Нет данных для анализа (DataFrame пустой)")
metrics = (
df
.groupby('url')
.agg(
count=('url', 'size'),
time_avg=('request_time', 'mean'),
time_max=('request_time', 'max'),
time_min=('request_time', 'min'),
time_med=('request_time', 'median'),
time_sum=('request_time', 'sum'),
)
.reset_index()
)
metrics['count_perc'] = metrics['count'] / total_count * 100.0
metrics['time_perc'] = metrics['time_sum'] / total_time_sum * 100.0
metrics = metrics[[
'url', 'count', 'count_perc', 'time_avg', 'time_max', 'time_min', 'time_med', 'time_perc', 'time_sum'
]]
metrics = metrics.sort_values('time_sum', ascending=False)
return metrics
@timer_decorator
def save_metrics_to_html(metrics: pd.DataFrame, filename: str):
html_table = metrics.round({
'count_perc': 3,
'time_avg': 3,
'time_max': 3,
'time_min': 3,
'time_med': 3,
'time_perc': 3,
'time_sum': 3,
}).to_html(index=False, border=1, justify="center", classes="metrics-table")
with open(filename, "w", encoding="utf-8") as f:
f.write("""
<html>
<head>
<meta charset='utf-8'>
<style>
body { font-family: Arial, sans-serif; }
.metrics-table { border-collapse: collapse; }
.metrics-table th, .metrics-table td { border: 1px solid #999; padding: 4px 8px; }
.metrics-table th { background-color: #efefef; }
.metrics-table tbody tr:nth-child(odd) { background-color: #fafafa; }
.metrics-table th:first-child,
.metrics-table td:first-child {
max-width: 707px;
white-space: nowrap;
overflow: hidden;
text-overflow: ellipsis;
}
</style>
</head>
<body>
<h2>Метрики по URL</h2>
""")
f.write(html_table)
f.write("</body></html>")
Комментариев нет:
Отправить комментарий