Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -161,4 +161,8 @@ cython_debug/
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/

main.ipynb
main.ipynb

*.xml

dump_*
93 changes: 14 additions & 79 deletions hivemind_etl/activities.py
Original file line number Diff line number Diff line change
@@ -1,91 +1,26 @@
import logging
from typing import Any

from temporalio import activity, workflow
from hivemind_etl.website.activities import (
get_hivemind_website_comminities,
extract_website,
transform_website_data,
load_website_data,
)
Comment on lines +4 to +9
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Fix unused imports identified by static analysis.

The imports are not being used in this file according to static analysis. If they're being imported for use in other modules that import from this file, consider adding a comment explaining this purpose. Otherwise, they should be removed.

-from hivemind_etl.website.activities import (
-    get_hivemind_website_comminities,
-    extract_website,
-    transform_website_data,
-    load_website_data,
-)

Let's verify if these functions are used elsewhere:


🏁 Script executed:

#!/bin/bash
# Search for imports or uses of these functions in other files
echo "Checking where these functions are imported or used:"
rg -l "from hivemind_etl.activities import" | xargs rg -l "(get_hivemind_website_comminities|extract_website|transform_website_data|load_website_data)"

Length of output: 280


Action Required: Clarify Re-Exported Functions Usage
The static analysis warning is a false positive since our grep confirms that these functions are imported in registry.py, meaning they’re intentionally re-exported as part of the module's public API. To prevent future confusion and accidental removal, please add a comment next to the import block in hivemind_etl/activities.py indicating that these imports exist solely for external use (re-export).

  • Location: hivemind_etl/activities.py (Lines 4-9)
  • Suggestion: Add a comment such as:
    # These functions are imported here to expose them as part of the public API for external modules (e.g., used in registry.py).
🧰 Tools
🪛 Ruff (0.8.2)

5-5: hivemind_etl.website.activities.get_hivemind_website_comminities imported but unused

Remove unused import

(F401)


6-6: hivemind_etl.website.activities.extract_website imported but unused

Remove unused import

(F401)


7-7: hivemind_etl.website.activities.transform_website_data imported but unused

Remove unused import

(F401)


8-8: hivemind_etl.website.activities.load_website_data imported but unused

Remove unused import

(F401)

from hivemind_etl.mediawiki.activities import (
get_hivemind_mediawiki_platforms,
extract_mediawiki,
transform_mediawiki_data,
load_mediawiki_data,
)

from temporalio import activity

with workflow.unsafe.imports_passed_through():
from hivemind_etl.website.module import ModulesWebsite
from hivemind_etl.website.website_etl import WebsiteETL
from llama_index.core import Document

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@activity.defn
async def get_communities(platform_id: str | None = None) -> list[dict[str, Any]]:
"""
Fetch all communities that need to be processed in case of no platform id given
Else, just process for one platform

Parameters
-----------
platform_id : str | None
A platform's community to be fetched
for default it is as `None` meaning to get all communities information

Returns
---------
communities : list[dict[str, Any]]
a list of communities holding website informations
"""
try:
if platform_id:
logger.info("Website ingestion is filtered for a single community!")
communities = ModulesWebsite().get_learning_platforms(
filter_platform_id=platform_id
)
logger.info(f"Found {len(communities)} communities to process")
logging.info(f"communities: {communities}")
return communities
except Exception as e:
logger.error(f"Error fetching communities: {str(e)}")
raise


@activity.defn
async def extract_website(urls: list[str], community_id: str) -> list[dict]:
"""Extract data from website URLs."""
try:
logger.info(
f"Starting extraction for community {community_id} with {len(urls)} URLs"
)
website_etl = WebsiteETL(community_id=community_id)
result = await website_etl.extract(urls=urls)
logger.info(f"Completed extraction for community {community_id}")
return result
except Exception as e:
logger.error(f"Error in extraction for community {community_id}: {str(e)}")
raise


@activity.defn
async def transform_data(raw_data: list[dict], community_id: str) -> list[Document]:
"""Transform the extracted raw data."""
try:
logger.info(f"Starting transformation for community {community_id}")
website_etl = WebsiteETL(community_id=community_id)
result = website_etl.transform(raw_data=raw_data)
logger.info(f"Completed transformation for community {community_id}")
return result
except Exception as e:
logger.error(f"Error in transformation for community {community_id}: {str(e)}")
raise


@activity.defn
async def load_data(documents: list[Document], community_id: str) -> None:
"""Load the transformed data into the database."""
try:
logger.info(f"Starting data load for community {community_id}")
website_etl = WebsiteETL(community_id=community_id)
website_etl.load(documents=documents)
logger.info(f"Completed data load for community {community_id}")
except Exception as e:
logger.error(f"Error in data load for community {community_id}: {str(e)}")
raise


@activity.defn
async def say_hello():
return 7
Empty file.
98 changes: 98 additions & 0 deletions hivemind_etl/mediawiki/activities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import logging
from typing import Any

from temporalio import activity, workflow

