Source code for ayx_python_sdk.providers.e1_provider.records.raw_record_container

# Copyright (C) 2022 Alteryx, Inc. All rights reserved.
#
# Licensed under the ALTERYX SDK AND API LICENSE AGREEMENT;
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    https://www.alteryx.com/alteryx-sdk-and-api-license-agreement
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""RawRecordContainer class definition."""
from typing import Dict, List, Optional, TYPE_CHECKING

from AlteryxPythonSDK import RecordCreator, RecordInfo, RecordRef

from ayx_python_sdk.providers.e1_provider.proxies import (
    FieldProxy,
    RecordCopierProxy,
)
from ayx_python_sdk.providers.e1_provider.records.base_record_container import (
    BaseRecordContainer,
)
from ayx_python_sdk.providers.e1_provider.utilities import (
    fill_df_nulls_with_blackbird_nulls,
)


if TYPE_CHECKING:
    import pandas as pd


[docs]class RawRecordContainer(BaseRecordContainer): """Container for copying and holding raw records.""" __slots__ = [ "records", "_input_record_info", "_storage_record_info", "_field_map", "_record_copier", "_input_fields", ] def __init__( self, input_record_info: RecordInfo, storage_record_info: Optional[RecordInfo] = None, field_map: Optional[Dict[str, str]] = None, ) -> None: """Construct a container.""" super().__init__() if (storage_record_info is None) ^ (field_map is None): raise ValueError( "storage_record_info and field_map must both be specified." ) self._input_record_info = input_record_info if storage_record_info is None: self._storage_record_info = self._input_record_info.clone() else: self._storage_record_info = storage_record_info if field_map is None: self._field_map = { str(field.name): str(field.name) for field in self._storage_record_info } else: self._field_map = field_map self._record_copier = RecordCopierProxy( self._input_record_info, self._storage_record_info, self._field_map ) self._input_fields = { field.name: FieldProxy(field) for field in input_record_info } self.records: List[RecordCreator] = []
[docs] def add_record(self, record: RecordRef) -> None: """Make a copy of the record and add it to the container.""" self.records.append(self._record_copier.copy(record))
[docs] def build_dataframe(self) -> "pd.DataFrame": """Build a dataframe from the container.""" raise NotImplementedError()
[docs] def update_with_dataframe(self, df: "pd.DataFrame") -> None: """Update stored records with values from a dataframe.""" num_rows, _ = df.shape if num_rows != len(self.records): raise ValueError( "Dataframe and source container must have the same number of records." ) fill_df_nulls_with_blackbird_nulls(df) for record, (_, row) in zip(self.records, df.iterrows()): for column_name in list(df): try: field = self._storage_record_info.get_field_by_name(column_name) if field is None: raise Exception() except Exception: raise RuntimeError( f"Couldn't update field '{column_name}' that does not exist" ) FieldProxy(field).set(record, row[column_name])