# Copyright (C) 2022 Alteryx, Inc. All rights reserved.
#
# Licensed under the ALTERYX SDK AND API LICENSE AGREEMENT;
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.alteryx.com/alteryx-sdk-and-api-license-agreement
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Connection class definitions."""
from enum import IntEnum
from typing import Any, List, Optional, TYPE_CHECKING
from AlteryxPythonSDK import RecordInfo, RecordRef
from ayx_python_sdk.core.observable_mixin import ObservableMixin
from ayx_python_sdk.providers.e1_provider.events import (
ConnectionEvents,
PluginEvents,
)
from ayx_python_sdk.providers.e1_provider.records import BaseRecordContainer
if TYPE_CHECKING:
from ayx_python_sdk.providers.e1_provider.e1_input_anchor_proxy import (
E1InputAnchorProxy,
)
from ayx_python_sdk.providers.e1_provider.e1_plugin_proxy import E1PluginProxy
ConnectionStatus = IntEnum(
"ConnectionStatus", "CREATED INITIALIZED RECEIVING_RECORDS CLOSED"
)
[docs]class ConnectionInterface(ObservableMixin):
"""Connection interface definition."""
__slots__ = [
"name",
"record_containers",
"__record_info",
"progress_percentage",
"status",
"plugin_failed",
"anchor",
"record_batch_size",
"plugin_proxy",
]
def __init__(
self,
plugin_proxy: "E1PluginProxy",
connection_name: str,
anchor: "E1InputAnchorProxy",
) -> None:
"""Instantiate a connection interface."""
super().__init__()
self.name = connection_name
self.__record_info: Optional[RecordInfo] = None
self.progress_percentage = 0.0
self.status = ConnectionStatus.CREATED
self.plugin_failed = False
self.record_containers: List[BaseRecordContainer] = []
self.anchor = anchor
self.record_batch_size: Optional[int] = None
self.plugin_proxy = plugin_proxy
self.plugin_proxy.subscribe(
PluginEvents.PLUGIN_FAILURE, self.plugin_failure_callback
)
@property
def record_info(self) -> Optional[RecordInfo]:
"""Getter for record info."""
return self.__record_info
[docs] def add_record_container(self, container: BaseRecordContainer) -> None:
"""Add a new record container."""
self.record_containers.append(container)
[docs] def clear_records(self) -> None:
"""Clear all records for this connection's containers."""
for container in self.record_containers:
container.clear_records()
[docs] def plugin_failure_callback(self, **_: Any) -> None:
"""Set failed status from plugin."""
self.plugin_failed = True
[docs] def ii_init(self, record_info: RecordInfo) -> bool:
"""Initialize the connection."""
# DO NOT REMOVE NEXT LINE: The base SDK has issues with sys.path updates,
# and in order to fix some dependency resolution issues we have to update it again here
self.plugin_proxy.update_sys_path()
self.status = ConnectionStatus.INITIALIZED
self.__record_info = record_info
self.notify_topic(ConnectionEvents.CONNECTION_INITIALIZED, connection=self)
return not self.plugin_failed
[docs] def ii_push_record(self, record: RecordRef) -> bool:
"""Receive a record."""
self.status = ConnectionStatus.RECEIVING_RECORDS
for container in self.record_containers:
container.add_record(record)
self.notify_topic(ConnectionEvents.RECORD_RECEIVED, connection=self)
return not self.plugin_failed
[docs] def ii_update_progress(self, d_percent: float) -> None:
"""Update progress of incoming data."""
self.progress_percentage = max(d_percent, 0)
self.notify_topic(
ConnectionEvents.PROGRESS_UPDATE, connection=self, percent=d_percent
)
[docs] def ii_close(self) -> None:
"""Close the connection."""
self.status = ConnectionStatus.CLOSED
self.notify_topic(ConnectionEvents.CONNECTION_CLOSED, connection=self)