Skip to content

Commit 1ba74b8

Browse files
committed
feat: developing LLM twin
0 parents  commit 1ba74b8

22 files changed

+1128
-0
lines changed

Diff for: 2-data-ingestion/cdc.py

+45
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
import json
2+
import logging
3+
4+
from bson import json_util
5+
from mq import publish_to_rabbitmq
6+
7+
from config import settings
8+
from db import MongoDatabaseConnector
9+
10+
# Configure logging
11+
logging.basicConfig(
12+
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
13+
)
14+
15+
16+
def stream_process():
17+
try:
18+
# Setup MongoDB connection
19+
client = MongoDatabaseConnector()
20+
db = client["scrabble"]
21+
logging.info("Connected to MongoDB.")
22+
23+
# Watch changes in a specific collection
24+
changes = db.watch([{"$match": {"operationType": {"$in": ["insert"]}}}])
25+
for change in changes:
26+
data_type = change["ns"]["coll"]
27+
entry_id = str(change["fullDocument"]["_id"]) # Convert ObjectId to string
28+
change["fullDocument"].pop("_id")
29+
change["fullDocument"]["type"] = data_type
30+
change["fullDocument"]["entry_id"] = entry_id
31+
32+
# Use json_util to serialize the document
33+
data = json.dumps(change["fullDocument"], default=json_util.default)
34+
logging.info(f"Change detected and serialized: {data}")
35+
36+
# Send data to rabbitmq
37+
publish_to_rabbitmq(queue_name=settings.RABBITMQ_QUEUE_NAME, data=data)
38+
logging.info("Data published to RabbitMQ.")
39+
40+
except Exception as e:
41+
logging.error(f"An error occurred: {e}")
42+
43+
44+
if __name__ == "__main__":
45+
stream_process()

Diff for: 2-data-ingestion/config.py

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
from pydantic_settings import BaseSettings
2+
3+
4+
class Settings(BaseSettings):
5+
# MongoDB configs
6+
MONGO_DATABASE_HOST: str = (
7+
"mongodb://mongo1:30001,mongo2:30002,mongo3:30003/?replicaSet=my-replica-set"
8+
)
9+
MONGO_DATABASE_NAME: str = "scrabble"
10+
11+
RABBITMQ_HOST: str = "mq" # or localhost if running outside Docker
12+
RABBITMQ_PORT: int = 5672
13+
RABBITMQ_DEFAULT_USERNAME: str = "guest"
14+
RABBITMQ_DEFAULT_PASSWORD: str = "guest"
15+
RABBITMQ_QUEUE_NAME: str = "default"
16+
17+
18+
settings = Settings()

Diff for: 2-data-ingestion/db.py

+34
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
from pymongo import MongoClient
2+
from pymongo.errors import ConnectionFailure
3+
4+
from config import settings
5+
6+
7+
class MongoDatabaseConnector:
8+
"""Singleton class to connect to MongoDB database."""
9+
10+
_instance: MongoClient = None
11+
12+
def __new__(cls, *args, **kwargs):
13+
if cls._instance is None:
14+
try:
15+
cls._instance = MongoClient(settings.MONGO_DATABASE_HOST)
16+
except ConnectionFailure as e:
17+
print(f"Couldn't connect to the database: {str(e)}")
18+
raise
19+
20+
print(
21+
f"Connection to database with uri: {settings.MONGO_DATABASE_HOST} successful"
22+
)
23+
return cls._instance
24+
25+
def get_database(self):
26+
return self._instance[settings.MONGO_DATABASE_NAME]
27+
28+
def close(self):
29+
if self._instance:
30+
self._instance.close()
31+
print("Connected to database has been closed.")
32+
33+
34+
connection = MongoDatabaseConnector()

Diff for: 2-data-ingestion/mq.py

