Skip to content

Commit 8f09641

Browse files
AIDB Utility - going from unstructured data (local storage and aws s3) to blob table (#68)
* initial commit * Basic Integration Working * basic implementation finished * test cases modified * aws implementation working * implementation finished * remove images * aidb utilities * real images * PR comments * change type to text type * pr comments * remove data * add datastore tests * skip unit test * image verify
1 parent e324a40 commit 8f09641

File tree

13 files changed

+401
-48
lines changed

13 files changed

+401
-48
lines changed

.github/workflows/tests.yaml

+3
Original file line numberDiff line numberDiff line change
@@ -41,3 +41,6 @@ jobs:
4141

4242
- name: Run caching logic tests
4343
run: python3 -m tests.tests_caching_logic
44+
45+
- name: Run data store tests
46+
run: python3 -m tests.tests_data_store

aidb/engine/base_engine.py

+3-46
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,6 @@
22
from typing import Dict, List, Optional, Set, Tuple
33

44
import pandas as pd
5-
import sqlalchemy
6-
import sqlalchemy.ext.asyncio
7-
import sqlalchemy.ext.automap
85

96
from aidb.config.config import Config
107
from aidb.config.config_types import InferenceBinding
@@ -14,7 +11,7 @@
1411
from aidb.query.query import FilteringClause, Query
1512
from aidb.query.utils import predicate_to_str
1613
from aidb.utils.asyncio import asyncio_run
17-
from aidb.utils.logger import logger
14+
from aidb.utils.db import infer_dialect, create_sql_engine
1815

1916

2017
class BaseEngine():
@@ -27,8 +24,8 @@ def __init__(
2724
self._connection_uri = connection_uri
2825
self._debug = debug
2926

30-
self._dialect = self._infer_dialect(connection_uri)
31-
self._sql_engine = self._create_sql_engine()
27+
self._dialect = infer_dialect(connection_uri)
28+
self._sql_engine = create_sql_engine(connection_uri, debug)
3229

3330
if infer_config:
3431
self._config: Config = asyncio_run(self._infer_config())
@@ -41,46 +38,6 @@ def __del__(self):
4138
# ---------------------
4239
# Setup
4340
# ---------------------
44-
def _infer_dialect(self, connection_uri: str):
45-
# Conection URIs have the following format:
46-
# dialect+driver://username:password@host:port/database
47-
# See https://docs.sqlalchemy.org/en/20/core/engines.html
48-
dialect = connection_uri.split(':')[0]
49-
if '+' in dialect:
50-
dialect = dialect.split('+')[0]
51-
52-
supported_dialects = [
53-
'mysql',
54-
'postgresql',
55-
'sqlite',
56-
]
57-
58-
if dialect not in supported_dialects:
59-
logger.warning(
60-
f'Unsupported dialect: {dialect}. Defaulting to mysql')
61-
dialect = 'mysql'
62-
63-
return dialect
64-
65-
66-
def _create_sql_engine(self):
67-
logger.info(f'Creating SQL engine for {self._dialect}')
68-
if self._dialect == 'mysql':
69-
kwargs = {
70-
'echo': self._debug,
71-
'max_overflow': -1,
72-
}
73-
else:
74-
kwargs = {}
75-
76-
engine = sqlalchemy.ext.asyncio.create_async_engine(
77-
self._connection_uri,
78-
**kwargs,
79-
)
80-
81-
return engine
82-
83-
8441
async def _infer_config(self) -> Config:
8542
'''
8643
Infer the database configuration from the sql engine.

aidb/utils/db.py

+46
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
import sqlalchemy
2+
import sqlalchemy.ext.asyncio
3+
import sqlalchemy.ext.automap
4+
5+
from aidb.utils.logger import logger
6+
7+
8+
def infer_dialect(connection_uri: str):
9+
# Conection URIs have the following format:
10+
# dialect+driver://username:password@host:port/database
11+
# See https://docs.sqlalchemy.org/en/20/core/engines.html
12+
dialect = connection_uri.split(':')[0]
13+
if '+' in dialect:
14+
dialect = dialect.split('+')[0]
15+
16+
supported_dialects = [
17+
'mysql',
18+
'postgresql',
19+
'sqlite',
20+
]
21+
22+
if dialect not in supported_dialects:
23+
logger.warning(
24+
f'Unsupported dialect: {dialect}. Defaulting to mysql')
25+
dialect = 'mysql'
26+
27+
return dialect
28+
29+
30+
def create_sql_engine(connection_uri, debug=False):
31+
dialect = infer_dialect(connection_uri)
32+
logger.info(f'Creating SQL engine for {dialect}')
33+
if dialect == 'mysql':
34+
kwargs = {
35+
'echo': debug,
36+
'max_overflow': -1,
37+
}
38+
else:
39+
kwargs = {}
40+
41+
engine = sqlalchemy.ext.asyncio.create_async_engine(
42+
connection_uri,
43+
**kwargs,
44+
)
45+
46+
return engine

aidb_utilities/__init__.py

Whitespace-only changes.

aidb_utilities/blob_store/__init__.py

Whitespace-only changes.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
from typing import List
2+
3+
import boto3
4+
5+
from aidb_utilities.blob_store.blob_store import Blob, BlobStore, DocumentBlob, ImageBlob
6+
from aidb_utilities.blob_store.utils import get_document_type, is_document, is_image_file
7+
8+
9+
class AwsS3BlobStore(BlobStore):
10+
11+
def __init__(self, bucket_name, access_key_id, secret_access_key):
12+
session = boto3.Session(aws_access_key_id=access_key_id, aws_secret_access_key=secret_access_key)
13+
s3 = session.resource('s3')
14+
self.bucket = s3.Bucket(bucket_name)
15+
16+
def get_blobs(self) -> List[Blob]:
17+
raise Exception("get_blobs function not implemented")
18+
19+
20+
class AwsS3ImageBlobStore(AwsS3BlobStore):
21+
22+
def __init__(self, bucket_name, access_key_id, secret_access_key):
23+
super().__init__(bucket_name, access_key_id, secret_access_key)
24+
25+
def get_blobs(self) -> List[ImageBlob]:
26+
image_blobs = []
27+
image_count = 0
28+
for obj in self.bucket.objects.all():
29+
file_path = f"s3://{obj.bucket_name}/{obj.key}"
30+
if is_image_file(file_path):
31+
file_creation_time = str(obj.last_modified)
32+
image_count += 1
33+
image_blobs.append(ImageBlob(blob_id=image_count, image_path=file_path, created_at=file_creation_time))
34+
return image_blobs
35+
36+
37+
class AwsS3DocumentBlobStore(AwsS3BlobStore):
38+
39+
def __init__(self, bucket_name, access_key_id, secret_access_key):
40+
super().__init__(bucket_name, access_key_id, secret_access_key)
41+
42+
def get_blobs(self) -> List[DocumentBlob]:
43+
doc_blobs = []
44+
doc_count = 0
45+
for obj in self.bucket.objects.all():
46+
file_path = f"s3://{obj.bucket_name}/{obj.key}"
47+
if is_document(file_path):
48+
file_creation_time = str(obj.last_modified)
49+
doc_type = get_document_type(file_path)
50+
doc_count += 1
51+
doc_blobs.append(
52+
DocumentBlob(blob_id=doc_count, doc_path=file_path, created_at=file_creation_time, doc_type=doc_type))
53+
return doc_blobs
+61
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
from abc import ABC, abstractmethod
2+
from dataclasses import dataclass
3+
from enum import Enum
4+
from typing import List, Union
5+
6+
7+
class DocumentType(Enum):
8+
PDF = 'pdf'
9+
DOCX = 'docx'
10+
DOC = 'doc'
11+
12+
13+
@dataclass
14+
class Blob:
15+
blob_id: int
16+
17+
def to_dict(self):
18+
return {
19+
'blob_id': self.blob_id,
20+
}
21+
22+
23+
@dataclass
24+
class ImageBlob(Blob):
25+
image_path: str
26+
created_at: Union[None, str]
27+
28+
def to_dict(self):
29+
return {
30+
'blob_id': self.blob_id,
31+
'image_path': self.image_path,
32+
'created_at': self.created_at
33+
}
34+
35+
36+
@dataclass
37+
class DocumentBlob(Blob):
38+
doc_path: str
39+
created_at: str
40+
doc_type: DocumentType
41+
42+
def to_dict(self):
43+
return {
44+
'blob_id': self.blob_id,
45+
'doc_path': self.doc_path,
46+
'created_at': self.created_at,
47+
'doc_type': self.doc_type.value
48+
}
49+
50+
51+
class BlobStore(ABC):
52+
53+
def __int__(self):
54+
'''
55+
configuration, data store access keys, etc.
56+
'''
57+
pass
58+
59+
@abstractmethod
60+
def get_blobs(self) -> List[Blob]:
61+
raise Exception("get_blobs function not implemented")
+52
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
from pathlib import Path
2+
from typing import List
3+
4+
from aidb_utilities.blob_store.blob_store import Blob
5+
6+
from aidb_utilities.blob_store.blob_store import BlobStore, DocumentBlob, ImageBlob
7+
from aidb_utilities.blob_store.utils import get_document_type, get_local_file_creation_time, is_document, is_image_file
8+
9+
10+
class LocalBlobStore(BlobStore):
11+
12+
def __init__(self, local_dir):
13+
self.files = Path(local_dir).rglob('*')
14+
15+
def get_blobs(self) -> List[Blob]:
16+
raise Exception("get_blobs function not implemented")
17+
18+
19+
class LocalImageBlobStore(LocalBlobStore):
20+
21+
def __init__(self, local_dir):
22+
super().__init__(local_dir)
23+
24+
def get_blobs(self) -> List[ImageBlob]:
25+
image_blobs = []
26+
image_count = 0
27+
for file in self.files:
28+
file_path = str(file)
29+
if is_image_file(file_path):
30+
file_creation_time = get_local_file_creation_time(file_path)
31+
image_count += 1
32+
image_blobs.append(ImageBlob(blob_id=image_count, image_path=file_path, created_at=file_creation_time))
33+
return image_blobs
34+
35+
36+
class LocalDocumentBlobStore(LocalBlobStore):
37+
38+
def __init__(self, local_dir):
39+
super().__init__(local_dir)
40+
41+
def get_blobs(self) -> List[DocumentBlob]:
42+
doc_blobs = []
43+
doc_count = 0
44+
for file in self.files:
45+
file_path = str(file)
46+
if is_document(file_path):
47+
file_creation_time = get_local_file_creation_time(file_path)
48+
doc_type = get_document_type(file_path)
49+
doc_count += 1
50+
doc_blobs.append(
51+
DocumentBlob(blob_id=doc_count, doc_path=file_path, created_at=file_creation_time, doc_type=doc_type))
52+
return doc_blobs

aidb_utilities/blob_store/utils.py

+40
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
import os
2+
import time
3+
4+
from PIL import Image
5+
from aidb_utilities.blob_store.blob_store import DocumentType
6+
7+
8+
def get_file_extension(file: str):
9+
return file.split('.')[-1]
10+
11+
12+
def get_local_file_creation_time(file: str):
13+
return time.ctime(os.path.getctime(file))
14+
15+
16+
def is_image_file(file: str):
17+
img = Image.open(file)
18+
try:
19+
img.verify()
20+
return True
21+
except Exception:
22+
return False
23+
24+
25+
def is_document(file: str):
26+
document_extension_filter = ['doc', 'docx', 'pdf']
27+
file_ext = get_file_extension(file)
28+
return file_ext in document_extension_filter
29+
30+
31+
def get_document_type(file: str):
32+
file_ext = get_file_extension(file)
33+
if file_ext == 'pdf':
34+
return DocumentType.PDF
35+
elif file_ext == 'doc':
36+
return DocumentType.DOC
37+
elif file_ext == 'docx':
38+
return DocumentType.DOCX
39+
else:
40+
raise Exception("Unsupported document type")

aidb_utilities/db_setup/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)