-
Notifications
You must be signed in to change notification settings - Fork 0
feat: added media wiki ETL! #15
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
c7c18c5
f132990
4c30962
5ed9028
20d863a
f065bf3
56d997a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,91 +1,26 @@ | ||
| import logging | ||
| from typing import Any | ||
|
|
||
| from temporalio import activity, workflow | ||
| from hivemind_etl.website.activities import ( | ||
| get_hivemind_website_comminities, | ||
| extract_website, | ||
| transform_website_data, | ||
| load_website_data, | ||
| ) | ||
| from hivemind_etl.mediawiki.activities import ( | ||
| get_hivemind_mediawiki_platforms, | ||
| extract_mediawiki, | ||
| transform_mediawiki_data, | ||
| load_mediawiki_data, | ||
| ) | ||
|
|
||
| from temporalio import activity | ||
|
|
||
| with workflow.unsafe.imports_passed_through(): | ||
| from hivemind_etl.website.module import ModulesWebsite | ||
| from hivemind_etl.website.website_etl import WebsiteETL | ||
| from llama_index.core import Document | ||
|
|
||
| logging.basicConfig(level=logging.INFO) | ||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| @activity.defn | ||
| async def get_communities(platform_id: str | None = None) -> list[dict[str, Any]]: | ||
| """ | ||
| Fetch all communities that need to be processed in case of no platform id given | ||
| Else, just process for one platform | ||
|
|
||
| Parameters | ||
| ----------- | ||
| platform_id : str | None | ||
| A platform's community to be fetched | ||
| for default it is as `None` meaning to get all communities information | ||
|
|
||
| Returns | ||
| --------- | ||
| communities : list[dict[str, Any]] | ||
| a list of communities holding website informations | ||
| """ | ||
| try: | ||
| if platform_id: | ||
| logger.info("Website ingestion is filtered for a single community!") | ||
| communities = ModulesWebsite().get_learning_platforms( | ||
| filter_platform_id=platform_id | ||
| ) | ||
| logger.info(f"Found {len(communities)} communities to process") | ||
| logging.info(f"communities: {communities}") | ||
| return communities | ||
| except Exception as e: | ||
| logger.error(f"Error fetching communities: {str(e)}") | ||
| raise | ||
|
|
||
|
|
||
| @activity.defn | ||
| async def extract_website(urls: list[str], community_id: str) -> list[dict]: | ||
| """Extract data from website URLs.""" | ||
| try: | ||
| logger.info( | ||
| f"Starting extraction for community {community_id} with {len(urls)} URLs" | ||
| ) | ||
| website_etl = WebsiteETL(community_id=community_id) | ||
| result = await website_etl.extract(urls=urls) | ||
| logger.info(f"Completed extraction for community {community_id}") | ||
| return result | ||
| except Exception as e: | ||
| logger.error(f"Error in extraction for community {community_id}: {str(e)}") | ||
| raise | ||
|
|
||
|
|
||
| @activity.defn | ||
| async def transform_data(raw_data: list[dict], community_id: str) -> list[Document]: | ||
| """Transform the extracted raw data.""" | ||
| try: | ||
| logger.info(f"Starting transformation for community {community_id}") | ||
| website_etl = WebsiteETL(community_id=community_id) | ||
| result = website_etl.transform(raw_data=raw_data) | ||
| logger.info(f"Completed transformation for community {community_id}") | ||
| return result | ||
| except Exception as e: | ||
| logger.error(f"Error in transformation for community {community_id}: {str(e)}") | ||
| raise | ||
|
|
||
|
|
||
| @activity.defn | ||
| async def load_data(documents: list[Document], community_id: str) -> None: | ||
| """Load the transformed data into the database.""" | ||
| try: | ||
| logger.info(f"Starting data load for community {community_id}") | ||
| website_etl = WebsiteETL(community_id=community_id) | ||
| website_etl.load(documents=documents) | ||
| logger.info(f"Completed data load for community {community_id}") | ||
| except Exception as e: | ||
| logger.error(f"Error in data load for community {community_id}: {str(e)}") | ||
| raise | ||
|
|
||
|
|
||
| @activity.defn | ||
| async def say_hello(): | ||
| return 7 | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,98 @@ | ||
| import logging | ||
| from typing import Any | ||
|
|
||
| from temporalio import activity, workflow | ||
|
|
||
| with workflow.unsafe.imports_passed_through(): | ||
| from hivemind_etl.mediawiki.module import ModulesMediaWiki | ||
| from hivemind_etl.mediawiki.etl import MediawikiETL | ||
| from llama_index.core import Document | ||
|
|
||
|
|
||
| @activity.defn | ||
| async def get_hivemind_mediawiki_platforms( | ||
| platform_id: str | None = None, | ||
| ) -> list[dict[str, Any]]: | ||
| """ | ||
| Fetch all MediaWiki communities that need to be processed in case of no platform id given | ||
| Else, just process for one platform | ||
|
|
||
| Parameters | ||
| ----------- | ||
| platform_id : str | None | ||
| A platform's community to be fetched | ||
| for default it is as `None` meaning to get all platforms information | ||
|
|
||
| example data output: | ||
| ``` | ||
| [{ | ||
| "community_id": "6579c364f1120850414e0dc5", | ||
| "base_url": "some_api_url", | ||
| "namespaces": [1, 2, 3], | ||
| }] | ||
| ``` | ||
|
|
||
| Returns | ||
| --------- | ||
| platforms : list[dict[str, Any]] | ||
| a list of platforms holding MediaWiki informations | ||
| """ | ||
| try: | ||
| if platform_id: | ||
| logging.info("MediaWiki ingestion is filtered for a single platform!") | ||
| platforms = ModulesMediaWiki().get_learning_platforms( | ||
| platform_id_filter=platform_id | ||
| ) | ||
| logging.info(f"Found {len(platforms)} platforms to process") | ||
| logging.info(f"platforms: {platforms}") | ||
| return platforms | ||
| except Exception as e: | ||
| logging.error(f"Error fetching MediaWiki platforms: {str(e)}") | ||
| raise | ||
|
|
||
|
|
||
| @activity.defn | ||
| async def extract_mediawiki(mediawiki_platform: dict[str, Any]) -> None: | ||
| """Extract data from MediaWiki API URL.""" | ||
| try: | ||
| community_id = mediawiki_platform["community_id"] | ||
| api_url = mediawiki_platform["base_url"] | ||
| namespaces = mediawiki_platform["namespaces"] | ||
|
|
||
| logging.info( | ||
| f"Starting extraction for community {community_id} with API URL: {api_url}" | ||
| ) | ||
| mediawiki_etl = MediawikiETL(community_id=community_id, namespaces=namespaces) | ||
| mediawiki_etl.extract(api_url=api_url) | ||
| logging.info(f"Completed extraction for community {community_id}") | ||
| except Exception as e: | ||
| community_id = mediawiki_platform["community_id"] | ||
| logging.error(f"Error in extraction for community {community_id}: {str(e)}") | ||
| raise | ||
|
|
||
|
|
||
| @activity.defn | ||
| async def transform_mediawiki_data(community_id: str) -> list[Document]: | ||
| """Transform the extracted MediaWiki data.""" | ||
| try: | ||
| logging.info(f"Starting transformation for community {community_id}") | ||
| mediawiki_etl = MediawikiETL(community_id=community_id) | ||
| result = mediawiki_etl.transform() | ||
| logging.info(f"Completed transformation for community {community_id}") | ||
| return result | ||
| except Exception as e: | ||
| logging.error(f"Error in transformation for community {community_id}: {str(e)}") | ||
| raise | ||
|
|
||
|
|
||
| @activity.defn | ||
| async def load_mediawiki_data(documents: list[Document], community_id: str) -> None: | ||
| """Load the transformed MediaWiki data into the database.""" | ||
| try: | ||
| logging.info(f"Starting data load for community {community_id}") | ||
| mediawiki_etl = MediawikiETL(community_id=community_id) | ||
| mediawiki_etl.load(documents=documents) | ||
| logging.info(f"Completed data load for community {community_id}") | ||
| except Exception as e: | ||
| logging.error(f"Error in data load for community {community_id}: {str(e)}") | ||
| raise |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,85 @@ | ||
| import logging | ||
| import shutil | ||
|
|
||
| from llama_index.core import Document | ||
| from tc_hivemind_backend.ingest_qdrant import CustomIngestionPipeline | ||
| from hivemind_etl.mediawiki.transform_xml import parse_mediawiki_xml | ||
| from hivemind_etl.mediawiki.wikiteam_crawler import WikiteamCrawler | ||
|
|
||
|
|
||
| class MediawikiETL: | ||
| def __init__( | ||
| self, | ||
| community_id: str, | ||
| namespaces: list[int], | ||
| delete_dump_after_load: bool = True, | ||
| ) -> None: | ||
| self.community_id = community_id | ||
| self.wikiteam_crawler = WikiteamCrawler(community_id, namespaces=namespaces) | ||
|
|
||
| self.dump_dir = f"dump_{self.community_id}" | ||
| self.delete_dump_after_load = delete_dump_after_load | ||
|
|
||
| def extract(self, api_url: str, dump_dir: str | None = None) -> None: | ||
| if dump_dir is None: | ||
| dump_dir = self.dump_dir | ||
| else: | ||
| self.dump_dir = dump_dir | ||
|
|
||
| self.wikiteam_crawler.crawl(api_url, dump_dir) | ||
|
|
||
| def transform(self) -> list[Document]: | ||
| pages = parse_mediawiki_xml(file_dir=self.dump_dir) | ||
|
|
||
| documents: list[Document] = [] | ||
| for page in pages: | ||
| try: | ||
| documents.append( | ||
| Document( | ||
| doc_id=page.page_id, | ||
| text=page.revision.text, | ||
| metadata={ | ||
| "title": page.title, | ||
| "namespace": page.namespace, | ||
| "revision_id": page.revision.revision_id, | ||
| "parent_revision_id": page.revision.parent_revision_id, | ||
| "timestamp": page.revision.timestamp, | ||
| "comment": page.revision.comment, | ||
| "contributor_username": page.revision.contributor.username, | ||
| "contributor_user_id": page.revision.contributor.user_id, | ||
| "sha1": page.revision.sha1, | ||
| "model": page.revision.model, | ||
| }, | ||
| excluded_embed_metadata_keys=[ | ||
| "namespace", | ||
| "revision_id", | ||
| "parent_revision_id", | ||
| "sha1", | ||
| "model", | ||
| "contributor_user_id", | ||
| "comment", | ||
| "timestamp", | ||
| ], | ||
|
Comment on lines
+35
to
+62
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 💡 Verification agent 🧩 Analysis chainValidate existence of Accessing 🏁 Script executed: #!/bin/bash
# This script searches for references to page.revision in this file to see if they’re all guarded.
rg -A 3 'page\.revision' hivemind_etl/mediawiki/etl.pyLength of output: 939 Action Required: Add a guard for
|
||
| excluded_llm_metadata_keys=[ | ||
| "namespace", | ||
| "revision_id", | ||
| "parent_revision_id", | ||
| "sha1", | ||
| "model", | ||
| "contributor_user_id", | ||
| ], | ||
| ) | ||
| ) | ||
| except Exception as e: | ||
| logging.error(f"Error transforming page {page.page_id}: {e}") | ||
|
|
||
| return documents | ||
|
|
||
| def load(self, documents: list[Document]) -> None: | ||
| ingestion_pipeline = CustomIngestionPipeline( | ||
| self.community_id, collection_name="mediawiki" | ||
| ) | ||
| ingestion_pipeline.run_pipeline(documents) | ||
|
|
||
| if self.delete_dump_after_load: | ||
| shutil.rmtree(self.dump_dir) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,95 @@ | ||
| """XML Reader.""" | ||
|
|
||
| """Copied from https://github.com/run-llama/llama_index/blob/main/llama-index-integrations/readers/llama-index-readers-file/llama_index/readers/file/xml/base.py""" | ||
|
|
||
| import re | ||
| import xml.etree.ElementTree as ET | ||
| from pathlib import Path | ||
| from typing import Dict, List, Optional | ||
|
|
||
| from llama_index.core.readers.base import BaseReader | ||
| from llama_index.core.schema import Document | ||
|
|
||
|
|
||
| def _get_leaf_nodes_up_to_level(root: ET.Element, level: int) -> List[ET.Element]: | ||
| """Get collection of nodes up to certain level including leaf nodes. | ||
| Args: | ||
| root (ET.Element): XML Root Element | ||
| level (int): Levels to traverse in the tree | ||
| Returns: | ||
| List[ET.Element]: List of target nodes | ||
| """ | ||
|
|
||
| def traverse(current_node, current_level): | ||
| if len(current_node) == 0 or level == current_level: | ||
| # Keep leaf nodes and target level nodes | ||
| nodes.append(current_node) | ||
| elif current_level < level: | ||
| # Move to the next level | ||
| for child in current_node: | ||
| traverse(child, current_level + 1) | ||
|
|
||
| nodes = [] | ||
| traverse(root, 0) | ||
| return nodes | ||
|
|
||
|
|
||
| class XMLReader(BaseReader): | ||
| """XML reader. | ||
| Reads XML documents with options to help suss out relationships between nodes. | ||
| Args: | ||
| tree_level_split (int): From which level in the xml tree we split documents, | ||
| the default level is the root which is level 0 | ||
| """ | ||
|
|
||
| def __init__(self, tree_level_split: Optional[int] = 0) -> None: | ||
| """Initialize with arguments.""" | ||
| super().__init__() | ||
| self.tree_level_split = tree_level_split | ||
|
|
||
| def _parse_xmlelt_to_document( | ||
| self, root: ET.Element, extra_info: Optional[Dict] = None | ||
| ) -> List[Document]: | ||
| """Parse the xml object into a list of Documents. | ||
| Args: | ||
| root: The XML Element to be converted. | ||
| extra_info (Optional[Dict]): Additional information. Default is None. | ||
| Returns: | ||
| Document: The documents. | ||
| """ | ||
| nodes = _get_leaf_nodes_up_to_level(root, self.tree_level_split) | ||
| documents = [] | ||
| for node in nodes: | ||
| content = ET.tostring(node, encoding="utf8").decode("utf-8") | ||
| content = re.sub(r"^<\?xml.*", "", content) | ||
| content = content.strip() | ||
| documents.append(Document(text=content, extra_info=extra_info or {})) | ||
|
|
||
| return documents | ||
|
|
||
| def load_data( | ||
| self, | ||
| file: Path, | ||
| extra_info: Optional[Dict] = None, | ||
| ) -> List[Document]: | ||
| """Load data from the input file. | ||
| Args: | ||
| file (Path): Path to the input file. | ||
| extra_info (Optional[Dict]): Additional information. Default is None. | ||
| Returns: | ||
| List[Document]: List of documents. | ||
| """ | ||
| if not isinstance(file, Path): | ||
| file = Path(file) | ||
|
|
||
| tree = ET.parse(file) | ||
| return self._parse_xmlelt_to_document(tree.getroot(), extra_info) | ||
amindadgar marked this conversation as resolved.
Show resolved
Hide resolved
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Fix unused imports identified by static analysis.
The imports are not being used in this file according to static analysis. If they're being imported for use in other modules that import from this file, consider adding a comment explaining this purpose. Otherwise, they should be removed.
Let's verify if these functions are used elsewhere:
🏁 Script executed:
Length of output: 280
Action Required: Clarify Re-Exported Functions Usage
The static analysis warning is a false positive since our grep confirms that these functions are imported in
registry.py, meaning they’re intentionally re-exported as part of the module's public API. To prevent future confusion and accidental removal, please add a comment next to the import block inhivemind_etl/activities.pyindicating that these imports exist solely for external use (re-export).hivemind_etl/activities.py(Lines 4-9)# These functions are imported here to expose them as part of the public API for external modules (e.g., used in registry.py).🧰 Tools
🪛 Ruff (0.8.2)
5-5:
hivemind_etl.website.activities.get_hivemind_website_comminitiesimported but unusedRemove unused import
(F401)
6-6:
hivemind_etl.website.activities.extract_websiteimported but unusedRemove unused import
(F401)
7-7:
hivemind_etl.website.activities.transform_website_dataimported but unusedRemove unused import
(F401)
8-8:
hivemind_etl.website.activities.load_website_dataimported but unusedRemove unused import
(F401)