Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -163,4 +163,6 @@ cython_debug/
.idea/

# Database migrations
migrations/
# migrations/
timetrack.db
uploads/
6 changes: 5 additions & 1 deletion app/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from flask import Flask, redirect, url_for
from flask_migrate import Migrate
from flask_migrate import Migrate # type: ignore

from app.db.database import db, init_db
from app.routes.main import main
Expand All @@ -24,4 +24,8 @@ def create_app(config_object):
app.register_blueprint(monthly_log_bp)
app.register_blueprint(settings_bp)

from app.routes.import_log import import_log_bp

app.register_blueprint(import_log_bp)

return app
1 change: 1 addition & 0 deletions app/models/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class ScheduleEntry(db.Model): # type: ignore
date = Column(SQLADate, nullable=False)
entries = Column(JSON, nullable=False)
absence_code = Column(String, nullable=True)
observation = Column(String, nullable=True)

employee: Mapped["Employee"] = relationship(
"Employee", back_populates="schedule_entries"
Expand Down
174 changes: 174 additions & 0 deletions app/routes/import_log.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
import dataclasses
import json
import logging
import os
import pathlib
import shutil
import tempfile
import uuid
from datetime import datetime
from typing import List

from flask import (
Blueprint,
current_app,
flash,
redirect,
render_template,
request,
url_for,
)
from werkzeug.utils import secure_filename

from app.db.database import db
from app.models.models import Employee, ScheduleEntry
from app.services.importer.factory import ImporterFactory
from app.services.importer.protocol import ImportResult
from app.utils.time_calculator import calculate_daily_hours

logger = logging.getLogger(__name__)


import_log_bp = Blueprint("import_log", __name__, url_prefix="/import")

# Configure upload folder
UPLOAD_FOLDER = os.path.join(os.getcwd(), "uploads")
os.makedirs(UPLOAD_FOLDER, exist_ok=True)


@import_log_bp.route("/", methods=["GET", "POST"])
def upload_file():
if request.method == "POST":
if "file" not in request.files:
flash("No file part", "error")
return redirect(request.url)

file = request.files["file"]
if file.filename == "":
flash("No selected file", "error")
return redirect(request.url)

if file:
filename = secure_filename(file.filename or "")
file_ext = filename.split(".")[-1].lower()

if file_ext not in ["pdf", "xlsx", "xls"]:
flash("Unsupported file type", "error")
return redirect(request.url)

# Generate unique ID for this upload
upload_id = str(uuid.uuid4())
temp_filename = f"{upload_id}.{file_ext}"
filepath = os.path.join(UPLOAD_FOLDER, temp_filename)
file.save(filepath)

return redirect(url_for("import_log.preview", upload_id=upload_id))

return render_template("import_upload.html")


@import_log_bp.route("/preview/<upload_id>", methods=["GET"])
def preview(upload_id):
# Find file
filepath = _get_filepath(upload_id)
if not filepath:
flash("File not found or expired", "error")
return redirect(url_for("import_log.upload_file"))

try:
importer = ImporterFactory.get_importer(filepath)
with open(filepath, "rb") as f:
content = f.read()
result = importer.parse(content)

return render_template(
"import_preview.html", result=result, upload_id=upload_id
)
except Exception as e:
flash(f"Error parsing file: {str(e)}", "error")
return redirect(url_for("import_log.upload_file"))


@import_log_bp.route("/confirm/<upload_id>", methods=["POST"])
def confirm(upload_id):
filepath = _get_filepath(upload_id)
if not filepath:
flash("File not found or expired", "error")
return redirect(url_for("import_log.upload_file"))

try:
importer = ImporterFactory.get_importer(filepath)
with open(filepath, "rb") as f:
content = f.read()
result = importer.parse(content)

# Import valid records
count = 0
# Assume for now we are importing for Employee ID 1 or passed in form
# Ideally user selects employee in Upload or Preview
# For now, let's hardcode 1 or get from request if we added it
employee_id = 1

for record in result.records:
if not record.is_valid:
continue

# Check duplicate/overwrite?
entry_date = datetime.strptime(record.date, "%Y-%m-%d").date()
existing = ScheduleEntry.query.filter_by(
employee_id=employee_id, date=entry_date
).first()

entries_data = []
if record.entry_time and record.exit_time:
entries_data.append(
{"entry": record.entry_time, "exit": record.exit_time}
)

if existing:
existing.entries = entries_data
existing.observation = record.observation
# If valid entries exist, we assume normal work day, so unset absence?
if entries_data:
existing.absence_code = None
else:
new_entry = ScheduleEntry(
employee_id=employee_id,
date=entry_date,
entries=entries_data,
observation=record.observation,
)
db.session.add(new_entry)
count += 1

db.session.commit()

# Cleanup
os.remove(filepath)

flash(f"Successfully imported {count} records", "success")
return redirect(url_for("monthly_log.view_monthly_log"))

except Exception as e:
db.session.rollback()
flash(f"Error importing data: {str(e)}", "error")
return redirect(url_for("import_log.preview", upload_id=upload_id))


@import_log_bp.route("/cancel/<upload_id>", methods=["POST"])
def cancel(upload_id):
filepath = _get_filepath(upload_id)
if filepath:
try:
os.remove(filepath)
except:
pass
return redirect(url_for("import_log.upload_file"))


def _get_filepath(upload_id):
# Search for file with upload_id prefix
for f in os.listdir(UPLOAD_FOLDER):
if f.startswith(upload_id):
return os.path.join(UPLOAD_FOLDER, f)
return None
108 changes: 108 additions & 0 deletions app/services/importer/excel_importer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import io
from typing import Any, List, Optional

import pandas as pd # type: ignore

from app.services.importer.protocol import (
ImporterProtocol,
ImportResult,
TimeEntryRecord,
)
from app.utils.validators import validate_date, validate_time_format


class ExcelImporter(ImporterProtocol):
def parse(self, file_content: Any) -> ImportResult:
records: List[TimeEntryRecord] = []
errors: List[str] = []

try:
# Read Excel file
df = pd.read_excel(io.BytesIO(file_content))

# Normalize headers
df.columns = df.columns.astype(str).str.lower().str.strip()

# Map columns
col_map = {}
for col in df.columns:
if "fecha" in col or "date" in col:
col_map["date"] = col
elif "entrada" in col or "in" in col:
col_map["entry"] = col
elif "salida" in col or "out" in col:
col_map["exit"] = col
elif "observ" in col or "note" in col:
col_map["obs"] = col

if "date" not in col_map:
errors.append("Could not find 'Fecha' or 'Date' column")
return ImportResult([], 0, 0, errors)

for _, row in df.iterrows():
date_val = row[col_map["date"]]
if pd.isna(date_val):
continue

# Handle dates
if isinstance(date_val, pd.Timestamp):
date_str = date_val.strftime("%Y-%m-%d")
else:
date_str = str(date_val).strip()

entry_val = (
row.get(col_map.get("entry")) if "entry" in col_map else None
)
exit_val = row.get(col_map.get("exit")) if "exit" in col_map else None
obs_val = row.get(col_map.get("obs")) if "obs" in col_map else None

entry_str = self._format_time(entry_val)
exit_str = self._format_time(exit_val)

# Logic for validating
is_valid = True
error_msg = None

if not validate_date(date_str):
is_valid = False
error_msg = f"Invalid date format: {date_str}"
elif entry_str and not validate_time_format(entry_str):
is_valid = False
error_msg = f"Invalid entry time: {entry_str}"
elif exit_str and not validate_time_format(exit_str):
is_valid = False
error_msg = f"Invalid exit time: {exit_str}"

records.append(
TimeEntryRecord(
date=date_str,
entry_time=entry_str,
exit_time=exit_str,
observation=str(obs_val) if pd.notna(obs_val) else None,
is_valid=is_valid,
error_message=error_msg,
)
)

except Exception as e:
errors.append(f"Error parsing Excel: {str(e)}")

valid_records = sum(1 for r in records if r.is_valid)
return ImportResult(records, len(records), valid_records, errors)

def _format_time(self, val: Any) -> Optional[str]:
if pd.isna(val):
return None

if isinstance(val, pd.Timestamp):
return str(val.strftime("%H:%M"))

# If it's a datetime.time object
try:
return str(val.strftime("%H:%M"))
except AttributeError:
pass

s = str(val).strip()
# Basic fixes
return s
21 changes: 21 additions & 0 deletions app/services/importer/factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from typing import Dict, Type

from app.services.importer.excel_importer import ExcelImporter
from app.services.importer.pdf_importer import PDFImporter
from app.services.importer.protocol import ImporterProtocol


class ImporterFactory:
_importers: Dict[str, Type[ImporterProtocol]] = {
"pdf": PDFImporter,
"xlsx": ExcelImporter,
"xls": ExcelImporter,
}

@classmethod
def get_importer(cls, filename: str) -> ImporterProtocol:
ext = filename.split(".")[-1].lower()
importer_class = cls._importers.get(ext)
if not importer_class:
raise ValueError(f"Unsupported file extension: {ext}")
return importer_class()
Loading