Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compare temp branch with current (Do not merge) #187

Draft
wants to merge 33 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
e28c91b
Add broadcast operation
tkilias Sep 15, 2023
7219ee1
Add test_gather_broadcast
tkilias Sep 17, 2023
e5f4e44
Add tensorflow example
tkilias Oct 11, 2023
fe2ee36
Added RetrieveExasolNodeIPAddressUDFDeployer
tkilias Dec 9, 2023
c40b1d0
Add unit test for udf_communicator and refactored it to make it unit …
tkilias Dec 9, 2023
27088ed
Fix integration tests
tkilias Dec 9, 2023
9c76af3
Introduce DistributedUDFRunner and DistributedUDF.
tkilias Dec 10, 2023
5947f2a
Refactored DistributedTrainingUDF to DistributedUDFRunner and Distrib…
tkilias Dec 10, 2023
39162c0
Extracted SocketFactoryContextManagerFactory from udf_communicator.py
tkilias Dec 10, 2023
9a45bea
Added all_gather to Communicator based on gather and broadcast
tkilias Dec 10, 2023
d63caae
Extract exchange_cluster_information from distributed_udf.py and refa…
tkilias Dec 10, 2023
149d6fa
Added integration with db for udf_communicator and returned the netwo…
tkilias Dec 10, 2023
10209bc
Use extracted exchange_cluster_information in distributed_training_ud…
tkilias Dec 10, 2023
4c0c02e
Add temporary_udf_name to QueryHandlerContext
tkilias Feb 4, 2024
063a27b
Fix return type of get_temporary_udf_name in QueryHandlerContext
tkilias Feb 17, 2024
e20f283
Add temporary_connection_name and temporary_name to QueryHandlerContext
tkilias Feb 17, 2024
bb5093d
Use UDFName class in RetrieveExasolNodeIPAddressUDFDeployer
tkilias Feb 17, 2024
e00bff3
Add language_alias parameter to RetrieveExasolNodeIPAddressUDFDeployer
tkilias Feb 17, 2024
09233a5
Extract ConnectionName into its own module to prevent circular import
tkilias Feb 17, 2024
25f21d1
Implement missing abstract function fully_qualified in ConnectionName
tkilias Feb 17, 2024
9eecf86
Fix MRO in ConnectionNameProxy
tkilias Feb 17, 2024
4b63596
Fix _get_temporary_connection_name in _ScopeQueryHandlerContextBase
tkilias Feb 17, 2024
55cffaa
Add MockSQLExecutor which also checks the query to execute against ex…
tkilias Feb 17, 2024
63fdfb0
Fix name of ExpectedQuery
tkilias Feb 17, 2024
1715daf
Return mock_result_set in MockSQLExecutor
tkilias Feb 17, 2024
dec8e3d
use cleandoc in RetrieveExasolNodeIPAddressUDFDeployer
tkilias Feb 17, 2024
6d3dbbe
Improve output of MockSQLExecutor and MockQueryHandlerRunner
tkilias Feb 18, 2024
48a520f
Use create or replace view in mock_query_handler_runner.py
tkilias Feb 18, 2024
baece13
Fix excpeption in mock_sql_executor.py
tkilias Feb 22, 2024
e54ee47
Removed tensorflow and some cleanup
tkilias Feb 24, 2024
d51872a
Update bucketfs-python
tkilias Feb 25, 2024
b2f8968
Make mock_sql_executor.py woring with pytest_regex matcher
tkilias Feb 28, 2024
80afdc2
Fix VARCHAR size of ip_adress in RetrieveExasolNodeIPAddressUDFDeployer
tkilias Apr 17, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QueryHandler PR

Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from exasol_data_science_utils_python.schema.dbobject_name import DBObjectName
from exasol_data_science_utils_python.schema.dbobject_name_impl import DBObjectNameImpl
from typeguard import typechecked


class ConnectionName(DBObjectName):
"""A DBObjectName class which represents the name of a connection object"""

