Source code for sequencing.sequencing.main

# This file is part of sequencing.
#
#    Copyright (c) 2021, The Sequencing Authors.
#    All rights reserved.
#
#    This source code is licensed under the BSD-style license found in the
#    LICENSE file in the root directory of this source tree.

import copy

import numpy as np
from tqdm import tqdm
import qutip

from .basic import CompiledPulseSequence
from .common import (
    Operation,
    SyncOperation,
    DelayOperation,
    DelayChannelsOperation,
    ValidatedList,
    delay_channels,
    get_sequence,
)


[docs]class PulseSequence(ValidatedList): """A list-like container of Operation, SyncOperation, DelayOperation, and DelayChannelOperation objects, which can be compiled into a CompiledPulseSequence at a later time. Args: system (optional, System): The System associated with the PulseSequence. Default: None. t0 (optional, int): The start time of the PulseSequence. Default: 0. items (optional, iterable): Iterable containing initial values with which to populate the PulseSequence. Default: None. """ VALID_TYPES = ( Operation, SyncOperation, DelayOperation, DelayChannelsOperation, ) def __init__(self, system=None, t0=0, items=None): self.system = None self.t0 = None super().__init__(items) self.set_system(system=system, t0=t0)
[docs] def set_system(self, system=None, t0=0): """Sets the System and start time for the PulseSequence. Args: system (optional, System): The System associated with the PulseSequence. Default: None. t0 (optional, int): The start time of the PulseSequence. Default: 0. """ self.system = system self.t0 = t0 self.clear()
[docs] def compile(self): """Compiles the PulseSequence into a CompiledPulseSequence. Returns: CompiledPulseSequence: A new CompiledPulseSequence. """ seq = CompiledPulseSequence(system=self.system, t0=self.t0) for item in self: item = self._validate(item) if isinstance(item, Operation): seq.add_operation(item) elif isinstance(item, SyncOperation): seq.sync() elif isinstance(item, DelayOperation): seq.delay( item.length, sync_before=item.sync_before, sync_after=item.sync_after, ) else: # item is a DelayChannelsOperation delay_channels(item.channels, item.length, seq=seq) return seq
@property def times(self): """Array of times.""" return self.compile().times @property def channels(self): """Dict of Hamiltonian channels.""" return self.compile().channels
[docs] def run( self, init_state, c_ops=None, e_ops=None, options=None, only_final_state=False, progress_bar=None, ): """Simulate the sequence using qutip.mesolve. Args: init_state (qutip.Qobj): Inital state of the system. c_ops (optional, list): List of additional collapse operators. Default: None. e_ops (optional, list): List of expectation operators. Default: None. options (optional, qutip.Options): qutip solver options. Note: defaults to max_step = 1. only_final_state (optional, bool): Whether to query the system's state at only the initial and final times rather than at every time step in the Hamiltonian. Default: False. progress_bar (optional, None): Whether to use qutip's progress bar. Default: None (no progress bar). Returns: ``qutip.solver.Result``: qutip.solver.Result instance. """ return self.compile().run( init_state, c_ops=c_ops, e_ops=e_ops, options=options, only_final_state=only_final_state, progress_bar=progress_bar, )
[docs] def propagator( self, c_ops=None, options=None, unitary_mode="batch", parallel=False, progress_bar=None, ): """Calculate the propagator using ``qutip.propagator()``. Args: c_ops (list[qutip.Qobj]): List of collapse operators. options (optional, qutip.Options): qutip solver options. Note: defaults to max_step = 1. progress_bar (optional, None): Whether to use qutip's progress bar. Default: None (no progress bar). unitary_mode (optional, "batch" or "single"): Solve all basis vectors simulaneously ("batch") or individually ("single"). Default: "batch". parallel (optional, bool): Run the propagator in parallel mode. This will override the unitary_mode settings if set to True. Default: False. Returns: list[qutip.Qobj]: List of Qobjs representing the propagator U(t). """ if all(isinstance(op, SyncOperation) for op in self): return [self.system.I()] return self.compile().propagator( c_ops=c_ops, options=options, unitary_mode=unitary_mode, parallel=parallel, )
[docs] def plot_coefficients(self, subplots=True, plot_imag=False, step=False): """Plot the Hamiltionian coefficients for all channels. Args: subplots (optional, bool): If True, plot each channel on a different axis. Default: True. plot_imag (optional, bool): If True, plot both real and imag. Default: False. step (optional, bool): It True, use axis.step() instead of axis.plot(). Default: False. Returns: tuple: (fig, ax): matplotlib Figure and axes. """ return self.compile().plot_coefficients( subplots=subplots, plot_imag=plot_imag, step=step )
[docs]class Sequence(ValidatedList): """A list-like container of PulseSequence, Operation, and unitary objects, which can be used to evolve an initial state. Args: system (System): System upon which the sequence will act. operations (optional, list[qutip.Qobj, PulseSequence, Operation]): Initial list of Operations or unitaries. Default: None. """ VALID_TYPES = ( qutip.Qobj, Operation, PulseSequence, ) def __init__(self, system, operations=None): self.system = system self.pulse_sequence = get_sequence(system) self._t = 0 super().__init__(operations) def _validate(self, item): """Enforces that item is an instance of one of the types in VALID_TYPES. If item is the global PulseSequence, this function will make a deepcopy and then reset the PulseSequence. Returns: `qutip.Qobj``, Operation, or PulseSequence: Returns the item if it is a valid type. """ item = super()._validate(item) if item is self.pulse_sequence: # If we are appending pulse_sequence, # make a deepcopy and then reset it. item = copy.deepcopy(item) self.reset_pulse_sequence() elif len(self.pulse_sequence): # Otherwise, if pulse_sequence is not empty, # capture its contents and reset it before returning # the current item. self.capture() return item
[docs] def reset_pulse_sequence(self): """Reset the current pulse sequence.""" self.pulse_sequence = get_sequence(self.system)
[docs] def capture(self): """Appends the current pulse sequence if it is not empty. """ if len(self.pulse_sequence): self.append(self.pulse_sequence)
[docs] def run( self, init_state, e_ops=None, options=None, full_evolution=True, progress_bar=False, ): """Evolves init_state using each PulseSequence, Operation, or unitary applied in series. Args: init_state (qutip.Qobj): Initial state to evolve. options (optional, qutip.Options): qutip solver options. Default: None. full_evolution (optional, bool): Whether to store the states for every time point in the included Sequences. If False, only the final state will be stored. Default: True. progress_bar (optional, bool): If True, displays a progress bar when iterating through the Sequence. Default:True. Returns: SequenceResult: SequenceResult containing the time evolution of the system. """ self.capture() progbar = tqdm if progress_bar else lambda x, **kw: x e_ops = e_ops or [] self._t = 0 times = [self._t] states = [init_state] for item in progbar(self): item = self._validate(item) if isinstance(item, PulseSequence): if item.system != self.system: raise ValueError("All operations must have the same system.") # The CompiledPulseSequence created by PulseSequence.run() # should start at the current Sequence time. item.t0 = self._t seq = item.compile() seq.sync() result = seq.run( states[-1], options=options, only_final_state=(not full_evolution) ) if full_evolution: new_states = result.states new_times = result.times else: new_states = result.states[-1:] new_times = result.times[-1:] self._t = seq._t elif isinstance(item, Operation): seq = CompiledPulseSequence(self.system, t0=self._t) seq.add_operation(item) seq.sync() result = seq.run( states[-1], options=options, only_final_state=(not full_evolution) ) if full_evolution: new_states = result.states new_times = result.times else: new_states = result.states[-1:] new_times = result.times[-1:] self._t = seq._t else: # item is a Qobj state = states[-1] if state.isket: state = item * state else: # state is a density matrix state = item * state * item.dag() new_states = [state] new_times = [times[-1]] # Unitaries take zero time, so self._t # should be the latest sequence time. states.extend(new_states) times.extend(new_times) times = np.array(times) ix = np.argsort(times) times = times[ix] states = [states[i] for i in ix] expect = [] for op in e_ops: expect.append(qutip.expect(op, states)) num_collapse = len(self.system.c_ops(clean=True)) result = SequenceResult( times=times, states=states, expect=expect, num_collapse=num_collapse ) return result
[docs] def propagator( self, c_ops=None, options=None, unitary_mode="batch", parallel=False, progress_bar=False, ): """Calculate the propagator using ``qutip.propagator()``. Args: c_ops (list[qutip.Qobj]): List of collapse operators. options (optional, qutip.Options): qutip solver options. Note: defaults to max_step = 1. progress_bar (optional, None): Whether to use qutip's progress bar. Default: None (no progress bar). unitary_mode (optional, "batch" or "single"): Solve all basis vectors simulaneously ("batch") or individually ("single"). Default: "batch". parallel (optional, bool): Run the propagator in parallel mode. This will override the unitary_mode settings if set to True. Default: False. progress_bar (optional, bool): If True, displays a progress bar when iterating through the Sequence. Default:True. Returns: list[qutip.Qobj]: List of Qobjs representing the propagator U(t). """ self.capture() progbar = tqdm if progress_bar else lambda x, **kw: x c_ops = c_ops or [] self._t = 0 times = [self._t] props = [self.system.I()] for item in progbar(self): item = self._validate(item) if isinstance(item, PulseSequence): if item.system != self.system: raise ValueError("All operations must have the same system.") # The CompiledPulseSequence created by PulseSequence.run() # should start at the current Sequence time. item.t0 = self._t seq = item.compile() seq.sync() result = seq.propagator( c_ops=c_ops, options=options, unitary_mode=unitary_mode, parallel=parallel, )[-1] self._t = seq._t props.append(result * props[-1]) times.append(self._t) elif isinstance(item, Operation): seq = CompiledPulseSequence(self.system, t0=self._t) seq.add_operation(item) seq.sync() result = seq.propagator( c_ops=c_ops, options=options, unitary_mode=unitary_mode, parallel=parallel, )[-1] props.append(result * props[-1]) self._t = seq._t else: # item is a Qobj props.append(item * props[-1]) # Unitaries take zero time, so self._t # should be the latest sequence time. times.append(times[-1]) return props
[docs] def plot_coefficients(self, subplots=True, sharex=True, sharey=True): """Plot the Hamiltionian coefficients for all channels. Unitaries are represented by vertical lines. Args: subplots (optional, bool): If True, plot each channel on a different axis. Default: True. sharex (optional, bool): Share x axes if subplots is True. Default: True. sharey (optional, bool): Share y axes if subplots is True. Default: True. Returns: tuple: (fig, ax): matplotlib Figure and axes. """ import matplotlib.pyplot as plt self._t = 0 channels = {} for item in self: item = self._validate(item) if isinstance(item, PulseSequence): if item.system != self.system: raise ValueError("All operations must have the same system.") # The CompiledPulseSequence created by PulseSequence.run() # should start at the current Sequence time. item.t0 = self._t seq = item.compile() seq.sync() new_times = seq.times self._t = new_times.max() new_channels = seq.channels elif isinstance(item, Operation): seq = CompiledPulseSequence(self.system, t0=self._t) seq.add_operation(item) seq.sync() new_times = seq.times self._t = new_times.max() new_channels = seq.channels else: new_channels = {"unitary": {"coeffs": np.array([1.0])}} new_times = [self._t] # Unitaries take zero time, so self._t # remains unchanged. # Assemble the results for this time step for name, info in new_channels.items(): if name in [] or "coeffs" not in info: continue if name in channels: channel_times = np.concatenate( (channels[name][0], np.asarray(new_times)) ) channel_coeffs = np.concatenate((channels[name][1], info["coeffs"])) channels[name] = (channel_times, channel_coeffs) else: channels[name] = (new_times, info["coeffs"]) channel_names = [n for n in channels if n not in ["delay", "unitary"]] if not channel_names: raise ValueError("There are no channels with coefficients to plot.") if subplots: fig, axes = plt.subplots(len(channel_names), sharex=sharex, sharey=sharey) else: fig, ax = plt.subplots(1, 1) axes = [ax] * len(channel_names) for name, ax in zip(channel_names, axes): ctimes, coeffs = channels[name] ax.plot(ctimes, coeffs, label=name) ax.legend(loc=0) ax.grid(True) for ctimes, _ in zip(*channels["unitary"]): if subplots: for a in axes: a.axvline(ctimes, ls="--", lw=1.5, color="k", alpha=0.25) else: ax.axvline(ctimes, ls="--", lw=1.5, color="k", alpha=0.25) fig.suptitle("Hamiltonian coefficients") fig.tight_layout() fig.subplots_adjust(top=0.9) axes[-1].set_xlabel("Time") if subplots: return fig, axes return fig, ax
[docs]class SequenceResult(object): """An object that mimics qutip.solver.Result for Sequences. Attributes: times (np.ndarray): Array of times. states (list[qutip.Qobj]): List of states. expect (list[np.ndarray]): List of arrays of expectation values. num_expect (int): Number of expectation operators, ``num_expect == len(expect)``. num_collapse (int): Number of collapse operators involved in the Sequence. solver (str): Name of the 'solver' used to generate the SequenceResult. This is always 'sequencing.Sequence'. """ def __init__(self, times=None, states=None, expect=None, num_collapse=0): if times is None: times = np.array([]) self.times = times if expect is None: expect = [] self.expect = expect if states is None: states = [] self.states = states self.num_collapse = num_collapse self.solver = "sequencing.Sequence" @property def num_expect(self): return len(self.expect) def __str__(self): s = ["SequenceResult"] s.append("-" * len(s[-1])) if self.times is not None and len(self.times) > 0: s.append(f"Number of times: {len(self.times)}") if self.states is not None and len(self.states) > 0: s.append("states = True") if self.expect is not None and len(self.expect) > 0: s.append(f"expect = True, num_expect = {self.num_expect}") s.append(f"num_collapse = {self.num_collapse}") return "\n".join(s) def __repr__(self): return self.__str__()
_global_pulse_sequence = PulseSequence()