Skip to content

Compare temp branch with current (Do not merge) #187

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 33 commits into
base: main
Choose a base branch
from
Draft
Changes from 1 commit
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
e28c91b
Add broadcast operation
tkilias Sep 15, 2023
7219ee1
Add test_gather_broadcast
tkilias Sep 17, 2023
e5f4e44
Add tensorflow example
tkilias Oct 11, 2023
fe2ee36
Added RetrieveExasolNodeIPAddressUDFDeployer
tkilias Dec 9, 2023
c40b1d0
Add unit test for udf_communicator and refactored it to make it unit …
tkilias Dec 9, 2023
27088ed
Fix integration tests
tkilias Dec 9, 2023
9c76af3
Introduce DistributedUDFRunner and DistributedUDF.
tkilias Dec 10, 2023
5947f2a
Refactored DistributedTrainingUDF to DistributedUDFRunner and Distrib…
tkilias Dec 10, 2023
39162c0
Extracted SocketFactoryContextManagerFactory from udf_communicator.py
tkilias Dec 10, 2023
9a45bea
Added all_gather to Communicator based on gather and broadcast
tkilias Dec 10, 2023
d63caae
Extract exchange_cluster_information from distributed_udf.py and refa…
tkilias Dec 10, 2023
149d6fa
Added integration with db for udf_communicator and returned the netwo…
tkilias Dec 10, 2023
10209bc
Use extracted exchange_cluster_information in distributed_training_ud…
tkilias Dec 10, 2023
4c0c02e
Add temporary_udf_name to QueryHandlerContext
tkilias Feb 4, 2024
063a27b
Fix return type of get_temporary_udf_name in QueryHandlerContext
tkilias Feb 17, 2024
e20f283
Add temporary_connection_name and temporary_name to QueryHandlerContext
tkilias Feb 17, 2024
bb5093d
Use UDFName class in RetrieveExasolNodeIPAddressUDFDeployer
tkilias Feb 17, 2024
e00bff3
Add language_alias parameter to RetrieveExasolNodeIPAddressUDFDeployer
tkilias Feb 17, 2024
09233a5
Extract ConnectionName into its own module to prevent circular import
tkilias Feb 17, 2024
25f21d1
Implement missing abstract function fully_qualified in ConnectionName
tkilias Feb 17, 2024
9eecf86
Fix MRO in ConnectionNameProxy
tkilias Feb 17, 2024
4b63596
Fix _get_temporary_connection_name in _ScopeQueryHandlerContextBase
tkilias Feb 17, 2024
55cffaa
Add MockSQLExecutor which also checks the query to execute against ex…
tkilias Feb 17, 2024
63fdfb0
Fix name of ExpectedQuery
tkilias Feb 17, 2024
1715daf
Return mock_result_set in MockSQLExecutor
tkilias Feb 17, 2024
dec8e3d
use cleandoc in RetrieveExasolNodeIPAddressUDFDeployer
tkilias Feb 17, 2024
6d3dbbe
Improve output of MockSQLExecutor and MockQueryHandlerRunner
tkilias Feb 18, 2024
48a520f
Use create or replace view in mock_query_handler_runner.py
tkilias Feb 18, 2024
baece13
Fix excpeption in mock_sql_executor.py
tkilias Feb 22, 2024
e54ee47
Removed tensorflow and some cleanup
tkilias Feb 24, 2024
d51872a
Update bucketfs-python
tkilias Feb 25, 2024
b2f8968
Make mock_sql_executor.py woring with pytest_regex matcher
tkilias Feb 28, 2024
80afdc2
Fix VARCHAR size of ip_adress in RetrieveExasolNodeIPAddressUDFDeployer
tkilias Apr 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Extract exchange_cluster_information from distributed_udf.py and refa…
…ctor it with all_gather
tkilias committed Dec 10, 2023
commit d63caae6d834e92d9b984b982624f6c310d8cdaf
Original file line number Diff line number Diff line change
@@ -1,67 +1,14 @@
import socket
import time
from typing import List, Tuple, Protocol
from typing import Protocol

