Source code for jwst.associations.generate

import logging
from timeit import default_timer as timer

from .association import make_timestamp
from .lib.process_list import (
    ListCategory,
    ProcessList,
    ProcessQueueSorted,
    workover_filter
)
from .pool import PoolRow
from ..lib.progress import Bar

# Configure logging
logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())

__all__ = ['generate']


[docs] def generate(pool, rules, version_id=None, finalize=True): """Generate associations in the pool according to the rules. Parameters ---------- pool : AssociationPool The pool to generate from. rules : AssociationRegistry The association rule set. version_id : None, True, or str The string to use to tag associations and products. If None, no tagging occurs. If True, use a timestamp If a string, the string. finalize : bool Run all rule methods marked as 'finalized'. Returns ------- associations : [Association[,...]] List of associations Notes ----- Refer to the :ref:`Association Generator <design-generator>` documentation for a full description. """ associations = [] if type(version_id) is bool: version_id = make_timestamp() process_queue = ProcessQueueSorted([ ProcessList( items=pool, rules=[rule for _, rule in rules.items()] ) ]) logger.debug('Initial process queue: %s', process_queue) for process_list in process_queue: logger.debug('** Working process list: %s', process_list) time_start = timer() total_mod_existing = 0 total_new = 0 total_reprocess = 0 with Bar('Processing items', log_level=logger.getEffectiveLevel(), max=len(process_list.items)) as bar: for item in process_list.items: item = PoolRow(item) existing_asns, new_asns, to_process = generate_from_item( item, version_id, associations, rules, process_list ) total_mod_existing += len(existing_asns) total_new += len(new_asns) associations.extend(new_asns) # If working on a process list EXISTING # remove any new `to_process` that is # also EXISTING. Prevent infinite loops. to_process_modified = [] for next_list in to_process: next_list = workover_filter(next_list, process_list.work_over) if next_list: to_process_modified.append(next_list) process_queue.extend(to_process_modified) total_reprocess += len(to_process_modified) bar.next() logger.debug('Existing associations modified: %d New associations created: %d', total_mod_existing, total_new) logger.debug('New process lists: %d', total_reprocess) logger.debug('Updated process queue: %s', process_queue) logger.debug('# associations: %d', len(associations)) logger.debug('Associations: %s', [type(_association) for _association in associations]) logger.debug('Seconds to process: %.2f\n', timer() - time_start) # Finalize found associations logger.debug('# associations before finalization: %d', len(associations)) finalized_asns = associations if finalize: logger.debug('Performing association finalization.') try: finalized_asns = rules.callback.reduce('finalize', associations) except KeyError as exception: logger.debug('Finalization failed for reason: %s', exception) return finalized_asns
def generate_from_item( item, version_id, associations, rules, process_list): """Either match or generate a new association Parameters ---------- item : dict The item to match to existing associations or generate new associations from version_id : str or None Version id to use with association creation. If None, no versioning is used. associations : [association, ...] List of already existing associations. If the item matches any of these, it will be added to them. rules : AssociationRegistry or None List of rules to create new associations process_list : ProcessList The `ProcessList` from which the current item belongs to. Returns ------- (associations, process_list): 3-tuple where existing_asns : [association,...] List of existing associations item belongs to. Empty if none match new_asns : [association,...] List of new associations item creates. Empty if none match process_list : [ProcessList, ...] List of process events. """ # Setup the rules allowed to be examined. if process_list.rules is None or len(process_list.rules) == 0: allowed_rules = list(rules.values()) else: allowed_rules = process_list.rules # Check membership in existing associations. existing_asns = [] reprocess_list = [] if process_list.work_over in ( ListCategory.BOTH, ListCategory.EXISTING, ListCategory.NONSCIENCE, ): associations = [ asn for asn in associations if type(asn) in allowed_rules ] existing_asns, reprocess_list = match_item( item, associations ) # Now see if this item will create new associations. # By default, a item will not be allowed to create # an association based on rules of existing associations. reprocess = [] new_asns = [] if process_list.work_over in ( ListCategory.BOTH, ListCategory.RULES, ) and rules is not None: ignore_asns = set([type(asn) for asn in existing_asns]) new_asns, reprocess = rules.match( item, version_id=version_id, allow=allowed_rules, ignore=ignore_asns, ) reprocess_list.extend(reprocess) return existing_asns, new_asns, reprocess_list def match_item(item, associations): """Match item to a list of associations Parameters ---------- item : dict The item to match to the associations. associations : [association, ...] List of already existing associations. If the item matches any of these, it will be added to them. Returns ------- (associations, process_list): 2-tuple where associations : [association,...] List of associations item belongs to. Empty if none match process_list : [ProcessList, ...] List of process events. """ item_associations = [] process_list = [] for asn in associations: if asn in item_associations: continue matches, reprocess = asn.add(item) process_list.extend(reprocess) if matches: item_associations.append(asn) return item_associations, process_list