Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions hivemind_etl/mediawiki/activities.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,16 @@ async def extract_mediawiki(mediawiki_platform: dict[str, Any]) -> None:
community_id = mediawiki_platform["community_id"]
api_url = mediawiki_platform["base_url"]
namespaces = mediawiki_platform["namespaces"]
platform_id = mediawiki_platform["platform_id"]

logging.info(
f"Starting extraction for community {community_id} with API URL: {api_url}"
)
mediawiki_etl = MediawikiETL(community_id=community_id, namespaces=namespaces)
mediawiki_etl = MediawikiETL(
community_id=community_id,
namespaces=namespaces,
platform_id=platform_id,
)
mediawiki_etl.extract(api_url=api_url)
logging.info(f"Completed extraction for community {community_id}")
except Exception as e:
Expand All @@ -78,11 +83,16 @@ async def transform_mediawiki_data(
"""Transform the extracted MediaWiki data."""

community_id = mediawiki_platform["community_id"]
platform_id = mediawiki_platform["platform_id"]
try:
namespaces = mediawiki_platform["namespaces"]

logging.info(f"Starting transformation for community {community_id}")
mediawiki_etl = MediawikiETL(community_id=community_id, namespaces=namespaces)
mediawiki_etl = MediawikiETL(
community_id=community_id,
namespaces=namespaces,
platform_id=platform_id,
)
result = mediawiki_etl.transform()
logging.info(f"Completed transformation for community {community_id}")
return result
Expand All @@ -95,6 +105,7 @@ async def transform_mediawiki_data(
async def load_mediawiki_data(mediawiki_platform: dict[str, Any]) -> None:
"""Load the transformed MediaWiki data into the database."""
community_id = mediawiki_platform["community_id"]
platform_id = mediawiki_platform["platform_id"]
namespaces = mediawiki_platform["namespaces"]

try:
Expand All @@ -103,7 +114,11 @@ async def load_mediawiki_data(mediawiki_platform: dict[str, Any]) -> None:
documents = [Document.from_dict(doc) for doc in documents_dict]

logging.info(f"Starting data load for community {community_id}")
mediawiki_etl = MediawikiETL(community_id=community_id, namespaces=namespaces)
mediawiki_etl = MediawikiETL(
community_id=community_id,
namespaces=namespaces,
platform_id=platform_id,
)
mediawiki_etl.load(documents=documents)
logging.info(f"Completed data load for community {community_id}")
except Exception as e:
Expand Down
4 changes: 3 additions & 1 deletion hivemind_etl/mediawiki/etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ def __init__(
self,
community_id: str,
namespaces: list[int],
platform_id: str,
delete_dump_after_load: bool = True,
) -> None:
self.community_id = community_id
self.platform_id = platform_id

self.proxy_url = os.getenv("PROXY_URL", "")
if self.proxy_url:
Expand Down Expand Up @@ -96,7 +98,7 @@ def transform(self) -> list[Document]:
def load(self, documents: list[Document]) -> None:
logging.info(f"Loading {len(documents)} documents into Qdrant!")
ingestion_pipeline = CustomIngestionPipeline(
self.community_id, collection_name="mediawiki"
self.community_id, collection_name=self.platform_id
)
ingestion_pipeline.run_pipeline(documents)
logging.info(f"Loaded {len(documents)} documents into Qdrant!")
Expand Down
4 changes: 3 additions & 1 deletion hivemind_etl/mediawiki/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ def get_learning_platforms(
example data output:
```
[{
"community_id": "6579c364f1120850414e0dc5",
"platform_id": "xxxx",
"community_id": "xxxxxx",
"base_url": "some_api_url",
"namespaces": [1, 2, 3],
}]
Expand Down Expand Up @@ -87,6 +88,7 @@ def get_learning_platforms(

communities_data.append(
{
"platform_id": str(platform_id),
"community_id": str(community),
"namespaces": namespaces,
"base_url": base_url + path, # type: ignore
Expand Down
26 changes: 17 additions & 9 deletions hivemind_etl/website/activities.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,15 @@ async def get_hivemind_website_comminities(


@activity.defn
async def extract_website(urls: list[str], community_id: str) -> list[dict]:
async def extract_website(
urls: list[str], community_id: str, platform_id: str
) -> list[dict]:
"""Extract data from website URLs."""
try:
logging.info(
f"Starting extraction for community {community_id} with {len(urls)} URLs"
f"Starting extraction for community {community_id} | platform {platform_id} with {len(urls)} URLs"
)
website_etl = WebsiteETL(community_id=community_id)
website_etl = WebsiteETL(community_id=community_id, platform_id=platform_id)
result = await website_etl.extract(urls=urls)
logging.info(f"Completed extraction for community {community_id}")
return result
Expand All @@ -60,12 +62,14 @@ async def extract_website(urls: list[str], community_id: str) -> list[dict]:

@activity.defn
async def transform_website_data(
raw_data: list[dict], community_id: str
raw_data: list[dict], community_id: str, platform_id: str
) -> list[Document]:
"""Transform the extracted raw data."""
try:
logging.info(f"Starting transformation for community {community_id}")
website_etl = WebsiteETL(community_id=community_id)
logging.info(
f"Starting transformation for community {community_id} | platform {platform_id}"
)
website_etl = WebsiteETL(community_id=community_id, platform_id=platform_id)
result = website_etl.transform(raw_data=raw_data)
logging.info(f"Completed transformation for community {community_id}")
return result
Expand All @@ -75,11 +79,15 @@ async def transform_website_data(


@activity.defn
async def load_website_data(documents: list[Document], community_id: str) -> None:
async def load_website_data(
documents: list[Document], community_id: str, platform_id: str
) -> None:
"""Load the transformed data into the database."""
try:
logging.info(f"Starting data load for community {community_id}")
website_etl = WebsiteETL(community_id=community_id)
logging.info(
f"Starting data load for community {community_id} | platform {platform_id}"
)
website_etl = WebsiteETL(community_id=community_id, platform_id=platform_id)
website_etl.load(documents=documents)
logging.info(f"Completed data load for community {community_id}")
except Exception as e:
Expand Down
4 changes: 2 additions & 2 deletions hivemind_etl/website/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ def get_learning_platforms(
example data output:
```
[{
"community_id": "6579c364f1120850414e0dc5",
"platform_id": "6579c364f1120850414e0dc6",
"community_id": "xxxx",
"platform_id": "xxxxxxx",
"urls": ["link1", "link2"],
}]
```
Expand Down
9 changes: 7 additions & 2 deletions hivemind_etl/website/website_etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,27 @@ class WebsiteETL:
def __init__(
self,
community_id: str,
platform_id: str,
) -> None:
"""
Parameters
-----------
community_id : str
the community to save its data
platform_id : str
the platform to save its data

Note: the collection name would be `community_id_platform_id`
"""
if not community_id or not isinstance(community_id, str):
raise ValueError("community_id must be a non-empty string")

self.community_id = community_id
collection_name = "website"
self.platform_id = platform_id

# preparing the ingestion pipeline
self.ingestion_pipeline = CustomIngestionPipeline(
self.community_id, collection_name=collection_name
self.community_id, collection_name=self.platform_id
)

async def extract(
Expand Down
6 changes: 3 additions & 3 deletions hivemind_etl/website/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ async def run(self, community_info: dict) -> None:
# Execute activities in sequence with retries
raw_data = await workflow.execute_activity(
extract_website,
args=[urls, community_id],
args=[urls, community_id, platform_id],
start_to_close_timeout=timedelta(minutes=30),
retry_policy=RetryPolicy(
initial_interval=timedelta(seconds=10),
Expand All @@ -40,7 +40,7 @@ async def run(self, community_info: dict) -> None:

documents = await workflow.execute_activity(
transform_website_data,
args=[raw_data, community_id],
args=[raw_data, community_id, platform_id],
start_to_close_timeout=timedelta(minutes=10),
retry_policy=RetryPolicy(
initial_interval=timedelta(seconds=5),
Expand All @@ -51,7 +51,7 @@ async def run(self, community_info: dict) -> None:

await workflow.execute_activity(
load_website_data,
args=[documents, community_id],
args=[documents, community_id, platform_id],
start_to_close_timeout=timedelta(minutes=60),
retry_policy=RetryPolicy(
initial_interval=timedelta(seconds=5),
Expand Down
5 changes: 5 additions & 0 deletions tests/integration/test_mediawiki_modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def test_get_single_data(self):
[0, 1, 2],
)
self.assertEqual(result[0]["base_url"], "http://example.com/api")
self.assertEqual(result[0]["platform_id"], str(platform_id))

def test_get_mediawiki_communities_data_multiple_platforms(self):
"""
Expand Down Expand Up @@ -146,6 +147,7 @@ def test_get_mediawiki_communities_data_multiple_platforms(self):
"community_id": str(community_id),
"namespaces": [0, 1, 2],
"base_url": "http://example1.com/api",
"platform_id": str(platform_id1),
},
)
self.assertEqual(
Expand All @@ -154,6 +156,7 @@ def test_get_mediawiki_communities_data_multiple_platforms(self):
"community_id": str(community_id),
"namespaces": [3, 4, 5],
"base_url": "http://example2.com/api",
"platform_id": str(platform_id2),
},
)

Expand Down Expand Up @@ -237,6 +240,7 @@ def test_get_mediawiki_communities_data_filtered_platforms(self):
"community_id": str(community_id),
"namespaces": [0, 1, 2],
"base_url": "http://example1.com/api",
"platform_id": str(platform_id1),
},
)

Expand Down Expand Up @@ -318,5 +322,6 @@ def test_get_mediawiki_communities_data_filtered_platforms_not_activated(self):
"community_id": str(community_id),
"namespaces": [3, 4, 5],
"base_url": "http://example2.com/api",
"platform_id": str(platform_id2),
},
)
43 changes: 35 additions & 8 deletions tests/unit/test_mediawiki_etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ def setUp(self):
self.community_id = "test_community"
self.api_url = "https://example.com/api.php"
self.custom_path = "custom/path"
self.platform_id = "test_platform"
self.namespaces = [0, 1] # Main and Talk namespaces

# Create a temporary dumps directory
Expand All @@ -26,7 +27,11 @@ def tearDown(self):
shutil.rmtree(self.custom_path)

def test_mediawiki_etl_initialization(self):
etl = MediawikiETL(community_id=self.community_id, namespaces=self.namespaces)
etl = MediawikiETL(
community_id=self.community_id,
namespaces=self.namespaces,
platform_id=self.platform_id,
)
self.assertEqual(etl.community_id, self.community_id)
self.assertTrue(etl.delete_dump_after_load)
self.assertEqual(etl.dump_dir, f"dumps/{self.community_id}")
Expand All @@ -35,12 +40,17 @@ def test_mediawiki_etl_initialization(self):
community_id=self.community_id,
namespaces=self.namespaces,
delete_dump_after_load=False,
platform_id=self.platform_id,
)
self.assertFalse(etl.delete_dump_after_load)

def test_extract_with_default_path(self):
# Create a ETL instance with mocked wikiteam_crawler
etl = MediawikiETL(community_id=self.community_id, namespaces=self.namespaces)
etl = MediawikiETL(
community_id=self.community_id,
namespaces=self.namespaces,
platform_id=self.platform_id,
)
etl.wikiteam_crawler = Mock()

etl.extract(self.api_url)
Expand All @@ -51,7 +61,11 @@ def test_extract_with_default_path(self):

def test_extract_with_custom_path(self):
# Create a ETL instance with mocked wikiteam_crawler
etl = MediawikiETL(community_id=self.community_id, namespaces=self.namespaces)
etl = MediawikiETL(
community_id=self.community_id,
namespaces=self.namespaces,
platform_id=self.platform_id,
)
etl.wikiteam_crawler = Mock()

etl.extract(self.api_url, self.custom_path)
Expand All @@ -63,7 +77,11 @@ def test_extract_with_custom_path(self):

@patch("hivemind_etl.mediawiki.etl.parse_mediawiki_xml")
def test_transform_success(self, mock_parse_mediawiki_xml):
etl = MediawikiETL(community_id=self.community_id, namespaces=self.namespaces)
etl = MediawikiETL(
community_id=self.community_id,
namespaces=self.namespaces,
platform_id=self.platform_id,
)

# Mock page data
mock_page = Mock()
Expand Down Expand Up @@ -98,7 +116,11 @@ def test_transform_success(self, mock_parse_mediawiki_xml):
@patch("hivemind_etl.mediawiki.etl.logging")
@patch("hivemind_etl.mediawiki.etl.parse_mediawiki_xml")
def test_transform_error_handling(self, mock_parse_mediawiki_xml, mock_logging):
etl = MediawikiETL(community_id=self.community_id, namespaces=self.namespaces)
etl = MediawikiETL(
community_id=self.community_id,
namespaces=self.namespaces,
platform_id=self.platform_id,
)

# Mock page that will raise an exception
mock_page = Mock()
Expand All @@ -122,7 +144,11 @@ def get_attribute_error(*args, **kwargs):

@patch("hivemind_etl.mediawiki.etl.CustomIngestionPipeline")
def test_load_with_dump_deletion(self, mock_ingestion_pipeline_class):
etl = MediawikiETL(community_id=self.community_id, namespaces=self.namespaces)
etl = MediawikiETL(
community_id=self.community_id,
namespaces=self.namespaces,
platform_id=self.platform_id,
)
documents = [Document(text="Test content")]

# Setup the mock
Expand All @@ -138,7 +164,7 @@ def test_load_with_dump_deletion(self, mock_ingestion_pipeline_class):

# Verify that methods were called correctly
mock_ingestion_pipeline_class.assert_called_once_with(
self.community_id, collection_name="mediawiki"
self.community_id, collection_name=self.platform_id
)
mock_pipeline.run_pipeline.assert_called_once_with(documents)
self.assertFalse(os.path.exists(etl.dump_dir))
Expand All @@ -149,6 +175,7 @@ def test_load_without_dump_deletion(self, mock_ingestion_pipeline_class):
community_id=self.community_id,
namespaces=self.namespaces,
delete_dump_after_load=False,
platform_id=self.platform_id,
)
documents = [Document(text="Test content")]

Expand All @@ -165,7 +192,7 @@ def test_load_without_dump_deletion(self, mock_ingestion_pipeline_class):

# Verify that methods were called correctly
mock_ingestion_pipeline_class.assert_called_once_with(
self.community_id, collection_name="mediawiki"
self.community_id, collection_name=self.platform_id
)
mock_pipeline.run_pipeline.assert_called_once_with(documents)
self.assertTrue(os.path.exists(etl.dump_dir))
3 changes: 2 additions & 1 deletion tests/unit/test_website_etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ def setUp(self):
"""
load_dotenv()
self.community_id = "test_community"
self.website_etl = WebsiteETL(self.community_id)
self.platform_id = "test_platform"
self.website_etl = WebsiteETL(self.community_id, self.platform_id)
self.website_etl.crawlee_client = AsyncMock()
self.website_etl.ingestion_pipeline = MagicMock()

Expand Down