@typechecked
def __init__(self, connection_name: str):
super().__init__(connection_name.upper())


class ConnectionNameImpl(DBObjectNameImpl, ConnectionName):

@property
def fully_qualified(self) -> str:
return self.quoted_name

@typechecked
def __init__(self, connection_name: str):
super().__init__(connection_name)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QueryHandler PR

Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from exasol_advanced_analytics_framework.query_handler.context.connection_name import ConnectionName, ConnectionNameImpl
from exasol_advanced_analytics_framework.query_handler.context.proxy.db_object_name_proxy import DBObjectNameProxy
from exasol_advanced_analytics_framework.query_handler.query.drop_connection_query import DropConnectionQuery
from exasol_advanced_analytics_framework.query_handler.query.query import Query


class ConnectionNameProxy(DBObjectNameProxy[ConnectionName], ConnectionName):

@property
def fully_qualified(self) -> str:
return self.quoted_name

def get_cleanup_query(self) -> Query:
return DropConnectionQuery(self._db_object_name)

def __init__(self, connection_name: ConnectionName, global_counter_value: int):
super().__init__(connection_name, global_counter_value)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QueryHandler PR

Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from exasol_data_science_utils_python.schema.udf_name import UDFName

from exasol_advanced_analytics_framework.query_handler.query.drop_query import DropQuery


class DropUDFQuery(DropQuery):

def __init__(self, udf_name: UDFName):
self._udf_name = udf_name

@property
def query_string(self) -> str:
return f"DROP SCRIPT IF EXISTS {self._udf_name.fully_qualified};"

@property
def table_name(self) -> UDFName:
return self._udf_name
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QueryHandler PR

Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from exasol_data_science_utils_python.schema.udf_name import UDFName

from exasol_advanced_analytics_framework.query_handler.context.proxy.db_object_name_with_schema_proxy import \
DBObjectNameWithSchemaProxy
from exasol_advanced_analytics_framework.query_handler.context.proxy.drop_udf_query import DropUDFQuery
from exasol_advanced_analytics_framework.query_handler.query.query import Query


class UDFNameProxy(DBObjectNameWithSchemaProxy[UDFName], UDFName):

def get_cleanup_query(self) -> Query:
return DropUDFQuery(self._db_object_name)

def __init__(self, script_name: UDFName, global_counter_value: int):
super().__init__(script_name, global_counter_value)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QueryHandler PR

Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,23 @@
from abc import ABC

from exasol_data_science_utils_python.schema.table_name import TableName
from exasol_data_science_utils_python.schema.udf_name import UDFName
from exasol_data_science_utils_python.schema.view_name import ViewName

from exasol_advanced_analytics_framework.query_handler.context.connection_name import ConnectionName
from exasol_advanced_analytics_framework.query_handler.context.proxy.bucketfs_location_proxy import \
BucketFSLocationProxy


class QueryHandlerContext(ABC):

@abc.abstractmethod
def get_temporary_name(self) -> str:
"""
Returns a temporary name
"""
pass

