# encoding: utf-8
"""Provides the Dimension class."""
import copy
from collections.abc import Sequence
from datetime import datetime
from typing import Dict, List, Optional, Tuple, Union
import numpy as np
from cr.cube.enums import (
COLLATION_METHOD as CM,
DIMENSION_TYPE as DT,
MARGINAL,
MEASURE,
)
from cr.cube.util import lazyproperty
class Dimensions(tuple):
"""Collection containing every dimension defined in cube response."""
def __repr__(self) -> str:
"str representation of this dimension sequence." ""
dim_lines = "\n".join(str(d) for d in self)
return f"Dimensions:\n{dim_lines}"
@classmethod
def from_dicts(cls, dicts):
"""Populate a Dimensions instance from the given cube response pieces.
Each dict in the given dicts should be a ZCL dimension definition,
with "type" and "references" members.
"""
dims = [Dimension(d, cls.dimension_type(d)) for d in dicts]
# Promote any DT.CA_SUBVAR to DT.MR_SUBVAR
# based on the type of its related "values" dim.
for dim in dims:
if dim.dimension_type == DT.CA_SUBVAR:
# Okay, not ANY but only the first axis ("subvariables").
# Additional axes (e.g. "fused" variables / scorecards) are
# incompletely implemented, and their cube output is different
# from subvariable axes (no "value" field in elements).
# Apparently, this library relies on these higher axes
# continuing to be marked as CA_SUBVAR to hint that
# features like insertions are not supported.
if any(
"value" not in e
for e in dim._unshimmed_dimension_dict["type"]["elements"]
):
continue
for other in dims:
if other is not dim and other.alias == dim.alias:
if other.dimension_type == DT.MR_CAT:
dim.dimension_type = DT.MR_SUBVAR
return cls(dims)
@classmethod
def dimension_type(cls, dimension_dict):
"""Return the appropriate DIMENSION_TYPE.
Note that the returned type may depend on BOTH the current dimension
AND the next one, since arrays always place the higher dimension
first.
"""
typedef = dimension_dict["type"]
type_class = typedef["class"]
if type_class == "categorical":
cats = typedef.get("categories", [])
is_logical = any(cat.get("selected") for cat in cats) and [
cat.get("id") for cat in cats
] == [1, 0, -1]
if dimension_dict.get("references", {}).get("subreferences"):
if is_logical:
return DT.MR_CAT
else:
return DT.CA_CAT
else:
if is_logical:
return DT.LOGICAL
if any("date" in cat for cat in cats):
return DT.CAT_DATE
return DT.CAT
elif type_class == "enum":
subclass = dimension_dict["type"]["subtype"]["class"]
if subclass == "variable":
# This may be overridden to DT.MR_SUBVAR by the caller
# if it detects related dims are DT.MR_CAT
return DT.CA_SUBVAR
elif subclass == "datetime":
return DT.DATETIME
elif subclass == "numeric":
return DT.BINNED_NUMERIC
elif subclass == "text":
return DT.TEXT
elif subclass == "num_arr":
return DT.NUM_ARRAY
raise NotImplementedError(f"unrecognized dimension type {type_class}")
@lazyproperty
def apparent_dimensions(self) -> "Dimensions":
"""List of the "visible" dimensions.
The two dimensions for a multiple-response (MR) variable are
conflated into a single dimensions in this collection.
"""
return self.__class__(d for d in self if d.dimension_type != DT.MR_CAT)
@lazyproperty
def dimension_order(self) -> Tuple[int, ...]:
"""Tuple of int representing the dimension order.
The dimension order depends on the presence of numeric array in the dimensions
and the number of the cube dimensions. In case of 3 dimensions e.g.
NUM_ARR_X_MR_SUBVAR_X_MR_CAT the order should be (1,2,0) that is basically
swapping the MR (2 dimensions) with the NUM_ARRAY dimension. In case of 2
dimensions the dimension order correspond simpy to the reverse of the original
dimension order.
"""
# NOTE: this is a temporary hack that goes away when we introduce the dim_order
# concept. We should receive the actual order directly in the cube_response.
# So, all this logic will be deleted.
dimension_types = tuple(d.dimension_type for d in self)
dim_order = tuple(range(len(self)))
if len(self) >= 2 and DT.NUM_ARRAY in dimension_types:
return (
dim_order[-2:] + (dim_order[0],) if len(self) == 3 else dim_order[::-1]
)
return dim_order
@lazyproperty
def shape(self) -> Tuple[int, ...]:
"""Tuple of int element count for each dimension.
This corresponds to the shape of the ndarray representing the raw
cube response values (raw meaning including missing and prunable
elements and any MR_CAT dimensions).
"""
dimensions = [self[i] for i in self.dimension_order]
return tuple(d.shape for d in dimensions)
[docs]class Dimension:
"""Represents one dimension of a cube response.
Each dimension represents one of the variables in a cube response. For
example, a query to cross-tabulate snack-food preference against region
will have two variables (snack-food preference and region) and will produce
a two-dimensional (2D) cube response. That cube will have two of these
dimension objects, which are accessed using
:attr:`.CrunchCube.dimensions`.
"""
dimension_type = None
def __init__(self, dimension_dict, dimension_type, dimension_transforms=None):
self._unshimmed_dimension_dict = dimension_dict
self.dimension_type = dimension_type
self._unshimmed_dimension_transforms_dict = dimension_transforms or {}
def __repr__(self) -> str:
"""str representation of this dimension."""
return f"{self.dimension_type}: {self.name}"
[docs] @lazyproperty
def alias(self) -> Optional[str]:
"""Return the alias for the dimension if it exists, None otherwise."""
return self._dimension_dict["references"].get("alias")
[docs] @lazyproperty
def all_elements(self) -> "Elements":
"""Elements object providing cats or subvars of this dimension.
Elements in this sequence appear in cube-result order.
"""
return Elements.from_typedef(
self._dimension_dict["type"],
self._dimension_transforms_dict,
self.dimension_type,
self._element_data_format,
)
[docs] @lazyproperty
def description(self) -> str:
"""str description of this dimension."""
# ---First authority in cascade is analysis-specific dimension transform. None
# ---is a legitimate value, indicating suppression of any inherited subtitle.
if "description" in self._dimension_transforms_dict:
description = self._dimension_transforms_dict["description"]
# ---inherited value is base dimension description---
else:
description = self._dimension_dict["references"].get("description")
# ---Normalize to "" so return value is always a str and callers don't need to
# ---deal with None as a possible return type.
return description if description else ""
[docs] @lazyproperty
def element_aliases(self) -> Tuple[str, ...]:
"""tuple of string element-aliases for each valid element in this dimension.
Element-aliases appear in the order defined in the cube-result.
"""
return tuple(e.alias for e in self.valid_elements)
[docs] @lazyproperty
def element_ids(self) -> Tuple[int, ...]:
"""tuple of int element-id for each valid element in this dimension.
Element-ids appear in the order defined in the cube-result.
"""
return tuple(e.element_id for e in self.valid_elements)
[docs] @lazyproperty
def element_labels(self) -> Tuple[str, ...]:
"""tuple of string element-labels for each valid element in this dimension.
Element-labels appear in the order defined in the cube-result.
"""
return tuple(e.label for e in self.valid_elements)
[docs] @lazyproperty
def hidden_idxs(self) -> Tuple[int, ...]:
"""tuple of int element-idx for each hidden valid element in this dimension.
An element is hidden when a "hide" transform is applied to it in its transforms
dict.
"""
return tuple(
idx for idx, element in enumerate(self.valid_elements) if element.is_hidden
)
[docs] @lazyproperty
def insertion_ids(self) -> Tuple[int, ...]:
"""tuple of int insertion-id for each insertion in this dimension.
Insertion-ids appear in the order insertions are defined in the dimension.
"""
return tuple(s.insertion_id for s in self.subtotals)
[docs] @lazyproperty
def name(self) -> str:
"""str name of this dimension, the empty string ("") if not specified."""
references = self._dimension_dict["references"]
def raw_name():
"""Return dimension-name as specified (`None` is not normalized to "")."""
# --- First authority in cascade is analysis-specific dimension transform.
# --- None is a legitimate value, indicating suppression of any inherited
# --- title.
if "name" in self._dimension_transforms_dict:
return self._dimension_transforms_dict["name"]
# --- next authority is base dimension name ---
if "name" in references:
return references["name"]
# --- default is dimension alias ---
return references.get("alias")
# ---Normalize None value to "" so return value is always a str and callers
# ---don't need to deal with multiple possible return value types.
name = raw_name()
return name if name else ""
[docs] @lazyproperty
def numeric_values(self) -> Tuple[Union[int, float], ...]:
"""tuple of numeric values for valid elements of this dimension.
Each category of a categorical variable can be assigned a *numeric
value*. For example, one might assign `like=1, dislike=-1,
neutral=0`. These numeric mappings allow quantitative operations
(such as mean) to be applied to what now forms a *scale* (in this
example, a scale of preference).
The numeric values appear in the same order as the
categories/elements of this dimension. Each element is represented by
a value, but an element with no numeric value appears as `np.nan` in
the returned list.
"""
return tuple(element.numeric_value for element in self.valid_elements)
[docs] @lazyproperty
def order_spec(self) -> "_OrderSpec":
"""_OrderSpec proxy object for dimension.transforms.order dict from payload."""
return _OrderSpec(self, self._dimension_transforms_dict)
[docs] @lazyproperty
def prune(self) -> bool:
"""True if empty elements should be automatically hidden on this dimension."""
return self._dimension_transforms_dict.get("prune") is True
[docs] @lazyproperty
def selected_categories(self) -> Tuple[Dict, ...]:
"""List of selected categories specified for this dimension."""
selected_categories = self._dimension_dict["references"].get(
"selected_categories"
)
return tuple(selected_categories) if selected_categories else ()
[docs] @lazyproperty
def shape(self) -> int:
"""int count of *all* elements in this dimension, both valid and missing."""
return len(self.all_elements)
[docs] @lazyproperty
def smoothing_dict(self) -> Optional[Dict]:
"""Optional dict of smoothing specifications."""
return self._dimension_transforms_dict.get("smoother") or {}
[docs] @lazyproperty
def subtotal_aliases(self) -> Tuple[str, ...]:
"""tuple of string element-aliases for each subtotal in this dimension.
Element-aliases appear in the order defined in the cube-result.
"""
return tuple(s.alias for s in self.subtotals)
[docs] @lazyproperty
def subtotal_labels(self) -> Tuple[str, ...]:
"""tuple of string element-labels for each subtotal in this dimension.
Element-labels appear in the order defined in the cube-result.
"""
return tuple(s.label for s in self.subtotals)
[docs] @lazyproperty
def subtotals(self) -> "_Subtotals":
"""_Subtotals sequence object for this dimension.
Each item in the sequence is a _Subtotal object specifying a subtotal, including
its addends and anchor.
"""
# --- elements of an aggregate/array dimension cannot meaningfully be summed, so
# --- an array dimension cannot have subtotals
if self.dimension_type in (DT.MR_SUBVAR, DT.CA_SUBVAR):
return _Subtotals([], self.valid_elements)
# --- insertions in dimension-transforms override those on dimension itself ---
elif "insertions" in self._dimension_transforms_dict:
insertion_dicts = self._dimension_transforms_dict["insertions"]
return _Subtotals(insertion_dicts, self.valid_elements, False)
# --- otherwise insertions defined on dimension/variable apply ---
else:
insertion_dicts = self._view_insertion_dicts
return _Subtotals(insertion_dicts, self.valid_elements)
[docs] @lazyproperty
def subtotals_in_payload_order(self) -> "_Subtotals":
"""_Subtotals sequence object for this dimension respecting the payload order.
Each item in the sequence is a _Subtotal object specifying a subtotal, including
its addends and anchor.
"""
if self.dimension_type in (DT.MR_SUBVAR, DT.CA_SUBVAR):
return _Subtotals([], self.valid_elements)
elif self._view_insertion_dicts:
return _Subtotals(self._view_insertion_dicts, self.valid_elements)
else:
insertion_dicts = self._dimension_transforms_dict.get("insertions", {})
return _Subtotals(insertion_dicts, self.valid_elements, False)
[docs] def translate_element_id(self, _id) -> Optional[str]:
"""Optional string that is the translation of various ids to subvariable alias
This is needed for the opposing dimension's sort by opposing element, because
when creating a dimension, we don't have access to the other dimension's
ids to transform it. Therefore, the id for opposing element sort by value
transforms is not translated at creation time.
0) If dimension is not a subvariables dimension, return the _id.
1) If id matches an alias, then just use it.
2) If id matches a subvariable id, translate to corresponding alias.
3) If id matches an element id, translate to corresponding alias.
4) If id can be parsed to int and matches an element id, translate to alias.
5) If id is int (or can be parsed to int) and can be used as index (eg in range
0-# of elements), use _id'th alias.
6) If all of these fail, return None.
"""
return self._element_id_shim.translate_element_id(_id)
[docs] @lazyproperty
def valid_elements(self) -> "Elements":
"""Elements object providing access to non-missing elements.
Any categories or subvariables representing missing data are excluded
from the collection; this sequence represents a subset of that
provided by `.all_elements`.
"""
return self.all_elements.valid_elements
@lazyproperty
def _element_id_shim(self) -> "_ElementIdShim":
"""_ElementIdShim for this Dimension object"""
return _ElementIdShim(
self.dimension_type,
self._unshimmed_dimension_dict,
self._unshimmed_dimension_transforms_dict,
)
@lazyproperty
def _dimension_dict(self) -> Dict:
"""Copy of dimension dictionary with shimmed `element_id`s"""
return self._element_id_shim.shimmed_dimension_dict
@lazyproperty
def _dimension_transforms_dict(self) -> Dict:
"""Copy of dimension transforms dictionary with shimmed `element_id`s"""
return self._element_id_shim.shimmed_dimension_transforms_dict
@lazyproperty
def _element_data_format(self) -> Optional[Union[str, int]]:
"""optional str format for datetimes or int number of decimals for numeric"""
dim_dict = self._dimension_dict
fmt = dim_dict.get("references", {}).get("format")
# --- Be careful, format can be set to None so check before using `get`
if fmt and fmt.get("data"):
return fmt.get("data")
# --- Determine a fallback format from the resolution to match whaam
# --- github.com/Crunch-io/whaam/blob/master/base/utility/dtype-to-strf.js
resolution = dim_dict.get("type", {}).get("subtype", {}).get("resolution")
# --- If there's no resolution then probably isn't a date
if not resolution:
return None
return {
"s": ":%S",
"m": "%H:%M",
"h": "%H:00",
"D": "%d %b %Y",
"W": "%Y W%W",
"M": "%b %Y",
"Y": "%Y",
}.get(resolution, "%Y-%m-%d")
@lazyproperty
def _view_insertion_dicts(self) -> List[Optional[Dict]]:
"""List of insertion dicts included in the dimension view."""
view = self._dimension_dict.get("references", {}).get("view") or {}
insertions = view.get("transform", {}).get("insertions", [])
return insertions
class Elements(tuple):
"""Sequence of Element objects for a dimension.
Each element is either a category or a subvariable.
"""
datetime_formats = {
"Y": "%Y",
"Q": "%Y-%m",
"3M": "%Y-%m",
"M": "%Y-%m",
"W": "%Y-%m-%d",
"D": "%Y-%m-%d",
"h": "%Y-%m-%dT%H",
"m": "%Y-%m-%dT%H:%M",
"s": "%Y-%m-%dT%H:%M:%S",
"ms": "%Y-%m-%dT%H:%M:%S.%f",
"us": "%Y-%m-%dT%H:%M:%S.%f",
}
@classmethod
def from_typedef(
cls, typedef, dimension_transforms_dict, dimension_type, element_data_format
) -> "Elements":
"""Populate an Elements instance from the given ZCL type definition."""
element_defs = (
typedef["categories"]
if typedef["class"] == "categorical"
else typedef["elements"]
)
order = typedef.get("order")
if order is not None:
# The data in the cube along this dimension is in the specified order.
# The categories or enum elements in the typedef were NOT rearranged;
# therefore, we do so here to make all layers that use this tuple
# of Element objects work transparently.
codemap = {edef["id"]: edef for edef in element_defs}
element_defs = [codemap[code] for code in order if code in codemap]
all_xforms = dimension_transforms_dict.get("elements", {})
if dimension_type == DT.MR_SUBVAR:
hidden_xforms = cls._hidden_transforms(
element_defs,
dimension_transforms_dict.get("insertions", []),
)
all_xforms = {**hidden_xforms, **all_xforms}
elements = []
for idx, element_dict in enumerate(element_defs):
# --- convert to string for categorical ids
element_id = element_dict["id"]
xforms = _ElementTransforms(
all_xforms.get(element_id, all_xforms.get(str(element_id), {}))
)
# This is so dumb that our type checker won't let us just
# write `formatter = str`.
def format(x) -> str:
return str(x)
formatter = format
if dimension_type == DT.DATETIME:
orig_format = cls.datetime_formats.get(
typedef["subtype"].get("resolution")
)
out_format = element_data_format
if orig_format is not None and out_format is not None:
def format_datetime(x) -> str:
try:
return datetime.strptime(x, orig_format).strftime(
out_format
)
except ValueError:
return str(x)
formatter = format_datetime
element = Element(element_dict, idx, xforms, formatter)
elements.append(element)
return cls(elements)
@classmethod
def _hidden_transforms(cls, element_defs, insertions) -> Dict:
"""Element transforms dict for array dimensions.
To provide consistency with a poorly-defined interface for categorical
insertions, a client can include a `"hide": true` field in a (complete) copy of
a variable-based insertion in order to suppress that variable-based insertion.
For the array case, these need to be translated to a "hide" transform on the
subvariable element because such an insertion becomes a derived subvariable just
like the other subvariables in the dimension.
"""
# --- currently an inserted-subvariable can only be identified by name, there is
# --- no alias for an inserted-subvariable and it does not receive a "normal"
# --- element.id like "0001".
hidden = [i["name"] for i in insertions if i.get("hide", False)]
# --- however, the hide-transform must be identified by element-id, so we need a
# --- mapping of insertion-name to element-id
element_id_from_name = {
element["value"]["id"]: element["id"] for element in element_defs
}
return {
element_id_from_name[name]: {"hide": True}
for name in hidden
if name in element_id_from_name
}
@lazyproperty
def element_ids(self) -> Tuple[int, ...]:
"""tuple of element-id for each element in collection.
Element ids appear in the order they occur in the cube response.
"""
return tuple(element.element_id for element in self)
@lazyproperty
def element_idxs(self) -> Tuple[int, ...]:
"""tuple of element-index for each element in collection.
Element index values represent the position of this element in the
dimension-dict it came from. In the case of all elements,
it will simply be a tuple(range(len(all_elements))).
"""
return tuple(element.index for element in self)
def get_by_id(self, element_id: int) -> "Element":
"""Return Element object identified by *element_id*.
Raises KeyError if not found. Only elements known to this collection
are accessible for lookup. For example, a_"valid elements" object will
raise KeyError for the id of a missing element.
"""
return self._elements_by_id[element_id]
@lazyproperty
def _elements_by_id(self) -> Dict:
"""dict mapping each element by its id."""
return {element.element_id: element for element in self}
@lazyproperty
def valid_elements(self) -> "Elements":
"""Elements object containing only non-missing elements."""
return Elements(element for element in self if not element.missing)
class _ElementIdShim:
"""Object used to replace element ids with alias for subvariables.
We want to move to a world where elements on a subvariables dimension are
identified by their alias, but right now the "element_id" from zz9 is
an index, and the transforms have several different ways to refer to
subvariables.
Types of identifiers for subvariables (and derived insertions):
* "element_id": Stored in the cube result as the object name in
`dimensions[i].type.elements[j].id`. For subvariables, zz9 currently puts
the index integer here. Long term zz9 may change this to the the alias.
* "subvariable_id": Subvariables have an id stored in
`dimensions[i].type.elements[j].value.id`, generally this is a 4 digit,
0-padded index of the subvariable when it was first created (eg "0001",
"0002", ...), though it is not required to be. For derived insertions,
currently the name is used here.
* "alias": Subvariables also have an alias that identifies them. It is stored
in `dimensions[i].type.elements[j].value.
"""
def __init__(self, dimension_type, dimension_dict, dimension_transforms_dict):
self.dimension_type = dimension_type
self._dimension_dict = dimension_dict
self._dimension_transforms_dict = dimension_transforms_dict
@lazyproperty
def shimmed_dimension_dict(self) -> Dict:
"""Copy of dimension dictionary with shimmed `element_id`s
We want to move to a world where elements on a subvariables dimension are
identified by their alias, but right now the "element_id" from zz9 is
an index for subvariables.
"""
shim = copy.deepcopy(self._dimension_dict)
# --- array variables are identified by thier aliases
if self.dimension_type in DT.ARRAY_TYPES:
# --- Replace element ids with the alias
for idx, alias in enumerate(self._subvar_aliases):
shim["type"]["elements"][idx]["id"] = alias
# --- datetimes are identified by their value
if self.dimension_type == DT.DATETIME:
for el in shim["type"]["elements"]:
# --- Missing data comes in as a dict and should be ignored
if not isinstance(el["value"], dict):
el["id"] = el["value"]
# --- Leave other types alone
return shim
@lazyproperty
def shimmed_dimension_transforms_dict(self) -> Dict:
"""Copy of dimension transforms dictionary with shimmed `element_id`s
We want to move to a world where elements on a subvariables dimension are
identified by their alias, but right now the "element_id" from zz9 is
simply the subvariable's (unstable) cardinal position in subvariables
sequence. Different parts of the transforms have several different ways
to refer to subvariables.
Types of identifiers for subvariables (and derived insertions):
- "element_id": Stored in the cube result as the object name in
`dimensions[i].type.elements[j].id`. For subvariables, zz9 currently puts
the index integer here. Long term zz9 may change this to the the alias.
- "subvariable_id": Subvariables have an id stored in
`dimensions[i].type.elements[j].value.id`, generally this is a 4 digit,
0-padded index of the subvariable when it was first created (eg "0001",
"0002", ...), though it is not required to be. For derived insertions,
currently the name is used here.
- "alias": Subvariables also have an alias that identifies them. It is stored
in `dimensions[i].type.elements[j].value.references.alias`.
"""
shim = copy.deepcopy(self._dimension_transforms_dict)
# --- Leave non-subvariable dimension types alone, as they don't have
# --- subvariable aliases to use, and category ids are already the main way
# --- we identify elements on categorical dimensions (and this is correct).
dim_type = self.dimension_type
if dim_type not in DT.SHIMMED_TYPES:
return shim
# --- Replace element transform ids with the alias
if "elements" in shim:
shim["elements"] = self._replaced_element_transforms(shim["elements"])
# --- Translate explicit order element ids if present
if shim.get("order", {}).get("element_ids") is not None:
shim["order"]["element_ids"] = self._replaced_order_element_ids(
shim["order"]["element_ids"]
)
# --- Translate fixed element ids if present
fixed = shim.get("order", {}).get("fixed", {})
if fixed.get("top"):
fixed["top"] = self._replaced_order_element_ids(fixed["top"])
if fixed.get("bottom"):
fixed["bottom"] = self._replaced_order_element_ids(fixed["bottom"])
# --- sort-by-value on the opposing dimension also refers to element ids, but
# --- the ids refer to the opposing dimension, so do the translation later on.
# --- This is a little unfortunate, because this means that the ids in this shim
# --- version of the dimension transforms are inconsistent. But it feels easier
# --- than forcing the dimensions to be aware of other dimensions.
return shim
def translate_element_id(self, _id) -> Optional[str]:
"""Optional string that is the translation of various ids to subvariable alias
0) If dimension is not a subvariables dimension, return the _id.
1) If id matches an alias, then just use it.
2) If id matches an element id, translate to corresponding alias.
3) If id matches a subvariable id, translate to corresponding alias.
4) If id can be parsed to int and matches an element id, translate to alias.
5) If id is int (or can be parsed to int) and can be used as index (eg in range
0-# of elements), use _id'th alias.
6) If all of these fail, return None.
"""
if self.dimension_type not in DT.SHIMMED_TYPES:
return _id
if self.dimension_type == DT.DATETIME:
int_id = int(_id) if isinstance(_id, str) and _id.isnumeric() else _id
return self._element_values_dict.get(int_id, _id)
# --- Otherwise is an array type
if _id in self._subvar_aliases:
return _id
if _id in self._raw_element_ids:
return self._subvar_aliases[self._raw_element_ids.index(_id)]
# --- In case of MR dimensions with HS there's an edge case where the hidden
# --- column refers to the subvariable idx position defined in the raw
# --- elements elements ids.
# --- For example id = "2",
# --- subvar_ids = ('savory+spicy','1','spicy+sweet','2','savory+sweet','3')
# --- subvar_aliases = ('savory___spicy_at_top',
# 'enjoy_mr_savory__4',
# 'spicy___sweet_after_enjoy_mr_savory__4',
# 'enjoy_mr_spicy__4',
# 'savory___sweet_after_enjoy_mr_spicy__4',
# 'enjoy_mr_sweet__4')
# --- Relying on the subvar_ids this will get the self._subvar_ids.index(_id)
# --- that corresponds to 3 and so subvar_aliases[3] == enjoy_mr_spicy__4.
# --- This means that we're hiding the col Spicy rather than Savory.
# --- Using the _raw_element_ids instead will return the correct hidden alias.
# --- NOTE: this goes away when we will use aliases rather than id.
if self._has_mr_insertion:
elements = self._dimension_dict["type"]["elements"]
insertions_mask = [
1 if "anchor" in el["value"].get("references", {}) else 0
for el in elements
]
element_ids_without_insertions = [
str(el)
for is_ins, el in zip(insertions_mask, self._raw_element_ids)
if not is_ins
]
if _id in element_ids_without_insertions:
return self._subvar_aliases[self._raw_element_ids.index(int(_id))]
if _id in self._subvar_ids:
return self._subvar_aliases[self._subvar_ids.index(_id)]
try:
_id = int(_id)
# --- If successfully converted to int, try raw element ids again
if _id in self._raw_element_ids:
return self._subvar_aliases[self._raw_element_ids.index(_id)]
except ValueError:
return None
if _id >= 0 and _id < len(self._subvar_aliases):
return self._subvar_aliases[_id]
return None
@lazyproperty
def _element_values_dict(self) -> Dict:
"""dict where the keys are the "ids" from zz9 dimension and elements are "value"
zz9 makes integer ids based on the position of the value, but puts the original
value in "value" for some dimension types, like datetime, numeric and text.
Since we've been using that "id" as if it were stable, we want a crossalk that
goes from that id to the real value.
"""
return {
el["id"]: el["value"] for el in self._dimension_dict["type"]["elements"]
}
@lazyproperty
def _has_mr_insertion(self) -> bool:
"""True if dimension type is MR and has insertions. False otherwise."""
if self.dimension_type == DT.MR_SUBVAR:
references = self._dimension_dict.get("references") or {}
view = references.get("view") or {}
mr_insertions = view.get("transform", {}).get("insertions", [])
return True if mr_insertions else False
return False
@lazyproperty
def _raw_element_ids(self) -> Tuple[Union[int, str], ...]:
"""tuple of int or string element ids, as they appear in cube result
These are "raw" because they refer to the element ids before they've been
replaced with the alias for subvariables in the `._shimmed_dimension_dict`.
"""
return tuple(
element["id"] for element in self._dimension_dict["type"]["elements"]
)
def _replaced_element_transforms(self, element_transforms) -> Dict:
"""Replace the dictionary keys of element transforms with aliases
The element transforms identify which element they refer to by their key in
the element_transforms object. Before it is shimmed, this can identify them
in many different ways. The shim replaces these with the alias.
"""
# --- The name of the element transform object is a string. This string is
# --- assumed to refer to the subvariable id, unless there is a "key", which
# --- can be "alias", in which case it refers to the alias, or if no
# --- subvariable is found with the id, in which case it's assumed to be the
# --- element_id
# --- TODO: This logic about "key" is not supported by the validation
# --- on the deck schema, so maybe we should remove it?
key = element_transforms.get("key")
if key == "alias":
# --- Already keyed by alias, no changes needed
return element_transforms
old_keys = tuple(element_transforms.keys())
if key == "subvar_id":
# --- translate from subvariable id
new_keys = tuple(
self._subvar_aliases[self._subvar_ids.index(_id)]
if _id in self._subvar_ids
else None
for _id in old_keys
)
else:
# --- Otherwise use usual translation logic
new_keys = tuple(self.translate_element_id(_id) for _id in old_keys)
return {
nkey: element_transforms[old_keys[i]]
for i, nkey in enumerate(new_keys)
if nkey is not None
}
def _replaced_order_element_ids(self, element_ids) -> List[Optional[str]]:
"""Replace the list of element ids with a list of aliases
The explicit order transform includes a list of ids that can be specified in
many different ways, this translate them to the subvariable aliases.
"""
return [self.translate_element_id(_id) for _id in element_ids]
@lazyproperty
def _subvar_aliases(self) -> Tuple[str, ...]:
"""tuple of str alias for each element of a subvariable dimension
Fall back to the `element_id` if the alias doesn't exist (this happens in one
fixture, but I don't think it can happen in production anymore.)
"""
return tuple(
element.get("value", {}).get("references", {}).get("alias", element["id"])
for element in self._dimension_dict["type"]["elements"]
)
@lazyproperty
def _subvar_ids(self) -> Tuple[Union[int, str], ...]:
"""tuple of str subvariable id for each element of a subvariable dimension
Only applicable to subvariables dimension (will return empty tuple if not
because a fused variables dimension reports nearly identical to a regular
MR dimension but does not have subvariable ids).
"""
try:
return tuple(
element["value"]["id"]
for element in self._dimension_dict["type"]["elements"]
)
except KeyError:
return tuple()
class Element:
"""A category or subvariable of a dimension.
This object resolves the transform cascade for element-level transforms.
"""
def __init__(self, element_dict, index, element_transforms, label_formatter):
self._element_dict = element_dict
self._index = index
self._element_transforms = element_transforms
self._label_formatter = label_formatter
def __repr__(self) -> str:
"""str of this element, which makes it easier to work in cosole."""
return self.label or self.alias
@lazyproperty
def alias(self) -> str:
"""str display-alias for this element."""
return self._str_representation_for("alias")
@lazyproperty
def anchor(self) -> Optional[Union[str, dict]]:
"""Optional str or dict defining the anchor for derived elements"""
if not self.derived:
return None
return self._element_dict.get("value", {}).get("references", {}).get("anchor")
@lazyproperty
def element_id(self) -> int:
"""int identifier for this category or subvariable."""
return self._element_dict["id"]
@lazyproperty
def derived(self) -> bool:
"""True if element is derived, False otherwise.
Multiple Response subvariable insertions are considered derived elements.
"""
value = self._element_dict.get("value")
if not value or not isinstance(value, dict):
return False
return value.get("derived", False)
@lazyproperty
def fill(self) -> str:
"""str RGB color like "#af032d" or None if not specified.
A value of None indicates the default fill should be used for this element.
A str value must be a hash character ("#") followed by six hexadecimal digits.
Three-character color contractions (like "#D07") are not valid.
"""
# ---first authority is fill transform in element transforms. The base value is
# ---the only prior authority and that is None (use default fill color).
return self._element_transforms.fill
@lazyproperty
def index(self) -> int:
"""int offset at which this element appears in dimension.
This position is based upon the document position of this element in
the cube response. No adjustment for missing elements is made.
"""
return self._index
@lazyproperty
def is_hidden(self) -> bool:
"""True if this element is explicitly hidden in this analysis."""
# ---first authority is hide transform in element transforms---
# ---default is not hidden, there is currently no prior-level hide transform---
return {True: True, False: False, None: False}[self._element_transforms.hide]
@lazyproperty
def label(self) -> str:
"""str display-name for this element.
This value is the empty string when no value has been specified or display of
the name has been suppressed.
"""
return self._str_representation_for("name")
@lazyproperty
def missing(self) -> bool:
"""True if this element represents missing data.
False if this category or subvariable represents valid (collected)
data.
"""
return bool(self._element_dict.get("missing"))
@lazyproperty
def numeric_value(self) -> Union[int, float]:
"""Numeric value assigned to element by user, np.nan if absent."""
numeric_value = self._element_dict.get("numeric_value")
return np.nan if numeric_value is None else numeric_value
def _str_representation_for(self, key: str) -> str:
"""return str representation for this element of a given key (`alias` or `name`)
This method handles elements for variables of all types, including
categorical, array (subvariable), numeric, datetime and text.
"""
# ---first authority is transform in element transforms---
value = getattr(self._element_transforms, key) if key == "name" else None
if value is not None:
return value if value else ""
# ---otherwise base-name/alias from element-dict is used according to the key---
element_dict = self._element_dict
# ---category elements have a name/alias item according to the key---
if key in element_dict:
value = element_dict[key]
return value if value else ""
# ---other types are more complicated---
value = element_dict.get("value")
type_value = type(value).__name__
if type_value == "NoneType":
return ""
if type_value == "list":
# ---like '10-15' or 'A-F'---
return "-".join([self._label_formatter(item) for item in value])
if type_value in ("float", "int", "str", "unicode"):
return self._label_formatter(value)
# ---For CA and MR subvar dimensions---
return value.get("references", {}).get(key) or ""
class _ElementTransforms:
"""A value object providing convenient access to transforms for a single element."""
def __init__(self, element_transforms_dict):
self._element_transforms_dict = element_transforms_dict
@lazyproperty
def fill(self) -> Optional[str]:
"""str RGB color like "#af032d" or None if not specified.
A value of None indicates no fill transform was specified for this element.
A str value must be a hash character ("#") followed by six hexadecimal digits.
Three-character color contractions (like "#D07") are not valid.
"""
fill = self._element_transforms_dict.get("fill")
if not fill:
return None
return fill
@lazyproperty
def hide(self) -> Optional[bool]:
"""Tri-value, True if this element has been explicitly hidden in this analysis.
False overrides any prior "hide" transform with "show" and None signifies
"inherit".
"""
hide = self._element_transforms_dict.get("hide")
# ---cover all of "omitted", "==None", and odd "==[]" or "==''" cases---
if hide is True:
return True
if hide is False:
return False
return None
@lazyproperty
def name(self) -> Optional[str]:
"""Optional str display-name for this element (None if not specified)."""
# ---if "name": element is omitted, no transform is specified---
if "name" not in self._element_transforms_dict:
return None
# ---otherwise normalize value to str, with an explicit value of None, [], 0,
# ---etc. becoming the empty string ("").
name = self._element_transforms_dict["name"]
return str(name) if name else ""
class _OrderSpec:
"""Value object providing convenient access to details of an order transform."""
def __init__(self, dimension, dimension_transforms_dict):
self._dimension = dimension
self._dimension_transforms_dict = dimension_transforms_dict
@lazyproperty
def bottom_fixed_ids(self) -> Tuple[int, ...]:
"""Tuple of each element-id appearing in the fixed.bottom field of order dict.
The element-ids appear in the order specified in the "bottom" fixed.bottom
field.
"""
return tuple(self._order_dict.get("fixed", {}).get("bottom", []))
@lazyproperty
def collation_method(self) -> CM:
"""Member of COLLATION_METHOD specifying ordering of dimension elements."""
method_keyword = self._order_dict.get("type")
if method_keyword is None or not CM.has_value(method_keyword):
return CM.PAYLOAD_ORDER
return CM(method_keyword)
@lazyproperty
def descending(self) -> bool:
"""True if sort direction is descending, False otherwise."""
return self._order_dict.get("direction", "descending") != "ascending"
@lazyproperty
def element_id(self) -> int:
"""int element id appearing in an order transform.
Raises KeyError if the transform dict does not contain an "element_id" field.
Note that not all order types use this field but it is a required field in all
those that do.
"""
return self._order_dict["element_id"]
@lazyproperty
def element_ids(self) -> Tuple[int, ...]:
"""tuple of int each element id appearing in an explicit order transform.
This value is `()` if no "element_ids": field is present.
"""
return tuple(self._order_dict.get("element_ids") or [])
@lazyproperty
def insertion_id(self) -> int:
"""int insertion-id in the "insertion_id" field of the transform dict.
Raises KeyError if this transform dict does not contain an "insertion_id" field.
"""
return self._order_dict["insertion_id"]
@lazyproperty
def marginal(self) -> MARGINAL:
"""Member of enums.MARGINAL corresponding to marginal: field in order transform.
Raises KeyError if the order dict has no "marginal": field and ValueError if the
value in that field is not a recognized marginal keyword. Note that not all
order types use the "marginal": field.
"""
return MARGINAL(self.marginal_keyname)
@lazyproperty
def marginal_keyname(self) -> str:
"""str value of "marginal": field in order transform.
Raises KeyError if the order dict has no "marginal": field. Note that not all
order types use the "measure": field, but it is a required field in all that do.
"""
return self._order_dict["marginal"]
@lazyproperty
def measure(self) -> MEASURE:
"""Member of enums.MEASURE corresponding to "measure": field in order transform.
Raises KeyError if the order dict has no "measure": field and ValueError if the
value in that field is not a recognized measure keyword. Note that not all order
types use the "measure": field.
"""
return MEASURE(self.measure_keyname)
@lazyproperty
def measure_keyname(self) -> str:
"""str value of "measure": field in order transform.
Raises KeyError if the order dict has no "measure": field. Note that not all
order types use the "measure": field, but it is a required field in all that do.
Note also that this property is required for the "univariate_measure" sort type
because its measure keywords do not correspond directly to members of the
MEASURE enumeration.
"""
return self._order_dict["measure"]
@lazyproperty
def top_fixed_ids(self) -> Tuple[int, ...]:
"""Tuple of each element-id appearing in the fixed.top field of order dict.
The element-ids appear in the order specified in the "top" fixed.top
field.
"""
return tuple(self._order_dict.get("fixed", {}).get("top", []))
@lazyproperty
def _order_dict(self) -> Dict:
"""dict dimension.transforms.order field parsed from JSON payload.
Value is `{}` if no "order": field is present.
"""
return self._dimension_transforms_dict.get("order") or {}
class _Subtotals(Sequence):
"""Sequence of _Subtotal objects for a dimension.
Each _Subtotal object represents a "subtotal" insertion transformation
defined for the dimension.
A subtotal can only involve valid (i.e. non-missing) elements.
"""
def __init__(self, insertion_dicts, valid_elements, from_view=True):
self._insertion_dicts = insertion_dicts
self._valid_elements = valid_elements
self._from_view = from_view
def __getitem__(self, idx_or_slice):
"""Implements indexed access."""
return self._subtotals[idx_or_slice]
def __iter__(self):
"""Implements (efficient) iterability."""
return iter(self._subtotals)
def __len__(self):
"""Implements len(subtotals)."""
return len(self._subtotals)
@lazyproperty
def bogus_ids(self) -> Tuple[str, ...]:
"""Tuple of bogus id for all the subtotals.
It enumerates the insertions id adding the `ins_` prefix:
("ins_1", "ins_5", "ins_2")
"""
return tuple([f"ins_{subtotal.insertion_id}" for subtotal in self._subtotals])
@lazyproperty
def insertion_ids(self) -> Tuple[int, ...]:
"""Tuple of insertion ids"""
return tuple([subtotal.insertion_id for subtotal in self._subtotals])
@lazyproperty
def _element_ids(self):
"""frozenset of int id of each non-missing cat or subvar in dim."""
return frozenset(self._valid_elements.element_ids)
def _iter_valid_subtotal_dicts(self):
"""Generate each insertion dict that represents a valid subtotal."""
for insertion_dict in self._insertion_dicts:
# ---skip any non-dicts---
if not isinstance(insertion_dict, dict):
continue
# ---skip any non-subtotal insertions---
if insertion_dict.get("function") != "subtotal":
continue
# ---skip any hidden insertions---
if insertion_dict.get("hide") is True:
continue
# ---skip any malformed subtotal-dicts---
if not {"anchor", "name"}.issubset(insertion_dict.keys()):
continue
# ---use "new style" kwargs defining positive terms if available---
# ---but if missing, use "old style" args defining positive terms---
positive = insertion_dict.get("kwargs", {}).get(
"positive"
) or insertion_dict.get("args", [])
negative = insertion_dict.get("kwargs", {}).get("negative", [])
# ---must have positive or negative elements---
if not (positive or negative):
continue
# ---skip if doesn't reference at least one non-missing element---
if not self._element_ids.intersection(positive + negative):
continue
# ---an insertion-dict that successfully runs this gauntlet
# ---is a valid subtotal dict
yield insertion_dict
def _position_crosswalk(self, subtotal_dicts):
"""dict mapping position in definition with position in payload order."""
element_ids = self._valid_elements.element_ids
# --- Divide insertions by anchor type
# --- Since MR insertions come from zz9, only concerned with categorical ones
first = []
after = {}
last = []
for idx, ins in enumerate(subtotal_dicts):
if ins["anchor"] == "top":
first.append(idx)
elif ins["anchor"] == "bottom":
last.append(idx)
elif ins["anchor"] in element_ids:
cat_str = str(ins["anchor"])
new = after.get(cat_str, [])
new.append(idx)
after[cat_str] = new
else:
# --- put on bottom if anchor is malformed
last.append(idx)
insertion_order = first
# --- Go through elements in order, and add the insertions anchored to them
for element in element_ids:
if str(element) in after:
insertion_order = insertion_order + after[str(element)]
insertion_order = insertion_order + last
return {pos: idx + 1 for idx, pos in enumerate(insertion_order)}
@lazyproperty
def _subtotals(self):
"""Composed tuple storing actual sequence of _Subtotal objects."""
return tuple(
_Subtotal(subtotal_dict, self._valid_elements)
for subtotal_dict in self._valid_subtotal_dicts_with_ids
)
@lazyproperty
def _valid_subtotal_dicts_with_ids(self):
"""list of valid subtotal dicts guaranteed to have ids."""
subtotal_dicts = list(self._iter_valid_subtotal_dicts())
# --- Case 1: All insertions have ids so we don't have to worry about
# --- generating them
if all("id" in ins for ins in subtotal_dicts):
return subtotal_dicts
# --- Case 2: Not all insertions have ids, and we're working with insertions
# --- from the variable view. Unfortunately, the frontend determines insertion
# --- id based on the position in payload order, not based on their position
# --- in the list they are defined in. In this codebase, we want to pass the
# --- complete _Subtotals object into the collator, and so need to duplicate
# --- the collating logic for payload order in `_position_crosswalk`.
if self._from_view:
position_crosswalk = self._position_crosswalk(subtotal_dicts)
return [
ins if "id" in ins else {**ins, "id": position_crosswalk[idx]}
for idx, ins in enumerate(subtotal_dicts)
]
# --- Case 3: Not all insertions have ids, and we're working with the insertions
# --- from the transform. There is no way to match insertions on transform
# --- without an id to the view insertions because both name and position can
# --- change. Generate ids starting from 1 in their defined order because this
# --- matches old behavior and we haven't noticed a bug yet.
# --- It's possible that the correct thing to do would be to generate ids
# --- that are guaranteed not to clash with the view insertion ids, because
# --- we don't want to accidentally match such an insertion to a view one,
# --- but I'm not sure it matters.
return [
ins if "id" in ins else {**ins, "id": idx + 1}
for idx, ins in enumerate(subtotal_dicts)
]
class _Subtotal:
"""A subtotal insertion on a cube dimension.
Assumes that the caller has already handled adding ids to `_subtotal_dict`.
"""
def __init__(self, subtotal_dict, valid_elements):
self._subtotal_dict = subtotal_dict
self._valid_elements = valid_elements
@lazyproperty
def alias(self) -> str:
"""str display alias for this subtotal."""
alias = self._subtotal_dict.get("alias")
return alias if alias else ""
@lazyproperty
def anchor(self) -> Union[int, str]:
"""int or str indicating element under which to insert this subtotal.
An int anchor is the id of the dimension element (category or
subvariable) under which to place this subtotal. The return value can
also be one of 'top' or 'bottom'.
The return value defaults to 'bottom' for an anchor referring to an
element that is no longer present in the dimension or an element that
represents missing data.
"""
anchor = self._subtotal_dict["anchor"]
if anchor is None:
# In the case of undefined anchor default to "bottom"
return "bottom"
try:
anchor = int(anchor)
if anchor not in self._valid_elements.element_ids:
# In the case of a non-valid int id, default to "bottom"
return "bottom"
return anchor
except (TypeError, ValueError):
return anchor.lower()
@lazyproperty
def addend_ids(self) -> Tuple[int, ...]:
"""tuple of int ids of elements contributing to this subtotal.
Any element id not present in the dimension or present but
representing missing data is excluded.
"""
# ---Prefer positive "kwargs" over "args" so we can migrate---
positive = self._subtotal_dict.get("kwargs", {}).get(
"positive"
) or self._subtotal_dict.get("args", [])
return tuple(arg for arg in positive if arg in self._valid_elements.element_ids)
@lazyproperty
def addend_idxs(self) -> np.ndarray:
"""ndarray of int base-element offsets contributing to this subtotal.
Suitable for directly indexing a numpy array object (such as base values or
margin) to extract the addend values for this subtotal.
"""
addend_ids = self.addend_ids
return np.fromiter(
(
idx
for idx, vector in enumerate(self._valid_elements)
if vector.element_id in addend_ids
),
dtype=np.int64, # force int so it can be used as index even if empty
)
@lazyproperty
def fill(self) -> str:
"""str RGB color like "#af032d" or None if not specified.
A value of None indicates the default fill should be used for this element.
A str value must be a hash character ("#") followed by six hexadecimal digits.
Three-character color contractions (like "#D07") are not valid.
"""
# ---first authority is fill transform in element transforms. The base value is
# ---the only prior authority and that is None (use default fill color).
return self._subtotal_dict.get("fill", None)
@lazyproperty
def insertion_id(self) -> int:
"""int unique identifier of this subtotal within this dimension's insertions."""
return self._subtotal_dict["id"]
@lazyproperty
def is_difference(self) -> bool:
"""True if a subtotal is a difference, False otherwise."""
return bool(self.subtrahend_ids)
@lazyproperty
def label(self) -> str:
"""str display name for this subtotal, suitable for use as label."""
name = self._subtotal_dict.get("name")
return name if name else ""
@lazyproperty
def subtrahend_ids(self) -> Tuple[int, ...]:
"""tuple of int ids of elements of negative part of the subtotal.
Any element id not present in the dimension or present but
representing missing data is excluded.
"""
return tuple(
arg
for arg in self._subtotal_dict.get("kwargs", {}).get("negative", [])
if arg in self._valid_elements.element_ids
)
@lazyproperty
def subtrahend_idxs(self) -> np.ndarray:
"""int ndarray base-element offsets contributing to the negative part of subtot.
Suitable for directly indexing a numpy array object (such as base values or
margin) to extract the addend values for this subtotal.
"""
return np.fromiter(
(
idx
for idx, vector in enumerate(self._valid_elements)
if vector.element_id in self.subtrahend_ids
),
dtype=np.int64, # force int so it can be used as index even if empty
)