Parser, 250530
import log_parser
import loggin_support
#import log_parser_project.log_parser.loggin_support as loggin_support
import logging
import argparse
from log_parser import (
parse_batch_directory,
loglines_to_dataframe,
calc_metrics,
save_metrics_to_html
)
def main():
logging.info("Start main()")
parser = argparse.ArgumentParser(description="Batch log parser and checker")
parser.add_argument("directory", help="Папка с батч-файлами")
#parser.add_argument("--pattern", default="batch_*.txt", help="Шаблон имени файла")
parser.add_argument("--pattern", default="*nginx*", help="Шаблон имени файла")
group = parser.add_mutually_exclusive_group()
group.add_argument('--today', action='store_true', help="Взять только файлы за сегодня")
group.add_argument('--yesterday', action='store_true', help="Взять только файлы за вчера")
group.add_argument('--all', action='store_true', help="Взять все батч-файлы (по умолчанию)")
group.add_argument('--first', type=int, metavar='N', help="Взять первые N батч-файлов")
group.add_argument('--last', type=int, metavar='N', help="Взять последние N батч-файлов")
parser.add_argument("--html", default=None, help="Сохранить метрики в HTML (metrics.html)")
args = parser.parse_args()
try:
logs = parse_batch_directory(
args.directory,
batch_pattern=args.pattern,
today=args.today,
yesterday=args.yesterday,
use_all=args.all,
first_n=args.first,
last_n=args.last
)
df = loglines_to_dataframe(logs)
except Exception as e:
print(e)
exit(1)
print(df.head())
try:
metrics = calc_metrics(df)
print(metrics.head(20).to_string(index=False))
except Exception as e:
print(e)
exit(1)
if args.html:
save_metrics_to_html(metrics, args.html)
print(f"Метрики сохранены в HTML-файл: {args.html}")
logging.info("Complete main()")
if __name__ == "__main__":
main()
# loggin_suport.py
import logging
log_format = "%(asctime)s %(message)s"
date_format = "%d-%m-%y %H:%M:%S"
file_handler = logging.FileHandler("log_file.log", encoding="utf-8")
file_handler.setFormatter(logging.Formatter(log_format, datefmt=date_format))
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter(log_format, datefmt=date_format))
logging.basicConfig(
level=logging.INFO,
handlers=[file_handler, console_handler]
)
import loggin_support
import os
import glob
import sys
import re
import logging
from typing import List, Optional, Tuple
from dataclasses import dataclass
import pandas as pd
import time
import gzip
from datetime import datetime, timedelta
def log_time(func):
def wrapper(*args, **kwargs):
start = time.time()
result = func(*args, **kwargs)
end = time.time()
print(f"Время выполнения {func.__name__}: {end - start:.4f} секунд")
return result
return wrapper
@dataclass
class LogLine:
ip: str
user_ident: Optional[str]
user_auth: Optional[str]
timestamp: str
request: str
status: int
size: int
referrer: Optional[str]
user_agent: Optional[str]
request_time: float
LOG_REGEX = re.compile(
r'^(?P<ip>\S+)\s+'
r'(?P<user_ident>\S+)\s+'
r'(?P<user_auth>\S+)\s+'
r'\[(?P<timestamp>[^\]]+)\]\s+'
r'"(?P<request>[^"]*)"\s+'
r'(?P<status>\d{3})\s+'
r'(?P<size>\d+)\s+'
r'"(?P<referrer>[^"]*)"\s+'
r'"(?P<user_agent>[^"]*)"\s+'
r'(?:".*?"\s+){0,5}'
r'(?P<request_time>[\d.]+)\s*$'
)
def parse_log_line(line: str) -> Optional[LogLine]:
m = LOG_REGEX.match(line)
if not m:
return None
d = m.groupdict()
return LogLine(
ip=d['ip'],
user_ident=None if d['user_ident'] == '-' else d['user_ident'],
user_auth=None if d['user_auth'] == '-' else d['user_auth'],
timestamp=d['timestamp'],
request=d['request'],
status=int(d['status']),
size=int(d['size']),
referrer=None if d['referrer'] == '-' else d['referrer'],
user_agent=None if d['user_agent'] == '-' else d['user_agent'],
request_time=float(d['request_time'])
)
@log_time
def parse_batch_file(batch_file: str, is_gz: bool = False) -> List[LogLine]:
logging.info("Start parse_batch_file()")
lines = []
open_func = gzip.open if is_gz else open
mode = 'rt' if is_gz else 'r'
with open_func(batch_file, mode, encoding='utf-8') as f:
for idx, line in enumerate(f, start=1):
line = line.rstrip('\n')
entry = parse_log_line(line)
if entry is None:
raise ValueError(f"Ошибка в файле {batch_file} на строке {idx}", line)
lines.append(entry)
logging.info("Finish parse_batch_file()")
return lines
@log_time
def find_batch_files(directory: str, pattern: str) -> List[Tuple[bool, str]]:
# True - gzipped, False - plain
pattern_plain = os.path.join(directory, pattern)
pattern_gz = pattern_plain + '.gz'
files_plain = sorted(glob.glob(pattern_plain))
files_gz = sorted(glob.glob(pattern_gz))
result = [(False, fn) for fn in files_plain] + [(True, fn) for fn in files_gz]
# sort again, so gz and plain together in order (by name)
result.sort(key=lambda x: x[1])
return result
def extract_date_from_filename(filename: str) -> Optional[str]:
# Ожидается что дата есть в формате ГГГГММДД в имени файла, пример batch_20240107.txt(.gz)
m = re.search(r'(\d{8})', filename)
if m:
return m.group(1)
return None
def select_files(files: List[Tuple[bool, str]], today=False, yesterday=False, use_all=False, first_n=None, last_n=None) -> List[Tuple[bool, str]]:
if not files:
return []
if today or yesterday:
date_to_file = {}
for is_gz, fname in files:
date = extract_date_from_filename(fname)
if date:
date_to_file[date] = date_to_file.get(date, []) + [(is_gz, fname)]
now = datetime.now()
if today:
target = now.strftime('%Y%m%d')
else: # yesterday
target = (now - timedelta(days=1)).strftime('%Y%m%d')
return date_to_file.get(target, [])
elif first_n:
return files[:first_n]
elif last_n:
return files[-last_n:]
else: # use_all by default
return files
@log_time
def parse_batch_directory(directory: str, batch_pattern: str = 'batch_*.txt', today=False, yesterday=False, use_all=False, first_n=None, last_n=None) -> List[LogLine]:
files = find_batch_files(directory, batch_pattern)
files = select_files(files, today=today, yesterday=yesterday, use_all=use_all, first_n=first_n, last_n=last_n)
logging.info("Start parse_batch_directory()")
if not files:
raise FileNotFoundError("Нет файлов для обработки согласно выбранному режиму")
all_lines = []
for is_gz, filename in files:
result = parse_batch_file(filename, is_gz=is_gz)
all_lines.extend(result)
logging.info("Finish parse_batch_directory()")
return all_lines
@log_time
def loglines_to_dataframe(logs: List[LogLine]) -> pd.DataFrame:
return pd.DataFrame([log.__dict__ for log in logs])
def extract_url(request: str) -> Optional[str]:
try:
return request.split()[1]
except Exception:
return None
@log_time
def calc_metrics(df: pd.DataFrame) -> pd.DataFrame:
logging.info("Start calc_metrics()")
if 'url' not in df.columns:
df['url'] = df['request'].apply(extract_url)
total_count = df.shape[0]
total_time_sum = df['request_time'].sum()
if total_count == 0:
raise ValueError("Нет данных для анализа (DataFrame пустой)")
metrics = (
df
.groupby('url')
.agg(
count=('url', 'size'),
time_avg=('request_time', 'mean'),
time_max=('request_time', 'max'),
time_min=('request_time', 'min'),
time_med=('request_time', 'median'),
time_sum=('request_time', 'sum'),
)
.reset_index()
)
metrics['count_perc'] = metrics['count'] / total_count * 100
metrics['time_perc'] = metrics['time_sum'] / total_time_sum * 100
metrics = metrics[[
'url', 'count', 'count_perc', 'time_avg', 'time_max', 'time_min', 'time_med', 'time_perc', 'time_sum'
]]
metrics = metrics.sort_values('time_sum', ascending=False)
logging.info("Finish calc_metrics()")
return metrics
@log_time
def save_metrics_to_html(metrics: pd.DataFrame, filename: str):
html_table = metrics.round({
'count_perc': 3,
'time_avg': 3,
'time_max': 3,
'time_min': 3,
'time_med': 3,
'time_perc': 3,
'time_sum': 3,
}).to_html(index=False, border=1, justify="center", classes="metrics-table")
with open(filename, "w", encoding="utf-8") as f:
f.write("""
<html>
<head>
<meta charset='utf-8'>
<style>
body { font-family: Arial, sans-serif; }
.metrics-table { border-collapse: collapse; }
.metrics-table th, .metrics-table td { border: 1px solid #999; padding: 4px 8px; }
.metrics-table th { background-color: #efefef; }
.metrics-table tbody tr:nth-child(odd) { background-color: #fafafa; }
.metrics-table th:first-child,
.metrics-table td:first-child {
max-width: 707px;
white-space: nowrap;
overflow: hidden;
text-overflow: ellipsis;
}
</style>
</head>
<body>
<h2>Метрики по URL</h2>
""")
f.write(html_table)
f.write("</body></html>")
Комментариев нет:
Отправить комментарий