Source code for hybrid.decomposers

# Copyright 2018 D-Wave Systems Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import collections
import random
import itertools
from heapq import heappush, heappop
from functools import partial

import dimod
import networkx as nx

from hybrid.core import Runnable, State
from hybrid.exceptions import EndOfStream
from hybrid import traits
from hybrid.utils import (
    bqm_induced_by, flip_energy_gains, select_random_subgraph,
    chimera_tiles)

__all__ = [
    'IdentityDecomposer', 'EnergyImpactDecomposer', 'RandomSubproblemDecomposer',
    'TilingChimeraDecomposer', 'RandomConstraintDecomposer',
    'RoofDualityDecomposer',
]

logger = logging.getLogger(__name__)


[docs]class IdentityDecomposer(traits.ProblemDecomposer, traits.SISO, Runnable): """Selects a subproblem that is a full copy of the problem.""" def next(self, state, **runopts): return state.updated(subproblem=state.problem)
[docs]class EnergyImpactDecomposer(traits.ProblemDecomposer, traits.SISO, Runnable): """Selects a subproblem of variables maximally contributing to the problem energy. The selection currently implemented does not ensure that the variables are connected in the problem graph. Args: size (int): Nominal number of variables in the subproblem. Actual subproblem can be smaller, depending on other parameters (e.g. `min_gain`). min_gain (int, optional, default=-inf): Minimum reduction required to BQM energy, given the current sample. A variable is included in the subproblem only if inverting its sample value reduces energy by at least this amount. rolling (bool, optional, default=True): If True, successive calls for the same problem (with possibly different samples) produce subproblems on different variables, selected by rolling down the list of all variables sorted by decreasing impact. rolling_history (float, optional, default=1.0): Fraction of the problem size, as a float in range 0.0 to 1.0, that should participate in the rolling selection. Once reached, subproblem unrolling is reset. silent_rewind (bool, optional, default=True): If False, raises :exc:`EndOfStream` when resetting/rewinding the subproblem generator upon the reset condition for unrolling. traversal (str, optional, default='energy'): Traversal algorithm used to pick a subproblem of `size` variables. Options are: energy: Use the next `size` variables in the list of variables ordered by descending energy impact. bfs: Breadth-first traversal seeded by the next variable in the energy impact list. pfs: Priority-first traversal seeded by variables from the energy impact list, proceeding with the variable on the search boundary that has the highest energy impact. See :ref:`decomposers-examples`. """ @classmethod def _energy(cls, bqm, sample, ordered_priority, visited, size): return list(itertools.islice( (v for v in ordered_priority if v not in visited), size)) @classmethod def _bfs_nodes(cls, graph, source, size, **kwargs): """Traverse `graph` with BFS starting from `source`, up to `size` nodes. Return an iterator of subgraph nodes (including source node). """ if size < 1: return iter(()) return itertools.chain( (source,), itertools.islice((v for u, v in nx.bfs_edges(graph, source)), size-1) ) @classmethod def _pfs_nodes(cls, graph, source, size, priority): """Priority-first traversal of `graph` starting from `source` node, returning up to `size` nodes iterable. Node priority is determined by `priority(node)` callable. Nodes with higher priority value are traversed before nodes with lower priority. """ if size < 1: return iter(()) # use min-heap to implement (max) priority queue # use insertion order to break priority tie queue = [] counter = itertools.count() push = lambda priority, node: heappush(queue, (-priority, next(counter), node)) pop = partial(heappop, queue) visited = set() enqueued = set() push(priority(source), source) while queue and len(visited) < size: _, _, node = pop() if node in visited: continue visited.add(node) for neighbor in graph[node]: if neighbor not in enqueued: enqueued.add(neighbor) push(priority(neighbor), neighbor) return iter(visited) @classmethod def _iterative_graph_search(cls, bqm, sample, ordered_priority, visited, size, method): """Traverse `bqm` graph using multi-start graph search `method`, until `size` variables are selected. Each subgraph is seeded from `ordered_priority` ordered dictionary. Note: a lot of room for optimization. Nx graph could be cached, or we could use a search/traverse method (BFS/PFS) which accepts a "node mask" - set of visited nodes. """ graph = bqm.to_networkx_graph() graph.remove_nodes_from(visited) variables = set() order = iter(ordered_priority) while len(variables) < size and len(graph): # find the next untraversed variable in (energy) order try: source = next(order) while source in visited or source in variables: source = next(order) except StopIteration: break # get a subgraph induced by source nodes = list( method(graph, source, size - len(variables), priority=ordered_priority.get)) variables.update(nodes) # in next iteration we traverse a reduced BQM graph graph.remove_nodes_from(nodes) return variables def __init__(self, size, min_gain=None, rolling=True, rolling_history=1.0, silent_rewind=True, traversal='energy', **runopts): traversers = { 'energy': self._energy, 'bfs': partial(self._iterative_graph_search, method=self._bfs_nodes), 'pfs': partial(self._iterative_graph_search, method=self._pfs_nodes), } super(EnergyImpactDecomposer, self).__init__(**runopts) if rolling and rolling_history < 0.0 or rolling_history > 1.0: raise ValueError("rolling_history must be a float in range [0.0, 1.0]") if traversal not in traversers: raise ValueError("traversal mode not supported: {}".format(traversal)) self.size = size self.min_gain = min_gain self.rolling = rolling self.rolling_history = rolling_history self.silent_rewind = silent_rewind self.traverse = traversers[traversal] # variables unrolled so far self._unrolled_vars = set() self._rolling_bqm = None # variable energy impact caching self._variable_priority = collections.OrderedDict() self._prev_sample = None def __repr__(self): return ( "{self}(size={self.size!r}, min_gain={self.min_gain!r}, " "rolling={self.rolling!r}, rolling_history={self.rolling_history!r}, " "silent_rewind={self.silent_rewind!r})" ).format(self=self) def _rewind_rolling(self, state): self._unrolled_vars.clear() self._rolling_bqm = state.problem self._rolling_sample = state.sample def next(self, state, **runopts): # run time options override silent_rewind = runopts.get('silent_rewind', self.silent_rewind) bqm = state.problem sample = state.samples.change_vartype(bqm.vartype).first.sample size = self.size if size > len(bqm): logger.debug("{} subproblem size greater than the problem size, " "adapting to problem size".format(self.name)) size = len(bqm) bqm_changed = bqm != self._rolling_bqm sample_changed = sample != self._prev_sample if bqm_changed: self._rewind_rolling(state) if sample_changed: self._prev_sample = sample # cache energy impact calculation per (bqm, sample) if bqm_changed or sample_changed or not self._variable_priority: impact = flip_energy_gains(bqm, sample, min_gain=self.min_gain) self._variable_priority = collections.OrderedDict((v, en) for en, v in impact) if self.rolling: if len(self._unrolled_vars) >= self.rolling_history * len(bqm): logger.debug("{} reset rolling at unrolled history size {}".format( self.name, len(self._unrolled_vars))) self._rewind_rolling(state) # reset before exception, to be ready on a subsequent call if not silent_rewind: raise EndOfStream # pick variables for the next subproblem next_vars = self.traverse(bqm, sample, ordered_priority=self._variable_priority, visited=self._unrolled_vars, size=size) logger.debug("{} selected {} subproblem variables: {!r}".format( self.name, len(next_vars), next_vars)) if self.rolling: self._unrolled_vars.update(next_vars) # induce sub-bqm based on selected variables and global sample subbqm = bqm_induced_by(bqm, next_vars, sample) return state.updated(subproblem=subbqm)
[docs]class RandomSubproblemDecomposer(traits.ProblemDecomposer, traits.SISO, Runnable): """Selects a subproblem of `size` random variables. The selection currently implemented does not ensure that the variables are connected in the problem graph. Args: size (int): Number of variables in the subproblem. See :ref:`decomposers-examples`. """ def __init__(self, size, **runopts): super(RandomSubproblemDecomposer, self).__init__(**runopts) self.size = size def __repr__(self): return "{self}(size={self.size!r})".format(self=self) def next(self, state, **runopts): bqm = state.problem size = self.size if size > len(bqm): logger.debug("{} subproblem size greater than the problem size, " "adapting to problem size".format(self.name)) size = len(bqm) variables = select_random_subgraph(bqm, size) sample = state.samples.change_vartype(bqm.vartype).first.sample subbqm = bqm_induced_by(bqm, variables, sample) logger.debug("{} selected {} subproblem variables: {!r}".format( self.name, len(variables), variables)) return state.updated(subproblem=subbqm)
[docs]class RoofDualityDecomposer(traits.ProblemDecomposer, traits.ProblemSampler, traits.SISO, Runnable): """Selects a subproblem with variables that cannot be fixed by roof duality. Roof duality finds a lower bound for the minimum of a quadratic polynomial. It can also find minimizing assignments for some of the polynomial's variables; these fixed variables take the same values in all optimal solutions [BHT]_ [BH]_. A quadratic pseudo-Boolean function can be represented as a network to find the lower bound through network-flow computations. This decomposer can also use maximum flow in the implication network to fix variables. Consequently, you can find an assignment for the remaining variables that attains the optimal value. Args: sampling_mode (bool, optional, default=True): In sampling mode, only roof-duality is used. When `sampling_mode` is false, strongly connected components are used to fix more variables, but in some optimal solutions these variables may take different values. .. [BHT] Boros, E., P.L. Hammer, G. Tavares. Preprocessing of Unconstraint Quadratic Binary Optimization. Rutcor Research Report 10-2006, April, 2006. .. [BH] Boros, E., P.L. Hammer. Pseudo-Boolean optimization. Discrete Applied Mathematics 123, (2002), pp. 155-225 """ def __init__(self, sampling_mode=True, **runopts): super(RoofDualityDecomposer, self).__init__(**runopts) self.sampling_mode = sampling_mode def __repr__(self): return "{self.name}(sampling_mode={self.sampling_mode!r})".format(self=self) def next(self, state, **runopts): bqm = state.problem sampleset = state.samples fixed_vars = dimod.fix_variables(bqm, sampling_mode=self.sampling_mode) # make a new bqm of everything not-fixed subbqm = bqm.copy() subbqm.fix_variables(fixed_vars) # update the existing state with the fixed variables newsampleset = sampleset.copy() for v, val in fixed_vars.items(): # index lookups on variables are fast for SampleSets newsampleset.record.sample[:, newsampleset.variables.index(v)] = val # make sure the energies reflect the changes newsampleset.record.energy = bqm.energies(newsampleset) return state.updated(subproblem=subbqm, samples=newsampleset)
[docs]class TilingChimeraDecomposer(traits.ProblemDecomposer, traits.EmbeddingProducing, traits.SISO, Runnable): """Returns sequential Chimera lattices that tile the initial problem. A Chimera lattice is an m-by-n grid of Chimera tiles, where each tile is a bipartite graph with shores of size t. The problem is decomposed into a sequence of subproblems with variables belonging to the Chimera lattices that tile the problem Chimera lattice. For example, a 2x2 Chimera lattice could be tiled 64 times (8x8) on a fully-yielded D-Wave 2000Q system (16x16). Args: size (int, optional, default=(4,4,4)): Size of the Chimera lattice as (m, n, t), where m is the number of rows, n the columns, and t the size of shore in the Chimera lattice. loop (Bool, optional, default=True): Cycle continually through the tiles. See :ref:`decomposers-examples`. """ def __init__(self, size=(4,4,4), loop=True, **runopts): """Size C(n,m,t) defines a Chimera subgraph returned with each call.""" super(TilingChimeraDecomposer, self).__init__(**runopts) self.size = size self.loop = loop self.blocks = None def __repr__(self): return "{self}(size={self.size!r}, loop={self.loop!r})".format(self=self) def init(self, state, **runopts): self.blocks = iter(chimera_tiles(state.problem, *self.size).items()) if self.loop: self.blocks = itertools.cycle(self.blocks) def next(self, state, **runopts): """Each call returns a subsequent block of size `self.size` Chimera cells.""" bqm = state.problem pos, embedding = next(self.blocks) variables = embedding.keys() sample = state.samples.change_vartype(bqm.vartype).first.sample subbqm = bqm_induced_by(bqm, variables, sample) return state.updated(subproblem=subbqm, embedding=embedding)
[docs]class RandomConstraintDecomposer(traits.ProblemDecomposer, traits.SISO, Runnable): """Selects variables randomly as constrained by groupings. By grouping related variables, the problem's structure can guide the random selection of variables so subproblems are related to the problem's constraints. Args: size (int): Number of variables in the subproblem. constraints (list[set]): Groups of variables in the BQM, as a list of sets, where each set is associated with a constraint. See :ref:`decomposers-examples`. """ def __init__(self, size, constraints, **runopts): super(RandomConstraintDecomposer, self).__init__(**runopts) self.size = size if not isinstance(constraints, collections.Sequence): raise TypeError("constraints should be a list of containers") if any(len(const) > size for const in constraints): raise ValueError("size must be able to contain the largest constraint") self.constraints = constraints def __repr__(self): return "{self}(size={self.size!r}, constraints={self.constraints!r})".format(self=self) def init(self, state, **runopts): if self.size > len(state.problem): raise ValueError("subproblem size cannot be greater than the problem size") # get the connectivity between the constraint components self.constraint_graph = CG = nx.Graph() for ci, const in enumerate(self.constraints): for i in range(ci+1, len(self.constraints)): if any(v in const for v in self.constraints[i]): CG.add_edge(i, ci) if len(CG) < 1: raise ValueError("constraint graph empty") def next(self, state, **runopts): CG = self.constraint_graph size = self.size constraints = self.constraints bqm = state.problem # get a random constraint to start with n = random.choice(list(CG.nodes)) if len(constraints[n]) > size: raise NotImplementedError # starting from our constraint, do a breadth-first search adding constraints until our max # size is reached variables = set(constraints[n]) for _, ci in nx.bfs_edges(CG, n): proposed = [v for v in constraints[ci] if v not in variables] if len(proposed) + len(variables) <= size: variables.union(proposed) if len(variables) == size: # can exit early break sample = state.samples.change_vartype(bqm.vartype).first.sample subbqm = bqm_induced_by(bqm, variables, sample) return state.updated(subproblem=subbqm)