with workflow.unsafe.imports_passed_through():
from hivemind_etl.mediawiki.module import ModulesMediaWiki
from hivemind_etl.mediawiki.etl import MediawikiETL
from llama_index.core import Document


@activity.defn
async def get_hivemind_mediawiki_platforms(
platform_id: str | None = None,
) -> list[dict[str, Any]]:
"""
Fetch all MediaWiki communities that need to be processed in case of no platform id given
Else, just process for one platform

Parameters
-----------
platform_id : str | None
A platform's community to be fetched
for default it is as `None` meaning to get all platforms information

example data output:
```
[{
"community_id": "6579c364f1120850414e0dc5",
"base_url": "some_api_url",
"namespaces": [1, 2, 3],
}]
```

Returns
---------
platforms : list[dict[str, Any]]
a list of platforms holding MediaWiki informations
"""
try:
if platform_id:
logging.info("MediaWiki ingestion is filtered for a single platform!")
platforms = ModulesMediaWiki().get_learning_platforms(
platform_id_filter=platform_id
)
logging.info(f"Found {len(platforms)} platforms to process")
logging.info(f"platforms: {platforms}")
return platforms
except Exception as e:
logging.error(f"Error fetching MediaWiki platforms: {str(e)}")
raise


@activity.defn
async def extract_mediawiki(mediawiki_platform: dict[str, Any]) -> None:
"""Extract data from MediaWiki API URL."""
try:
community_id = mediawiki_platform["community_id"]
api_url = mediawiki_platform["base_url"]
namespaces = mediawiki_platform["namespaces"]

logging.info(
f"Starting extraction for community {community_id} with API URL: {api_url}"
)
mediawiki_etl = MediawikiETL(community_id=community_id, namespaces=namespaces)
mediawiki_etl.extract(api_url=api_url)
logging.info(f"Completed extraction for community {community_id}")
except Exception as e:
community_id = mediawiki_platform["community_id"]
logging.error(f"Error in extraction for community {community_id}: {str(e)}")
raise


@activity.defn
async def transform_mediawiki_data(community_id: str) -> list[Document]:
"""Transform the extracted MediaWiki data."""
try:
logging.info(f"Starting transformation for community {community_id}")
mediawiki_etl = MediawikiETL(community_id=community_id)
result = mediawiki_etl.transform()
logging.info(f"Completed transformation for community {community_id}")
return result
except Exception as e:
logging.error(f"Error in transformation for community {community_id}: {str(e)}")
raise


@activity.defn
async def load_mediawiki_data(documents: list[Document], community_id: str) -> None:
"""Load the transformed MediaWiki data into the database."""
try:
logging.info(f"Starting data load for community {community_id}")
mediawiki_etl = MediawikiETL(community_id=community_id)
mediawiki_etl.load(documents=documents)
logging.info(f"Completed data load for community {community_id}")
except Exception as e:
logging.error(f"Error in data load for community {community_id}: {str(e)}")
raise
85 changes: 85 additions & 0 deletions hivemind_etl/mediawiki/etl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import logging
import shutil

from llama_index.core import Document
from tc_hivemind_backend.ingest_qdrant import CustomIngestionPipeline
from hivemind_etl.mediawiki.transform_xml import parse_mediawiki_xml
from hivemind_etl.mediawiki.wikiteam_crawler import WikiteamCrawler


class MediawikiETL:
def __init__(
self,
community_id: str,
namespaces: list[int],
delete_dump_after_load: bool = True,
) -> None:
self.community_id = community_id
self.wikiteam_crawler = WikiteamCrawler(community_id, namespaces=namespaces)

self.dump_dir = f"dump_{self.community_id}"
self.delete_dump_after_load = delete_dump_after_load

def extract(self, api_url: str, dump_dir: str | None = None) -> None:
if dump_dir is None:
dump_dir = self.dump_dir
else:
self.dump_dir = dump_dir

self.wikiteam_crawler.crawl(api_url, dump_dir)

def transform(self) -> list[Document]:
pages = parse_mediawiki_xml(file_dir=self.dump_dir)

