D:\VC25\Otus\Py\250530\01_new_project\homework
(base) PS D:\VC25\Otus\Py\250530\01_new_project\homework\1> & "python.exe" batch_parser_05.py D:\VC25\Otus\Py\250530\01_new_project\homework\1\Batches --csv metrics.csv --html metrics.html
& "python.exe" batch_parser_05.py D:\VC25\Otus\Py\250530\01_new_project\homework\1\Batches --csv metrics.csv --html metrics.html
& "C:/Program Files (x86)/Microsoft Visual Studio/Shared/Python39_64/python.exe" d:/VC25/Otus/Py/250530/01_new_project/homework/1/batch_val-03.py D:\VC25\Otus\Py\250530\01_new_project\homework\1\Batches
import os
import glob
import sys
from typing import List, Optional
from dataclasses import dataclass
import re
import pandas as pd
@dataclass
class LogLine:
ip: str
user_ident: Optional[str]
user_auth: Optional[str]
timestamp: str
request: str
status: int
size: int
referrer: Optional[str]
user_agent: Optional[str]
request_time: float
LOG_REGEX = re.compile(
r'^(?P<ip>\S+)\s+'
r'(?P<user_ident>\S+)\s+'
r'(?P<user_auth>\S+)\s+'
r'\[(?P<timestamp>[^\]]+)\]\s+'
r'"(?P<request>[^"]*)"\s+'
r'(?P<status>\d{3})\s+'
r'(?P<size>\d+)\s+'
r'"(?P<referrer>[^"]*)"\s+'
r'"(?P<user_agent>[^"]*)"\s+'
r'(?:".*?"\s+){0,5}'
r'(?P<request_time>[\d.]+)\s*$'
)
def parse_log_line(line: str) -> Optional[LogLine]:
m = LOG_REGEX.match(line)
if not m:
return None
d = m.groupdict()
return LogLine(
ip=d['ip'],
user_ident=None if d['user_ident'] == '-' else d['user_ident'],
user_auth=None if d['user_auth'] == '-' else d['user_auth'],
timestamp=d['timestamp'],
request=d['request'],
status=int(d['status']),
size=int(d['size']),
referrer=None if d['referrer'] == '-' else d['referrer'],
user_agent=None if d['user_agent'] == '-' else d['user_agent'],
request_time=float(d['request_time'])
)
def parse_batch_file(batch_file: str) -> List[LogLine]:
"""Парсит один файл, возвращает список LogLine"""
lines = []
with open(batch_file, encoding='utf-8') as f:
for idx, line in enumerate(f, start=1):
line = line.rstrip('\n')
entry = parse_log_line(line)
if entry is None:
print(f"Ошибка в файле {batch_file} на строке {idx}")
print(f"Проблемная строка: {line}")
sys.exit(1)
lines.append(entry)
return lines
def parse_batch_directory(directory: str, batch_pattern: str = 'batch_*.txt') -> List[LogLine]:
"""Парсит все файлы в директории, возвращает общий список LogLine"""
pattern = os.path.join(directory, batch_pattern)
files = sorted(glob.glob(pattern))
if not files:
print(f"Файлы не найдены по шаблону {pattern}")
sys.exit(1)
all_lines = []
for file in files:
result = parse_batch_file(file)
all_lines.extend(result)
print(f"{file}: Проверка завершена успешно! ({len(result)} строк)")
print("Проверка всех файлов завершена успешно!")
return all_lines
def loglines_to_dataframe(logs: List[LogLine]) -> pd.DataFrame:
"""Преобразует список LogLine в pandas.DataFrame"""
df = pd.DataFrame([log.__dict__ for log in logs])
return df
if __name__ == "__main__":
import argparse
import pandas as pd
parser = argparse.ArgumentParser(description="Batch log parser and checker")
parser.add_argument("directory", help="Папка с батч-файлами")
parser.add_argument("--pattern", default="batch_*.txt", help="Шаблон имени файла")
parser.add_argument("--csv", default=None, help="Сохранить DataFrame в CSV (указать имя файла)")
parser.add_argument("--html", default=None, help="Сохранить метрики в HTML (metrics.html)")
args = parser.parse_args()
# 1. Чтение логов и формирование DataFrame
logs = parse_batch_directory(args.directory, args.pattern)
df = loglines_to_dataframe(logs)
# Можно посмотреть первые строки:
print(df.head())
# 2. Извлечение url
def extract_url(request):
try:
return request.split()[1]
except Exception:
return None
df['url'] = df['request'].apply(extract_url)
total_count = df.shape[0]
total_time_sum = df['request_time'].sum()
# Защита на случай пустого DataFrame
if total_count == 0:
print("Нет данных для анализа (DataFrame пустой)")
exit(0)
# 3. Группировка и расчёт метрик по url
metrics = (
df
.groupby('url')
.agg(
count=('url', 'size'),
time_avg=('request_time', 'mean'),
time_max=('request_time', 'max'),
time_min=('request_time', 'min'),
time_med=('request_time', 'median'),
time_sum=('request_time', 'sum'),
)
.reset_index()
)
metrics['count_perc'] = metrics['count'] / total_count * 100
metrics['time_perc'] = metrics['time_sum'] / total_time_sum * 100
# Перестановка столбцов
metrics = metrics[[
'url', 'count', 'count_perc', 'time_avg', 'time_max', 'time_min', 'time_med', 'time_perc', 'time_sum'
]]
# 4. Сортировка и вывод
metrics = metrics.sort_values('time_sum', ascending=False)
print(metrics.head(20).to_string(index=False))
# 5. Сохраняем в CSV логи (если надо)
if args.csv:
df.to_csv(args.csv, index=False)
print(f"DataFrame записан в файл {args.csv}")
# 6. Сохраняем таблицу метрик в HTML (если надо)
if getattr(args, "html", None):
# Для красивого отображения округлим части метрик
html_table = metrics.round({
'count_perc': 3,
'time_avg': 3,
'time_max': 3,
'time_min': 3,
'time_med': 3,
'time_perc': 3,
'time_sum': 3,
}).to_html(index=False, border=1, justify="center", classes="metrics-table")
with open(args.html, "w", encoding="utf-8") as f:
# Добавляем небольшой стиль для читабельности
f.write("""
<html>
<head>
<meta charset='utf-8'>
<style>
body { font-family: Arial, sans-serif; }
.metrics-table { border-collapse: collapse; }
.metrics-table th, .metrics-table td { border: 1px solid #999; padding: 4px 8px; }
.metrics-table th { background-color: #efefef; }
.metrics-table tbody tr:nth-child(odd) { background-color: #fafafa; }
/* Ограничиваем ширину первого столбца и разрешаем перенос слов */
.metrics-table th:first-child,
.metrics-table td:first-child {
max-width: 707px;
white-space: nowrap;
overflow: hidden;
text-overflow: ellipsis;
}
</style>
</head>
<body>
<h2>Метрики по URL</h2>
""")
f.write(html_table)
f.write("</body></html>")
print(f"Метрики сохранены в HTML-файл: {args.html}")
Start from Python
& "python.exe" batch_parser_02.py D:\VC25\Otus\Py\250530\01_new_project\homework\1\Batches
import os
import glob
import sys
from typing import List, Optional
from dataclasses import dataclass
import re
import pandas as pd
@dataclass
class LogLine:
ip: str
user_ident: Optional[str]
user_auth: Optional[str]
timestamp: str
request: str
status: int
size: int
referrer: Optional[str]
user_agent: Optional[str]
request_time: float
LOG_REGEX = re.compile(
r'^(?P<ip>\S+)\s+'
r'(?P<user_ident>\S+)\s+'
r'(?P<user_auth>\S+)\s+'
r'\[(?P<timestamp>[^\]]+)\]\s+'
r'"(?P<request>[^"]*)"\s+'
r'(?P<status>\d{3})\s+'
r'(?P<size>\d+)\s+'
r'"(?P<referrer>[^"]*)"\s+'
r'"(?P<user_agent>[^"]*)"\s+'
r'(?:".*?"\s+){0,5}'
r'(?P<request_time>[\d.]+)\s*$'
)
def parse_log_line(line: str) -> Optional[LogLine]:
m = LOG_REGEX.match(line)
if not m:
return None
d = m.groupdict()
return LogLine(
ip=d['ip'],
user_ident=None if d['user_ident'] == '-' else d['user_ident'],
user_auth=None if d['user_auth'] == '-' else d['user_auth'],
timestamp=d['timestamp'],
request=d['request'],
status=int(d['status']),
size=int(d['size']),
referrer=None if d['referrer'] == '-' else d['referrer'],
user_agent=None if d['user_agent'] == '-' else d['user_agent'],
request_time=float(d['request_time'])
)
def parse_batch_file(batch_file: str) -> List[LogLine]:
"""Парсит один файл, возвращает список LogLine"""
lines = []
with open(batch_file, encoding='utf-8') as f:
for idx, line in enumerate(f, start=1):
line = line.rstrip('\n')
entry = parse_log_line(line)
if entry is None:
print(f"Ошибка в файле {batch_file} на строке {idx}")
print(f"Проблемная строка: {line}")
sys.exit(1)
lines.append(entry)
return lines
def parse_batch_directory(directory: str, batch_pattern: str = 'batch_*.txt') -> List[LogLine]:
"""Парсит все файлы в директории, возвращает общий список LogLine"""
pattern = os.path.join(directory, batch_pattern)
files = sorted(glob.glob(pattern))
if not files:
print(f"Файлы не найдены по шаблону {pattern}")
sys.exit(1)
all_lines = []
for file in files:
result = parse_batch_file(file)
all_lines.extend(result)
print(f"{file}: Проверка завершена успешно! ({len(result)} строк)")
print("Проверка всех файлов завершена успешно!")
return all_lines
def loglines_to_dataframe(logs: List[LogLine]) -> pd.DataFrame:
"""Преобразует список LogLine в pandas.DataFrame"""
df = pd.DataFrame([log.__dict__ for log in logs])
return df
if __name__ == "__main__":
import argparse
import pandas as pd
parser = argparse.ArgumentParser(description="Batch log parser and checker")
parser.add_argument("directory", help="Папка с батч-файлами")
parser.add_argument("--pattern", default="batch_*.txt", help="Шаблон имени файла")
parser.add_argument("--csv", default=None, help="Сохранить DataFrame в CSV (указать имя файла)")
args = parser.parse_args()
# 1. Чтение логов и формирование DataFrame
logs = parse_batch_directory(args.directory, args.pattern)
df = loglines_to_dataframe(logs)
# Можно посмотреть первые строки:
print(df.head())
# 2. Извлечение url
def extract_url(request):
try:
return request.split()[1]
except Exception:
return None
df['url'] = df['request'].apply(extract_url)
total_count = df.shape[0]
total_time_sum = df['request_time'].sum()
# Защита на случай пустого DataFrame
if total_count == 0:
print("Нет данных для анализа (DataFrame пустой)")
exit(0)
# 3. Группировка и расчёт метрик по url
metrics = (
df
.groupby('url')
.agg(
count=('url', 'size'),
time_avg=('request_time', 'mean'),
time_max=('request_time', 'max'),
time_min=('request_time', 'min'),
time_med=('request_time', 'median'),
time_sum=('request_time', 'sum'),
)
.reset_index()
)
metrics['count_perc'] = metrics['count'] / total_count
metrics['time_perc'] = metrics['time_sum'] / total_time_sum
# Перестановка столбцов
metrics = metrics[[
'url', 'count', 'count_perc', 'time_avg', 'time_max', 'time_min', 'time_med', 'time_perc', 'time_sum'
]]
# 4. Сортировка и вывод
metrics = metrics.sort_values('time_sum', ascending=False)
print(metrics.head(20).to_string(index=False))
# 5. Сохраняем в CSV логи (если надо)
if args.csv:
df.to_csv(args.csv, index=False)
print(f"DataFrame записан в файл {args.csv}")
& "C:/Program Files (x86)/Microsoft Visual Studio/Shared/Python39_64/python.exe" d:/VC25/Otus/Py/250530/01_new_project/homework/1/batch_val-03.py D:\VC25\Otus\Py\250530\01_new_project\homework\1\Batches
Code:
import os
import glob
import sys
from typing import List, Optional
from dataclasses import dataclass
import re
@dataclass
class LogLine:
ip: str
user_ident: Optional[str]
user_auth: Optional[str]
timestamp: str
request: str
status: int
size: int
referrer: Optional[str]
user_agent: Optional[str]
request_time: float
LOG_REGEX = re.compile(
r'^(?P<ip>\S+)\s+'
r'(?P<user_ident>\S+)\s+'
r'(?P<user_auth>\S+)\s+'
r'\[(?P<timestamp>[^\]]+)\]\s+'
r'"(?P<request>[^"]*)"\s+'
r'(?P<status>\d{3})\s+'
r'(?P<size>\d+)\s+'
r'"(?P<referrer>[^"]*)"\s+'
r'"(?P<user_agent>[^"]*)"\s+'
r'(?:".*?"\s+){0,5}'
r'(?P<request_time>[\d.]+)\s*$'
)
def parse_log_line(line: str) -> Optional[LogLine]:
m = LOG_REGEX.match(line)
if not m:
return None
d = m.groupdict()
return LogLine(
ip=d['ip'],
user_ident=None if d['user_ident'] == '-' else d['user_ident'],
user_auth=None if d['user_auth'] == '-' else d['user_auth'],
timestamp=d['timestamp'],
request=d['request'],
status=int(d['status']),
size=int(d['size']),
referrer=None if d['referrer'] == '-' else d['referrer'],
user_agent=None if d['user_agent'] == '-' else d['user_agent'],
request_time=float(d['request_time'])
)
def process_batch_file(batch_file: str):
with open(batch_file, encoding='utf-8') as f:
for idx, line in enumerate(f, start=1):
line = line.rstrip('\n')
entry = parse_log_line(line)
if entry is None:
print(f"Ошибка в файле {batch_file} на строке {idx}")
print(f"Проблемная строка: {line}")
sys.exit(1)
def main(directory: str, batch_pattern: str = 'batch_*.txt'):
pattern = os.path.join(directory, batch_pattern)
files = sorted(glob.glob(pattern))
if not files:
print(f"Файлы не найдены по шаблону {pattern}")
sys.exit(1)
for file in files:
process_batch_file(file)
print(f"{file}: Проверка завершена успешно!")
print("Проверка всех файлов завершена успешно!")
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Batch log checker")
parser.add_argument("directory", help="D:\VC25\Otus\Py\250530\01_new_project\homework\1\Batches")
parser.add_argument("--pattern", default="batch_*.txt", help="batch_*.txt")
args = parser.parse_args()
main(args.directory, args.pattern)
Комментариев нет:
Отправить комментарий