+111
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
import pika
2+
3+
from config import settings
4+
5+
6+
class RabbitMQConnection:
7+
"""Singleton class to manage RabbitMQ connection."""
8+
9+
_instance = None
10+
11+
def __new__(
12+
cls,
13+
host: str = None,
14+
port: int = None,
15+
username: str = None,
16+
password: str = None,
17+
virtual_host: str = "/",
18+
):
19+
if not cls._instance:
20+
cls._instance = super().__new__(cls)
21+
return cls._instance
22+
23+
def __init__(
24+
self,
25+
host: str = None,
26+
port: int = None,
27+
username: str = None,
28+
password: str = None,
29+
virtual_host: str = "/",
30+
fail_silently: bool = False,
31+
**kwargs,
32+
):
33+
self.host = host or settings.RABBITMQ_HOST
34+
self.port = port or settings.RABBITMQ_PORT
35+
self.username = username or settings.RABBITMQ_DEFAULT_USERNAME
36+
self.password = password or settings.RABBITMQ_DEFAULT_PASSWORD
37+
self.virtual_host = virtual_host
38+
self.fail_silently = fail_silently
39+
self._connection = None
40+
41+
def __enter__(self):
42+
self.connect()
43+
return self
44+
45+
def __exit__(self, exc_type, exc_val, exc_tb):
46+
self.close()
47+
48+
def connect(self):
49+
try:
50+
credentials = pika.PlainCredentials(self.username, self.password)
51+
self._connection = pika.BlockingConnection(
52+
pika.ConnectionParameters(
53+
host=self.host,
54+
port=self.port,
55+
virtual_host=self.virtual_host,
56+
credentials=credentials,
57+
)
58+
)
59+
except pika.exceptions.AMQPConnectionError as e:
60+
print("Failed to connect to RabbitMQ:", e)
61+
if not self.fail_silently:
62+
raise e
63+
64+
def is_connected(self) -> bool:
65+
return self._connection is not None and self._connection.is_open
66+
67+
def get_channel(self):
68+
if self.is_connected():
69+
return self._connection.channel()
70+
71+
def close(self):
72+
if self.is_connected():
73+
self._connection.close()
74+
self._connection = None
75+
print("Closed RabbitMQ connection")
76+
77+
78+
def publish_to_rabbitmq(queue_name: str, data: str):
79+
"""Publish data to a RabbitMQ queue."""
80+
try:
81+
# Create an instance of RabbitMQConnection
82+
rabbitmq_conn = RabbitMQConnection()
83+
84+
# Establish connection
85+
with rabbitmq_conn:
86+
channel = rabbitmq_conn.get_channel()
87+
88+
# Ensure the queue exists
89+
channel.queue_declare(queue=queue_name, durable=True)
90+
91+
# Delivery confirmation
92+
channel.confirm_delivery()
93+
94+
# Send data to the queue
95+
channel.basic_publish(
96+
exchange="",
97+
routing_key=queue_name,
98+
body=data,
99+
properties=pika.BasicProperties(
100+
delivery_mode=2, # make message persistent
101+
),
102+
)
103+
print("Sent data to RabbitMQ:", data)
104+
except pika.exceptions.UnroutableError:
105+
print("Message could not be routed")
106+
except Exception as e:
107+
print(f"Error publishing to RabbitMQ: {e}")
108+
109+
110+
if __name__ == "__main__":
111+
publish_to_rabbitmq("test_queue", "Hello, World!")

Diff for: 2-data-ingestion/test_cdc.py

+32
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
from pymongo import MongoClient
2+
3+
4+
def insert_data_to_mongodb(uri, database_name, collection_name, data):
5+
"""
6+
Insert data into a MongoDB collection.
7+
8+
:param uri: MongoDB URI
9+
:param database_name: Name of the database
10+
:param collection_name: Name of the collection
11+
:param data: Data to be inserted (dict)
12+
"""
13+
client = MongoClient(uri)
14+
db = client[database_name]
15+
collection = db[collection_name]
16+
17+
try:
18+
result = collection.insert_one(data)
19+
print(f"Data inserted with _id: {result.inserted_id}")
20+
except Exception as e:
21+
print(f"An error occurred: {e}")
22+
finally:
23+
client.close()
24+
25+
26+
if __name__ == "__main__":
27+
insert_data_to_mongodb(
28+
"mongodb://localhost:30001,localhost:30002,localhost:30003/?replicaSet=my-replica-set",
29+
"scrabble",
30+
"posts",
31+
{"platform": "linkedin", "content": "Test content"}
32+
)

Diff for: config.py

+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
from pydantic_settings import BaseSettings, SettingsConfigDict
2+
3+
4+
class Settings(BaseSettings):
5+
model_config = SettingsConfigDict(env_file="../.env", env_file_encoding="utf-8")
6+
7+
MONGO_DATABASE_HOST: str = (
8+
"mongodb://mongo1:30001,mongo2:30002,mongo3:30003/?replicaSet=my-replica-set"
9+
)
10+
MONGO_DATABASE_NAME: str = "scrabble"
11+
12+
# Optional LinkedIn credentials for scraping your profile
13+
LINKEDIN_USERNAME: str | None = None
14+
LINKEDIN_PASSWORD: str | None = None
15+
16+
17+
settings = Settings()

Diff for: crawlers/__init__.py

Whitespace-only changes.

