# Copyright 2018 D-Wave Systems Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Classical and quantum :class:`.Runnable` `dimod <http://dimod.readthedocs.io/en/stable/>`_
samplers for problems and subproblems.
"""
import time
import logging
import threading
from collections import namedtuple
import dimod
from dwave.system.samplers import DWaveSampler
from dwave.system.composites import AutoEmbeddingComposite, FixedEmbeddingComposite
from tabu import TabuSampler
from neal import SimulatedAnnealingSampler
from hybrid.core import Runnable, SampleSet
from hybrid.flow import Loop
from hybrid.utils import random_sample
from hybrid import traits
__all__ = [
'QPUSubproblemExternalEmbeddingSampler', 'QPUSubproblemAutoEmbeddingSampler',
'ReverseAnnealingAutoEmbeddingSampler',
'SimulatedAnnealingSubproblemSampler', 'InterruptableSimulatedAnnealingSubproblemSampler',
'SimulatedAnnealingProblemSampler', 'InterruptableSimulatedAnnealingProblemSampler',
'TabuSubproblemSampler', 'TabuProblemSampler', 'InterruptableTabuSampler',
'RandomSubproblemSampler',
]
logger = logging.getLogger(__name__)
[docs]class QPUSubproblemExternalEmbeddingSampler(traits.SubproblemSampler,
traits.EmbeddingIntaking,
traits.SISO, Runnable):
r"""A quantum sampler for a subproblem with a defined minor-embedding.
Note:
Externally supplied embedding must be present in the input state.
Args:
num_reads (int, optional, default=100):
Number of states (output solutions) to read from the sampler.
qpu_sampler (:class:`dimod.Sampler`, optional, default=\ :class:`~dwave.system.samplers.DWaveSampler`\ ()):
Quantum sampler such as a D-Wave system.
sampling_params (dict):
Dictionary of keyword arguments with values that will be used
on every call of the (external-embedding-wrapped QPU) sampler.
See :ref:`samplers-examples`.
"""
def __init__(self, num_reads=100, qpu_sampler=None, sampling_params=None,
**runopts):
super(QPUSubproblemExternalEmbeddingSampler, self).__init__(**runopts)
self.num_reads = num_reads
if qpu_sampler is None:
qpu_sampler = DWaveSampler(solver=dict(qpu=True))
self.sampler = qpu_sampler
if sampling_params is None:
sampling_params = {}
self.sampling_params = sampling_params
def __repr__(self):
return ("{self}(num_reads={self.num_reads!r}, "
"qpu_sampler={self.sampler!r}, "
"sampling_params={self.sampling_params!r})").format(self=self)
def next(self, state, **runopts):
num_reads = runopts.get('num_reads', self.num_reads)
sampling_params = runopts.get('sampling_params', self.sampling_params)
params = sampling_params.copy()
params.update(num_reads=num_reads)
sampler = FixedEmbeddingComposite(self.sampler, embedding=state.embedding)
response = sampler.sample(state.subproblem, **params)
return state.updated(subsamples=response)
[docs]class QPUSubproblemAutoEmbeddingSampler(traits.SubproblemSampler, traits.SISO, Runnable):
r"""A quantum sampler for a subproblem with automated heuristic
minor-embedding.
Args:
num_reads (int, optional, default=100):
Number of states (output solutions) to read from the sampler.
qpu_sampler (:class:`dimod.Sampler`, optional, default=\ :class:`~dwave.system.samplers.DWaveSampler`\ ()):
Quantum sampler such as a D-Wave system. Subproblems that do not fit the
sampler's structure are minor-embedded on the fly with
:class:`~dwave.system.composites.AutoEmbeddingComposite`.
sampling_params (dict):
Dictionary of keyword arguments with values that will be used
on every call of the (embedding-wrapped QPU) sampler.
auto_embedding_params (dict, optional):
If provided, parameters are passed to the
:class:`~dwave.system.composites.AutoEmbeddingComposite` constructor
as keyword arguments.
See :ref:`samplers-examples`.
"""
def __init__(self, num_reads=100, qpu_sampler=None, sampling_params=None,
auto_embedding_params=None, **runopts):
super(QPUSubproblemAutoEmbeddingSampler, self).__init__(**runopts)
self.num_reads = num_reads
if qpu_sampler is None:
qpu_sampler = DWaveSampler(solver=dict(qpu=True))
if sampling_params is None:
sampling_params = {}
self.sampling_params = sampling_params
# embed on the fly and only if needed
if auto_embedding_params is None:
auto_embedding_params = {}
self.sampler = AutoEmbeddingComposite(qpu_sampler, **auto_embedding_params)
def __repr__(self):
return ("{self}(num_reads={self.num_reads!r}, "
"qpu_sampler={self.sampler!r}, "
"sampling_params={self.sampling_params!r})").format(self=self)
def next(self, state, **runopts):
num_reads = runopts.get('num_reads', self.num_reads)
sampling_params = runopts.get('sampling_params', self.sampling_params)
params = sampling_params.copy()
params.update(num_reads=num_reads)
response = self.sampler.sample(state.subproblem, **params)
return state.updated(subsamples=response)
[docs]class ReverseAnnealingAutoEmbeddingSampler(traits.SubproblemSampler,
traits.SubsamplesIntaking,
traits.SISO, Runnable):
r"""A quantum reverse annealing sampler for a subproblem with automated
heuristic minor-embedding.
Args:
num_reads (int, optional, default=100):
Number of states (output solutions) to read from the sampler.
anneal_schedule (list(list), optional, default=[[0, 1], [0.5, 0.5], [1, 1]]):
An anneal schedule defined by a series of pairs of floating-point
numbers identifying points in the schedule at which to change slope.
The first element in the pair is time t in microseconds; the second,
normalized persistent current s in the range [0,1]. The resulting
schedule is the piecewise-linear curve that connects the provided
points. For more details, see
:meth:`~dwave.system.DWaveSampler.validate_anneal_schedule`.
qpu_sampler (:class:`dimod.Sampler`, optional, default=\ :class:`~dwave.system.samplers.DWaveSampler`\ ()):
Quantum sampler such as a D-Wave system. Subproblems that do not fit the
sampler's structure are minor-embedded on the fly with
:class:`~dwave.system.composites.AutoEmbeddingComposite`.
sampling_params (dict):
Dictionary of keyword arguments with values that will be used
on every call of the (embedding-wrapped QPU) sampler.
auto_embedding_params (dict, optional):
If provided, parameters are passed to the
:class:`~dwave.system.composites.AutoEmbeddingComposite` constructor
as keyword arguments.
"""
def __init__(self, num_reads=100, anneal_schedule=None, qpu_sampler=None,
sampling_params=None, auto_embedding_params=None, **runopts):
super(ReverseAnnealingAutoEmbeddingSampler, self).__init__(**runopts)
self.num_reads = num_reads
if anneal_schedule is None:
anneal_schedule = [[0, 1], [0.5, 0.5], [1, 1]]
self.anneal_schedule = anneal_schedule
if qpu_sampler is None:
qpu_sampler = DWaveSampler(
solver=dict(qpu=True,
max_anneal_schedule_points__gte=len(self.anneal_schedule)))
# validate schedule, raising `ValueError` on invalid schedule or
# `RuntimeError` if anneal schedule not supported by QPU (this could
# happen only if user provided the `qpu_sampler`)
qpu_sampler.validate_anneal_schedule(anneal_schedule)
if sampling_params is None:
sampling_params = {}
self.sampling_params = sampling_params
# embed on the fly and only if needed
if auto_embedding_params is None:
auto_embedding_params = {}
self.sampler = AutoEmbeddingComposite(qpu_sampler, **auto_embedding_params)
def __repr__(self):
return ("{self}(num_reads={self.num_reads!r}, "
"anneal_schedule={self.anneal_schedule!r}, "
"qpu_sampler={self.sampler!r}, "
"sampling_params={self.sampling_params!r})").format(self=self)
def next(self, state, **runopts):
num_reads = runopts.get('num_reads', self.num_reads)
anneal_schedule = runopts.get('anneal_schedule', self.anneal_schedule)
sampling_params = runopts.get('sampling_params', self.sampling_params)
params = sampling_params.copy()
params.update(num_reads=num_reads, anneal_schedule=anneal_schedule)
# TODO: handle more than just the first subsample (not yet supported via API)
subsamples = self.sampler.sample(
state.subproblem, initial_state=state.subsamples.first.sample, **params)
return state.updated(subsamples=subsamples)
[docs]class SimulatedAnnealingSubproblemSampler(traits.SubproblemSampler, traits.SISO, Runnable):
"""A simulated annealing sampler for a subproblem.
Args:
num_reads (int, optional, default=len(state.subsamples) or 1):
Number of states (output solutions) to read from the sampler.
num_sweeps (int, optional, default=1000):
Number of sweeps or steps.
beta_range (tuple, optional):
A 2-tuple defining the beginning and end of the beta schedule, where
beta is the inverse temperature. The schedule is applied linearly in
beta. Default range is set based on the total bias associated with
each node.
beta_schedule_type (string, optional, default='geometric'):
Beta schedule type, or how the beta values are interpolated between
the given 'beta_range'. Supported values are: linear and geometric.
initial_states_generator (str, 'none'/'tile'/'random', optional, default='random'):
Defines the expansion of input state subsamples into `initial_states`
for the simulated annealing, if fewer than `num_reads` subsamples are
present. See :meth:`~neal.SimulatedAnnealingSampler.sample`.
See :ref:`samplers-examples`.
"""
def __init__(self, num_reads=None, num_sweeps=1000,
beta_range=None, beta_schedule_type='geometric',
initial_states_generator='random', **runopts):
super(SimulatedAnnealingSubproblemSampler, self).__init__(**runopts)
self.num_reads = num_reads
self.num_sweeps = num_sweeps
self.beta_range = beta_range
self.beta_schedule_type = beta_schedule_type
self.initial_states_generator = initial_states_generator
self.sampler = SimulatedAnnealingSampler()
self._stop_event = threading.Event()
def __repr__(self):
return ("{self}(num_reads={self.num_reads!r}, "
"num_sweeps={self.num_sweeps!r}, "
"initial_states_generator={self.initial_states_generator!r})").format(self=self)
def next(self, state, **runopts):
subsamples = self.sampler.sample(
state.subproblem, num_reads=self.num_reads, num_sweeps=self.num_sweeps,
beta_range=self.beta_range, beta_schedule_type=self.beta_schedule_type,
initial_states=state.subsamples,
initial_states_generator=self.initial_states_generator,
interrupt_function=lambda: self._stop_event.is_set())
return state.updated(subsamples=subsamples)
def halt(self):
self._stop_event.set()
class InterruptableSimulatedAnnealingSubproblemSampler(SimulatedAnnealingSubproblemSampler):
"""SimulatedAnnealingSubproblemSampler is already interruptable."""
pass
[docs]class SimulatedAnnealingProblemSampler(traits.ProblemSampler, traits.SISO, Runnable):
"""A simulated annealing sampler for a complete problem.
Args:
num_reads (int, optional, default=len(state.samples) or 1):
Number of states (output solutions) to read from the sampler.
num_sweeps (int, optional, default=1000):
Number of sweeps or steps.
beta_range (tuple, optional):
A 2-tuple defining the beginning and end of the beta schedule, where
beta is the inverse temperature. The schedule is applied linearly in
beta. Default range is set based on the total bias associated with
each node.
beta_schedule_type (string, optional, default='geometric'):
Beta schedule type, or how the beta values are interpolated between
the given 'beta_range'. Supported values are: linear and geometric.
initial_states_generator (str, 'none'/'tile'/'random', optional, default='random'):
Defines the expansion of input state samples into `initial_states`
for the simulated annealing, if fewer than `num_reads` samples are
present. See :meth:`~neal.SimulatedAnnealingSampler.sample`.
"""
def __init__(self, num_reads=None, num_sweeps=1000,
beta_range=None, beta_schedule_type='geometric',
initial_states_generator='random', **runopts):
super(SimulatedAnnealingProblemSampler, self).__init__(**runopts)
self.num_reads = num_reads
self.num_sweeps = num_sweeps
self.beta_range = beta_range
self.beta_schedule_type = beta_schedule_type
self.initial_states_generator = initial_states_generator
self.sampler = SimulatedAnnealingSampler()
self._stop_event = threading.Event()
def __repr__(self):
return ("{self}(num_reads={self.num_reads!r}, "
"num_sweeps={self.num_sweeps!r}, "
"initial_states_generator={self.initial_states_generator!r})").format(self=self)
def next(self, state, **runopts):
samples = self.sampler.sample(
state.problem, num_reads=self.num_reads, num_sweeps=self.num_sweeps,
beta_range=self.beta_range, beta_schedule_type=self.beta_schedule_type,
initial_states=state.samples,
initial_states_generator=self.initial_states_generator,
interrupt_function=lambda: self._stop_event.is_set())
return state.updated(samples=samples)
def halt(self):
self._stop_event.set()
class InterruptableSimulatedAnnealingProblemSampler(SimulatedAnnealingProblemSampler):
"""SimulatedAnnealingProblemSampler is already interruptable."""
pass
[docs]class TabuSubproblemSampler(traits.SubproblemSampler, traits.SISO, Runnable):
"""A tabu sampler for a subproblem.
Args:
num_reads (int, optional, default=len(state.subsamples) or 1):
Number of states (output solutions) to read from the sampler.
tenure (int, optional):
Tabu tenure, which is the length of the tabu list, or number of
recently explored solutions kept in memory. Default is a quarter of
the number of problem variables up to a maximum value of 20.
timeout (int, optional, default=100):
Total running time in milliseconds.
initial_states_generator (str, 'none'/'tile'/'random', optional, default='random'):
Defines the expansion of input state subsamples into `initial_states`
for the Tabu search, if fewer than `num_reads` subsamples are
present. See :meth:`~tabu.TabuSampler.sample`.
See :ref:`samplers-examples`.
"""
def __init__(self, num_reads=None, tenure=None, timeout=100,
initial_states_generator='random', **runopts):
super(TabuSubproblemSampler, self).__init__(**runopts)
self.num_reads = num_reads
self.tenure = tenure
self.timeout = timeout
self.initial_states_generator = initial_states_generator
self.sampler = TabuSampler()
def __repr__(self):
return ("{self}(num_reads={self.num_reads!r}, "
"tenure={self.tenure!r}, "
"timeout={self.timeout!r}, "
"initial_states_generator={self.initial_states_generator!r})").format(self=self)
def next(self, state, **runopts):
subsamples = self.sampler.sample(
state.subproblem, initial_states=state.subsamples,
initial_states_generator=self.initial_states_generator,
tenure=self.tenure, timeout=self.timeout, num_reads=self.num_reads)
return state.updated(subsamples=subsamples)
[docs]class TabuProblemSampler(traits.ProblemSampler, traits.SISO, Runnable):
"""A tabu sampler for a binary quadratic problem.
Args:
num_reads (int, optional, default=len(state.samples) or 1):
Number of states (output solutions) to read from the sampler.
tenure (int, optional):
Tabu tenure, which is the length of the tabu list, or number of
recently explored solutions kept in memory. Default is a quarter of
the number of problem variables up to a maximum value of 20.
timeout (int, optional, default=100):
Total running time in milliseconds.
initial_states_generator (str, 'none'/'tile'/'random', optional, default='random'):
Defines the expansion of input state samples into `initial_states`
for the Tabu search, if fewer than `num_reads` samples are
present. See :meth:`~tabu.TabuSampler.sample`.
See :ref:`samplers-examples`.
"""
def __init__(self, num_reads=None, tenure=None, timeout=100,
initial_states_generator='random', **runopts):
super(TabuProblemSampler, self).__init__(**runopts)
self.num_reads = num_reads
self.tenure = tenure
self.timeout = timeout
self.initial_states_generator = initial_states_generator
self.sampler = TabuSampler()
def __repr__(self):
return ("{self}(num_reads={self.num_reads!r}, "
"tenure={self.tenure!r}, "
"timeout={self.timeout!r}, "
"initial_states_generator={self.initial_states_generator!r})").format(self=self)
def next(self, state, **runopts):
sampleset = self.sampler.sample(
state.problem, initial_states=state.samples,
initial_states_generator=self.initial_states_generator,
tenure=self.tenure, timeout=self.timeout, num_reads=self.num_reads)
return state.updated(samples=sampleset)
[docs]class InterruptableTabuSampler(Loop):
"""An interruptable tabu sampler for a binary quadratic problem.
Args:
num_reads (int, optional, default=1):
Number of states (output solutions) to read from the sampler.
tenure (int, optional):
Tabu tenure, which is the length of the tabu list, or number of
recently explored solutions kept in memory. Default is a quarter of
the number of problem variables up to a maximum value of 20.
timeout (int, optional, default=20):
Timeout for non-interruptable operation of tabu search. At the
completion of each loop of tabu search through its problem
variables, if this time interval has been exceeded, the search can
be stopped by an interrupt signal or expiration of the `timeout`
parameter.
initial_states_generator (str, 'none'/'tile'/'random', optional, default='random'):
Defines the expansion of input state samples into `initial_states`
for the Tabu search, if fewer than `num_reads` samples are
present. See :meth:`~tabu.TabuSampler.sample`.
max_time (float, optional, default=None):
Total running time in milliseconds.
See :ref:`samplers-examples`.
"""
def __init__(self, max_time=None, **tabu):
super(InterruptableTabuSampler, self).__init__(
TabuProblemSampler(**tabu), max_time=max_time)
[docs]class RandomSubproblemSampler(traits.SubproblemSampler, traits.SISO, Runnable):
"""A random sample generator for a subproblem."""
def next(self, state, **runopts):
bqm = state.subproblem
sample = random_sample(bqm)
sampleset = SampleSet.from_samples(sample,
vartype=bqm.vartype,
energy=bqm.energy(sample))
return state.updated(subsamples=sampleset)