diff --git a/hivemind_etl/mediawiki/activities.py b/hivemind_etl/mediawiki/activities.py index 3037411..eca39d0 100644 --- a/hivemind_etl/mediawiki/activities.py +++ b/hivemind_etl/mediawiki/activities.py @@ -72,11 +72,15 @@ async def extract_mediawiki(mediawiki_platform: dict[str, Any]) -> None: @activity.defn -async def transform_mediawiki_data(community_id: str) -> list[Document]: +async def transform_mediawiki_data(mediawiki_platform: dict[str, Any]) -> list[Document]: """Transform the extracted MediaWiki data.""" + + community_id = mediawiki_platform["community_id"] try: + namespaces = mediawiki_platform["namespaces"] + logging.info(f"Starting transformation for community {community_id}") - mediawiki_etl = MediawikiETL(community_id=community_id) + mediawiki_etl = MediawikiETL(community_id=community_id, namespaces=namespaces) result = mediawiki_etl.transform() logging.info(f"Completed transformation for community {community_id}") return result @@ -86,9 +90,12 @@ async def transform_mediawiki_data(community_id: str) -> list[Document]: @activity.defn -async def load_mediawiki_data(documents: list[Document], community_id: str) -> None: +async def load_mediawiki_data(mediawiki_platform: dict[str, Any]) -> None: """Load the transformed MediaWiki data into the database.""" + community_id = mediawiki_platform["community_id"] try: + documents = mediawiki_platform["documents"] + logging.info(f"Starting data load for community {community_id}") mediawiki_etl = MediawikiETL(community_id=community_id) mediawiki_etl.load(documents=documents) diff --git a/hivemind_etl/mediawiki/workflows.py b/hivemind_etl/mediawiki/workflows.py index 0d7bf87..002676d 100644 --- a/hivemind_etl/mediawiki/workflows.py +++ b/hivemind_etl/mediawiki/workflows.py @@ -59,7 +59,7 @@ async def run(self, platform_id: str | None = None) -> None: # Transform the extracted data documents = await workflow.execute_activity( transform_mediawiki_data, - platform["community_id"], + mediawiki_platform, start_to_close_timeout=timedelta(minutes=30), retry_policy=RetryPolicy( initial_interval=timedelta(minutes=1), @@ -67,11 +67,11 @@ async def run(self, platform_id: str | None = None) -> None: ), ) + mediawiki_platform["documents"] = documents # Load the transformed data await workflow.execute_activity( load_mediawiki_data, - documents, - platform["community_id"], + mediawiki_platform, start_to_close_timeout=timedelta(minutes=30), retry_policy=RetryPolicy( initial_interval=timedelta(minutes=1),