Parser, 250530, Nginx
import log_parser
import loggin_support
import time
from decorators import log_time
#import log_parser_project.log_parser.loggin_support as loggin_support
import logging
import argparse
from log_parser import (
parse_batch_directory,
loglines_to_dataframe,
calc_metrics,
save_metrics_to_html
)
@log_time
def main():
logging.info("main() Start")
parser = argparse.ArgumentParser(description="Batch log parser and checker")
parser.add_argument("directory", help="Папка с батч-файлами")
#parser.add_argument("--pattern", default="batch_*.txt", help="Шаблон имени файла")
parser.add_argument("--pattern", default="*nginx*", help="Шаблон имени файла")
group = parser.add_mutually_exclusive_group()
group.add_argument('--today', action='store_true', help="Взять файл за сегодня")
group.add_argument('--yesterday', action='store_true', help="Взять файл за вчера")
group.add_argument('--all', action='store_true', help="Взять все файлы (по умолчанию)")
group.add_argument('--first', type=int, metavar='N', help="Взять первые (старые) N файлов")
group.add_argument('--last', type=int, metavar='N', help="Взять последние (новые) N файлов")
parser.add_argument("--html", default=None, help="Сохранить метрики в HTML (metrics.html)")
args = parser.parse_args()
try:
logs = parse_batch_directory(
args.directory,
batch_pattern=args.pattern,
today=args.today,
yesterday=args.yesterday,
use_all=args.all,
first_n=args.first,
last_n=args.last
)
df = loglines_to_dataframe(logs)
except Exception as e:
print(e)
exit(1)
print(df.head())
try:
metrics = calc_metrics(df)
print(metrics.head(20).to_string(index=False))
except Exception as e:
print(e)
exit(1)
if args.html:
save_metrics_to_html(metrics, args.html)
logging.info(f"main() Метрики сохранены в HTML-файл: {args.html}")
#print(f"Метрки сохранены в HTML-файл: {args.html}")
logging.info("main() Complete")
if __name__ == "__main__":
main()
---
import loggin_support
import os
import glob
import sys
import re
import logging
from typing import List, Optional, Tuple
from dataclasses import dataclass
from decorators import log_time
import pandas as pd
import time
import gzip
from datetime import datetime, timedelta
@dataclass
class LogLine:
ip: str
user_ident: Optional[str]
user_auth: Optional[str]
timestamp: str
request: str
status: int
size: int
referrer: Optional[str]
user_agent: Optional[str]
request_time: float
LOG_REGEX = re.compile(
r'^(?P<ip>\S+)\s+'
r'(?P<user_ident>\S+)\s+'
r'(?P<user_auth>\S+)\s+'
r'\[(?P<timestamp>[^\]]+)\]\s+'
r'"(?P<request>[^"]*)"\s+'
r'(?P<status>\d{3})\s+'
r'(?P<size>\d+)\s+'
r'"(?P<referrer>[^"]*)"\s+'
r'"(?P<user_agent>[^"]*)"\s+'
r'(?:".*?"\s+){0,5}'
r'(?P<request_time>[\d.]+)\s*$'
)
def parse_log_line(line: str) -> Optional[LogLine]:
m = LOG_REGEX.match(line)
if not m:
return None
d = m.groupdict()
return LogLine(
ip=d['ip'],
user_ident=None if d['user_ident'] == '-' else d['user_ident'],
user_auth=None if d['user_auth'] == '-' else d['user_auth'],
timestamp=d['timestamp'],
request=d['request'],
status=int(d['status']),
size=int(d['size']),
referrer=None if d['referrer'] == '-' else d['referrer'],
user_agent=None if d['user_agent'] == '-' else d['user_agent'],
request_time=float(d['request_time'])
)
@log_time
def parse_batch_file(batch_file: str, is_gz: bool = False) -> List[LogLine]:
logging.info("parse_batch_file() Start")
lines = []
open_func = gzip.open if is_gz else open
mode = 'rt' if is_gz else 'r'
with open_func(batch_file, mode, encoding='utf-8') as f:
for idx, line in enumerate(f, start=1):
line = line.rstrip('\n')
entry = parse_log_line(line)
if entry is None:
raise ValueError(f"Ошибка в файле {batch_file} на строке {idx}", line)
lines.append(entry)
logging.info("parse_batch_file() Complete")
return lines
@log_time
def find_batch_files(directory: str, pattern: str) -> List[Tuple[bool, str]]:
# True - gzipped, False - plain
pattern_plain = os.path.join(directory, pattern)
pattern_gz = pattern_plain + '.gz'
files_plain = sorted(glob.glob(pattern_plain))
files_gz = sorted(glob.glob(pattern_gz))
result = [(False, fn) for fn in files_plain] + [(True, fn) for fn in files_gz]
# sort again, so gz and plain together in order (by name)
result.sort(key=lambda x: x[1])
return result
def extract_date_from_filename(filename: str) -> Optional[str]:
# Ожидается что дата есть в формате ГГГГММДД в имени файла, пример batch_20240107.txt(.gz)
m = re.search(r'(\d{8})', filename)
if m:
return m.group(1)
return None
def select_files(files: List[Tuple[bool, str]], today=False, yesterday=False, use_all=False, first_n=None, last_n=None) -> List[Tuple[bool, str]]:
if not files:
return []
if today or yesterday:
date_to_file = {}
for is_gz, fname in files:
date = extract_date_from_filename(fname)
if date:
date_to_file[date] = date_to_file.get(date, []) + [(is_gz, fname)]
now = datetime.now()
if today:
target = now.strftime('%Y%m%d')
else: # yesterday
target = (now - timedelta(days=1)).strftime('%Y%m%d')
return date_to_file.get(target, [])
elif first_n:
return files[:first_n]
elif last_n:
return files[-last_n:]
else: # use_all by default
return files
@log_time
def parse_batch_directory(directory: str, batch_pattern: str = 'batch_*.txt', today=False, yesterday=False, use_all=False, first_n=None, last_n=None) -> List[LogLine]:
files = find_batch_files(directory, batch_pattern)
files = select_files(files, today=today, yesterday=yesterday, use_all=use_all, first_n=first_n, last_n=last_n)
logging.info("parse_batch_directory() Start")
if not files:
raise FileNotFoundError("Нет файлов для обработки согласно выбранному режиму")
all_lines = []
for is_gz, filename in files:
result = parse_batch_file(filename, is_gz=is_gz)
all_lines.extend(result)
logging.info("parse_batch_directory() Complete")
return all_lines
@log_time
def loglines_to_dataframe(logs: List[LogLine]) -> pd.DataFrame:
return pd.DataFrame([log.__dict__ for log in logs])
def extract_url(request: str) -> Optional[str]:
try:
return request.split()[1]
except Exception:
return None
@log_time
def calc_metrics(df: pd.DataFrame) -> pd.DataFrame:
logging.info("calc_metrics() Start")
if 'url' not in df.columns:
df['url'] = df['request'].apply(extract_url)
total_count = df.shape[0]
total_time_sum = df['request_time'].sum()
if total_count == 0:
raise ValueError("Нет данных для анализа (DataFrame пустой)")
metrics = (
df
.groupby('url')
.agg(
count=('url', 'size'),
time_avg=('request_time', 'mean'),
time_max=('request_time', 'max'),
time_min=('request_time', 'min'),
time_med=('request_time', 'median'),
time_sum=('request_time', 'sum'),
)
.reset_index()
)
metrics['count_perc'] = metrics['count'] / total_count * 100
metrics['time_perc'] = metrics['time_sum'] / total_time_sum * 100
metrics = metrics[[
'url', 'count', 'count_perc', 'time_avg', 'time_max', 'time_min', 'time_med', 'time_perc', 'time_sum'
]]
metrics = metrics.sort_values('time_sum', ascending=False)
logging.info("calc_metrics() Complete")
return metrics
@log_time
def save_metrics_to_html(metrics: pd.DataFrame, filename: str):
logging.info("save_metrics_to_html() Start")
html_table = metrics.round({
'count_perc': 3,
'time_avg': 3,
'time_max': 3,
'time_min': 3,
'time_med': 3,
'time_perc': 3,
'time_sum': 3,
}).to_html(index=False, border=1, justify="center", classes="metrics-table")
with open(filename, "w", encoding="utf-8") as f:
f.write("""
<html>
<head>
<meta charset='utf-8'>
<style>
body { font-family: Arial, sans-serif; }
.metrics-table { border-collapse: collapse; }
.metrics-table th, .metrics-table td { border: 1px solid #999; padding: 4px 8px; }
.metrics-table th { background-color: #efefef; }
.metrics-table tbody tr:nth-child(odd) { background-color: #fafafa; }
.metrics-table th:first-child,
.metrics-table td:first-child {
max-width: 707px;
white-space: nowrap;
overflow: hidden;
text-overflow: ellipsis;
}
</style>
</head>
<body>
<h2>Метрики по URL</h2>
""")
f.write(html_table)
f.write("</body></html>")
logging.info("save_metrics_to_html() Complete"
---
# loggin_suport.py
import logging
log_format = "%(asctime)s %(message)s"
date_format = "%d-%m-%y %H:%M:%S"
file_handler = logging.FileHandler("log_file.log", encoding="utf-8")
file_handler.setFormatter(logging.Formatter(log_format, datefmt=date_format))
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter(log_format, datefmt=date_format))
logging.basicConfig(
level=logging.INFO,
handlers=[file_handler, console_handler]
)
----
import time
def log_time(func):
def wrapper(*args, **kwargs):
start = time.time()
result = func(*args, **kwargs)
end = time.time()
print(f"Время выполнения {func.__name__}: {end - start:.4f} секунд")
return result
return wrapper
----
Комментариев нет:
Отправить комментарий