@abc.abstractmethod
def get_temporary_table_name(self) -> TableName:
"""
Expand All @@ -26,11 +36,28 @@ def get_temporary_view_name(self) -> ViewName:

pass

@abc.abstractmethod
def get_temporary_udf_name(self) -> UDFName:
"""
This function registers a new temporary script without creating it.
After the release of this context the framework will issue a cleanup query.
"""

pass

@abc.abstractmethod
def get_temporary_connection_name(self) -> ConnectionName:
"""
This function registers a new temporary connection without creating it.
After the release of this context the framework will issue a cleanup query.
"""

pass

@abc.abstractmethod
def get_temporary_bucketfs_location(self) -> BucketFSLocationProxy:
"""
This function registers a new temporary bucketfs file without creating it.
After the release of this context the framework will remove it.
"""
pass

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QueryHandler PR

Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,19 @@
from exasol_data_science_utils_python.schema.schema_name import SchemaName
from exasol_data_science_utils_python.schema.table_name import TableName
from exasol_data_science_utils_python.schema.table_name_builder import TableNameBuilder
from exasol_data_science_utils_python.schema.udf_name import UDFName
from exasol_data_science_utils_python.schema.udf_name_builder import UDFNameBuilder
from exasol_data_science_utils_python.schema.view_name import ViewName
from exasol_data_science_utils_python.schema.view_name_builder import ViewNameBuilder

from exasol_advanced_analytics_framework.query_handler.context.connection_name_proxy import ConnectionNameProxy
from exasol_advanced_analytics_framework.query_handler.context.connection_name import ConnectionName, ConnectionNameImpl
from exasol_advanced_analytics_framework.query_handler.context.proxy.bucketfs_location_proxy import \
BucketFSLocationProxy
from exasol_advanced_analytics_framework.query_handler.context.proxy.db_object_name_proxy import DBObjectNameProxy
from exasol_advanced_analytics_framework.query_handler.context.proxy.object_proxy import ObjectProxy
from exasol_advanced_analytics_framework.query_handler.context.proxy.table_name_proxy import TableNameProxy
from exasol_advanced_analytics_framework.query_handler.context.proxy.udf_name_proxy import UDFNameProxy
from exasol_advanced_analytics_framework.query_handler.context.proxy.view_name_proxy import ViewNameProxy
from exasol_advanced_analytics_framework.query_handler.context.scope_query_handler_context import \
ScopeQueryHandlerContext, Connection
Expand Down Expand Up @@ -110,6 +115,20 @@ def _get_temporary_view_name(self) -> ViewName:
schema=SchemaName(schema_name=self._temporary_schema_name))
return temporary_view_name

def _get_temporary_udf_name(self) -> UDFName:
self._check_if_released()
temporary_name = self._get_temporary_db_object_name()
temporary_script_name = UDFNameBuilder.create(
name=temporary_name,
schema=SchemaName(schema_name=self._temporary_schema_name))
return temporary_script_name

def _get_temporary_connection_name(self) -> ConnectionName:
self._check_if_released()
temporary_name = self._get_temporary_db_object_name()
temporary_connection_name = ConnectionNameImpl(connection_name=temporary_name)
return temporary_connection_name

def _get_temporary_db_object_name(self) -> str:
temporary_name = f"{self._temporary_db_object_name_prefix}_{self._get_counter_value()}"
return temporary_name
Expand All @@ -118,6 +137,11 @@ def _own_object(self, object_proxy: ObjectProxy):
self._register_object(object_proxy)
self._owned_object_proxies.add(object_proxy)

def get_temporary_name(self) -> str:
self._check_if_released()
temporary_name = self._get_temporary_db_object_name()
return temporary_name

def get_temporary_table_name(self) -> TableName:
self._check_if_released()
temporary_table_name = self._get_temporary_table_name()
Expand All @@ -134,6 +158,22 @@ def get_temporary_view_name(self) -> ViewName:
self._own_object(object_proxy)
return object_proxy

def get_temporary_udf_name(self) -> UDFName:
self._check_if_released()
temporary_script_name = self._get_temporary_udf_name()
object_proxy = UDFNameProxy(temporary_script_name,
self._global_temporary_object_counter.get_current_value())
self._own_object(object_proxy)
return object_proxy

def get_temporary_connection_name(self) -> ConnectionName:
self._check_if_released()
temporary_connection_name = self._get_temporary_connection_name()
object_proxy = ConnectionNameProxy(connection_name=temporary_connection_name,
global_counter_value=self._global_temporary_object_counter.get_current_value())
self._own_object(object_proxy)
return object_proxy

def get_temporary_bucketfs_location(self) -> BucketFSLocationProxy:
self._check_if_released()
temporary_path = self._get_temporary_path()
Expand Down
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QueryHandler PR

Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from exasol_advanced_analytics_framework.query_handler.context.connection_name import ConnectionName
from exasol_advanced_analytics_framework.query_handler.query.drop_query import DropQuery


class DropConnectionQuery(DropQuery):

def __init__(self, connection_name: ConnectionName):
self._connection_name = connection_name

@property
def query_string(self) -> str:
return f"DROP CONNECTION IF EXISTS {self._connection_name.fully_qualified};"

@property
def connection_name(self) -> ConnectionName:
return self._connection_name
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QueryHandler PR

Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,7 @@

class DropyQuery(Query):
pass


class DropQuery(Query):
pass
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QueryHandler PR

Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
from exasol_data_science_utils_python.schema.table_name import TableName
from exasol_advanced_analytics_framework.query_handler.query.query import Query

class DropQuery(Query):
pass
from exasol_advanced_analytics_framework.query_handler.query.drop_query import DropQuery


class DropTableQuery(DropQuery):
Expand All @@ -15,5 +13,5 @@ def query_string(self) -> str:
return f"DROP TABLE IF EXISTS {self._table_name.fully_qualified};"

@property
def table_name(self)->TableName:
return self._table_name
def table_name(self) -> TableName:
return self._table_name
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QueryHandler PR

Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from exasol_data_science_utils_python.schema.table_name import TableName
from exasol_data_science_utils_python.schema.view_name import ViewName

from exasol_advanced_analytics_framework.query_handler.query.drop_table_query import DropQuery
from exasol_advanced_analytics_framework.query_handler.query.drop_query import DropQuery


class DropViewQuery(DropQuery):
Expand Down
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QueryHandler PR

Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import logging
import textwrap
from inspect import cleandoc
from typing import Callable, TypeVar, Generic, Tuple, Union, List

from exasol_data_science_utils_python.udf_utils.sql_executor import SQLExecutor
Expand Down Expand Up @@ -97,12 +99,18 @@ def release_and_create_query_handler_context_of_input_query(self):

def _wrap_return_query(self, input_query: SelectQueryWithColumnDefinition) -> Tuple[str, str]:
temporary_view_name = self._state.input_query_query_handler_context.get_temporary_view_name()
input_query_create_view_string = \
f"CREATE VIEW {temporary_view_name.fully_qualified} AS {input_query.query_string};"
input_query_create_view_string = cleandoc(
f"""
CREATE OR REPLACE VIEW {temporary_view_name.fully_qualified} AS
{input_query.query_string};
""")
full_qualified_columns = [col.name.fully_qualified
for col in input_query.output_columns]
columns_str = ",".join(full_qualified_columns)
input_query_string = \
f"SELECT {columns_str} " \
f"FROM {temporary_view_name.fully_qualified};"
columns_str = ",\n".join(full_qualified_columns)
input_query_string = cleandoc(
f"""
SELECT
{textwrap.indent(columns_str, " " * 4)}
FROM {temporary_view_name.fully_qualified};
""")
return input_query_create_view_string, input_query_string
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from typing import List, Optional

from pydantic import BaseModel

from exasol_advanced_analytics_framework.udf_communication.communicator_protocol import CommunicatorProtocol
from exasol_advanced_analytics_framework.udf_communication.serialization import serialize_message, deserialize_message


class AllGatherResult(BaseModel):
gather_result: List[bytes]


class AllGatherOperation:

def __init__(self, communicator: CommunicatorProtocol, value: bytes):
self._value = value
self._communicator = communicator

def __call__(self) -> List[bytes]:
gather_result = self._communicator.gather(self._value)
broadcast_value: Optional[bytes] = None
if self._communicator.is_multi_node_leader():
all_gather_result = AllGatherResult(gather_result=gather_result)
broadcast_value = serialize_message(all_gather_result)
broadcast_result = self._communicator.broadcast(broadcast_value)
all_gather_result = deserialize_message(broadcast_result, AllGatherResult)
return all_gather_result.gather_result
Loading
Loading