diff --git a/article/controller.py b/article/controller.py index 96ac11ab7..dce6e3859 100644 --- a/article/controller.py +++ b/article/controller.py @@ -3,29 +3,21 @@ import logging import sys import traceback -from datetime import datetime -from django.db.models import Q from packtools.sps.formats.am import am -from article.sources.xmlsps import load_article -from article.models import Article, ArticleExporter, ArticleFunding -from article.choices import ( - DATA_STATUS_DUPLICATED, - DATA_STATUS_DEDUPLICATED, - DATA_STATUS_PUBLIC, -) +from article.models import Article, ArticleExporter, ArticleFunding, ArticleSource +from article import choices +from collection.models import Collection from core.mongodb import write_item -from core.utils import date_utils +from core.utils.harvesters import AMHarvester, OPACHarvester from institution.models import Sponsor -from journal.models import Journal, SciELOJournal +from journal.models import Journal from pid_provider.choices import ( PPXML_STATUS_TODO, - PPXML_STATUS_DUPLICATED, - PPXML_STATUS_DEDUPLICATED, PPXML_STATUS_INVALID, ) -from pid_provider.models import PidProviderXML, XMLVersionXmlWithPreError +from pid_provider.models import PidProviderXML from tracker.models import UnexpectedEvent @@ -403,3 +395,207 @@ def bulk_export_articles_to_articlemeta( }, ) raise + + +class ArticleIteratorBuilder: + """ + Monta e encadeia iteradores de seleção de artigos para despacho ao pipeline. + + Cada método ``_iter_from_*`` é um gerador que yields kwargs prontos para + ``task_process_article_pipeline``. Os iteradores ativos são determinados + pelos argumentos exclusivos presentes na instância — múltiplos podem estar + ativos simultaneamente. + + Argumentos exclusivos e seus iteradores: + + ========================= ================================================ + Argumento exclusivo Iterador ativado + ========================= ================================================ + proc_status_list _iter_from_pid_provider + data_status_list _iter_from_article + limit / timeout / opac_url _iter_from_harvest + article_source_status_list _iter_from_article_source + (nenhum) _iter_from_pid_provider (padrão) + ========================= ================================================ + + Usage:: + + it = ArticleIteratorBuilder( + user=user, + collection_acron_list=["scl"], + proc_status_list=["todo"], + data_status_list=["invalid"], + ) + for kwargs in it: + task_process_article_pipeline.delay(**kwargs) + """ + + def __init__( + self, + user, + collection_acron_list=None, + journal_acron_list=None, + from_pub_year=None, + until_pub_year=None, + from_date=None, + until_date=None, + proc_status_list=None, + data_status_list=None, + article_source_status_list=None, + limit=None, + timeout=None, + opac_url=None, + force_update=None, + ): + self.user = user + self.collection_acron_list = collection_acron_list + self.journal_acron_list = journal_acron_list + self.from_pub_year = from_pub_year + self.until_pub_year = until_pub_year + self.from_date = from_date + self.until_date = until_date + self.proc_status_list = proc_status_list + self.data_status_list = data_status_list + self.article_source_status_list = article_source_status_list + self.limit = limit + self.timeout = timeout + self.opac_url = opac_url + self.force_update = force_update + + self._iter_from_harvest_count = 0 + self._iter_from_article_source_count = 0 + self._iter_from_pid_provider_count = 0 + self._iter_from_article_count = 0 + + def __iter__(self): + yield from self._iter_from_harvest() + yield from self._iter_from_article_source() + yield from self._iter_from_pid_provider() + yield from self._iter_from_article() + + logging.info(f"Iterators summary: harvest={self._iter_from_harvest_count}, " + f"article_source={self._iter_from_article_source_count}, " + f"pid_provider={self._iter_from_pid_provider_count}, " + f"article={self._iter_from_article_count}") + + # ------------------------------------------------------------------ + # Iteradores de seleção + # ------------------------------------------------------------------ + + def _iter_from_pid_provider(self): + """Itera PidProviderXML filtrados por periódico, data e status.""" + journal_issn_groups = ( + Journal.get_journal_issns(self.collection_acron_list, self.journal_acron_list) + or [None] + ) + for journal_issns in journal_issn_groups: + issn_list = [i for i in journal_issns if i] if journal_issns else None + if journal_issns and not issn_list: + continue + qs = PidProviderXML.get_queryset( + issn_list=issn_list, + from_pub_year=self.from_pub_year, + until_pub_year=self.until_pub_year, + from_updated_date=self.from_date, + until_updated_date=self.until_date, + proc_status_list=self.proc_status_list or [PPXML_STATUS_TODO, PPXML_STATUS_INVALID], + ) + self._iter_from_pid_provider_count += qs.count() + for item in qs.iterator(): + yield {"pp_xml_id": item.id} + logging.info(f"_iter_from_pid_provider: yielded {self._iter_from_pid_provider_count} items") + + def _iter_from_article(self): + """ + Itera Articles filtrados por data_status. + Yields None para artigos sem pp_xml recuperável (sinaliza skip). + """ + filters = { + "data_status__in": self.data_status_list or [ + choices.DATA_STATUS_PENDING, + choices.DATA_STATUS_UNDEF, + choices.DATA_STATUS_INVALID, + ] + } + journal_id_list = Journal.get_ids( + collection_acron_list=self.collection_acron_list, + journal_acron_list=self.journal_acron_list, + ) + if journal_id_list: + filters["journal__in"] = journal_id_list + if self.from_pub_year: + filters["pub_year__gte"] = self.from_pub_year + if self.until_pub_year: + filters["pub_year__lte"] = self.until_pub_year + if self.from_date: + filters["updated__gte"] = self.from_date + if self.until_date: + filters["updated__lte"] = self.until_date + + articles = Article.objects.filter(**filters) + self._iter_from_article_count += articles.count() + for article in articles.iterator(): + if not article.pp_xml: + try: + article.pp_xml = PidProviderXML.get_by_pid_v3(pid_v3=article.pid_v3) + article.save(update_fields=["pp_xml"]) + except Exception as e: + logging.error(f"pp_xml not found for article {article.id}: {e}") + yield None + continue + yield {"pp_xml_id": article.pp_xml.id} + logging.info(f"_iter_from_article: yielded {self._iter_from_article_count} articles") + + def _iter_from_harvest(self): + """Itera documentos coletados via OPAC ou ArticleMeta.""" + + if Collection.objects.count() == 0: + Collection.load(self.user) + + count = 0 + for collection_acron in self.collection_acron_list or list(Collection.get_acronyms()): + logging.info(collection_acron) + harvester = self._build_harvester(collection_acron) + logging.info(harvester) + for document in harvester.harvest_documents(): + count += 1 + yield { + "xml_url": document["url"], + "collection_acron": collection_acron, + "pid": document["pid_v2"], + "source_date": document.get("processing_date") or document.get("origin_date"), + } + + self._iter_from_harvest_count = count + logging.info(f"Harvest iterator yielded {count} documents") + + def _iter_from_article_source(self): + """Itera ArticleSources pendentes ou com erro.""" + count = 0 + for article_source in ArticleSource.get_queryset_to_complete_data( + self.from_date, + self.until_date, + self.force_update, + self.article_source_status_list, + ): + count += 1 + yield {"article_source_id": article_source.id} + self._iter_from_article_source_count += count + logging.info(f"ArticleSource iterator yielded {count} items") + + # ------------------------------------------------------------------ + # Helpers privados + # ------------------------------------------------------------------ + + def _build_harvester(self, collection_acron): + """Instancia o harvester adequado para a coleção.""" + kwargs = dict( + from_date=self.from_date, + until_date=self.until_date, + limit=self.limit, + timeout=self.timeout, + ) + if collection_acron == "scl": + return OPACHarvester(self.opac_url or "www.scielo.br", collection_acron, **kwargs) + return AMHarvester("article", collection_acron, **kwargs) + diff --git a/article/migrations/0048_alter_articlesource_status.py b/article/migrations/0048_alter_articlesource_status.py new file mode 100644 index 000000000..e91cb3cd1 --- /dev/null +++ b/article/migrations/0048_alter_articlesource_status.py @@ -0,0 +1,32 @@ +# Generated by Django 5.2.7 on 2026-03-01 15:15 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("article", "0047_articleaffiliation_normalized_and_more"), + ] + + operations = [ + migrations.AlterField( + model_name="articlesource", + name="status", + field=models.CharField( + choices=[ + ("pending", "Pending"), + ("processing", "Processing"), + ("completed", "Completed"), + ("error", "Error"), + ("reprocess", "Reprocess"), + ("url_error", "URL Error"), + ("xml_error", "XML Error"), + ], + default="pending", + help_text="Processing status of the article source", + max_length=20, + verbose_name="Status", + ), + ), + ] diff --git a/article/models.py b/article/models.py index 34bc5ac4a..85a4e434d 100755 --- a/article/models.py +++ b/article/models.py @@ -3,7 +3,7 @@ import sys import traceback from datetime import datetime -from functools import lru_cache, cached_property +from functools import cached_property from django.core.files.base import ContentFile from django.db import IntegrityError, models @@ -20,6 +20,7 @@ from wagtail.admin.panels import FieldPanel, InlinePanel, ObjectList, TabbedInterface from wagtail.models import Orderable from wagtailautocomplete.edit_handlers import AutocompletePanel +from packtools.sps.libs.requester import NonRetryableError from article import choices from article.utils.url_builder import ArticleURLBuilder @@ -52,6 +53,18 @@ from vocabulary.models import Keyword +class RequestXMLException(Exception): + """Exceção personalizada para erros na requisição de XML""" + pass + +class XMLException(Exception): + """Exceção personalizada para erros na requisição de XML""" + pass + +class UnableToRegisterPIDError(Exception): + """Exceção personalizada para erros ao registrar PID""" + pass + class AMArticle(BaseLegacyRecord): """ Modelo que representa a coleta de dados de Issue na API Article Meta. @@ -566,17 +579,6 @@ def mark_as_completed(self, user=None): self.save() # Salvar estado final self.pp_xml.mark_as_done() - def complete_data(self, pp_xml, save=False): - if pp_xml: - if not self.sps_pkg_name: - self.sps_pkg_name = pp_xml.pkg_name - save = True - if not self.pp_xml: - self.pp_xml = pp_xml - save = True - if save: - self.save() - def set_date_pub(self, dates, save=True): if dates: self.pub_date_day = dates.get("day") @@ -739,7 +741,7 @@ def urls_data(self): if not pid_v2: continue url_builder = ArticleURLBuilder( - sj.collection.domain, + sj.collection.base_url, sj.journal_acron, pid_v2, self.pid_v3, @@ -762,11 +764,14 @@ def get_availability( logging.info(f"get_availability {params}") return self.article_availability.filter(available=True, **params) - def check_availability(self, user): + def check_availability(self, user, force_update=False): try: if not self.is_pp_xml_valid(): return False + if not force_update and self.is_available(): + return True + event = None urls = [] for item in self.article_availability.all(): @@ -901,9 +906,10 @@ def mark_items_as_invalid(cls, journal=None, journal_id=None): def is_pp_xml_valid(self): if not self.pp_xml: try: - self.pp_xml = PidProviderXML.objects.get(v3=self.pid_v3) + self.pp_xml = PidProviderXML.get_by_pid_v3(pid_v3=self.pid_v3) except PidProviderXML.DoesNotExist: - pass + self.pp_xml = None + if not self.pp_xml or not self.pp_xml.xml_with_pre: if self.data_status != choices.DATA_STATUS_INVALID: self.data_status = choices.DATA_STATUS_INVALID @@ -1686,6 +1692,8 @@ class StatusChoices(models.TextChoices): COMPLETED = "completed", _("Completed") ERROR = "error", _("Error") REPROCESS = "reprocess", _("Reprocess") + URL_ERROR = "url_error", _("URL Error") + XML_ERROR = "xml_error", _("XML Error") url = models.URLField( verbose_name=_("Article URL"), @@ -1793,7 +1801,7 @@ def get(cls, url): raise ValueError("ArticleSource.get requires url") @classmethod - def create(cls, user, url=None, source_date=None, am_article=None): + def create(cls, user, url=None, source_date=None, am_article=None, force_update=None, auto_solve_pid_conflict=False): if not url: raise ValueError("ArticleSource.create requires url") @@ -1804,69 +1812,83 @@ def create(cls, user, url=None, source_date=None, am_article=None): obj.source_date = source_date obj.am_article = am_article obj.status = cls.StatusChoices.PENDING - obj.save() - try: - obj.request_xml(detail=[]) - except Exception as e: - pass + obj.add_pid_provider(user, force_update, auto_solve_pid_conflict=auto_solve_pid_conflict) return obj except IntegrityError: return cls.get(url=url) @classmethod def create_or_update( - cls, user, url=None, source_date=None, am_article=None, force_update=None + cls, user, url=None, source_date=None, am_article=None, force_update=None, auto_solve_pid_conflict=False ): try: logging.info( f"ArticleSource.create_or_update {url} {source_date} {am_article} {force_update}" ) obj = cls.get(url=url) - if ( force_update or (source_date and source_date != obj.source_date) or not obj.is_completed ): - logging.info(f"updating source: {(source_date, obj.source_date)}") - logging.info(f"updating am_article: {(am_article, obj.am_article)}") - logging.info( - f"updating file: {not obj.file or not obj.file.path or not os.path.isfile(obj.file.path)}" - ) - obj.request_xml() obj.updated_by = user obj.source_date = source_date obj.am_article = am_article - obj.status = cls.StatusChoices.REPROCESS - obj.save() - + obj.add_pid_provider(user, force_update, auto_solve_pid_conflict=auto_solve_pid_conflict) + return obj except cls.DoesNotExist: - obj = cls.create( - user, url=url, source_date=source_date, am_article=am_article + return cls.create( + user, + url=url, + source_date=source_date, + am_article=am_article, + force_update=force_update, + auto_solve_pid_conflict=auto_solve_pid_conflict ) - return obj + + @cached_property + def xml_with_pre(self): + if self.pid_provider_xml: + try: + return self.pid_provider_xml.xml_with_pre + except AttributeError: + pass + if self.file and self.file.path and os.path.isfile(self.file.path): + try: + return XMLWithPre.from_file(self.file.path) + except Exception as e: + pass + if self.url: + try: + return list(XMLWithPre.create(uri=self.url))[0] + except Exception as e: + pass @cached_property def sps_pkg_name(self): try: - xml_with_pre = list(XMLWithPre.create(path=self.file.path))[0] - except: - xml_with_pre = list(XMLWithPre.create(uri=self.url))[0] - return xml_with_pre.sps_pkg_name + return self.xml_with_pre.sps_pkg_name + except Exception: + pass - def request_xml(self, detail=None, force_update=False): + def request_xml(self, detail): if not self.url: raise ValueError("URL is required") - if force_update or not self.is_completed: - if detail: - detail.append("create file") - - logging.info(f"ArticleSource.request_xml for {self.url}") + logging.info(f"ArticleSource.request_xml for {self.url}") + try: xml_with_pre = list(XMLWithPre.create(uri=self.url))[0] self.save_file( - f"{self.sps_pkg_name}.xml", xml_with_pre.tostring(pretty_print=True) + f"{xml_with_pre.sps_pkg_name}.xml", xml_with_pre.tostring(pretty_print=True) ) + except NonRetryableError as e: + raise RequestXMLException( + f"Non-retryable error while requesting XML: {e}" + ) from e + except Exception as e: + raise XMLException( + f"Error while requesting XML: {e}" + ) from e def save_file(self, filename, content): try: @@ -1891,6 +1913,16 @@ def mark_as_error(self): self.status = self.StatusChoices.ERROR self.save() + def mark_as_url_error(self): + """Marca como erro de URL""" + self.status = self.StatusChoices.URL_ERROR + self.save() + + def mark_as_xml_error(self): + """Marca como erro de XML""" + self.status = self.StatusChoices.XML_ERROR + self.save() + def mark_for_reprocess(self): """Marca para reprocessamento""" self.status = self.StatusChoices.REPROCESS @@ -1957,83 +1989,107 @@ def get_queryset_to_complete_data( @property def is_completed(self): if not self.pid_provider_xml: + logging.info(f"Not completed: ArticleSource {self.url} has no pid_provider_xml") return False - if not self.pid_provider_xml.xml_with_pre: - return False + try: + if not self.pid_provider_xml.xml_with_pre: + logging.info(f"Not completed: ArticleSource {self.url} has pid_provider_xml but no xml_with_pre") + return False + except Exception: + pass if not self.am_article: + logging.info(f"Not completed: ArticleSource {self.url} has no am_article") return False if not self.file: + logging.info(f"Not completed: ArticleSource {self.url} has no file") return False if not self.file.path or not os.path.isfile(self.file.path): + logging.info(f"Not completed: ArticleSource {self.url} has file path invalid or file does not exist") return False if self.status != ArticleSource.StatusChoices.COMPLETED: self.status = ArticleSource.StatusChoices.COMPLETED self.save() + logging.info(f"Completed: ArticleSource {self.url} is completed") return True - def complete_data(self, user, force_update=False, auto_solve_pid_conflict=False): + def add_pid_provider(self, user, force_update=False, auto_solve_pid_conflict=False): """ - Processa um arquivo XML de artigo científico, criando ou atualizando os dados necessários. - - Este método gerencia todo o fluxo de processamento de um XML de artigo, incluindo: - - Download/criação do arquivo XML se necessário - - Geração de PID (Persistent Identifier) através do PidProvider - - Args: - user: Usuário responsável pelo processamento - force_update (bool): Se True, força a atualização mesmo se os dados já existem - auto_solve_pid_conflict (bool): Se True, resolve automaticamente conflitos de PID - - Raises: - ValueError: Se a URL não estiver definida - - Note: - O método atualiza os seguintes atributos do objeto: - - status: Estado do processamento (PENDING, COMPLETED, ERROR) - - file: Arquivo XML baixado/criado - - pid_provider_xml: Objeto PidProviderXML associado - - detail: Lista com detalhes do processamento + Executa o pipeline de obtenção de XML e registro de PID para este + ArticleSource. Evita refazer etapas já concluídas, a menos que + ``force_update=True``. + + Etapas: + 1. request_xml — baixa o XML da URL e salva em ``self.file`` + • Pula se ``self.file`` já existe em disco E ``force_update`` é False + 2. request_pid — registra o XML no PidProvider e associa ``self.pid_provider_xml`` + • Pula se ``self.pid_provider_xml`` já está associado E ``force_update`` é False + + Se uma etapa anterior falhou (ex: tem arquivo mas não tem + pid_provider_xml), somente a etapa faltante é executada. """ - try: - # Lista para armazenar detalhes do processamento detail = [] - if not force_update: - if self.is_completed: - return - - # Define status inicial como pendente self.status = ArticleSource.StatusChoices.PENDING - if not self.file.path or not os.path.isfile(self.file.path): - self.request_xml(detail, force_update) - - pid_v3 = self.get_or_create_pid_v3( - user, detail, force_update, auto_solve_pid_conflict + # --- Etapa 1: request_xml --- + has_valid_file = ( + self.file + and self.file.name + and os.path.isfile(self.file.path) ) - if not pid_v3: - raise ValueError("Failed to obtain or create PID v3") + + if force_update or not has_valid_file: + logging.info(f"Requesting XML for {self.url}") + self.request_xml(detail) + logging.info(f"XML requested successfully for {self.url}") + else: + logging.info( + f"Skipping request_xml: file already exists for {self.url}" + ) + detail.append("request_xml skipped (file already exists)") + + # --- Etapa 2: request_pid --- + has_pid_provider = self.pid_provider_xml is not None + + if force_update or not has_pid_provider: + logging.info(f"Requesting PID for {self.url}") + self.request_pid( + user, detail, force_update, auto_solve_pid_conflict + ) + logging.info( + f"PID requested successfully for {self.pid_provider_xml}" + ) + else: + logging.info( + f"Skipping request_pid: pid_provider_xml already set " + f"for {self.url}" + ) + detail.append("request_pid skipped (pid_provider_xml already set)") + self.detail = detail - self.mark_as_completed() # Marca o processamento como concluído + self.mark_as_completed() + logging.info(f"ArticleSource {self.status}") + except XMLException as e: + exc_type, exc_value, exc_traceback = sys.exc_info() + detail.append(str({"error_type": str(type(e)), "error_message": str(e)})) + self.detail = detail + self.mark_as_xml_error() + logging.info(f"ArticleSource {self.url} marked as XML error") + except RequestXMLException as e: + exc_type, exc_value, exc_traceback = sys.exc_info() + detail.append(str({"error_type": str(type(e)), "error_message": str(e)})) + self.detail = detail + self.mark_as_url_error() + logging.info(f"ArticleSource {self.url} marked as URL error") except Exception as e: - # Registra a exceção no log logging.exception(e) - - # Obtém informações detalhadas da exceção exc_type, exc_value, exc_traceback = sys.exc_info() - - # Adiciona informações do erro aos detalhes detail.append(str({"error_type": str(type(e)), "error_message": str(e)})) self.detail = detail - - # Marca o processamento como erro self.mark_as_error() - def get_or_create_pid_v3(self, user, detail, force_update, auto_solve_pid_conflict): - if self.pid_provider_xml and self.pid_provider_xml.xml_with_pre: - if not force_update: - return self.pid_provider_xml.v3 + def request_pid(self, user, detail, force_update, auto_solve_pid_conflict): try: detail.append("create pid_provider_xml") @@ -2055,12 +2111,12 @@ def get_or_create_pid_v3(self, user, detail, force_update, auto_solve_pid_confli # Obtém a primeira resposta (assumindo apenas uma) response = list(responses)[0] v3 = response.get("v3") - if v3: # Associa o PidProviderXML ao ArticleSource - self.pid_provider_xml = PidProviderXML.objects.get(v3=v3) + self.pid_provider_xml = PidProviderXML.get_by_pid_v3(v3) + if not self.pid_provider_xml: + raise UnableToRegisterPIDError("Failed to obtain or create PID v3") detail.append("set pid_provider_xml") - return v3 else: # Registra erro se não conseguiu obter v3 detail.append(str(response)) @@ -2071,14 +2127,13 @@ def get_or_create_pid_v3(self, user, detail, force_update, auto_solve_pid_confli exception=e, exc_traceback=exc_traceback, detail=dict( - function="article.models.ArticleSource.get_or_create_pid_v3", + function="article.models.ArticleSource.request_pid", article_source_id=self.id, - sps_pkg_name=self.sps_pkg_name, url=self.url, ), ) detail.append(str(unexpected_event.data)) - raise e + raise UnableToRegisterPIDError(str(e)) class ArticleAvailability(CommonControlField): @@ -2341,6 +2396,25 @@ class Meta: models.Index(fields=["article", "organization"]), ] + def autocomplete_label(self): + if self.organization: + return f"{self.article} - {self.organization}" + if self.raw_institution_name: + return f"{self.article} - {self.raw_institution_name}" + if self.raw_text: + return f"{self.article} - {self.raw_text}" + return f"{self.article} - Affiliation" + + @staticmethod + def autocomplete_custom_queryset_filter(search_term): + return ArticleAffiliation.objects.filter( + Q(raw_text__icontains=search_term) + | Q(raw_institution_name__icontains=search_term) + | Q(raw_country_name__icontains=search_term) + | Q(raw_state_name__icontains=search_term) + | Q(raw_city_name__icontains=search_term) + ) + def __str__(self): if self.organization: return f"{self.article} - {self.organization}" diff --git a/article/search_indexes.py b/article/search_indexes.py index c03a5cc99..c14cb2a8d 100644 --- a/article/search_indexes.py +++ b/article/search_indexes.py @@ -45,6 +45,25 @@ class ArticleIndex(indexes.SearchIndex, indexes.Indexable): ta_cluster = indexes.CharField(null=True) year_cluster = indexes.CharField(null=True) + # ISSNs — busca direta por ISSN + issn = indexes.MultiValueField(null=True) + + # Licença do artigo + license = indexes.CharField(null=True) + + # Afiliações (países e instituições para filtro geográfico/institucional) + aff_country = indexes.MultiValueField(null=True) + aff_institution = indexes.MultiValueField(null=True) + + # Status OA do periódico + open_access = indexes.CharField(null=True) + + # Bases de indexação do periódico + indexed_at = indexes.MultiValueField(null=True) + + # Crossmark ativo + crossmark_active = indexes.BooleanField(null=True) + def prepare(self, obj): """ " Here add the title to with dynamic fields. @@ -77,9 +96,9 @@ def prepare(self, obj): for collection in collections: for lang in obj.languages.all(): data["fulltext_pdf_%s" % (lang.code2)] = ( - "http://%s/scielo.php?script=sci_pdf&pid=%s&tlng=%s" + "%s/scielo.php?script=sci_pdf&pid=%s&tlng=%s" % ( - collection.domain, + collection.base_url, obj.pid_v2, lang.code2, ) @@ -91,9 +110,9 @@ def prepare(self, obj): for collection in collections: for lang in obj.languages.all(): data["fulltext_html_%s" % (lang.code2)] = ( - "http://%s/scielo.php?script=sci_arttext&pid=%s&tlng=%s" + "%s/scielo.php?script=sci_arttext&pid=%s&tlng=%s" % ( - collection.domain, + collection.base_url, obj.pid_v2, lang.code2, ) @@ -101,6 +120,34 @@ def prepare(self, obj): return data + def prepare_issn(self, obj): + if obj.journal and obj.journal.official: + issns = [] + if obj.journal.official.issn_electronic: + issns.append(obj.journal.official.issn_electronic) + if obj.journal.official.issn_print: + issns.append(obj.journal.official.issn_print) + if obj.journal.official.issnl: + issns.append(obj.journal.official.issnl) + return issns or None + + def prepare_license(self, obj): + if obj.license and obj.license.license_type: + return obj.license.license_type + + def prepare_open_access(self, obj): + if obj.journal: + return obj.journal.open_access + + def prepare_indexed_at(self, obj): + if obj.journal: + return [i.acronym for i in obj.journal.indexed_at.all() if i.acronym] + + def prepare_crossmark_active(self, obj): + if obj.journal: + return obj.journal.crossmark_doi_is_active + return False + def prepare_ids(self, obj): """ This field have all ids for the article. @@ -130,8 +177,8 @@ def prepare_ur(self, obj): if obj.journal: for collection in collections: urls.append( - "http://%s/scielo.php?script=sci_arttext&pid=%s" - % (collection.domain, obj.pid_v2) + "%s/scielo.php?script=sci_arttext&pid=%s" + % (collection.base_url, obj.pid_v2) ) return urls @@ -305,6 +352,39 @@ class ArticleOAIIndex(indexes.SearchIndex, indexes.Indexable): compile = indexes.CharField( null=True, index_fieldname="item.compile", use_template=True ) + # ISSNs — item.collections já tem o ISSN SciELO, mas falta o ISSN oficial + issn = indexes.MultiValueField(null=True, index_fieldname="metadata.dc.relation") + + # Publisher — sempre vazio nos dados reais + publisher = indexes.MultiValueField(null=True, index_fieldname="metadata.dc.publisher") + + # ORCID como campo pesquisável separado + orcid = indexes.MultiValueField(null=True, index_fieldname="metadata.dc.contributor.orcid") + + # Format — presente no item.compile mas não como campo Solr direto + format_ = indexes.CharField(null=True, index_fieldname="metadata.dc.format") + + def prepare_publisher(self, obj): + if obj.journal: + names = obj.journal.publisher_names + return names if names else None + + def prepare_issn(self, obj): + if obj.journal and obj.journal.official: + issns = [] + for attr in ("issn_electronic", "issn_print", "issnl"): + v = getattr(obj.journal.official, attr, None) + if v: + issns.append(v) + return issns or None + + def prepare_orcid(self, obj): + if obj.contrib_persons.exists(): + return [ + p.orcid + for p in obj.contrib_persons.all() + if p.orcid + ] or None def prepare_id(self, obj): """This field is the identifier of the record @@ -396,15 +476,8 @@ def prepare_dates(self, obj): """This the publication date, that is format by YYYY-MM-DD In the model this field is seperated into pub_date_day, pub_date_month and pub_date_year """ - return [ - "-".join( - [ - obj.pub_date_year or "", - obj.pub_date_month or "", - obj.pub_date_day or "", - ] - ), - ] + if obj.pub_date: + return set([obj.pub_date]) def prepare_la(self, obj): """The language of the article.""" @@ -426,9 +499,9 @@ def prepare_identifier(self, obj): for collection in collections: for lang in obj.languages.all(): idents.add( - "http://%s/scielo.php?script=sci_arttext&pid=%s&tlng=%s" + "%s/scielo.php?script=sci_arttext&pid=%s&tlng=%s" % ( - collection.domain, + collection.base_url, obj.pid_v2, lang.code2, ) diff --git a/article/sources/xmlsps.py b/article/sources/xmlsps.py index 405f5278d..2bcd41fd5 100755 --- a/article/sources/xmlsps.py +++ b/article/sources/xmlsps.py @@ -111,12 +111,15 @@ def load_article(user, xml=None, file_path=None, v3=None, pp_xml=None): "load_article() requires params: pp_xml or v3 or file_path or xml" ) + if not pp_xml and v3: + try: + pp_xml = PidProviderXML.get_by_pid_v3(pid_v3=v3) + except PidProviderXML.DoesNotExist: + pp_xml = None + try: if pp_xml: xml_with_pre = pp_xml.xml_with_pre - elif v3: - pp_xml = PidProviderXML.objects.get(v3=v3) - xml_with_pre = pp_xml.xml_with_pre elif file_path: for xml_with_pre in XMLWithPre.create(file_path): xmltree = xml_with_pre.xmltree @@ -163,15 +166,16 @@ def load_article(user, xml=None, file_path=None, v3=None, pp_xml=None): event = None xmltree = xml_with_pre.xmltree - logging.info(f"Article {pid_v3} {xml_with_pre.sps_pkg_name}") + sps_pkg_name = xml_with_pre.sps_pkg_name + logging.info(f"Article {pid_v3} {sps_pkg_name}") # CRIAÇÃO/OBTENÇÃO DO OBJETO PRINCIPAL article = Article.create_or_update( user=user, pid_v3=pid_v3, - sps_pkg_name=xml_with_pre.sps_pkg_name, + sps_pkg_name=sps_pkg_name, ) - logging.info(f"...Article {pid_v3} {xml_with_pre.sps_pkg_name}") + logging.info(f"...Article {pid_v3} {sps_pkg_name}") article.events.all().delete() event = article.add_event(user, _("load article")) @@ -180,7 +184,7 @@ def load_article(user, xml=None, file_path=None, v3=None, pp_xml=None): article.valid = False article.data_status = choices.DATA_STATUS_PENDING article.pp_xml = pp_xml - article.sps_pkg_name = xml_with_pre.sps_pkg_name + article.sps_pkg_name = sps_pkg_name # CAMPOS SIMPLES EXTRAÍDOS DO XML set_pids(xmltree=xmltree, article=article, errors=errors) @@ -213,7 +217,7 @@ def load_article(user, xml=None, file_path=None, v3=None, pp_xml=None): # Salvar uma vez após definir todos os campos simples logging.info( - f"Saving article {article.pid_v3} {xml_with_pre.sps_pkg_name} {xml_with_pre.main_doi}" + f"Saving article {article.pid_v3} {sps_pkg_name} {xml_with_pre.main_doi}" ) add_data_availability_status( diff --git a/article/tasks.py b/article/tasks.py index c074a6506..d6aac89e5 100644 --- a/article/tasks.py +++ b/article/tasks.py @@ -1,29 +1,20 @@ import logging import sys -import traceback -from datetime import datetime, timedelta -from celery import group, shared_task from django.contrib.auth import get_user_model -from django.db import transaction -from django.db.models import Count, F, Prefetch, Q, Subquery from django.utils.translation import gettext_lazy as _ from article import controller from article.models import Article, ArticleFormat, ArticleSource, AMArticle from article.sources.preprint import harvest_preprints from article.sources.xmlsps import load_article -from article import choices from collection.models import Collection from config import celery_app from core.models import License from core.utils.extracts_normalized_email import extracts_normalized_email -from core.utils.utils import _get_user, fetch_data -from core.utils.harvesters import AMHarvester, OPACHarvester -from journal.models import SciELOJournal, Journal -from pid_provider.choices import PPXML_STATUS_DONE, PPXML_STATUS_TODO, PPXML_STATUS_INVALID +from core.utils.utils import _get_user +from journal.models import Journal from pid_provider.models import PidProviderXML -from pid_provider.provider import PidProvider from researcher.models import ResearcherIdentifier from tracker.models import UnexpectedEvent @@ -32,13 +23,70 @@ @celery_app.task() def load_funding_data(user, file_path): + """ + Carrega dados de financiamento a partir de um arquivo CSV ou similar. + + Processa um arquivo de dados de financiamento e carrega as informações + no banco de dados, associando-as aos artigos correspondentes. + + Args: + user (int): ID do usuário que está executando a tarefa + file_path (str): Caminho absoluto para o arquivo contendo dados de financiamento + + Returns: + None + + Side Effects: + - Lê arquivo de financiamento do sistema de arquivos + - Cria/atualiza registros de financiamento no banco + - Registra logs de processamento e erros + + Notes: + - Utiliza controller.read_file para processamento + - O formato do arquivo deve seguir o padrão esperado pelo sistema + """ user = User.objects.get(pk=user) controller.read_file(user, file_path) -@celery_app.task(bind=True, name=_("load_preprints")) +@celery_app.task(bind=True, name=_('load_preprints')) def load_preprint(self, user_id, oai_pmh_preprint_uri): + """ + Coleta e carrega preprints de um endpoint OAI-PMH específico. + + Conecta-se a um servidor OAI-PMH para coletar metadados de preprints + e carregá-los no sistema para posterior processamento. + + Args: + self: Instância da tarefa Celery + user_id (int): ID do usuário executando a tarefa (obrigatório) + oai_pmh_preprint_uri (str): URI do endpoint OAI-PMH para coleta (obrigatório) + + Returns: + None + + Side Effects: + - Conecta ao endpoint OAI-PMH especificado + - Coleta metadados de preprints disponíveis + - Cria/atualiza registros de preprints no banco + - Registra logs de processamento e eventuais erros + + Todo: + - Implementar filtro para não coletar todos os registros sempre + - Adicionar suporte a coleta incremental por data + + Examples: + # Coletar preprints de repositório específico + load_preprint.delay( + user_id=1, + oai_pmh_preprint_uri="http://repo.example.com/oai/request" + ) + + Notes: + - Utiliza harvest_preprints para o processamento efetivo + - A coleta completa pode ser demorada em repositórios grandes + """ user = User.objects.get(pk=user_id) ## fazer filtro para não coletar tudo sempre harvest_preprints(oai_pmh_preprint_uri, user) @@ -48,6 +96,34 @@ def load_preprint(self, user_id, oai_pmh_preprint_uri): def task_convert_xml_to_other_formats_for_articles( self, user_id=None, username=None, from_date=None, force_update=False ): + """ + Dispara conversão de XML para outros formatos para todos os artigos com SPS package. + + Itera por todos os artigos que possuem sps_pkg_name e dispara + tarefas individuais de conversão para cada um. + + Args: + self: Instância da tarefa Celery + user_id (int, optional): ID do usuário executando a tarefa + username (str, optional): Nome do usuário executando a tarefa + from_date (str, optional): Data inicial para filtrar artigos (não implementado) + force_update (bool, optional): Força reprocessamento mesmo se já convertido + + Returns: + None + + Side Effects: + - Dispara múltiplas subtarefas convert_xml_to_other_formats + - Registra UnexpectedEvent em caso de erro + - Processa todos os artigos com sps_pkg_name + + Examples: + # Converter todos os artigos + task_convert_xml_to_other_formats_for_articles.delay( + user_id=1, + force_update=True + ) + """ try: user = _get_user(self.request, username, user_id) @@ -87,6 +163,31 @@ def task_convert_xml_to_other_formats_for_articles( def convert_xml_to_other_formats( self, user_id=None, username=None, item_id=None, force_update=None ): + """ + Converte XML de um artigo específico para outros formatos (HTML, PDF, etc.). + + Verifica se o artigo já possui formatos gerados e, caso necessário, + gera os formatos a partir do XML SPS armazenado. + + Args: + self: Instância da tarefa Celery + user_id (int, optional): ID do usuário executando a tarefa + username (str, optional): Nome do usuário executando a tarefa + item_id (int): ID do artigo a ser processado (obrigatório) + force_update (bool, optional): Força regeneração mesmo se já existe + + Returns: + None + + Side Effects: + - Cria/atualiza registros ArticleFormat + - Gera arquivos HTML, PDF e outros formatos + - Registra logs de processamento + + Notes: + - Pula processamento se ArticleFormat já existe e force_update=False + - Utiliza ArticleFormat.generate_formats para conversão + """ user = _get_user(self.request, username, user_id) try: @@ -110,118 +211,34 @@ def convert_xml_to_other_formats( @celery_app.task(bind=True) -def task_select_articles_to_complete_data( - self, - username=None, - user_id=None, - collection_acron_list=None, - journal_acron_list=None, - data_status_list=None, - from_pub_year=None, - until_pub_year=None, - from_updated_date=None, - until_updated_date=None, - articlemeta_export_enable=False, +def transfer_license_statements_fk_to_article_license( + self, user_id=None, username=None ): """ - Task para carregar artigos de uma lista selecionada de periódicos. - Dispara subtasks para cada periódico encontrado. - """ - try: - user = _get_user(self.request, username=username, user_id=user_id) - - # Construir filtros para os artigos - article_filters = {} - - # Obter IDs dos periódicos baseado nos filtros - journal_id_list = Journal.get_ids( - collection_acron_list=collection_acron_list, - journal_acron_list=journal_acron_list, - ) - - if journal_id_list: - article_filters["journal__in"] = journal_id_list - - # Aplicar filtro de status se fornecido - if not data_status_list: - data_status_list = [ - choices.DATA_STATUS_PENDING, - choices.DATA_STATUS_UNDEF, - choices.DATA_STATUS_INVALID, - ] - article_filters["data_status__in"] = data_status_list - - # Adicionar filtros de data se fornecidos - if from_pub_year: - article_filters["pub_year__gte"] = from_pub_year - if until_pub_year: - article_filters["pub_year__lte"] = until_pub_year - if from_updated_date: - article_filters["updated__gte"] = from_updated_date - if until_updated_date: - article_filters["updated__lte"] = until_updated_date - - # Processar artigos - articles_processed = 0 - articles_skipped = 0 - - for article in Article.objects.filter(**article_filters).iterator(): - if not article.pp_xml_id: - try: - pp_xml = PidProviderXML.objects.get(v3=article.pid_v3) - article.pp_xml = pp_xml - article.save(update_fields=['pp_xml']) - except PidProviderXML.DoesNotExist: - articles_skipped += 1 - continue - - task_load_article_from_pp_xml.delay( - pp_xml_id=article.pp_xml_id, - pid_v3=article.pid_v3, - user_id=user_id or user.id, - username=username or user.username, - articlemeta_export_enable=articlemeta_export_enable, - ) - articles_processed += 1 - - return { - "status": "success", - "message": "Complete data to articles", - "articles_processed": articles_processed, - "articles_skipped": articles_skipped, - "filters": { - "collection_acron_list": collection_acron_list, - "journal_acron_list": journal_acron_list, - "from_pub_year": from_pub_year, - "until_pub_year": until_pub_year, - "from_updated_date": from_updated_date, - "until_updated_date": until_updated_date, - "data_status_list": data_status_list, - }, - } - except Exception as e: - exc_type, exc_value, exc_traceback = sys.exc_info() - UnexpectedEvent.create( - exception=e, - exc_traceback=exc_traceback, - detail={ - "task": "task_select_articles_to_complete_data", - "collection_acron_list": collection_acron_list, - "journal_acron_list": journal_acron_list, - "data_status_list": data_status_list, - "from_pub_year": from_pub_year, - "until_pub_year": until_pub_year, - "from_updated_date": from_updated_date, - "until_updated_date": until_updated_date, - }, - ) - raise + Migra informações de licença de license_statements para o campo license. + Processa artigos que não possuem license mas têm license_statements, + transferindo as informações para o campo direto license. -@celery_app.task(bind=True) -def transfer_license_statements_fk_to_article_license( - self, user_id=None, username=None -): + Args: + self: Instância da tarefa Celery + user_id (int, optional): ID do usuário executando a tarefa + username (str, optional): Nome do usuário executando a tarefa + + Returns: + None + + Side Effects: + - Atualiza campo license em artigos + - Cria registros License se necessário + - Executa bulk_update para otimizar performance + - Registra logs de processamento + + Notes: + - Processa apenas artigos com license=None + - Usa o primeiro license_statement como referência + - Cria License automaticamente se não existir + """ user = _get_user(self.request, username, user_id) articles_to_update = [] for instance in Article.objects.filter(license__isnull=True): @@ -247,6 +264,20 @@ def transfer_license_statements_fk_to_article_license( def get_researcher_identifier_unnormalized(): + """ + Retorna identificadores de e-mail que não seguem formato padrão RFC 5322. + + Filtra objetos ResearcherIdentifier que possuem source_name="EMAIL" + mas cujo campo identifier não corresponde ao padrão de e-mail válido. + + Returns: + QuerySet: Queryset de ResearcherIdentifier com e-mails mal formatados + + Notes: + - Usa regex para identificar e-mails fora do padrão + - Utilizada pela tarefa normalize_stored_email para identificar registros a corrigir + - Regex verifica formato básico: usuario@dominio.extensao + """ return ResearcherIdentifier.objects.filter(source_name="EMAIL").exclude( identifier__regex=r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$" ) @@ -256,6 +287,38 @@ def get_researcher_identifier_unnormalized(): def normalize_stored_email( self, ): + """ + Normaliza e corrige endereços de e-mail mal formatados no banco de dados. + + Busca identificadores de pesquisadores do tipo EMAIL que não seguem + o padrão RFC 5322 e aplica normalização para corrigir formatos inválidos. + + Args: + self: Instância da tarefa Celery + + Returns: + None + + Side Effects: + - Identifica e-mails com formato inválido usando regex + - Aplica normalização através de extracts_normalized_email + - Executa bulk_update para otimizar performance em lotes + - Registra logs de processamento + + Examples: + # Executar normalização de e-mails + normalize_stored_email.delay() + + Notes: + - Processa apenas ResearcherIdentifier com source_name="EMAIL" + - Usa regex para identificar e-mails que não seguem formato padrão + - Operação é idempotente - pode ser executada múltiplas vezes + - Performance otimizada com bulk_update para grandes volumes + + See Also: + - get_researcher_identifier_unnormalized(): Função auxiliar para filtros + - extracts_normalized_email(): Função de normalização de e-mails + """ updated_list = [] re_identifiers = get_researcher_identifier_unnormalized() @@ -284,20 +347,51 @@ def task_export_articles_to_articlemeta( username=None, ): """ - Export articles to ArticleMeta Database with flexible filtering. - + Exporta artigos em lote para a base de dados ArticleMeta com filtros flexíveis. + + Processa e exporta múltiplos artigos para o sistema ArticleMeta baseado + em critérios de filtragem por coleção, periódico, ano ou data. + Args: - collection_acron_list: List of collection acronyms - journal_acron_list: List of journal acronyms - year_of_publication: Specific year of publication - from_pub_year: Start publication year - until_pub_year: End publication year - from_date: Start date for filtering - until_date: End date for filtering - days_to_go_back: Number of days to go back - force_update: Force update existing records - user_id: User ID for authentication - username: Username for authentication + self: Instância da tarefa Celery + collection_acron_list (list, optional): Lista de acrônimos de coleções + journal_acron_list (list, optional): Lista de acrônimos de periódicos + year_of_publication (int, optional): Ano específico de publicação + from_pub_year (int, optional): Ano inicial para filtro de publicação + until_pub_year (int, optional): Ano final para filtro de publicação + from_date (str, optional): Data inicial para filtro (formato ISO) + until_date (str, optional): Data final para filtro (formato ISO) + days_to_go_back (int, optional): Número de dias para retroceder da data atual + force_update (bool, optional): Força reprocessamento mesmo se já exportado + user_id (int, optional): ID do usuário executando a tarefa + username (str, optional): Nome do usuário executando a tarefa + + Returns: + dict: Resultado da operação com estatísticas de processamento + + Side Effects: + - Exporta múltiplos artigos para ArticleMeta + - Atualiza status de exportação dos artigos + - Registra logs de processamento + - Registra UnexpectedEvent em caso de erro + + Examples: + # Exportar por coleção e período + task_export_articles_to_articlemeta.delay( + collection_acron_list=["scl", "arg"], + from_pub_year=2023, + until_pub_year=2024 + ) + + # Exportar artigos dos últimos 7 dias + task_export_articles_to_articlemeta.delay( + days_to_go_back=7, + force_update=True + ) + + Notes: + - Utiliza controller.bulk_export_articles_to_articlemeta internamente + - Pode processar grandes volumes de dados """ try: user = _get_user(self.request, username=username, user_id=user_id) @@ -353,16 +447,42 @@ def task_export_article_to_articlemeta( username=None, ): """ - Export a single article to ArticleMeta Database. + Exporta um artigo específico para a base de dados ArticleMeta. + + Processa e exporta um único artigo identificado pelo PID v3 + para o sistema ArticleMeta, com controle de atualizações forçadas. Args: - pid_v3: Article PID v3 - force_update: Force update existing records - user_id: User ID - username: Username + self: Instância da tarefa Celery + pid_v3 (str, optional): PID v3 do artigo a exportar (obrigatório) + collection_acron_list (list, optional): Lista de acrônimos de coleções para filtro + force_update (bool): Força reexportação mesmo se já exportado + user_id (int, optional): ID do usuário executando a tarefa + username (str, optional): Nome do usuário executando a tarefa Returns: - bool: True if export was successful, False otherwise. + bool: True se exportação foi bem-sucedida, False caso contrário + + Side Effects: + - Exporta artigo específico para ArticleMeta + - Atualiza status de exportação do artigo + - Registra logs de processamento + - Registra UnexpectedEvent em caso de erro + + Raises: + ValueError: Se pid_v3 não for fornecido + Article.DoesNotExist: Se artigo com o PID não for encontrado + + Examples: + # Exportar artigo específico + task_export_article_to_articlemeta.delay( + pid_v3="S1234-56782024000100001", + force_update=True + ) + + Notes: + - Utiliza controller.export_article_to_articlemeta internamente + - Requer que o artigo exista na base local antes da exportação """ try: if not pid_v3: @@ -393,185 +513,106 @@ def task_export_article_to_articlemeta( @celery_app.task(bind=True) -def task_load_article_from_pp_xml( +def task_fix_article_status( self, - pp_xml_id=None, - pid_v3=None, - user_id=None, username=None, + user_id=None, collection_acron_list=None, - articlemeta_export_enable=None, - force_update=None, - timeout=None, - is_activate=None, - version=None, + journal_acron_list=None, + journal_id=None, + mark_as_invalid=False, + mark_as_public=False, + mark_as_duplicated=False, + deduplicate=False, ): """ - Carrega um artigo específico a partir de um PidProviderXML. + Marca artigos com diferentes status baseado em filtros de coleções e periódicos. - Processa o XML armazenado no PidProviderXML, cria/atualiza o Article - e opcionalmente exporta para ArticleMeta. + Aceita filtros por lista de coleções/periódicos ou um journal_id direto. + Itera pelos periódicos correspondentes e aplica as operações de marcação. Args: self: Instância da tarefa Celery - pp_xml_id (int): ID do PidProviderXML a processar (obrigatório) - user_id (int, optional): ID do usuário executando a tarefa username (str, optional): Nome do usuário executando a tarefa - articlemeta_export_enable (bool, optional): Exporta para ArticleMeta após carregar + user_id (int, optional): ID do usuário executando a tarefa + collection_acron_list (list, optional): Lista de acrônimos de coleções + journal_acron_list (list, optional): Lista de acrônimos de periódicos + journal_id (int, optional): ID direto de um periódico específico + mark_as_invalid (bool): Se True, marca artigos como invalid + mark_as_public (bool): Se True, marca artigos como public + mark_as_duplicated (bool): Se True, marca artigos como duplicated + deduplicate (bool): Se True, marca artigos como deduplicated Returns: - None + dict: Resumo da operação com contadores Side Effects: - - Cria/atualiza Article no banco - - Atualiza status do PidProviderXML para DONE - - Verifica disponibilidade do artigo - - Exporta para ArticleMeta se solicitado + - Altera status de artigos no banco - Registra UnexpectedEvent em caso de erro - Notes: - - O XML é lido diretamente do arquivo armazenado no PidProviderXML - - A verificação de disponibilidade valida URLs e assets do artigo + Examples: + task_fix_article_status.delay( + collection_acron_list=["scl"], + mark_as_invalid=True, + mark_as_public=True, + ) + + task_fix_article_status.delay( + journal_id=42, + deduplicate=True, + ) """ try: - user = _get_user(self.request, username, user_id) + user = _get_user(self.request, username=username, user_id=user_id) - pp_xml = None - # Busca o PidProviderXML com suas relações - if pp_xml_id: - pp_xml = PidProviderXML.objects.select_related("current_version").get( - id=pp_xml_id - ) + operations = { + "invalid": mark_as_invalid, + "public": mark_as_public, + "duplicated": mark_as_duplicated, + "deduplicated": deduplicate, + } - # Carrega o artigo do arquivo XML - article = load_article( - user, - v3=pid_v3, - pp_xml=pp_xml, - ) - pp_xml.collections.set(article.collections) + if not any(operations.values()): + raise ValueError("At least one marking operation must be specified") - # Exporta para ArticleMeta se solicitado - if articlemeta_export_enable: - # Verifica disponibilidade (URLs, assets, etc) - article.check_availability(user) - controller.export_article_to_articlemeta( - user, - article, - collection_acron_list, - force_update, - version=version, - ) + # Determinar lista de journal_ids a processar + if journal_id: + journal_id_list = [journal_id] else: - if article.is_available(): - return - task_check_article_availability.delay( - article_id=article.id, - user_id=user.id, - username=user.username, - collection_acron_list=collection_acron_list, - timeout=timeout, - is_activate=is_activate, - force_update=force_update, - ) + journal_id_list = Journal.get_ids(collection_acron_list, journal_acron_list) - except Exception as exception: - exc_type, exc_value, exc_traceback = sys.exc_info() - UnexpectedEvent.create( - exception=exception, - exc_traceback=exc_traceback, - detail={ - "task": "article.tasks.task_load_article_from_pp_xml", - "pp_xml_id": pp_xml_id, - "articlemeta_export_enable": articlemeta_export_enable, - "force_update": force_update, - }, - ) + journals_processed = 0 + for jid in journal_id_list: + if Article.objects.filter(journal_id=jid).count() == 0: + continue -@celery_app.task(bind=True) -def task_select_articles_to_load_from_api( - self, - username=None, - user_id=None, - collection_acron_list=None, - from_date=None, - until_date=None, - limit=None, - timeout=None, - force_update=None, - auto_solve_pid_conflict=None, - opac_url=None, -): - """ - Tarefa orquestradora para carregar artigos de múltiplas coleções via API. - - Dispara tarefas paralelas para cada coleção, otimizando o processamento - em larga escala. Se nenhuma coleção for especificada, processa todas as - coleções conhecidas do SciELO. - - Args: - self: Instância da tarefa Celery - username (str, optional): Nome do usuário executando a tarefa - user_id (int, optional): ID do usuário executando a tarefa - collection_acron_list (list, optional): Lista de acrônimos das coleções. - Se None, usa lista padrão com todas as coleções SciELO. - Ex: ["scl", "arg", "mex", "esp"] - from_date (str, optional): Data inicial para coleta (formato ISO) - until_date (str, optional): Data final para coleta (formato ISO) - limit (int, optional): Limite de artigos por coleção - timeout (int, optional): Timeout em segundos para requisições HTTP - force_update (bool, optional): Força atualização mesmo se já existe - auto_solve_pid_conflict (bool, optional): Resolve conflitos de PID automaticamente - - Returns: - None + if mark_as_invalid: + Article.mark_items_as_invalid(journal_id=jid) - Side Effects: - - Garante que coleções estão carregadas no banco - - Dispara uma tarefa para cada coleção em collection_acron_list - - Registra UnexpectedEvent em caso de erro + if mark_as_public: + Article.mark_items_as_public(journal_id=jid) - Examples: - # Carregar artigos de coleções específicas - task_select_articles_to_load_from_api.delay( - collection_acron_list=["scl", "mex"], - from_date="2024-01-01", - until_date="2024-12-31" - ) + if mark_as_duplicated or deduplicate: + Article.deduplicate_items( + user, + journal_id=jid, + mark_as_duplicated=mark_as_duplicated, + deduplicate=deduplicate, + ) - # Carregar artigos de todas as coleções com limite - task_select_articles_to_load_from_api.delay( - limit=100, - force_update=True - ) - """ - try: - user = _get_user(self.request, username=username, user_id=user_id) + journals_processed += 1 - # Define coleções padrão se não especificadas - # Garante que as coleções estão carregadas no banco - if Collection.objects.count() == 0: - Collection.load(user) - - if not collection_acron_list: - collection_acron_list = Collection.get_acronyms() - # Dispara tarefa para cada coleção - for collection_acron in collection_acron_list: - task_select_articles_to_load_from_collection_endpoint.apply_async( - kwargs={ - "username": username, - "user_id": user_id, - "collection_acron": collection_acron, - "from_date": from_date, - "until_date": until_date, - "limit": limit, - "timeout": timeout, - "force_update": force_update, - "auto_solve_pid_conflict": auto_solve_pid_conflict, - "opac_url": opac_url, - } - ) + return { + "status": "success", + "journals_processed": journals_processed, + "operations": {k: v for k, v in operations.items() if v}, + "filters": { + "collections": collection_acron_list, + "journals": journal_acron_list, + "journal_id": journal_id, + }, + } except Exception as e: exc_type, exc_value, exc_traceback = sys.exc_info() @@ -579,394 +620,194 @@ def task_select_articles_to_load_from_api( exception=e, exc_traceback=exc_traceback, detail={ - "task": "task_select_articles_to_load_from_api", + "task": "task_fix_article_status", "collection_acron_list": collection_acron_list, - "from_date": from_date, - "until_date": until_date, - "limit": limit, - "timeout": timeout, + "journal_acron_list": journal_acron_list, + "journal_id": journal_id, + "operations": { + "mark_as_invalid": mark_as_invalid, + "mark_as_public": mark_as_public, + "mark_as_duplicated": mark_as_duplicated, + "deduplicate": deduplicate, + }, }, ) + raise @celery_app.task(bind=True) -def task_select_articles_to_load_from_collection_endpoint( +def task_check_article_availability( self, - username=None, user_id=None, - collection_acron=None, - from_date=None, - until_date=None, - limit=None, + username=None, + article_id=None, + collection_acron_list=None, timeout=None, - force_update=None, - auto_solve_pid_conflict=None, - opac_url=None, + is_activate=None, + force_update=False, ): """ - Coleta artigos de uma coleção específica via endpoint OPAC ou ArticleMeta. + Verifica e atualiza o status de disponibilidade de um artigo específico. - Utiliza harvesters especializados para cada tipo de endpoint: - - OPACHarvester: Para coleção Brasil (scl) - - AMHarvester: Para demais coleções via ArticleMeta + Executa verificações de URLs, assets e outros recursos do artigo + para determinar se está completamente disponível online. Args: self: Instância da tarefa Celery - username (str, optional): Nome do usuário executando a tarefa user_id (int, optional): ID do usuário executando a tarefa - collection_acron (str): Acrônimo da coleção (obrigatório). - Ex: "scl", "mex", "arg" - from_date (str, optional): Data inicial para coleta (formato ISO) - until_date (str, optional): Data final para coleta (formato ISO) - limit (int, optional): Limite de documentos a coletar - timeout (int, optional): Timeout em segundos para requisições - force_update (bool, optional): Força atualização de artigos existentes - auto_solve_pid_conflict (bool, optional): Resolve conflitos de PID - - Returns: - None - - Raises: - ValueError: Se collection_acron não for fornecido - - Side Effects: - - Dispara task_load_article_from_xml_url para cada documento - - Registra UnexpectedEvent em caso de erro - - Notes: - - OPAC é usado apenas para Brasil (scl) por questões de performance - - ArticleMeta é usado para todas as outras coleções - """ - try: - if not collection_acron: - raise ValueError("Missing collection_acron") - - # Seleciona o harvester apropriado baseado na coleção - if collection_acron == "scl": - harvester = OPACHarvester( - opac_url or "www.scielo.br", - collection_acron, - from_date=from_date, - until_date=until_date, - limit=limit, - timeout=timeout, - ) - else: - harvester = AMHarvester( - "article", - collection_acron, - from_date=from_date, - until_date=until_date, - limit=limit, - timeout=timeout, - ) - - # Itera sobre documentos e dispara tarefas individuais - for document in harvester.harvest_documents(): - source_date = document.get("processing_date") or document.get("origin_date") - task_load_article_from_xml_url.delay( - username, - user_id, - collection_acron, - document["pid_v2"], - document["url"], - source_date, - force_update, - auto_solve_pid_conflict, - ) - - except Exception as e: - exc_type, exc_value, exc_traceback = sys.exc_info() - UnexpectedEvent.create( - exception=e, - exc_traceback=exc_traceback, - detail={ - "task": "task_select_articles_to_load_from_collection_endpoint", - "collection_acron": collection_acron, - "from_date": from_date, - "until_date": until_date, - "limit": limit, - "timeout": timeout, - "force_update": force_update, - }, - ) - - -@celery_app.task(bind=True) -def task_load_article_from_xml_url( - self, - username=None, - user_id=None, - collection_acron=None, - pid=None, - xml_url=None, - source_date=None, - force_update=None, - auto_solve_pid_conflict=None, -): - """ - Carrega um artigo individual a partir de uma URL de XML. - - Cria ou atualiza um ArticleSource e processa o XML para criar/atualizar - o artigo no banco de dados. - - Args: - self: Instância da tarefa Celery username (str, optional): Nome do usuário executando a tarefa - user_id (int, optional): ID do usuário executando a tarefa - xml_url (str): URL do XML do artigo - Ex: "https://www.scielo.br/scielo.php?script=sci_arttext&pid=..." - source_date (str, optional): Data de última atualização na fonte - force_update (bool, optional): Força reprocessamento mesmo se já completado - auto_solve_pid_conflict (bool, optional): Resolve conflitos de PID automaticamente + article_id (int, optional): ID do artigo a verificar (obrigatório) + collection_acron_list (list, optional): Lista de acrônimos de coleções para filtro + timeout (int, optional): Timeout em segundos para verificações HTTP + is_activate (bool, optional): Se deve ativar artigo após verificação + force_update (bool): Força nova verificação mesmo se recente Returns: None Side Effects: - - Cria/atualiza registro ArticleSource - - Processa XML e cria/atualiza Article + - Atualiza status de disponibilidade do artigo + - Verifica URLs de assets (PDF, HTML, etc.) + - Registra timestamps de última verificação - Registra UnexpectedEvent em caso de erro Notes: - - Pula processamento se ArticleSource já está COMPLETED e force_update=False - - XML é baixado e armazenado localmente antes do processamento + - Utiliza Article.check_availability para executar verificações + - Pode ser utilizada para monitoramento de saúde dos artigos """ try: - user = _get_user(self.request, username=username, user_id=user_id) - - # Cria ou atualiza ArticleSource - am_article = AMArticle.create_or_update( - pid, Collection.get(collection_acron), None, user - ) - - article_source = ArticleSource.create_or_update( - user=user, - url=xml_url, - source_date=source_date, - force_update=force_update, - am_article=am_article, - ) - article_source.complete_data( - user=user, - force_update=force_update, - auto_solve_pid_conflict=auto_solve_pid_conflict, - ) - - if article_source.status != ArticleSource.StatusChoices.COMPLETED: - return - - # Processa o XML - task_load_article_from_pp_xml.delay( - pp_xml_id=article_source.pid_provider_xml.id, - user_id=user_id or user.id, - username=username or user.username, - force_update=force_update, - ) - - except Exception as e: + user = _get_user(self.request, username, user_id) + article = Article.objects.get(id=article_id) + article.check_availability(user) + except Exception as exception: + logging.exception(f"Error processing article ID {article_id}: {str(exception)}") exc_type, exc_value, exc_traceback = sys.exc_info() UnexpectedEvent.create( - exception=e, + exception=exception, exc_traceback=exc_traceback, detail={ - "task": "task_load_article_from_xml_url", - "xml_url": xml_url, - "source_date": source_date, + "task": "article.tasks.task_check_article_availability", + "article_id": article_id, + "collection_acron_list": collection_acron_list, + "timeout": timeout, + "is_activate": is_activate, "force_update": force_update, }, ) @celery_app.task(bind=True) -def task_select_articles_to_load_from_article_source( +def task_dispatch_articles( self, username=None, user_id=None, + # --- filtros comuns --- + collection_acron_list=None, + journal_acron_list=None, + from_pub_year=None, + until_pub_year=None, from_date=None, until_date=None, force_update=None, - status_list=None, + export_to_articlemeta=False, auto_solve_pid_conflict=None, + # --- ativa pid_provider --- + proc_status_list=None, + # --- ativa article --- + data_status_list=None, + # --- ativa harvest (qualquer um) --- + limit=None, + timeout=None, + opac_url=None, + # --- ativa article_source --- + article_source_status_list=None, ): """ - Processa ArticleSources pendentes ou que necessitam reprocessamento. - - Busca ArticleSources com status pendente ou erro e processa seus XMLs. - Útil para reprocessar falhas anteriores ou completar processamentos interrompidos. - - Args: - self: Instância da tarefa Celery - username (str, optional): Nome do usuário executando a tarefa - user_id (int, optional): ID do usuário executando a tarefa - from_date (str, optional): Data inicial para filtrar ArticleSources - until_date (str, optional): Data final para filtrar ArticleSources - force_update (bool, optional): Força reprocessamento de todos - auto_solve_pid_conflict (bool, optional): Resolve conflitos de PID - - Returns: - None - - Side Effects: - - Processa XMLs de ArticleSources selecionados - - Atualiza status dos ArticleSources - - Registra UnexpectedEvent em caso de erro - - Examples: - # Reprocessar falhas dos últimos 7 dias - task_select_articles_to_load_from_article_source.delay( - from_date=(datetime.now() - timedelta(days=7)).isoformat(), - force_update=True - ) - """ - try: - user = _get_user(self.request, username=username, user_id=user_id) - - # Obtém queryset de ArticleSources para processar - for article_source in ArticleSource.get_queryset_to_complete_data( - from_date, - until_date, - force_update, - status_list, - ): - - try: - # Processa o XML - article_source.complete_data( - user=user, - force_update=force_update, - auto_solve_pid_conflict=auto_solve_pid_conflict, - ) - if article_source.status != ArticleSource.StatusChoices.COMPLETED: - continue - - task_load_article_from_pp_xml.delay( - pp_xml_id=article_source.pid_provider_xml.id, - user_id=user_id or user.id, - username=username or user.username, - force_update=force_update, - ) - except Exception as exception: - exc_type, exc_value, exc_traceback = sys.exc_info() - UnexpectedEvent.create( - exception=exception, - exc_traceback=exc_traceback, - detail={ - "task": "article.tasks.task_select_articles_to_load_from_article_source", - "article_source_id": str(article_source.id), - }, - ) - - except Exception as e: - exc_type, exc_value, exc_traceback = sys.exc_info() - UnexpectedEvent.create( - exception=e, - exc_traceback=exc_traceback, - detail={ - "task": "task_select_articles_to_load_from_article_source", - "from_date": from_date, - "until_date": until_date, - "force_update": force_update, - }, - ) - - -@celery_app.task(bind=True) -def task_fix_article_status( - self, - username=None, - user_id=None, - collection_acron_list=None, - journal_acron_list=None, - mark_as_invalid=False, - mark_as_public=False, - mark_as_duplicated=False, - deduplicate=False, -): - """ - Marca artigos com diferentes status baseado em filtros de coleções e periódicos. + Tarefa orquestradora que dispara processamento em lote de artigos. - Processa artigos aplicando diferentes marcações de status conforme parâmetros. - Itera diretamente pelos periódicos, usando coleção apenas como filtro. + Utiliza ArticleIteratorBuilder para selecionar artigos baseado em + múltiplos critérios e dispara task_process_article_pipeline para + cada item encontrado, permitindo processamento paralelo. Args: self: Instância da tarefa Celery username (str, optional): Nome do usuário executando a tarefa user_id (int, optional): ID do usuário executando a tarefa - collection_acron_list (list, optional): Lista de acrônimos de coleções para filtrar - journal_acron_list (list, optional): Lista de acrônimos de periódicos - mark_as_invalid (bool): Se True, marca artigos como invalid - mark_as_public (bool): Se True, marca artigos como public - mark_as_duplicated (bool): Se True, marca artigos como duplicated - deduplicate (bool): Se True, marca artigos como deduplicated + collection_acron_list (list, optional): Filtro por acrônimos de coleções + journal_acron_list (list, optional): Filtro por acrônimos de periódicos + from_pub_year (int, optional): Ano inicial de publicação + until_pub_year (int, optional): Ano final de publicação + from_date (str, optional): Data inicial (formato ISO) + until_date (str, optional): Data final (formato ISO) + force_update (bool, optional): Força reprocessamento + export_to_articlemeta (bool): Exporta para ArticleMeta após processamento + auto_solve_pid_conflict (bool, optional): Resolve conflitos de PID automaticamente + proc_status_list (list, optional): Status do pid_provider para filtro + data_status_list (list, optional): Status do article para filtro + limit (int, optional): Limite máximo de artigos a processar + timeout (int, optional): Timeout para operações HTTP + opac_url (str, optional): URL base do OPAC para harvest + article_source_status_list (list, optional): Status do article_source para filtro Returns: - dict: Resumo da operação com contadores - - Side Effects: - - Altera status de artigos no banco - - Registra UnexpectedEvent em caso de erro - - Dispara subtarefas para cada periódico + dict: Resumo com contadores de dispatched/skipped Examples: - # Marcar artigos como invalid para coleções específicas - task_fix_article_records_status.delay( - collection_acron_list=["scl", "mex"], - journal_acron_list=["abc", "xyz"], - mark_as_invalid=True + # Processamento padrão por coleção + task_dispatch_articles.delay(collection_acron_list=["scl"]) + + # Múltiplas fontes simultaneamente + task_dispatch_articles.delay( + proc_status_list=["todo"], + data_status_list=["invalid"], + article_source_status_list=["error"], + limit=500 ) - # Marcar artigos como public e deduplicated - task_fix_article_records_status.delay( - journal_acron_list=["abc"], - mark_as_public=True, - deduplicate=True - ) + Notes: + - Ver ArticleIteratorBuilder para detalhes sobre iteradores ativados + - Cada artigo encontrado gera uma subtarefa independente """ try: user = _get_user(self.request, username=username, user_id=user_id) - # Validação: ao menos uma operação deve ser especificada - operations = { - "invalid": mark_as_invalid, - "public": mark_as_public, - "duplicated": mark_as_duplicated, - "deduplicated": deduplicate, + common_kwargs = { + "user_id": user.id, + "username": user.username, + "force_update": force_update, + "export_to_articlemeta": export_to_articlemeta, + "auto_solve_pid_conflict": auto_solve_pid_conflict, } - if not any(operations.values()): - raise ValueError("At least one marking operation must be specified") - - # Construir filtros para os periódicos - journal_id_list = Journal.get_ids(collection_acron_list, journal_acron_list) + dispatched = skipped = 0 - # Iterar pelos periódicos e disparar subtarefas - journals_processed = 0 - for journal_id in journal_id_list: - qs = Article.objects.filter(journal_id=journal_id) - if qs.count() == 0: + for item_kwargs in controller.ArticleIteratorBuilder( + user=user, + collection_acron_list=collection_acron_list, + journal_acron_list=journal_acron_list, + from_pub_year=from_pub_year, + until_pub_year=until_pub_year, + from_date=from_date, + until_date=until_date, + proc_status_list=proc_status_list, + data_status_list=data_status_list, + article_source_status_list=article_source_status_list, + limit=limit, + timeout=timeout, + opac_url=opac_url, + force_update=force_update, + ): + if item_kwargs is None: + skipped += 1 continue - task_fix_journal_articles_status.apply_async( - kwargs={ - "username": username, - "user_id": user_id, - "journal_id": journal_id, - "mark_as_invalid": mark_as_invalid, - "mark_as_public": mark_as_public, - "mark_as_duplicated": mark_as_duplicated, - "deduplicate": deduplicate, - } - ) - journals_processed += 1 + logging.info(f"Dispatching article with kwargs: {item_kwargs}") + task_process_article_pipeline.delay(**item_kwargs, **common_kwargs) + dispatched += 1 return { "status": "success", - "journals_processed": journals_processed, - "operations": {k: v for k, v in operations.items() if v}, - "filters": { - "collections": collection_acron_list, - "journals": journal_acron_list, - }, + "dispatched": dispatched, + "skipped": skipped, } except Exception as e: @@ -975,386 +816,174 @@ def task_fix_article_status( exception=e, exc_traceback=exc_traceback, detail={ - "task": "task_fix_article_records_status", + "task": "task_dispatch_articles", "collection_acron_list": collection_acron_list, "journal_acron_list": journal_acron_list, - "operations": { - "mark_as_invalid": mark_as_invalid, - "mark_as_public": mark_as_public, - "mark_as_duplicated": mark_as_duplicated, - "deduplicate": deduplicate, - }, + "from_pub_year": from_pub_year, + "until_pub_year": until_pub_year, + "from_date": from_date, + "until_date": until_date, + "proc_status_list": proc_status_list, + "data_status_list": data_status_list, + "article_source_status_list": article_source_status_list, + "force_update": force_update, + "export_to_articlemeta": export_to_articlemeta, }, ) raise - @celery_app.task(bind=True) -def task_fix_journal_articles_status( +def task_process_article_pipeline( self, - username=None, - user_id=None, - journal_id=None, + # Entrada para fluxo A (XML URL → ArticleSource → PidProviderXML) + xml_url=None, collection_acron=None, - journal_acron=None, - mark_as_invalid=False, - mark_as_public=False, - mark_as_duplicated=False, - deduplicate=False, + pid=None, + source_date=None, + # Entrada para fluxo B (ArticleSource existente → PidProviderXML) + article_source_id=None, + # Entrada direta para etapa C (PidProviderXML → Article) + pp_xml_id=None, + # Controle do fluxo + export_to_articlemeta=False, + collection_acron_list=None, + force_update=None, + auto_solve_pid_conflict=None, + version=None, + user_id=None, + username=None, ): """ - Marca artigos com diferentes status para um periódico específico. + Pipeline principal de processamento de artigos com múltiplos pontos de entrada. - Processa artigos do periódico aplicando as marcações de status especificadas. - Cada operação de marcação é executada independentemente se habilitada. + Implementa um pipeline flexível que pode iniciar em diferentes estágios: + - Fluxo A: XML URL → ArticleSource → PidProviderXML → Article + - Fluxo B: ArticleSource existente → PidProviderXML → Article + - Fluxo C: PidProviderXML → Article (entrada direta) Args: self: Instância da tarefa Celery - username (str, optional): Nome do usuário executando a tarefa + xml_url (str, optional): URL do XML para fluxo A (requer collection_acron e pid) + collection_acron (str, optional): Acrônimo da coleção (obrigatório com xml_url) + pid (str, optional): PID do artigo (obrigatório com xml_url) + source_date (datetime, optional): Data da fonte para fluxo A + article_source_id (int, optional): ID do ArticleSource para fluxo B + pp_xml_id (int, optional): ID do PidProviderXML para fluxo C + export_to_articlemeta (bool): Se True, exporta para ArticleMeta após processamento + collection_acron_list (list, optional): Lista de coleções para exportação + force_update (bool, optional): Força reprocessamento mesmo se existir + auto_solve_pid_conflict (bool, optional): Resolve conflitos de PID automaticamente + version (str, optional): Versão específica a processar user_id (int, optional): ID do usuário executando a tarefa - journal_id (int, optional): ID do periódico (preferencial por performance) - journal_acron (str, optional): Acrônimo do periódico (alternativa ao journal_id) - mark_as_invalid (bool): Se True, marca artigos sem registro ativo como invalid - mark_as_public (bool): Se True, marca artigos como public - mark_as_duplicated (bool): Se True, marca artigos como duplicated - deduplicate (bool): Se True, marca artigos como deduplicated + username (str, optional): Nome do usuário executando a tarefa Returns: - dict: Resumo das operações realizadas - - Raises: - ValueError: Se nem journal_id nem journal_acron forem fornecidos + None Side Effects: - - Altera status de artigos no banco + - Cria/atualiza ArticleSource (fluxo A) + - Cria/atualiza PidProviderXML + - Cria/atualiza Article + - Verifica disponibilidade do artigo + - Exporta para ArticleMeta se solicitado - Registra UnexpectedEvent em caso de erro - - Pode executar múltiplas operações de marcação em sequência - """ - try: - # Validar que ao menos um identificador foi fornecido - if not journal_id and not journal_acron: - raise ValueError("Either journal_id or journal_acron must be provided") - - user = _get_user(self.request, username=username, user_id=user_id) - - # Buscar o periódico por ID ou acrônimo - if journal_acron and collection_acron: - journal_ids = Journal.get_ids( - [collection_acron], - [journal_acron], - ) - elif journal_id: - journal_ids = [journal_id] - else: - raise ValueError("Insufficient data to identify the journal") - - if Article.objects.filter(journal__id__in=journal_ids).count() == 0: - return { - "status": "no_articles", - "journal_id": journal_id, - "journal_acron": journal_acron, - "collection_acron": collection_acron, - } - journal_id = journal_ids[0] - if mark_as_invalid: - Article.mark_items_as_invalid(journal_id=journal_id) - - if mark_as_public: - Article.mark_items_as_public(journal_id=journal_id) - - if mark_as_duplicated or deduplicate: - Article.deduplicate_items(user, journal_id=journal_id, mark_as_duplicated=mark_as_duplicated, deduplicate=deduplicate) - - return { - "status": "success", - "journal_id": journal_id, - "journal_acron": journal_acron, - "collection_acron": collection_acron, - "operations_performed": { - "mark_as_invalid": mark_as_invalid, - "mark_as_public": mark_as_public, - "mark_as_duplicated": mark_as_duplicated, - "deduplicate": deduplicate, - }, - } + Raises: + ValueError: Se nenhum ponto de entrada válido for fornecido + Se xml_url fornecido sem collection_acron ou pid - except Exception as e: - exc_type, exc_value, exc_traceback = sys.exc_info() - UnexpectedEvent.create( - exception=e, - exc_traceback=exc_traceback, - detail={ - "task": "task_fix_journal_articles_status", - "journal_id": journal_id, - "journal_acron": journal_acron, - "collection_acron": collection_acron, - "operations": { - "mark_as_invalid": mark_as_invalid, - "mark_as_public": mark_as_public, - "mark_as_duplicated": mark_as_duplicated, - "deduplicate": deduplicate, - }, - }, + Examples: + # Fluxo completo a partir de URL + task_process_article_pipeline.delay( + xml_url="http://example.com/article.xml", + collection_acron="scl", + pid="S1234-56782024000100001", + export_to_articlemeta=True ) - raise + # A partir de ArticleSource existente + task_process_article_pipeline.delay( + article_source_id=123, + force_update=True + ) -@celery_app.task(bind=True) -def task_load_articles( - self, - username=None, - user_id=None, - collection_acron_list=None, - journal_acron_list=None, - articlemeta_export_enable=None, - from_pub_year=None, - until_pub_year=None, - from_updated_date=None, - until_updated_date=None, - proc_status_list=None, -): - """ - Task para carregar artigos de uma lista selecionada de periódicos. - Dispara subtasks para cada periódico encontrado. + # Entrada direta via PidProviderXML + task_process_article_pipeline.delay( + pp_xml_id=456, + export_to_articlemeta=True + ) """ try: user = _get_user(self.request, username=username, user_id=user_id) - - proc_status_list = proc_status_list or [ - PPXML_STATUS_TODO, - PPXML_STATUS_INVALID, - ] - # Construir filtros para os periódicos - items = Journal.get_journal_issns(collection_acron_list, journal_acron_list) - if items: - # Iterar pelos periódicos e disparar subtarefas - journals_processed = 0 - for journal_issns in items: - # Filtrar ISSNs válidos - issn_list = [issn for issn in journal_issns if issn] - - if not issn_list: # Só dispara task se houver ISSNs - continue - - task_load_journal_articles.delay( - username=username, - user_id=user_id, - issn_list=issn_list, - articlemeta_export_enable=articlemeta_export_enable, - from_pub_year=from_pub_year, - until_pub_year=until_pub_year, - from_updated_date=from_updated_date, - until_updated_date=until_updated_date, - proc_status_list=proc_status_list, + + if xml_url: + if not collection_acron: + raise ValueError("collection_acron is required when xml_url is provided") + if not pid: + raise ValueError("pid is required when xml_url is provided") + am_article = AMArticle.create_or_update( + pid, Collection.get(collection_acron), None, user + ) + if not am_article: + raise ValueError( + f"Failed to create or update AMArticle with pid: {pid} and collection: {collection_acron}" ) - journals_processed += 1 - - return { - "status": "success", - "journals_processed": journals_processed, - "filters": { - "collections": collection_acron_list, - "journals": journal_acron_list, - "from_pub_year": from_pub_year, - "until_pub_year": until_pub_year, - "from_updated_date": from_updated_date, - "until_updated_date": until_updated_date, - "proc_status_list": proc_status_list, - }, - } - else: - # Se não há filtros, processa todos os artigos - task_load_journal_articles.delay( - username=username, - user_id=user_id, - issn_list=None, - articlemeta_export_enable=articlemeta_export_enable, - from_pub_year=from_pub_year, - until_pub_year=until_pub_year, - from_updated_date=from_updated_date, - until_updated_date=until_updated_date, - proc_status_list=proc_status_list, + article_source = ArticleSource.create_or_update( + user=user, + url=xml_url, + source_date=source_date, + force_update=force_update, + am_article=am_article, + auto_solve_pid_conflict=auto_solve_pid_conflict, ) - return { - "status": "success", - "message": "Processing all articles without journal filters", - "filters": { - "from_pub_year": from_pub_year, - "until_pub_year": until_pub_year, - "from_updated_date": from_updated_date, - "until_updated_date": until_updated_date, - "proc_status_list": proc_status_list, - }, - } + pp_xml_id = article_source.pid_provider_xml.id + + if article_source_id: + article_source = ArticleSource.objects.get(id=article_source_id) + article_source.add_pid_provider( + user=user, + force_update=force_update, + auto_solve_pid_conflict=auto_solve_pid_conflict, + ) + pp_xml_id = article_source.pid_provider_xml.id - except Exception as e: - exc_type, exc_value, exc_traceback = sys.exc_info() - UnexpectedEvent.create( - exception=e, - exc_traceback=exc_traceback, - detail={ - "task": "task_load_articles", - "collection_acron_list": collection_acron_list, - "journal_acron_list": journal_acron_list, - "articlemeta_export_enable": articlemeta_export_enable, - "from_pub_year": from_pub_year, - "until_pub_year": until_pub_year, - "from_updated_date": from_updated_date, - "until_updated_date": until_updated_date, - "proc_status_list": proc_status_list, - }, - ) - raise + if not pp_xml_id: + raise ValueError( + "No valid entry point provided. Please provide either xml_url, " + "article_source_id, pp_xml_id or pid_v3." + ) + pp_xml = PidProviderXML.objects.select_related( + "current_version" + ).get(id=pp_xml_id) -@celery_app.task(bind=True) -def task_load_journal_articles( - self, - username=None, - user_id=None, - issn_list=None, - articlemeta_export_enable=False, - from_pub_year=None, - until_pub_year=None, - from_updated_date=None, - until_updated_date=None, - proc_status_list=None, -): - """ - Task para carregar artigos de um periódico específico. - Dispara subtasks para cada artigo encontrado. - """ - try: - user = _get_user(self.request, username=username, user_id=user_id) + article = load_article(user, pp_xml=pp_xml) + pp_xml.collections.set(article.collections) - proc_status_list = proc_status_list or [ - PPXML_STATUS_TODO, - PPXML_STATUS_INVALID, - ] - # Buscar os XMLs usando os ISSNs do periódico - items = PidProviderXML.get_queryset( - issn_list=issn_list, - from_pub_year=from_pub_year, - until_pub_year=until_pub_year, - from_updated_date=from_updated_date, - until_updated_date=until_updated_date, - proc_status_list=proc_status_list, - ) - if not items.exists(): - return { - "status": "success", - "articles_found": 0, - "message": "No articles found with the specified filters", - "filters": { - "issn_list": issn_list, - "from_pub_year": from_pub_year, - "until_pub_year": until_pub_year, - "from_updated_date": from_updated_date, - "until_updated_date": until_updated_date, - "proc_status_list": proc_status_list, - }, - } - # Contador de artigos processados - articles_processed = 0 - # Iterar sobre os itens e disparar tasks para cada artigo - for item in items.iterator(): - task_load_article_from_pp_xml.delay( - pp_xml_id=item.id, - user_id=user_id or user.id, - username=username or user.username, - articlemeta_export_enable=articlemeta_export_enable, + article.check_availability(user, force_update=export_to_articlemeta or force_update) + + if export_to_articlemeta: + task_export_article_to_articlemeta.delay( + pid_v3=article.pid_v3, + collection_acron_list=collection_acron_list, + force_update=force_update, + user_id=user.id, + username=user.username, ) - articles_processed += 1 - return { - "status": "success", - "articles_processed": articles_processed, - "operations": { - "articlemeta_export_enable": articlemeta_export_enable, - }, - "filters": { - "issn_list": issn_list, - "from_pub_year": from_pub_year, - "until_pub_year": until_pub_year, - "from_updated_date": from_updated_date, - "until_updated_date": until_updated_date, - "proc_status_list": proc_status_list, - }, - } - except Exception as e: exc_type, exc_value, exc_traceback = sys.exc_info() UnexpectedEvent.create( exception=e, exc_traceback=exc_traceback, detail={ - "task": "task_load_journal_articles", - "issn_list": issn_list, - "articlemeta_export_enable": articlemeta_export_enable, - "from_pub_year": from_pub_year, - "until_pub_year": until_pub_year, - "from_updated_date": from_updated_date, - "until_updated_date": until_updated_date, - "proc_status_list": proc_status_list, - }, - ) - raise - - -@celery_app.task(bind=True) -def task_check_article_availability( - self, - user_id=None, - username=None, - article_id=None, - collection_acron_list=None, - timeout=None, - is_activate=None, - force_update=False, -): - """ - Carrega um artigo específico a partir de um PidProviderXML. - - Processa o XML armazenado no PidProviderXML, cria/atualiza o Article - e opcionalmente exporta para ArticleMeta. - - Args: - self: Instância da tarefa Celery - pp_xml_id (int): ID do PidProviderXML a processar (obrigatório) - user_id (int, optional): ID do usuário executando a tarefa - username (str, optional): Nome do usuário executando a tarefa - articlemeta_export_enable (bool, optional): Exporta para ArticleMeta após carregar - - Returns: - None - - Side Effects: - - Cria/atualiza Article no banco - - Atualiza status do PidProviderXML para DONE - - Verifica disponibilidade do artigo - - Exporta para ArticleMeta se solicitado - - Registra UnexpectedEvent em caso de erro - - Notes: - - O XML é lido diretamente do arquivo armazenado no PidProviderXML - - A verificação de disponibilidade valida URLs e assets do artigo - """ - try: - user = _get_user(self.request, username, user_id) - article = Article.objects.get(id=article_id) - article.check_availability(user) - except Exception as exception: - exc_type, exc_value, exc_traceback = sys.exc_info() - UnexpectedEvent.create( - exception=exception, - exc_traceback=exc_traceback, - detail={ - "task": "article.tasks.task_check_article_availability", - "article_id": article_id, - "collection_acron_list": collection_acron_list, - "timeout": timeout, - "is_activate": is_activate, + "task": "article.tasks.task_process_article_pipeline", + "xml_url": xml_url, + "article_source_id": article_source_id, + "pp_xml_id": pp_xml_id, + "pid": pid, + "collection_acron": collection_acron, + "export_to_articlemeta": export_to_articlemeta, "force_update": force_update, }, - ) \ No newline at end of file + ) diff --git a/bigbang/tasks_scheduler.py b/bigbang/tasks_scheduler.py index 4c6ee8e82..d2905f3b5 100644 --- a/bigbang/tasks_scheduler.py +++ b/bigbang/tasks_scheduler.py @@ -31,6 +31,18 @@ def delete_outdated_tasks(task_list=None): "article.tasks.task_convert_xml_to_other_formats_for_articles", "article.tasks.convert_xml_to_other_formats", "article.tasks.task_load_article_from_xml_endpoint", + "article.tasks.task_select_articles_to_complete_data", + "article.tasks.task_select_articles_to_load_from_api", + "article.tasks.task_select_articles_to_load_from_collection_endpoint", + "article.tasks.task_select_articles_to_load_from_article_source", + "article.tasks.task_load_articles", + "article.tasks.task_load_journal_articles", + "article.tasks.task_load_article_from_xml_url", + "article.tasks.task_create_article_source", + "article.tasks.task_create_pid_provider_xml", + "article.tasks.task_fix_journal_articles_status", + "article.tasks.task_select_articles_to_export_to_articlemeta", + "issue.tasks.load_issue_from_article_meta", # Tarefas de Article sem namespace (legacy) "article_complete_data", @@ -47,6 +59,17 @@ def delete_outdated_tasks(task_list=None): "task_load_article_from_article_source", "task_mark_articles_as_deleted_without_pp_xml", "transfer_license_statements_fk_to_article_license", + "task_select_articles_to_complete_data", + "task_select_articles_to_load_from_api", + "task_select_articles_to_load_from_collection_endpoint", + "task_select_articles_to_load_from_article_source", + "task_load_articles", + "task_load_journal_articles", + "task_load_article_from_xml_url", + "task_create_article_source", + "task_create_pid_provider_xml", + "task_fix_journal_articles_status", + "task_select_articles_to_export_to_articlemeta", ] delete_tasks(task_list) @@ -61,18 +84,17 @@ def schedule_tasks(username): """ enabled = False + delete_outdated_tasks() + # Tarefas de Article mantidas - schedule_task_select_articles_to_complete_data(username, enabled) + schedule_task_dispatch_articles(username, enabled) schedule_task_export_articles_to_articlemeta(username, enabled) - schedule_task_select_articles_to_load_from_api(username, enabled) - schedule_task_select_articles_to_load_from_article_source(username, enabled) schedule_task_fix_article_status(username, enabled) - schedule_task_load_articles(username, enabled) # Tarefas de issue schedule_export_issue_to_articlemeta(username, enabled) schedule_export_issues_to_articlemeta(username, enabled) - schedule_load_issue_from_article_meta(username, enabled) + schedule_load_issue_from_articlemeta(username, enabled) # Tarefas de journal schedule_export_journal_to_articlemeta(username, enabled) @@ -87,42 +109,46 @@ def schedule_tasks(username): # Tarefas de bigbang schedule_bigbang_start(username, enabled) - schedule_bigbang_delete_outdated_tasks(username, enabled) # ============================================================================== # TAREFAS DE ARTICLE MANTIDAS # ============================================================================== -def schedule_task_select_articles_to_complete_data(username, enabled=False): +def schedule_task_dispatch_articles(username, enabled=False): """ - Agenda a tarefa de completar dados de artigos incompletos + Agenda a tarefa orquestradora de despacho de artigos para o pipeline. + Substitui as antigas tarefas de seleção (complete_data, load_from_api, + load_from_article_source, load_articles). """ schedule_task( - task="article.tasks.task_select_articles_to_complete_data", - name="article.tasks.task_select_articles_to_complete_data", + task="article.tasks.task_dispatch_articles", + name="article.tasks.task_dispatch_articles", kwargs=dict( - user_id=None, username=username, - collection_acron_list=[], - journal_acron_list=[], + user_id=None, + collection_acron_list=None, + journal_acron_list=None, from_pub_year=None, until_pub_year=None, + from_date=None, + until_date=None, force_update=False, - from_updated_date=None, - until_updated_date=None, - data_status_list=[], - valid=None, - pp_xml__isnull=True, - sps_pkg_name__isnull=True, - article_license__isnull=True, + export_to_articlemeta=False, + auto_solve_pid_conflict=False, + proc_status_list=None, + data_status_list=None, + limit=None, + timeout=None, + opac_url=None, + article_source_status_list=None, ), - description=_("Complete missing data for articles"), + description=_("Dispatch articles to processing pipeline"), priority=TASK_PRIORITY, enabled=enabled, run_once=False, day_of_week="*", - hour="6", + hour="2", minute="1", ) @@ -132,8 +158,8 @@ def schedule_task_export_articles_to_articlemeta(username, enabled=False): Agenda a tarefa de exportar artigos em lote para ArticleMeta """ schedule_task( - task="article.tasks.task_select_articles_to_export_to_articlemeta", - name="article.tasks.task_select_articles_to_export_to_articlemeta", + task="article.tasks.task_export_articles_to_articlemeta", + name="article.tasks.task_export_articles_to_articlemeta", kwargs=dict( collection_acron_list=[], issn=None, @@ -157,59 +183,8 @@ def schedule_task_export_articles_to_articlemeta(username, enabled=False): ) -def schedule_task_select_articles_to_load_from_api(username, enabled=False): - """ - Agenda a tarefa de carregar artigos de múltiplas coleções via API - """ - schedule_task( - task="article.tasks.task_select_articles_to_load_from_api", - name="article.tasks.task_select_articles_to_load_from_api", - kwargs=dict( - username=username, - user_id=None, - collection_acron_list=["scl"], - from_date="2024-01-01", - until_date="2024-02-31", - limit=100, - timeout=10, - force_update=False, - auto_solve_pid_conflict=False, - opac_url=None, - ), - description=_("Load articles from multiple collections via API"), - priority=TASK_PRIORITY, - enabled=enabled, - run_once=False, - day_of_week="*", - hour="3", - minute="1", - ) -def schedule_task_select_articles_to_load_from_article_source(username, enabled=False): - """ - Agenda a tarefa de processar ArticleSources pendentes - """ - schedule_task( - task="article.tasks.task_select_articles_to_load_from_article_source", - name="task_select_articles_to_load_from_article_source", - kwargs=dict( - username=username, - user_id=None, - from_date="2024-01-01", - until_date="2024-12-31", - force_update=False, - auto_solve_pid_conflict=False, - ), - description=_("Process pending ArticleSources"), - priority=TASK_PRIORITY, - enabled=enabled, - run_once=False, - day_of_week="*", - hour="*/6", - minute="30", - ) - def schedule_task_fix_article_status(username, enabled=False): """ @@ -241,35 +216,6 @@ def schedule_task_fix_article_status(username, enabled=False): ) -def schedule_task_load_articles(username, enabled=False): - """ - Agenda a tarefa de carregar artigos do PidProviderXML - """ - schedule_task( - task="article.tasks.task_load_articles", - name="article.tasks.task_load_articles", - kwargs=dict( - username=None, - user_id=None, - collection_acron_list=None, - journal_acron_list=None, - articlemeta_export_enable=None, - from_pub_year=None, - until_pub_year=None, - from_updated_date=None, - until_updated_date=None, - proc_status_list=None, - ), - description=_("Load articles from PidProviderXML"), - priority=TASK_PRIORITY, - enabled=enabled, - run_once=False, - day_of_week="*", - hour="2", - minute="1", - ) - - # ============================================================================== # TAREFAS DE BIGBANG # ============================================================================== @@ -462,24 +408,21 @@ def schedule_export_journal_to_articlemeta(username, enabled=False): # TAREFAS DE ISSUE # ============================================================================== -def schedule_load_issue_from_article_meta(username, enabled=False): +def schedule_load_issue_from_articlemeta(username, enabled=False): """ Agenda a tarefa de carregar issues do ArticleMeta """ schedule_task( - task="issue.tasks.task_load_issue_from_article_meta", - name="task_load_issue_from_article_meta", + task="issue.tasks.load_issue_from_articlemeta", + name="load_issue_from_articlemeta", kwargs=dict( user_id=None, username=username, - collection="scl", - issn_scielo="0034-8910", + collection_acron=None, from_date=None, until_date=None, - limit=None, - force_update=False, - timeout=None, - reset=None, + force_update=None, + timeout=30, ), description=_("Load issues from ArticleMeta"), priority=TASK_PRIORITY, diff --git a/collection/models.py b/collection/models.py index 038a2e9ea..73d8e5f94 100755 --- a/collection/models.py +++ b/collection/models.py @@ -1,4 +1,5 @@ import logging +from functools import cached_property from django.db import models from django.utils.translation import gettext_lazy as _ @@ -186,6 +187,13 @@ class Meta: ), ] + @cached_property + def base_url(self): + """Retorna o domain pronto para compor URLs, adicionando protocolo se ausente.""" + if self.domain and not self.domain.startswith(("http://", "https://")): + return f"https://{self.domain}" + return self.domain + @property def data(self): d = { diff --git a/core/home/migrations/0015_alter_formpage_thank_you_text.py b/core/home/migrations/0015_alter_formpage_thank_you_text.py new file mode 100644 index 000000000..4d4cca974 --- /dev/null +++ b/core/home/migrations/0015_alter_formpage_thank_you_text.py @@ -0,0 +1,22 @@ +# Generated by Django 5.2.7 on 2026-03-12 14:11 + +import wagtail.fields +from django.db import migrations + + +class Migration(migrations.Migration): + + dependencies = [ + ("home", "0014_homepagesponsor"), + ] + + operations = [ + migrations.AlterField( + model_name="formpage", + name="thank_you_text", + field=wagtail.fields.RichTextField( + blank=True, + help_text="Adicione a mensagem que será exibida após o envio do formulário.", + ), + ), + ] diff --git a/core/utils/harvesters.py b/core/utils/harvesters.py index da4a0353c..cf2026f66 100644 --- a/core/utils/harvesters.py +++ b/core/utils/harvesters.py @@ -17,7 +17,7 @@ def __init__( collection_acron: str, from_date: Optional[str] = None, until_date: Optional[str] = None, - limit: int = 1000, + limit: Optional[int] = None, timeout: int = 30, ): """ @@ -35,7 +35,7 @@ def __init__( self.collection_acron = collection_acron self.from_date = from_date or "1997-01-01" self.until_date = until_date or datetime.utcnow().isoformat()[:10] - self.limit = limit + self.limit = limit or 1000 self.timeout = timeout def harvest_documents(self) -> Generator[Dict[str, Any], None, None]: @@ -141,7 +141,7 @@ class OPACHarvester: def __init__( self, - domain: str = "www.scielo.br", + domain: str = "https://www.scielo.br", collection_acron: str = "scl", from_date: Optional[str] = None, until_date: Optional[str] = None, @@ -163,8 +163,8 @@ def __init__( self.collection_acron = collection_acron self.from_date = from_date or "2000-01-01" self.until_date = until_date or datetime.utcnow().isoformat()[:10] - self.limit = limit - self.timeout = timeout + self.limit = limit or 100 + self.timeout = timeout or 5 def harvest_documents(self) -> Generator[Dict[str, Any], None, None]: """ @@ -190,7 +190,7 @@ def harvest_documents(self) -> Generator[Dict[str, Any], None, None]: try: # Constrói URL url = ( - f"https://{self.domain}/api/v1/counter_dict?" + f"{self.domain}/api/v1/counter_dict?" f"end_date={self.until_date}&begin_date={self.from_date}" f"&limit={self.limit}&page={page}" ) @@ -220,7 +220,7 @@ def harvest_documents(self) -> Generator[Dict[str, Any], None, None]: # Constrói URL do XML journal_acron = item["journal_acronym"] - xml_url = f"https://{self.domain}/j/{journal_acron}/a/{pid_v3}/?format=xml" + xml_url = f"{self.domain}/j/{journal_acron}/a/{pid_v3}/?format=xml" # Extrai data de origem origin_date = self._parse_gmt_date( diff --git a/core/views.py b/core/views.py index 412b40789..73778af7e 100644 --- a/core/views.py +++ b/core/views.py @@ -1,8 +1,18 @@ from django.http import HttpResponseRedirect from wagtail_modeladmin.views import CreateView +from wagtail.snippets.views.snippets import SnippetViewSet class CommonControlFieldCreateView(CreateView): def form_valid(self, form): self.object = form.save_all(self.request.user) return HttpResponseRedirect(self.get_success_url()) + + +class CommonControlFieldViewSet(SnippetViewSet): + """ViewSet base com save_instance compartilhado""" + + def save_instance(self, instance, form, is_new): + if hasattr(form, 'save_all'): + return form.save_all(self.request.user) + return super().save_instance(instance, form, is_new) \ No newline at end of file diff --git a/index/7.7.3/core/conf/schema.xml b/index/7.7.3/core/conf/schema.xml index ddd7c97aa..b6844b027 100755 --- a/index/7.7.3/core/conf/schema.xml +++ b/index/7.7.3/core/conf/schema.xml @@ -152,12 +152,13 @@ + - +