documents: list[Document] = []
for page in pages:
try:
documents.append(
Document(
doc_id=page.page_id,
text=page.revision.text,
metadata={
"title": page.title,
"namespace": page.namespace,
"revision_id": page.revision.revision_id,
"parent_revision_id": page.revision.parent_revision_id,
"timestamp": page.revision.timestamp,
"comment": page.revision.comment,
"contributor_username": page.revision.contributor.username,
"contributor_user_id": page.revision.contributor.user_id,
"sha1": page.revision.sha1,
"model": page.revision.model,
},
excluded_embed_metadata_keys=[
"namespace",
"revision_id",
"parent_revision_id",
"sha1",
"model",
"contributor_user_id",
"comment",
"timestamp",
],
Comment on lines +35 to +62
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Validate existence of page.revision before referencing.

Accessing page.revision.text may raise an AttributeError if page.revision is None. Confirm whether pages without revisions should be skipped or handled differently.


🏁 Script executed:

#!/bin/bash
# This script searches for references to page.revision in this file to see if they’re all guarded.
rg -A 3 'page\.revision' hivemind_etl/mediawiki/etl.py

Length of output: 939


Action Required: Add a guard for page.revision existence.
It was verified that the code on lines 35–62 in hivemind_etl/mediawiki/etl.py directly accesses attributes of page.revision without checking if it exists. This can lead to a runtime AttributeError if a page lacks a revision.

  • File: hivemind_etl/mediawiki/etl.py (Lines 35–62)
  • Issue: Directly accessing page.revision.text, page.revision.revision_id, etc., without confirming page.revision is not None.
  • Recommendation: Introduce a conditional check before dereferencing the revision. For example, consider:
    for page in pages:
        if page.revision is None:
            continue  # or handle accordingly
        try:
            documents.append(
                Document(
                    doc_id=page.page_id,
                    text=page.revision.text,
                    metadata={
                        "title": page.title,
                        "namespace": page.namespace,
                        "revision_id": page.revision.revision_id,
                        "parent_revision_id": page.revision.parent_revision_id,
                        "timestamp": page.revision.timestamp,
                        "comment": page.revision.comment,
                        "contributor_username": page.revision.contributor.username,
                        "contributor_user_id": page.revision.contributor.user_id,
                        "sha1": page.revision.sha1,
                        "model": page.revision.model,
                    },
                    excluded_embed_metadata_keys=[
                        "namespace",
                        "revision_id",
                        "parent_revision_id",
                        "sha1",
                        "model",
                        "contributor_user_id",
                        "comment",
                        "timestamp",
                    ],
                )
            )
        except Exception as e:
            # appropriate error handling if needed
            pass
  • Action: Please confirm the intended behavior for pages missing a revision and update the code accordingly.

excluded_llm_metadata_keys=[
"namespace",
"revision_id",
"parent_revision_id",
"sha1",
"model",
"contributor_user_id",
],
)
)
except Exception as e:
logging.error(f"Error transforming page {page.page_id}: {e}")

return documents

def load(self, documents: list[Document]) -> None:
ingestion_pipeline = CustomIngestionPipeline(
self.community_id, collection_name="mediawiki"
)
ingestion_pipeline.run_pipeline(documents)

if self.delete_dump_after_load:
shutil.rmtree(self.dump_dir)
95 changes: 95 additions & 0 deletions hivemind_etl/mediawiki/llama_xml_reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
"""XML Reader."""

"""Copied from https://github.com/run-llama/llama_index/blob/main/llama-index-integrations/readers/llama-index-readers-file/llama_index/readers/file/xml/base.py"""

import re
import xml.etree.ElementTree as ET
from pathlib import Path
from typing import Dict, List, Optional

from llama_index.core.readers.base import BaseReader
from llama_index.core.schema import Document


def _get_leaf_nodes_up_to_level(root: ET.Element, level: int) -> List[ET.Element]:
"""Get collection of nodes up to certain level including leaf nodes.
Args:
root (ET.Element): XML Root Element
level (int): Levels to traverse in the tree
Returns:
List[ET.Element]: List of target nodes
"""

def traverse(current_node, current_level):
if len(current_node) == 0 or level == current_level:
# Keep leaf nodes and target level nodes
nodes.append(current_node)
elif current_level < level:
# Move to the next level
for child in current_node:
traverse(child, current_level + 1)

nodes = []
traverse(root, 0)
return nodes


class XMLReader(BaseReader):
"""XML reader.
Reads XML documents with options to help suss out relationships between nodes.
Args:
tree_level_split (int): From which level in the xml tree we split documents,
the default level is the root which is level 0
"""

def __init__(self, tree_level_split: Optional[int] = 0) -> None:
"""Initialize with arguments."""
super().__init__()
self.tree_level_split = tree_level_split

def _parse_xmlelt_to_document(
self, root: ET.Element, extra_info: Optional[Dict] = None
) -> List[Document]:
"""Parse the xml object into a list of Documents.
Args:
root: The XML Element to be converted.
extra_info (Optional[Dict]): Additional information. Default is None.
Returns:
Document: The documents.
"""
nodes = _get_leaf_nodes_up_to_level(root, self.tree_level_split)
documents = []
for node in nodes:
content = ET.tostring(node, encoding="utf8").decode("utf-8")
content = re.sub(r"^<\?xml.*", "", content)
content = content.strip()
documents.append(Document(text=content, extra_info=extra_info or {}))

return documents

def load_data(
self,
file: Path,
extra_info: Optional[Dict] = None,
) -> List[Document]:
"""Load data from the input file.
Args:
file (Path): Path to the input file.
extra_info (Optional[Dict]): Additional information. Default is None.
Returns:
List[Document]: List of documents.
"""
if not isinstance(file, Path):
file = Path(file)

tree = ET.parse(file)
return self._parse_xmlelt_to_document(tree.getroot(), extra_info)
Loading