diff --git a/hivemind_etl/mediawiki/activities.py b/hivemind_etl/mediawiki/activities.py index 69047b1..64f5060 100644 --- a/hivemind_etl/mediawiki/activities.py +++ b/hivemind_etl/mediawiki/activities.py @@ -58,11 +58,16 @@ async def extract_mediawiki(mediawiki_platform: dict[str, Any]) -> None: community_id = mediawiki_platform["community_id"] api_url = mediawiki_platform["base_url"] namespaces = mediawiki_platform["namespaces"] + platform_id = mediawiki_platform["platform_id"] logging.info( f"Starting extraction for community {community_id} with API URL: {api_url}" ) - mediawiki_etl = MediawikiETL(community_id=community_id, namespaces=namespaces) + mediawiki_etl = MediawikiETL( + community_id=community_id, + namespaces=namespaces, + platform_id=platform_id, + ) mediawiki_etl.extract(api_url=api_url) logging.info(f"Completed extraction for community {community_id}") except Exception as e: @@ -78,11 +83,16 @@ async def transform_mediawiki_data( """Transform the extracted MediaWiki data.""" community_id = mediawiki_platform["community_id"] + platform_id = mediawiki_platform["platform_id"] try: namespaces = mediawiki_platform["namespaces"] logging.info(f"Starting transformation for community {community_id}") - mediawiki_etl = MediawikiETL(community_id=community_id, namespaces=namespaces) + mediawiki_etl = MediawikiETL( + community_id=community_id, + namespaces=namespaces, + platform_id=platform_id, + ) result = mediawiki_etl.transform() logging.info(f"Completed transformation for community {community_id}") return result @@ -95,6 +105,7 @@ async def transform_mediawiki_data( async def load_mediawiki_data(mediawiki_platform: dict[str, Any]) -> None: """Load the transformed MediaWiki data into the database.""" community_id = mediawiki_platform["community_id"] + platform_id = mediawiki_platform["platform_id"] namespaces = mediawiki_platform["namespaces"] try: @@ -103,7 +114,11 @@ async def load_mediawiki_data(mediawiki_platform: dict[str, Any]) -> None: documents = [Document.from_dict(doc) for doc in documents_dict] logging.info(f"Starting data load for community {community_id}") - mediawiki_etl = MediawikiETL(community_id=community_id, namespaces=namespaces) + mediawiki_etl = MediawikiETL( + community_id=community_id, + namespaces=namespaces, + platform_id=platform_id, + ) mediawiki_etl.load(documents=documents) logging.info(f"Completed data load for community {community_id}") except Exception as e: diff --git a/hivemind_etl/mediawiki/etl.py b/hivemind_etl/mediawiki/etl.py index ee415c9..f42a6e6 100644 --- a/hivemind_etl/mediawiki/etl.py +++ b/hivemind_etl/mediawiki/etl.py @@ -13,9 +13,11 @@ def __init__( self, community_id: str, namespaces: list[int], + platform_id: str, delete_dump_after_load: bool = True, ) -> None: self.community_id = community_id + self.platform_id = platform_id self.proxy_url = os.getenv("PROXY_URL", "") if self.proxy_url: @@ -96,7 +98,7 @@ def transform(self) -> list[Document]: def load(self, documents: list[Document]) -> None: logging.info(f"Loading {len(documents)} documents into Qdrant!") ingestion_pipeline = CustomIngestionPipeline( - self.community_id, collection_name="mediawiki" + self.community_id, collection_name=self.platform_id ) ingestion_pipeline.run_pipeline(documents) logging.info(f"Loaded {len(documents)} documents into Qdrant!") diff --git a/hivemind_etl/mediawiki/module.py b/hivemind_etl/mediawiki/module.py index 3294e7b..49af004 100644 --- a/hivemind_etl/mediawiki/module.py +++ b/hivemind_etl/mediawiki/module.py @@ -28,7 +28,8 @@ def get_learning_platforms( example data output: ``` [{ - "community_id": "6579c364f1120850414e0dc5", + "platform_id": "xxxx", + "community_id": "xxxxxx", "base_url": "some_api_url", "namespaces": [1, 2, 3], }] @@ -87,6 +88,7 @@ def get_learning_platforms( communities_data.append( { + "platform_id": str(platform_id), "community_id": str(community), "namespaces": namespaces, "base_url": base_url + path, # type: ignore diff --git a/hivemind_etl/website/activities.py b/hivemind_etl/website/activities.py index 6c2d933..51786c8 100644 --- a/hivemind_etl/website/activities.py +++ b/hivemind_etl/website/activities.py @@ -43,13 +43,15 @@ async def get_hivemind_website_comminities( @activity.defn -async def extract_website(urls: list[str], community_id: str) -> list[dict]: +async def extract_website( + urls: list[str], community_id: str, platform_id: str +) -> list[dict]: """Extract data from website URLs.""" try: logging.info( - f"Starting extraction for community {community_id} with {len(urls)} URLs" + f"Starting extraction for community {community_id} | platform {platform_id} with {len(urls)} URLs" ) - website_etl = WebsiteETL(community_id=community_id) + website_etl = WebsiteETL(community_id=community_id, platform_id=platform_id) result = await website_etl.extract(urls=urls) logging.info(f"Completed extraction for community {community_id}") return result @@ -60,12 +62,14 @@ async def extract_website(urls: list[str], community_id: str) -> list[dict]: @activity.defn async def transform_website_data( - raw_data: list[dict], community_id: str + raw_data: list[dict], community_id: str, platform_id: str ) -> list[Document]: """Transform the extracted raw data.""" try: - logging.info(f"Starting transformation for community {community_id}") - website_etl = WebsiteETL(community_id=community_id) + logging.info( + f"Starting transformation for community {community_id} | platform {platform_id}" + ) + website_etl = WebsiteETL(community_id=community_id, platform_id=platform_id) result = website_etl.transform(raw_data=raw_data) logging.info(f"Completed transformation for community {community_id}") return result @@ -75,11 +79,15 @@ async def transform_website_data( @activity.defn -async def load_website_data(documents: list[Document], community_id: str) -> None: +async def load_website_data( + documents: list[Document], community_id: str, platform_id: str +) -> None: """Load the transformed data into the database.""" try: - logging.info(f"Starting data load for community {community_id}") - website_etl = WebsiteETL(community_id=community_id) + logging.info( + f"Starting data load for community {community_id} | platform {platform_id}" + ) + website_etl = WebsiteETL(community_id=community_id, platform_id=platform_id) website_etl.load(documents=documents) logging.info(f"Completed data load for community {community_id}") except Exception as e: diff --git a/hivemind_etl/website/module.py b/hivemind_etl/website/module.py index ed6ece9..c00ef81 100644 --- a/hivemind_etl/website/module.py +++ b/hivemind_etl/website/module.py @@ -29,8 +29,8 @@ def get_learning_platforms( example data output: ``` [{ - "community_id": "6579c364f1120850414e0dc5", - "platform_id": "6579c364f1120850414e0dc6", + "community_id": "xxxx", + "platform_id": "xxxxxxx", "urls": ["link1", "link2"], }] ``` diff --git a/hivemind_etl/website/website_etl.py b/hivemind_etl/website/website_etl.py index afc3e34..385b3c9 100644 --- a/hivemind_etl/website/website_etl.py +++ b/hivemind_etl/website/website_etl.py @@ -10,22 +10,27 @@ class WebsiteETL: def __init__( self, community_id: str, + platform_id: str, ) -> None: """ Parameters ----------- community_id : str the community to save its data + platform_id : str + the platform to save its data + + Note: the collection name would be `community_id_platform_id` """ if not community_id or not isinstance(community_id, str): raise ValueError("community_id must be a non-empty string") self.community_id = community_id - collection_name = "website" + self.platform_id = platform_id # preparing the ingestion pipeline self.ingestion_pipeline = CustomIngestionPipeline( - self.community_id, collection_name=collection_name + self.community_id, collection_name=self.platform_id ) async def extract( diff --git a/hivemind_etl/website/workflows.py b/hivemind_etl/website/workflows.py index 126e2dd..edc186c 100644 --- a/hivemind_etl/website/workflows.py +++ b/hivemind_etl/website/workflows.py @@ -29,7 +29,7 @@ async def run(self, community_info: dict) -> None: # Execute activities in sequence with retries raw_data = await workflow.execute_activity( extract_website, - args=[urls, community_id], + args=[urls, community_id, platform_id], start_to_close_timeout=timedelta(minutes=30), retry_policy=RetryPolicy( initial_interval=timedelta(seconds=10), @@ -40,7 +40,7 @@ async def run(self, community_info: dict) -> None: documents = await workflow.execute_activity( transform_website_data, - args=[raw_data, community_id], + args=[raw_data, community_id, platform_id], start_to_close_timeout=timedelta(minutes=10), retry_policy=RetryPolicy( initial_interval=timedelta(seconds=5), @@ -51,7 +51,7 @@ async def run(self, community_info: dict) -> None: await workflow.execute_activity( load_website_data, - args=[documents, community_id], + args=[documents, community_id, platform_id], start_to_close_timeout=timedelta(minutes=60), retry_policy=RetryPolicy( initial_interval=timedelta(seconds=5), diff --git a/tests/integration/test_mediawiki_modules.py b/tests/integration/test_mediawiki_modules.py index c60a308..2b69f45 100644 --- a/tests/integration/test_mediawiki_modules.py +++ b/tests/integration/test_mediawiki_modules.py @@ -67,6 +67,7 @@ def test_get_single_data(self): [0, 1, 2], ) self.assertEqual(result[0]["base_url"], "http://example.com/api") + self.assertEqual(result[0]["platform_id"], str(platform_id)) def test_get_mediawiki_communities_data_multiple_platforms(self): """ @@ -146,6 +147,7 @@ def test_get_mediawiki_communities_data_multiple_platforms(self): "community_id": str(community_id), "namespaces": [0, 1, 2], "base_url": "http://example1.com/api", + "platform_id": str(platform_id1), }, ) self.assertEqual( @@ -154,6 +156,7 @@ def test_get_mediawiki_communities_data_multiple_platforms(self): "community_id": str(community_id), "namespaces": [3, 4, 5], "base_url": "http://example2.com/api", + "platform_id": str(platform_id2), }, ) @@ -237,6 +240,7 @@ def test_get_mediawiki_communities_data_filtered_platforms(self): "community_id": str(community_id), "namespaces": [0, 1, 2], "base_url": "http://example1.com/api", + "platform_id": str(platform_id1), }, ) @@ -318,5 +322,6 @@ def test_get_mediawiki_communities_data_filtered_platforms_not_activated(self): "community_id": str(community_id), "namespaces": [3, 4, 5], "base_url": "http://example2.com/api", + "platform_id": str(platform_id2), }, ) diff --git a/tests/unit/test_mediawiki_etl.py b/tests/unit/test_mediawiki_etl.py index 277a32e..8f42e60 100644 --- a/tests/unit/test_mediawiki_etl.py +++ b/tests/unit/test_mediawiki_etl.py @@ -12,6 +12,7 @@ def setUp(self): self.community_id = "test_community" self.api_url = "https://example.com/api.php" self.custom_path = "custom/path" + self.platform_id = "test_platform" self.namespaces = [0, 1] # Main and Talk namespaces # Create a temporary dumps directory @@ -26,7 +27,11 @@ def tearDown(self): shutil.rmtree(self.custom_path) def test_mediawiki_etl_initialization(self): - etl = MediawikiETL(community_id=self.community_id, namespaces=self.namespaces) + etl = MediawikiETL( + community_id=self.community_id, + namespaces=self.namespaces, + platform_id=self.platform_id, + ) self.assertEqual(etl.community_id, self.community_id) self.assertTrue(etl.delete_dump_after_load) self.assertEqual(etl.dump_dir, f"dumps/{self.community_id}") @@ -35,12 +40,17 @@ def test_mediawiki_etl_initialization(self): community_id=self.community_id, namespaces=self.namespaces, delete_dump_after_load=False, + platform_id=self.platform_id, ) self.assertFalse(etl.delete_dump_after_load) def test_extract_with_default_path(self): # Create a ETL instance with mocked wikiteam_crawler - etl = MediawikiETL(community_id=self.community_id, namespaces=self.namespaces) + etl = MediawikiETL( + community_id=self.community_id, + namespaces=self.namespaces, + platform_id=self.platform_id, + ) etl.wikiteam_crawler = Mock() etl.extract(self.api_url) @@ -51,7 +61,11 @@ def test_extract_with_default_path(self): def test_extract_with_custom_path(self): # Create a ETL instance with mocked wikiteam_crawler - etl = MediawikiETL(community_id=self.community_id, namespaces=self.namespaces) + etl = MediawikiETL( + community_id=self.community_id, + namespaces=self.namespaces, + platform_id=self.platform_id, + ) etl.wikiteam_crawler = Mock() etl.extract(self.api_url, self.custom_path) @@ -63,7 +77,11 @@ def test_extract_with_custom_path(self): @patch("hivemind_etl.mediawiki.etl.parse_mediawiki_xml") def test_transform_success(self, mock_parse_mediawiki_xml): - etl = MediawikiETL(community_id=self.community_id, namespaces=self.namespaces) + etl = MediawikiETL( + community_id=self.community_id, + namespaces=self.namespaces, + platform_id=self.platform_id, + ) # Mock page data mock_page = Mock() @@ -98,7 +116,11 @@ def test_transform_success(self, mock_parse_mediawiki_xml): @patch("hivemind_etl.mediawiki.etl.logging") @patch("hivemind_etl.mediawiki.etl.parse_mediawiki_xml") def test_transform_error_handling(self, mock_parse_mediawiki_xml, mock_logging): - etl = MediawikiETL(community_id=self.community_id, namespaces=self.namespaces) + etl = MediawikiETL( + community_id=self.community_id, + namespaces=self.namespaces, + platform_id=self.platform_id, + ) # Mock page that will raise an exception mock_page = Mock() @@ -122,7 +144,11 @@ def get_attribute_error(*args, **kwargs): @patch("hivemind_etl.mediawiki.etl.CustomIngestionPipeline") def test_load_with_dump_deletion(self, mock_ingestion_pipeline_class): - etl = MediawikiETL(community_id=self.community_id, namespaces=self.namespaces) + etl = MediawikiETL( + community_id=self.community_id, + namespaces=self.namespaces, + platform_id=self.platform_id, + ) documents = [Document(text="Test content")] # Setup the mock @@ -138,7 +164,7 @@ def test_load_with_dump_deletion(self, mock_ingestion_pipeline_class): # Verify that methods were called correctly mock_ingestion_pipeline_class.assert_called_once_with( - self.community_id, collection_name="mediawiki" + self.community_id, collection_name=self.platform_id ) mock_pipeline.run_pipeline.assert_called_once_with(documents) self.assertFalse(os.path.exists(etl.dump_dir)) @@ -149,6 +175,7 @@ def test_load_without_dump_deletion(self, mock_ingestion_pipeline_class): community_id=self.community_id, namespaces=self.namespaces, delete_dump_after_load=False, + platform_id=self.platform_id, ) documents = [Document(text="Test content")] @@ -165,7 +192,7 @@ def test_load_without_dump_deletion(self, mock_ingestion_pipeline_class): # Verify that methods were called correctly mock_ingestion_pipeline_class.assert_called_once_with( - self.community_id, collection_name="mediawiki" + self.community_id, collection_name=self.platform_id ) mock_pipeline.run_pipeline.assert_called_once_with(documents) self.assertTrue(os.path.exists(etl.dump_dir)) diff --git a/tests/unit/test_website_etl.py b/tests/unit/test_website_etl.py index d00838b..a25d9e0 100644 --- a/tests/unit/test_website_etl.py +++ b/tests/unit/test_website_etl.py @@ -14,7 +14,8 @@ def setUp(self): """ load_dotenv() self.community_id = "test_community" - self.website_etl = WebsiteETL(self.community_id) + self.platform_id = "test_platform" + self.website_etl = WebsiteETL(self.community_id, self.platform_id) self.website_etl.crawlee_client = AsyncMock() self.website_etl.ingestion_pipeline = MagicMock()