Source code for sequencing.sequencing.basic

# This file is part of sequencing.
#
#    Copyright (c) 2021, The Sequencing Authors.
#    All rights reserved.
#
#    This source code is licensed under the BSD-style license found in the
#    LICENSE file in the root directory of this source tree.

import numpy as np
import qutip


[docs]class HamiltonianChannels(object): """Object prepresenting a set of Hamiltonian 'channels' with time-dependent coefficients. Args: channels (optional, dict): Dict of {channel_name: {'H': hamiltonian_term, 'time_dependent': td_bool}}, where hamiltonian_term is a qutip.Qobj and td_bool is a boolean flag indicating whether this Hamiltonian will be given time-dependence (defaults to False). Default: None. collapse_channels (optional, dict): Similar to channels, but for collapse operators instead of Hamiltonian terms. Default: None. """ def __init__(self, channels=None, collapse_channels=None, t0=0, dt=1): self.channels = {} if channels is None: channels = {} for name, info in channels.items(): H = info["H"] time_dependent = info.get("time_dependent", False) self.add_channel(name, H=H, time_dependent=time_dependent) self.collapse_channels = {} if collapse_channels is None: collapse_channels = {} for name, info in collapse_channels.items(): op = info["op"] time_dependent = info.get("time_dependent", False) self.add_channel(name, C_op=op, time_dependent=time_dependent) self.tmin = t0 self.tmax = t0 self.dt = dt @property def times(self): """Array of times.""" return np.arange(self.tmin, self.tmax + self.dt, self.dt).astype(float)
[docs] def add_channel( self, name, H=None, C_op=None, time_dependent=False, error_if_exists=True ): """Add a channel with specified Hamiltonian term H or collapse operator C_op. Args: name (str): Name of the Hamiltonian channel. H (optional, qutip.Qobj): Hamiltonian term for this channel. C_op (optional, qutip.Qobj): Collapse operator for this channel. time_dependent (optional, bool): Whether this channel has time-dependent Hamiltonian coefficients. Default: False. error_if_exists (optional, bool): Whether to raise an exception if a channel with the same name already exists. Default: True. """ if name in self.channels and error_if_exists: raise ValueError(f"Channel {name} already exists!") if H is not None: if H != 0 and not isinstance(H, qutip.Qobj): # H could be zero if it's sum([]) raise TypeError(f"Excpected instance of qutip.Qobj, but got {type(H)}.") self.channels[name] = {"H": H, "time_dependent": time_dependent, "delay": 0} elif C_op is not None: # C_op could be zero if it's sum([]) if C_op != 0 and not isinstance(C_op, qutip.Qobj): raise TypeError( f"Excpected instance of qutip.Qobj, but got {type(C_op)}." ) self.collapse_channels[name] = { "op": C_op, "time_dependent": time_dependent, "delay": 0, } else: raise ValueError("Expected H or C_op.")
[docs] def add_operation( self, channel_name, t0=None, duration=None, times=None, H=None, C_op=None, coeffs=1, coeffs_args=None, coeffs_kwargs=None, ): """Add an operation to a given channel, padding all other channels accordingly. Args: channel_name (str): Name of the Hamiltonian channel this operation acts on. t0 (optional, int): Starting time of this operation (required along with duration if you do not provide an array of times). Default: None. duration (optional, int): Length of this operation (required along with t0 if you do not provide an array of times). Default: None. times (optional, sequence[int]): Sequence of time points. Default: None. H (optional, qutip.Qobj): Hamiltonian for the given channel (required if this channel is not yet in self.channels). Default: None. C_op (optional, qutip.Qobj): Collapse operator for the given channel (required if this channel is not yet in self.collapse_channels). coeffs (optional, number | sequence | callable): Hamiltonian coefficients for the given time points. You can specify single int/float/complex for constant coefficients, or provide a function that takes time as its first argument and returns coefficiencts. Default: None. coeffs_args (optional, sequence): Positional arguments passed to coeffs if coeffs is callable. Default: None. coeffs_kwargs (optional, dict): Keyword arguments passed to coeffs if coeffs is a function. Default: None. **Important note:** if ``reset_t0`` is in coeffs_kwargs and is True, then the first argument for coeffs will be ``times-times[0]`` instead of ``times``. This allows for the user to take care of phase bookkeeping. """ if H is not None and C_op is not None: raise ValueError("Expected only H or C_op.") all_channels = list(self.channels) + list(self.collapse_channels) if channel_name not in all_channels: self.add_channel(channel_name, H=H, C_op=C_op, time_dependent=True) if channel_name in self.channels: channel_dict = self.channels else: channel_dict = self.collapse_channels if not channel_dict[channel_name]["time_dependent"]: raise ValueError( f"Channel {channel_name} is time-independent, " "so you cannot add time-dependent coefficients." ) if times is None: if t0 is None or duration is None: raise ValueError( "You must either specify an array of times or t0 and duration." ) delay = int(channel_dict[channel_name]["delay"]) times = np.arange(t0 + delay, t0 + delay + duration, self.dt) if isinstance(coeffs, (int, float, complex)): coeffs = coeffs * np.ones_like(times) elif callable(coeffs): if coeffs_args is None: coeffs_args = [] if coeffs_kwargs is None: coeffs_kwargs = {} else: coeffs_kwargs = coeffs_kwargs.copy() reset_t0 = coeffs_kwargs.pop("reset_t0", False) ts = times - times[0] if reset_t0 else times coeffs = coeffs(ts, *coeffs_args, **coeffs_kwargs) # First, pad the new coeffs to line up with self.times lpad = int(max((times.min() - self.tmin) // self.dt, 0)) rpad = int(max((self.tmax - times.max()) // self.dt, 0)) coeffs = np.concatenate([np.zeros(lpad), coeffs, np.zeros(rpad)]) # Next, calculcate pad widths needed to make other channels # line up with this one lpad = int(max((self.tmin - times.min()) // self.dt, 0)) rpad = int(max((times.max() - self.tmax) // self.dt, 0)) self.tmin = min(times.min(), self.tmin) self.tmax = max(times.max(), self.tmax) for channel_dict in (self.channels, self.collapse_channels): for name, channel in channel_dict.items(): if "coeffs" in channel: # This channel already has an operation on it, # so we have to pad it old_coeffs = np.concatenate( [np.zeros(lpad), channel["coeffs"], np.zeros(rpad)] ) new_coeffs = old_coeffs if name == channel_name: # Sum signals on the same Hamiltonian channel # if they are not separated by a sync(). new_coeffs = coeffs + old_coeffs channel["coeffs"] = new_coeffs elif name == channel_name: # This is the channel we're currently adding an operation to, # and it doesn't have any operations on it yet channel["coeffs"] = coeffs for channel_dict in (self.channels, self.collapse_channels): # check that padding was done properly for name, channel in channel_dict.items(): if "coeffs" in channel: if len(self.times) != len(channel["coeffs"]): msg = ( f"Channel {name}: " f"Number of time points {len(self.times)} " f"does not match number of coefficients " f'{len(channel["coeffs"])}.' ) raise ValueError(msg)
[docs] def delay_channels(self, channel_names, duration): """Add a delay of length ``duration`` to channels specified in the list ``channel_names``. """ if duration == 0: return if duration < 0: raise ValueError("Delay cannot be less than 0.") if isinstance(channel_names, str): channel_names = [channel_names] for name in channel_names: if name not in list(self.channels) + list(self.collapse_channels): raise ValueError(f"Channel {name} does not exist.") for name in channel_names: self.channels[name]["delay"] += duration
[docs] def build_hamiltonian(self): """Assemble the channels into a list of Hamiltonian terms and collapse operators which can be ingested by ``qutip.mesolve``. Returns: tuple[list, list, np.ndarray]: H, C_ops, times """ H = [] for channel in self.channels.values(): if "coeffs" in channel: H.append([channel["H"], channel["coeffs"]]) elif isinstance(channel["H"], (list, tuple)): H.extend(channel["H"]) else: H.append(channel["H"]) C_ops = [] for channel in self.collapse_channels.values(): if "coeffs" in channel and np.any(channel["coeffs"]): C_ops.append([channel["op"], channel["coeffs"]]) elif isinstance(channel["op"], (list, tuple)): C_ops.extend(channel["op"]) else: C_ops.append(channel["op"]) return H, C_ops, self.times
[docs] def plot_coefficients(self, subplots=True, plot_imag=False, step=False): """Plot the Hamiltionian coefficients for all channels. Args: subplots (optional, bool): If True, plot each channel on a different axis. Default: True. plot_imag (optional, bool): If True, plot both real and imag. Default: False. step (optional, bool): It True, use axis.step() instead of axis.plot(). Default: False. Returns: tuple: (fig, ax): matplotlib Figure and axes. """ import matplotlib.pyplot as plt channels = [ name for name, ch in self.channels.items() if name != "delay" and "coeffs" in ch ] xs = self.times if subplots: fig, axes = plt.subplots(len(channels), 1, sharex=True) if not isinstance(axes, (list, np.ndarray)): axes = [axes] for i, (ch, ax) in enumerate(zip(channels, axes)): c = "C" + str(i) ys = self.channels[ch]["coeffs"] plot_func = ax.step if step else ax.plot plot_func(xs, ys.real, "-", label=ch, color=c) if plot_imag: plot_func(xs, ys.imag, "--", color=c) ax.grid(True) ax.legend(loc=0) axes[0].set_title("Hamiltonian coefficients") axes[-1].set_xlabel("Time") else: fig, axes = plt.subplots(1, 1) plot_func = axes.step if step else axes.plot for i, ch in enumerate(channels): c = "C" + str(i) ys = self.channels[ch]["coeffs"] plot_func(xs, ys.real, "-", label=ch, color=c) if plot_imag: plot_func(xs, ys.imag, "--", color=c) axes.set_xlabel("Time") axes.set_ylabel("Hamiltonian coefficient") axes.legend(loc=0) axes.grid(True) return fig, axes
[docs]class CompiledPulseSequence(object): """Creates time-dependent Hamiltonian channels from a sequence of Operations. Args: system (optional, subsystems.System): System containing the Modes on which to operate. channels (opional, dict): Dict of initial channels to pass the HamiltonianChannels. Default: None. """ def __init__(self, system=None, channels=None, t0=0): self.system = None self.modes = None self.hc = None self._t = None if system is not None: self.set_system(system, channels=channels, t0=t0) def set_system(self, system, channels=None, t0=0): self.system = system self.modes = system.modes self.hc = HamiltonianChannels(channels=channels, t0=t0, dt=system.dt) self._t = self.hc.tmax @property def dt(self): return self.hc.dt @property def times(self): return self.hc.times @property def channels(self): return self.hc.channels
[docs] def add_channel( self, name, H=None, C_op=None, time_dependent=False, error_if_exists=True ): """Add a channel with specified Hamiltonian term H or collapse operator C_op. Args: name (str): Name of the Hamiltonian channel. H (optional, qutip.Qobj): Hamiltonian term for this channel. C_op (optional, qutip.Qobj): Collapse operator for this channel. time_dependent (optional, bool): Whether this channel has time-dependent Hamiltonian coefficients. Default: False. error_if_exists (optional, bool): Whether to raise an exception if a channel with the same name already exists. Default: True. """ self.hc.add_channel( name, H=H, C_op=C_op, time_dependent=time_dependent, error_if_exists=error_if_exists, )
[docs] def add_operation(self, operation): """Add an Operation (time dependent Hamiltonian or collapse terms) to the HamiltonianChannels at the current time. Args: operation (Operation): Operation object to add to the pulse sequence. """ from .common import HTerm, Operation if not isinstance(operation, Operation): raise TypeError(f"Expected an Operation, but got {type(operation)}.") duration, terms = operation for channel_name, term in terms.items(): if isinstance(term, HTerm): H, coeffs, args, kwargs = term C_op = None else: # its a CTerm C_op, coeffs, args, kwargs = term H = None if args is None: args = [] if kwargs is None: kwargs = {} self.hc.add_operation( channel_name, t0=self._t, duration=duration, H=H, C_op=C_op, coeffs=coeffs, coeffs_args=args, coeffs_kwargs=kwargs, )
[docs] def delay(self, length, sync_before=True, sync_after=True): """Add a global delay to the sequence. Args: length (int): Length of the delay in ns. sync_before (optional, bool): Whether to sync before the delay. Default: True. sync_after (optional, bool): Whether to sync after the delay: Default: True. """ if not length: if sync_before: self.sync() return if sync_before: self.sync() self.hc.add_operation( "delay", H=0 * self.system.active_modes[0].I, t0=self._t, duration=length, coeffs=1, ) if sync_after: self.sync()
[docs] def sync(self): """Ensure that the Hamiltonian channels all align up to this point. This means that all operations which follow the sync will be executed after all those before the sync. Sequences are constructed in terms of blocks of operations separated by sync()s. Within a block, channels are made to have equal duration by padding shorter channels to the maximum channel length. """ self._t = self.hc.tmax + self.hc.dt
[docs] def build_hamiltonian(self): """Assemble the channels into a list of Hamiltonian terms and collapse operators which can be ingested by qutip.mesolve. Returns: tuple[list, list, np.ndarray]: H, C_ops, times """ return self.hc.build_hamiltonian()
[docs] def run( self, init_state, c_ops=None, e_ops=None, options=None, only_final_state=False, progress_bar=None, ): """Simulate the sequence using qutip.mesolve. Args: init_state (qutip.Qobj): Inital state of the system. c_ops (optional, list): List of additional collapse operators. Default: None. e_ops (optional, list): List of expectation operators. Default: None. options (optional, qutip.Options): qutip solver options. Note: defaults to max_step = 1. only_final_state (optional, bool): Whether to query the system's state at only the initial and final times rather than at every time step in the Hamiltonian. Default: False. progress_bar (optional, None): Whether to use qutip's progress bar. Default: None (no progress bar). Returns: ``qutip.solver.Result``: qutip.Solver.Result instance. """ from .common import ops2dms if c_ops is None: c_ops = [] if e_ops is None: e_ops = [] if options is None: options = qutip.Options() options.max_step = self.hc.dt options.store_states = True options.rhs_reuse = True qutip.rhs_clear() if "H0" not in self.hc.channels: H0 = sum(self.system.H0()) if isinstance(H0, int) and H0 == 0: H0 = 0 * self.system.I() self.hc.add_channel("H0", H=H0, time_dependent=False) system_c_ops = self.system.c_ops() c_ops.extend(system_c_ops) H, C_ops, times = self.build_hamiltonian() c_ops.extend(C_ops) e_ops = ops2dms(e_ops) if only_final_state: mesolve_times = [times.min(), times.max()] H = qutip.QobjEvo(H, tlist=times, state0=init_state, e_ops=e_ops) else: mesolve_times = times return qutip.mesolve( H, init_state, mesolve_times, c_ops, e_ops, options=options, progress_bar=progress_bar, )
[docs] def propagator( self, c_ops=None, options=None, unitary_mode="batch", parallel=False, ): """Calculate the propagator using ``qutip.propagator()``. Args: c_ops (list[qutip.Qobj]): List of collapse operators. options (optional, qutip.Options): qutip solver options. Note: defaults to max_step = 1. progress_bar (optional, None): Whether to use qutip's progress bar. Default: None (no progress bar). unitary_mode (optional, "batch" or "single"): Solve all basis vectors simulaneously ("batch") or individually ("single"). Default: "batch". parallel (optional, bool): Run the propagator in parallel mode. This will override the unitary_mode settings if set to True. Default: False. Returns: list[qutip.Qobj]: List of Qobjs representing the propagator U(t). """ if c_ops is None: c_ops = [] if options is None: options = qutip.Options() options.max_step = self.hc.dt options.rhs_reuse = True qutip.rhs_clear() if "H0" not in self.hc.channels: H0 = sum(self.system.H0()) if isinstance(H0, int) and H0 == 0: H0 = 0 * self.system.I() self.hc.add_channel("H0", H=H0, time_dependent=False) system_c_ops = self.system.c_ops() c_ops.extend(system_c_ops) H, C_ops, times = self.build_hamiltonian() c_ops.extend(C_ops) if len(H) == 1 and not isinstance(H[0], list): # This case breaks qutip.propagator(). H = H[0] props = qutip.propagator( H, times, c_op_list=c_ops, unitary_mode=unitary_mode, parallel=parallel, options=options, ) # Ensure that props is a list rather than np.ndarray. # This is especially important for numpy >= 1.20 # since newer versions of numpy can automatically coerce # objects of type np.ndarray[qutip.Qobj] into bare np.ndarray, # which is not what we want in this situation. if isinstance(props, np.ndarray): eye = self.system.I() props = [qutip.Qobj(prop, dims=eye.dims) for prop in props] return props
[docs] def plot_coefficients(self, subplots=True, plot_imag=False, step=False): """Plot the Hamiltionian coefficients for all channels. Args: subplots (optional, bool): If True, plot each channel on a different axis. Default: True. plot_imag (optional, bool): If True, plot both real and imag. Default: False. step (optional, bool): It True, use axis.step() instead of axis.plot(). Default: False. Returns: tuple: (fig, ax): matplotlib Figure and axes. """ return self.hc.plot_coefficients( subplots=subplots, plot_imag=plot_imag, step=step )