"""Reprocessing Lists and Queues
This modules defines what process lists are and queues of process lists.
A process list, `ProcessList`, is a list of (items, rules) and meta information
, most notably `work_over`. `work_over` is one of the values of `ListCategory`.
A `ListCategory` defines which stage of association processing the list is
relevant to. In other words, the order, or priority, of when a list should be processed
is defined by its `ListCategory`. The priority is the value of each `ListCategory`,
starting with zero.
ProcessLists are primarily put into queues for processing. There are two
queues for handling ProcessLists. `ProcessListQueue` is a basic
First-In-First-Out (FIFO) queue that can be used as a generator.
The second queue is `ProcessQueueSorted`, which returns ProcessLists according to
their priority as defined by each ProcessList's `work_over`. An important aspect of
ProcessQueueSorted is that it is mutable: New ProcessLists can be added to the queue
while iterating over the queue.
"""
from collections import deque
from enum import Enum
from functools import reduce
__all__ = [
'ListCategory',
'ProcessList',
'ProcessItem',
'ProcessQueue',
'ProcessQueueSorted'
]
[docs]
class ListCategory(Enum):
"""The work_over categories for ProcessLists"""
RULES = 0 # Operate over rules only
BOTH = 1 # Operate over both rules and existing associations
EXISTING = 2 # Operate over existing associations only
NONSCIENCE = 3 # Items that are not science specific that should be applied to only
# existing associations
[docs]
class ProcessItem:
"""Items to be processed
Create hashable objects from a list of arbitrary objects.
Parameters
----------
obj : object
The object to make a `ProcessItem`.
Objects must be equatable.
"""
def __init__(self, obj):
self.obj = obj
[docs]
@classmethod
def to_process_items(cls, iterable):
"""Iterable to convert a list to ProcessItem's
Parameters
----------
iterable : iterable
A source of objects to convert
Returns
-------
An iterable where the object has been
converted to a `ProcessItem`
"""
for obj in iterable:
yield cls(obj)
def __hash__(self):
try:
hash_value = self.obj.__hash__()
except (AttributeError, TypeError):
hash_value = hash(repr(self))
return hash_value
def __eq__(self, other):
try:
equality = self.obj == other.obj
except AttributeError:
equality = self.__hash__() == other.__hash__()
return equality
[docs]
class ProcessList:
"""A Process list
Parameters
----------
items : [item[, ...]]
The list of items to process
rules : [Association[, ...]]
List of rules to process the items against.
work_over : int
What the reprocessing should work on:
- `ProcessList.RULES`: Only on the rules to create new associations
- `ProcessList.EXISTING`: Only existing associations
- `ProcessList.BOTH`: Compare to both existing and rules
- `ProcessList.NONSCIENCE`: Only on non-science items
only_on_match : bool
Only use this object if the overall condition
is True.
trigger_constraints : [Constraint[,...]]
The constraints that created the ProcessList
trigger_rules : [Association[,...]]
The association rules that created the ProcessList
"""
_str_attrs = ('rules', 'work_over', 'only_on_match', 'trigger_constraints', 'trigger_rules')
def __init__(self, items=None, rules=None,
work_over=ListCategory.BOTH, only_on_match=False,
trigger_constraints=None, trigger_rules=None):
self.items = items
self.rules = rules
self.work_over = work_over
self.only_on_match = only_on_match
self.trigger_constraints = set(trigger_constraints) if trigger_constraints else set()
self.trigger_rules = set(trigger_rules) if trigger_rules else set()
@property
def hash(self):
"""Create a unique hash"""
return (tuple(self.rules), self.work_over, self.only_on_match)
[docs]
def update(self, process_list, full=False):
"""Update with information from ProcessList
Attributes from `process_list` are added to self's attributes. If `not
full`, the attributes `rules`, 'work_over`, and `only_on_match` are not
taken.
Note that if `full`, destructive action will occur with respect to
`work_over` and `only_on_match`.
Parameters
----------
process_list : ProcessList
The source process list to absorb.
full : bool
Include the hash attributes `rules`, `work_over`, and `only_on_match`.
"""
self.items += process_list.items
self.trigger_constraints.update(process_list.trigger_constraints)
self.trigger_rules.update(process_list.trigger_rules)
if full:
self.rules += process_list.rules
self.work_over = process_list.work_over
self.only_on_match = process_list.only_on_match
def __str__(self):
result = '{}(n_items: {}, {})'.format(
self.__class__.__name__,
len(self.items),
{
str_attr: getattr(self, str_attr)
for str_attr in self._str_attrs
}
)
return result
[docs]
class ProcessQueue(deque):
"""Make a deque iterable and mutable"""
def __iter__(self):
while True:
try:
yield self.popleft()
except:
break
class ProcessListQueue:
"""First-In-First-Out queue of ProcessLists
ProcessLists can be added either individually using `append` method, or
a list of ProcessLists can be added through object initialization or
the `extend` method.
There are two generators implement. The first is the ProcessListQueue
object itself. When the object is used as a generator, the generator will
return the earliest ProcessList added to the queue (FIFO), popping that
ProcessList from the queue, hence draining the queue.
The second generator is returned by the `items` method. This method will
return all the items from all the ProcessLists in the queue,
non-destructively. The ProcessLists are accessed in their order in the
queue, and then each item is retrieved from their ProcessList in the list
order of the ProcessList.
A final feature of ProcessListQueue is that it is mutable: New items can
be added to the queue while items are being popped from the queue.
Parameters
----------
init : [ProcessList[,...]] or None
List of ProcessLists to put on the queue.
Notes
-----
The FIFO operations depends on the fact that, inherently,
`dict` preserves order in which key/value pairs are added to the
dictionary.
"""
def __init__(self, init=None):
self._queue = dict()
if init is not None:
self.extend(init)
def append(self, process_list):
"""Add ProcessList to queue, if not already in the queue."""
plhash = process_list.hash
if plhash not in self._queue:
self._queue[plhash] = process_list
else:
self._queue[plhash].update(process_list)
def extend(self, iterable):
"""Add lists of ProcessLists if not already in the queue"""
for process_list in iterable:
self.append(process_list)
def items(self):
"""Return list generator of all items"""
for plhash in self._queue:
for item in self._queue[plhash].items:
yield item
def popleft(self):
"""Pop the first-in object"""
plhash = next(iter(self._queue))
process_list = self._queue[plhash]
del self._queue[plhash]
return process_list
def __len__(self):
return len(self._queue)
def __iter__(self):
while True:
try:
yield self.popleft()
except:
break
def __str__(self):
result = f'{self.__class__.__name__}: rulesets {len(self)} items {len(list(self.items()))}'
return result
[docs]
class ProcessQueueSorted:
"""Sort ProcessItem based on work_over
Create a generator that implements a First-In-First-Out (FIFO) queue, with the one
modification that the queues are handled in order of their `work_over` priority.
For example, even if a ProcessList with work_over of ListCategory.EXISTING had
been added to the queue before a ProcessList with work_over of ListCategory.RULES,
the second ProcessList will be returned before the first.
ProcessQueueSorted is also mutable: ProcessLists can be added to the queue
while the lists are being popped from the queue. When doing so, it is
important to remember that the order of return, as described above, still
pertains. For example, if the queue only has ProcessLists of work_over
ListCategory.EXISTING, and a new ProcessList of work_over
ListCategory.RULES is added during iteration, the next list returned will
be the RULES one, because the RULES lists have priority over EXISTING
lists, regardless of when the list was added.
Parameters
----------
init : [ProcessList[,...]]
List of `ProcessList` to start the queue with.
"""
def __init__(self, init=None):
self.queues = {
list_category: ProcessListQueue()
for list_category in ListCategory
}
if init is not None:
self.extend(init)
[docs]
def extend(self, process_lists):
"""Add the list of process items to their appropriate queues"""
for process_list in process_lists:
self.queues[process_list.work_over].append(process_list)
def __iter__(self):
"""Return the queues in order"""
while len(self) > 0:
for category in ListCategory:
for process_list in self.queues[category]:
yield process_list
break
else:
continue
break
def __len__(self):
return reduce(lambda x, y: x + len(y), self.queues.values(), 0)
def __str__(self):
result = f'{self.__class__.__name__}:'
for queue in self.queues:
result += f'\n\tQueue {queue}: {self.queues[queue]}'
return result
def workover_filter(process_list, work_over):
"""Determine and modify workover of input process list
Parameters
---------
process_list : ProcessList
The process list under consideration
work_over : ListCategory
The ListCategory to compare against.
Returns
-------
process_list : ProcessList or None
The input process_list with work_over modified.
None if the process list should not be continued.
"""
result = process_list
if process_list.work_over in [ListCategory.RULES, ListCategory.BOTH]:
if work_over in [ListCategory.RULES, ListCategory.BOTH]:
result.work_over = ListCategory.BOTH
else:
result = None
elif work_over not in [ListCategory.RULES, ListCategory.BOTH]:
result = None
return result