-
Notifications
You must be signed in to change notification settings - Fork 10
Aprimora fluxo de processamento de artigos e faz correcoes journal issue etc #1402
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
1d26085
49013bd
69631e4
ff49ab0
3398922
d556d84
337f979
79332dd
1242b12
ccacb0a
d059d82
f6a8147
fefde02
9cfa046
c1f49f7
0dfec26
51890a9
36baa09
59f95f8
231c72b
8bbd91a
acb98b6
3d7a862
21916e5
f64ad0e
662de15
17cc3ab
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -3,29 +3,21 @@ | |||||||||||
| import logging | ||||||||||||
| import sys | ||||||||||||
| import traceback | ||||||||||||
| from datetime import datetime | ||||||||||||
|
|
||||||||||||
| from django.db.models import Q | ||||||||||||
| from packtools.sps.formats.am import am | ||||||||||||
|
|
||||||||||||
| from article.sources.xmlsps import load_article | ||||||||||||
| from article.models import Article, ArticleExporter, ArticleFunding | ||||||||||||
| from article.choices import ( | ||||||||||||
| DATA_STATUS_DUPLICATED, | ||||||||||||
| DATA_STATUS_DEDUPLICATED, | ||||||||||||
| DATA_STATUS_PUBLIC, | ||||||||||||
| ) | ||||||||||||
| from article.models import Article, ArticleExporter, ArticleFunding, ArticleSource | ||||||||||||
| from article import choices | ||||||||||||
| from collection.models import Collection | ||||||||||||
| from core.mongodb import write_item | ||||||||||||
| from core.utils import date_utils | ||||||||||||
| from core.utils.harvesters import AMHarvester, OPACHarvester | ||||||||||||
| from institution.models import Sponsor | ||||||||||||
| from journal.models import Journal, SciELOJournal | ||||||||||||
| from journal.models import Journal | ||||||||||||
| from pid_provider.choices import ( | ||||||||||||
| PPXML_STATUS_TODO, | ||||||||||||
| PPXML_STATUS_DUPLICATED, | ||||||||||||
| PPXML_STATUS_DEDUPLICATED, | ||||||||||||
| PPXML_STATUS_INVALID, | ||||||||||||
| ) | ||||||||||||
| from pid_provider.models import PidProviderXML, XMLVersionXmlWithPreError | ||||||||||||
| from pid_provider.models import PidProviderXML | ||||||||||||
| from tracker.models import UnexpectedEvent | ||||||||||||
|
|
||||||||||||
|
|
||||||||||||
|
|
@@ -403,3 +395,207 @@ def bulk_export_articles_to_articlemeta( | |||||||||||
| }, | ||||||||||||
| ) | ||||||||||||
| raise | ||||||||||||
|
|
||||||||||||
|
|
||||||||||||
| class ArticleIteratorBuilder: | ||||||||||||
| """ | ||||||||||||
| Monta e encadeia iteradores de seleção de artigos para despacho ao pipeline. | ||||||||||||
|
|
||||||||||||
| Cada método ``_iter_from_*`` é um gerador que yields kwargs prontos para | ||||||||||||
| ``task_process_article_pipeline``. Os iteradores ativos são determinados | ||||||||||||
| pelos argumentos exclusivos presentes na instância — múltiplos podem estar | ||||||||||||
| ativos simultaneamente. | ||||||||||||
|
|
||||||||||||
| Argumentos exclusivos e seus iteradores: | ||||||||||||
|
|
||||||||||||
| ========================= ================================================ | ||||||||||||
| Argumento exclusivo Iterador ativado | ||||||||||||
| ========================= ================================================ | ||||||||||||
| proc_status_list _iter_from_pid_provider | ||||||||||||
| data_status_list _iter_from_article | ||||||||||||
| limit / timeout / opac_url _iter_from_harvest | ||||||||||||
| article_source_status_list _iter_from_article_source | ||||||||||||
| (nenhum) _iter_from_pid_provider (padrão) | ||||||||||||
| ========================= ================================================ | ||||||||||||
|
|
||||||||||||
| Usage:: | ||||||||||||
|
|
||||||||||||
| it = ArticleIteratorBuilder( | ||||||||||||
| user=user, | ||||||||||||
| collection_acron_list=["scl"], | ||||||||||||
| proc_status_list=["todo"], | ||||||||||||
| data_status_list=["invalid"], | ||||||||||||
| ) | ||||||||||||
| for kwargs in it: | ||||||||||||
| task_process_article_pipeline.delay(**kwargs) | ||||||||||||
| """ | ||||||||||||
|
|
||||||||||||
| def __init__( | ||||||||||||
| self, | ||||||||||||
| user, | ||||||||||||
| collection_acron_list=None, | ||||||||||||
| journal_acron_list=None, | ||||||||||||
| from_pub_year=None, | ||||||||||||
| until_pub_year=None, | ||||||||||||
| from_date=None, | ||||||||||||
| until_date=None, | ||||||||||||
| proc_status_list=None, | ||||||||||||
| data_status_list=None, | ||||||||||||
| article_source_status_list=None, | ||||||||||||
| limit=None, | ||||||||||||
| timeout=None, | ||||||||||||
| opac_url=None, | ||||||||||||
| force_update=None, | ||||||||||||
| ): | ||||||||||||
| self.user = user | ||||||||||||
| self.collection_acron_list = collection_acron_list | ||||||||||||
| self.journal_acron_list = journal_acron_list | ||||||||||||
| self.from_pub_year = from_pub_year | ||||||||||||
| self.until_pub_year = until_pub_year | ||||||||||||
| self.from_date = from_date | ||||||||||||
| self.until_date = until_date | ||||||||||||
| self.proc_status_list = proc_status_list | ||||||||||||
| self.data_status_list = data_status_list | ||||||||||||
| self.article_source_status_list = article_source_status_list | ||||||||||||
| self.limit = limit | ||||||||||||
| self.timeout = timeout | ||||||||||||
| self.opac_url = opac_url | ||||||||||||
| self.force_update = force_update | ||||||||||||
|
|
||||||||||||
| self._iter_from_harvest_count = 0 | ||||||||||||
| self._iter_from_article_source_count = 0 | ||||||||||||
| self._iter_from_pid_provider_count = 0 | ||||||||||||
| self._iter_from_article_count = 0 | ||||||||||||
|
|
||||||||||||
| def __iter__(self): | ||||||||||||
| yield from self._iter_from_harvest() | ||||||||||||
| yield from self._iter_from_article_source() | ||||||||||||
| yield from self._iter_from_pid_provider() | ||||||||||||
| yield from self._iter_from_article() | ||||||||||||
|
|
||||||||||||
| logging.info(f"Iterators summary: harvest={self._iter_from_harvest_count}, " | ||||||||||||
| f"article_source={self._iter_from_article_source_count}, " | ||||||||||||
| f"pid_provider={self._iter_from_pid_provider_count}, " | ||||||||||||
| f"article={self._iter_from_article_count}") | ||||||||||||
|
|
||||||||||||
| # ------------------------------------------------------------------ | ||||||||||||
| # Iteradores de seleção | ||||||||||||
| # ------------------------------------------------------------------ | ||||||||||||
|
|
||||||||||||
| def _iter_from_pid_provider(self): | ||||||||||||
| """Itera PidProviderXML filtrados por periódico, data e status.""" | ||||||||||||
| journal_issn_groups = ( | ||||||||||||
| Journal.get_journal_issns(self.collection_acron_list, self.journal_acron_list) | ||||||||||||
| or [None] | ||||||||||||
| ) | ||||||||||||
| for journal_issns in journal_issn_groups: | ||||||||||||
| issn_list = [i for i in journal_issns if i] if journal_issns else None | ||||||||||||
| if journal_issns and not issn_list: | ||||||||||||
| continue | ||||||||||||
| qs = PidProviderXML.get_queryset( | ||||||||||||
| issn_list=issn_list, | ||||||||||||
| from_pub_year=self.from_pub_year, | ||||||||||||
| until_pub_year=self.until_pub_year, | ||||||||||||
| from_updated_date=self.from_date, | ||||||||||||
| until_updated_date=self.until_date, | ||||||||||||
| proc_status_list=self.proc_status_list or [PPXML_STATUS_TODO, PPXML_STATUS_INVALID], | ||||||||||||
| ) | ||||||||||||
| self._iter_from_pid_provider_count += qs.count() | ||||||||||||
| for item in qs.iterator(): | ||||||||||||
| yield {"pp_xml_id": item.id} | ||||||||||||
| logging.info(f"_iter_from_pid_provider: yielded {self._iter_from_pid_provider_count} items") | ||||||||||||
|
|
||||||||||||
| def _iter_from_article(self): | ||||||||||||
| """ | ||||||||||||
| Itera Articles filtrados por data_status. | ||||||||||||
| Yields None para artigos sem pp_xml recuperável (sinaliza skip). | ||||||||||||
| """ | ||||||||||||
| filters = { | ||||||||||||
| "data_status__in": self.data_status_list or [ | ||||||||||||
| choices.DATA_STATUS_PENDING, | ||||||||||||
| choices.DATA_STATUS_UNDEF, | ||||||||||||
| choices.DATA_STATUS_INVALID, | ||||||||||||
| ] | ||||||||||||
| } | ||||||||||||
| journal_id_list = Journal.get_ids( | ||||||||||||
| collection_acron_list=self.collection_acron_list, | ||||||||||||
| journal_acron_list=self.journal_acron_list, | ||||||||||||
| ) | ||||||||||||
| if journal_id_list: | ||||||||||||
| filters["journal__in"] = journal_id_list | ||||||||||||
| if self.from_pub_year: | ||||||||||||
| filters["pub_year__gte"] = self.from_pub_year | ||||||||||||
| if self.until_pub_year: | ||||||||||||
| filters["pub_year__lte"] = self.until_pub_year | ||||||||||||
| if self.from_date: | ||||||||||||
| filters["updated__gte"] = self.from_date | ||||||||||||
| if self.until_date: | ||||||||||||
| filters["updated__lte"] = self.until_date | ||||||||||||
|
|
||||||||||||
| articles = Article.objects.filter(**filters) | ||||||||||||
| self._iter_from_article_count += articles.count() | ||||||||||||
| for article in articles.iterator(): | ||||||||||||
|
Comment on lines
+501
to
+537
|
||||||||||||
| if not article.pp_xml: | ||||||||||||
| try: | ||||||||||||
| article.pp_xml = PidProviderXML.get_by_pid_v3(pid_v3=article.pid_v3) | ||||||||||||
| article.save(update_fields=["pp_xml"]) | ||||||||||||
| except Exception as e: | ||||||||||||
| logging.error(f"pp_xml not found for article {article.id}: {e}") | ||||||||||||
| yield None | ||||||||||||
| continue | ||||||||||||
| yield {"pp_xml_id": article.pp_xml.id} | ||||||||||||
| logging.info(f"_iter_from_article: yielded {self._iter_from_article_count} articles") | ||||||||||||
|
|
||||||||||||
| def _iter_from_harvest(self): | ||||||||||||
| """Itera documentos coletados via OPAC ou ArticleMeta.""" | ||||||||||||
|
|
||||||||||||
| if Collection.objects.count() == 0: | ||||||||||||
| Collection.load(self.user) | ||||||||||||
|
|
||||||||||||
| count = 0 | ||||||||||||
| for collection_acron in self.collection_acron_list or list(Collection.get_acronyms()): | ||||||||||||
| logging.info(collection_acron) | ||||||||||||
| harvester = self._build_harvester(collection_acron) | ||||||||||||
| logging.info(harvester) | ||||||||||||
| for document in harvester.harvest_documents(): | ||||||||||||
| count += 1 | ||||||||||||
| yield { | ||||||||||||
| "xml_url": document["url"], | ||||||||||||
| "collection_acron": collection_acron, | ||||||||||||
| "pid": document["pid_v2"], | ||||||||||||
| "source_date": document.get("processing_date") or document.get("origin_date"), | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| self._iter_from_harvest_count = count | ||||||||||||
| logging.info(f"Harvest iterator yielded {count} documents") | ||||||||||||
|
|
||||||||||||
| def _iter_from_article_source(self): | ||||||||||||
| """Itera ArticleSources pendentes ou com erro.""" | ||||||||||||
| count = 0 | ||||||||||||
| for article_source in ArticleSource.get_queryset_to_complete_data( | ||||||||||||
| self.from_date, | ||||||||||||
| self.until_date, | ||||||||||||
| self.force_update, | ||||||||||||
| self.article_source_status_list, | ||||||||||||
| ): | ||||||||||||
| count += 1 | ||||||||||||
| yield {"article_source_id": article_source.id} | ||||||||||||
| self._iter_from_article_source_count += count | ||||||||||||
| logging.info(f"ArticleSource iterator yielded {count} items") | ||||||||||||
|
|
||||||||||||
| # ------------------------------------------------------------------ | ||||||||||||
| # Helpers privados | ||||||||||||
| # ------------------------------------------------------------------ | ||||||||||||
|
|
||||||||||||
| def _build_harvester(self, collection_acron): | ||||||||||||
| """Instancia o harvester adequado para a coleção.""" | ||||||||||||
| kwargs = dict( | ||||||||||||
| from_date=self.from_date, | ||||||||||||
| until_date=self.until_date, | ||||||||||||
| limit=self.limit, | ||||||||||||
| timeout=self.timeout, | ||||||||||||
| ) | ||||||||||||
| if collection_acron == "scl": | ||||||||||||
| return OPACHarvester(self.opac_url or "www.scielo.br", collection_acron, **kwargs) | ||||||||||||
|
||||||||||||
| return OPACHarvester(self.opac_url or "www.scielo.br", collection_acron, **kwargs) | |
| domain = self.opac_url or "https://www.scielo.br" | |
| if not domain.startswith(("http://", "https://")): | |
| domain = f"https://{domain}" | |
| return OPACHarvester(domain, collection_acron, **kwargs) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,32 @@ | ||
| # Generated by Django 5.2.7 on 2026-03-01 15:15 | ||
|
|
||
| from django.db import migrations, models | ||
|
|
||
|
|
||
| class Migration(migrations.Migration): | ||
|
|
||
| dependencies = [ | ||
| ("article", "0047_articleaffiliation_normalized_and_more"), | ||
| ] | ||
|
|
||
| operations = [ | ||
| migrations.AlterField( | ||
| model_name="articlesource", | ||
| name="status", | ||
| field=models.CharField( | ||
| choices=[ | ||
| ("pending", "Pending"), | ||
| ("processing", "Processing"), | ||
| ("completed", "Completed"), | ||
| ("error", "Error"), | ||
| ("reprocess", "Reprocess"), | ||
| ("url_error", "URL Error"), | ||
| ("xml_error", "XML Error"), | ||
| ], | ||
| default="pending", | ||
| help_text="Processing status of the article source", | ||
| max_length=20, | ||
| verbose_name="Status", | ||
| ), | ||
| ), | ||
| ] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ArticleIteratorBuilder.__iter__unconditionally yields from all four iterators. This contradicts the class docstring (which says iterators are activated by “exclusive” args) and means every dispatch will also harvest all collections (and also iterate PidProviderXML + Article + ArticleSource), potentially duplicating work and massively increasing load. Make iterator selection conditional so only the requested sources run (and default to only_iter_from_pid_providerwhen no exclusive args are provided).