Diff for: crawlers/base.py

+66
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
import time
2+
from abc import ABC, abstractmethod
3+
from tempfile import mkdtemp
4+
5+
from db.documents import BaseDocument
6+
from selenium import webdriver
7+
from selenium.webdriver.chrome.options import Options
8+
9+
10+
class BaseCrawler(ABC):
11+
model: type[BaseDocument]
12+
13+
@abstractmethod
14+
def extract(self, link: str, **kwargs) -> None: ...
15+
16+
17+
class BaseAbstractCrawler(BaseCrawler, ABC):
18+
def __init__(self, scroll_limit: int = 5) -> None:
19+
options = webdriver.ChromeOptions()
20+
options.binary_location = "/opt/chrome/chrome"
21+
options.add_argument("--no-sandbox")
22+
options.add_argument("--headless=new")
23+
options.add_argument("--single-process")
24+
options.add_argument("--disable-dev-shm-usage")
25+
options.add_argument("--disable-gpu")
26+
options.add_argument("--log-level=3")
27+
options.add_argument("--disable-popup-blocking")
28+
options.add_argument("--disable-notifications")
29+
options.add_argument("--disable-dev-tools")
30+
options.add_argument("--ignore-certificate-errors")
31+
options.add_argument("--no-zygote")
32+
options.add_argument(f"--user-data-dir={mkdtemp()}")
33+
options.add_argument(f"--data-path={mkdtemp()}")
34+
options.add_argument(f"--disk-cache-dir={mkdtemp()}")
35+
options.add_argument("--remote-debugging-port=9222")
36+
37+
self.set_extra_driver_options(options)
38+
39+
self.scroll_limit = scroll_limit
40+
self.driver = webdriver.Chrome(
41+
service=webdriver.ChromeService("/opt/chromedriver"),
42+
options=options,
43+
)
44+
45+
def set_extra_driver_options(self, options: Options) -> None:
46+
pass
47+
48+
def login(self) -> None:
49+
pass
50+
51+
def scroll_page(self) -> None:
52+
"""Scroll through the LinkedIn page based on the scroll limit."""
53+
current_scroll = 0
54+
last_height = self.driver.execute_script("return document.body.scrollHeight")
55+
while True:
56+
self.driver.execute_script(
57+
"window.scrollTo(0, document.body.scrollHeight);"
58+
)
59+
time.sleep(5)
60+
new_height = self.driver.execute_script("return document.body.scrollHeight")
61+
if new_height == last_height or (
62+
self.scroll_limit and current_scroll >= self.scroll_limit
63+
):
64+
break
65+
last_height = new_height
66+
current_scroll += 1

Diff for: crawlers/github.py

+57
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
import os
2+
import shutil
3+
import subprocess
4+
import tempfile
5+
6+
from aws_lambda_powertools import Logger
7+
8+
from crawlers.base import BaseCrawler
9+
from db.documents import RepositoryDocument
10+
11+
logger = Logger(service="llm-twin-course/crawler")
12+
13+
14+
class GithubCrawler(BaseCrawler):
15+
model = RepositoryDocument
16+
17+
def __init__(self, ignore=(".git", ".toml", ".lock", ".png")) -> None:
18+
super().__init__()
19+
self._ignore = ignore
20+
21+
def extract(self, link: str, **kwargs) -> None:
22+
logger.info(f"Starting scrapping GitHub repository: {link}")
23+
24+
repo_name = link.rstrip("/").split("/")[-1]
25+
26+
local_temp = tempfile.mkdtemp()
27+
28+
try:
29+
os.chdir(local_temp)
30+
subprocess.run(["git", "clone", link])
31+
32+
repo_path = os.path.join(local_temp, os.listdir(local_temp)[0])
33+
34+
tree = {}
35+
for root, dirs, files in os.walk(repo_path):
36+
dir = root.replace(repo_path, "").lstrip("/")
37+
if dir.startswith(self._ignore):
38+
continue
39+
40+
for file in files:
41+
if file.endswith(self._ignore):
42+
continue
43+
file_path = os.path.join(dir, file)
44+
with open(os.path.join(root, file), "r", errors="ignore") as f:
45+
tree[file_path] = f.read().replace(" ", "")
46+
47+
instance = self.model(
48+
name=repo_name, link=link, content=tree, owner_id=kwargs.get("user")
49+
)
50+
instance.save()
51+
52+
except Exception:
53+
raise
54+
finally:
55+
shutil.rmtree(local_temp)
56+
57+
logger.info(f"Finished scrapping GitHub repository: {link}")

0 commit comments

Comments
 (0)