import structlog
from pydantic import BaseModel
from structlog.typing import FilteringBoundLogger

from exasol_advanced_analytics_framework.udf_communication.communicator import Communicator
from exasol_advanced_analytics_framework.udf_communication.ip_address import Port, IPAddress
from exasol_advanced_analytics_framework.udf_communication.udf_communicator import udf_communicator

LOGGER: FilteringBoundLogger = structlog.get_logger()


class WorkerAddress(BaseModel):
ip_address: IPAddress
port: Port


class ClusterInformation(BaseModel):
workers: List[WorkerAddress]


def reserve_port(ip: IPAddress) -> Port:
def new_socket():
return socket.socket(socket.AF_INET, socket.SOCK_STREAM)

def bind(sock: socket.socket, ip: IPAddress, port: int):
sock.bind((ip.ip_address, port))
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

def acquire_port_number(sock: socket.socket, ip: IPAddress) -> int:
bind(sock, ip, 0)
return sock.getsockname()[1]

with new_socket() as sock:
port_number = acquire_port_number(sock, ip)
port = Port(port=port_number)
LOGGER.info("reserve_port", ip=ip, port=port)
return port


def exchange_cluster_information(communicator: Communicator, worker_address: WorkerAddress) \
-> ClusterInformation:
LOGGER.info("before gather", worker_address=worker_address)
workers_messages = communicator.gather(worker_address.json().encode("UTF-8"))
LOGGER.info("after gather", worker_address=worker_address)
broadcast_value = None
if communicator.is_multi_node_leader():
workers = [WorkerAddress.parse_raw(message.decode("UTF-8")) for message in workers_messages]
cluster_information = ClusterInformation(workers=workers)
broadcast_value = cluster_information.json().encode("UTF-8")
LOGGER.info("before broadcast", worker_address=worker_address)
broadcast_result = communicator.broadcast(broadcast_value)
LOGGER.info("after broadcast", worker_address=worker_address)

# if communicator.is_multi_node_leader():
# time.sleep(5)

cluster_information = ClusterInformation.parse_raw(broadcast_result.decode("UTF-8"))
return cluster_information


class UDFCommunicatorFactory(Protocol):

def create(self) -> Communicator:
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import contextlib
import socket
from typing import List, Iterator

from pydantic import BaseModel

from exasol_advanced_analytics_framework.udf_communication.communicator import Communicator
from exasol_advanced_analytics_framework.udf_communication.distributed_udf import LOGGER
from exasol_advanced_analytics_framework.udf_communication.ip_address import IPAddress, Port
from exasol_advanced_analytics_framework.udf_communication.serialization import serialize_message, deserialize_message


class WorkerAddress(BaseModel):
ip_address: IPAddress
port: Port


class ClusterInformation(BaseModel):
workers: List[WorkerAddress]


@contextlib.contextmanager
def reserve_port(ip: IPAddress) -> Iterator[Port]:
def new_socket():
return socket.socket(socket.AF_INET, socket.SOCK_STREAM)

def bind(sock: socket.socket, ip: IPAddress, port: int):
sock.bind((ip.ip_address, port))
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

def acquire_port_number(sock: socket.socket, ip: IPAddress) -> int:
bind(sock, ip, 0)
return sock.getsockname()[1]

with new_socket() as sock:
port_number = acquire_port_number(sock, ip)
port = Port(port=port_number)
LOGGER.info("reserve_port", ip=ip, port=port)
yield port


def exchange_cluster_information(communicator: Communicator, worker_address: WorkerAddress) \
-> ClusterInformation:
worker_messages = communicator.all_gather(serialize_message(worker_address))
worker_addresses = [deserialize_message(message, WorkerAddress) for message in worker_messages]
cluster_information = ClusterInformation(workers=worker_addresses)
return cluster_information