import time
from subprocess import CalledProcessError
from typing import List, Optional, Sequence, Tuple, Union
import samna
import torch
import torch.nn as nn
import sinabs
from sinabs.backend.dynapcnn.chip_factory import ChipFactory
from .dvs_layer import DVSLayer
from .dynapcnn_layer import DynapcnnLayer
from .exceptions import InputConfigurationError
from .io import disable_timestamps, enable_timestamps, open_device, reset_timestamps
from .utils import (
build_from_list,
convert_model_to_layer_list,
infer_input_shape,
parse_device_id,
)
[docs]class DynapcnnNetwork(nn.Module):
"""
Given a sinabs spiking network, prepare a dynapcnn-compatible network.
This can be used to test the network will be equivalent once on DYNAPCNN.
This class also provides utilities to make the dynapcnn configuration and
upload it to DYNAPCNN.
The following operations are done when converting to dynapcnn-compatible:
* multiple avg pooling layers in a row are consolidated into one and \
turned into sum pooling layers;
* checks are performed on layer hyperparameter compatibility with dynapcnn \
(kernel sizes, strides, padding)
* checks are performed on network structure compatibility with dynapcnn \
(certain layers can only be followed by other layers)
* linear layers are turned into convolutional layers
* dropout layers are ignored
* weights, biases and thresholds are discretized according to dynapcnn requirements
Note that the model parameters are only ever transferred to the device
on the `to` call, so changing a threshold or weight of a model that
is deployed will have no effect on the model on chip until `to` is called again.
"""
def __init__(
self,
snn: Union[nn.Sequential, sinabs.Network],
input_shape: Optional[Tuple[int, int, int]] = None,
dvs_input: bool = False,
discretize: bool = True,
):
"""
DynapcnnNetwork: a class turning sinabs networks into dynapcnn
compatible networks, and making dynapcnn configurations.
Parameters
----------
snn: sinabs.Network
SNN that determines the structure of the `DynapcnnNetwork`
input_shape: None or tuple of ints
Shape of the input, convention: (features, height, width)
If None, `snn` needs an InputLayer
dvs_input: bool
Does dynapcnn receive input from its DVS camera?
discretize: bool
If True, discretize the parameters and thresholds.
This is needed for uploading weights to dynapcnn. Set to False only for
testing purposes.
"""
super().__init__()
# This attribute stores the location/core-id of each of the DynapcnnLayers upon placement on chip
self.chip_layers_ordering = []
self.input_shape = input_shape # Convert models to sequential
layers = convert_model_to_layer_list(model=snn)
# Check if dvs input is expected
if dvs_input:
self.dvs_input = True
else:
self.dvs_input = False
input_shape = infer_input_shape(layers, input_shape=input_shape)
assert len(input_shape) == 3, "infer_input_shape did not return 3-tuple"
# Build model from layers
self.sequence = build_from_list(
layers,
in_shape=input_shape,
discretize=discretize,
dvs_input=self.dvs_input,
)
[docs] def to(
self,
device="cpu",
chip_layers_ordering="auto",
monitor_layers: Optional[Union[List, str]] = None,
config_modifier=None,
slow_clk_frequency: int = None,
):
"""
Note that the model parameters are only ever transferred to the device on the
`to` call, so changing a threshold or weight of a model that is deployed will
have no effect on the model on chip until `to` is called again.
Parameters
----------
device: String
cpu:0, cuda:0, dynapcnndevkit, speck2devkit
chip_layers_ordering: sequence of integers or `auto`
The order in which the dynapcnn layers will be used. If `auto`,
an automated procedure will be used to find a valid ordering.
A list of layers on the device where you want each of the model's DynapcnnLayers to be placed.
The index of the core on chip to which the i-th layer in the model is mapped is the value of the i-th entry in the list.
Note: This list should be the same length as the number of dynapcnn layers in your model.
monitor_layers: None/List
A list of all layers in the module that you want to monitor. Indexing starts with the first non-dvs layer.
If you want to monitor the dvs layer for eg.
::
monitor_layers = ["dvs"] # If you want to monitor the output of the pre-processing layer
monitor_layers = ["dvs", 8] # If you want to monitor preprocessing and layer 8
monitor_layers = "all" # If you want to monitor all the layers
config_modifier:
A user configuration modifier method.
This function can be used to make any custom changes you want to make to the configuration object.
Note
----
chip_layers_ordering and monitor_layers are used only when using synsense devices.
For GPU or CPU usage these options are ignored.
"""
self.device = device
if isinstance(device, torch.device):
return super().to(device)
elif isinstance(device, str):
device_name, _ = parse_device_id(device)
if device_name in ChipFactory.supported_devices:
# Generate config
config = self.make_config(
chip_layers_ordering=chip_layers_ordering,
device=device,
monitor_layers=monitor_layers,
config_modifier=config_modifier,
)
# Apply configuration to device
self.samna_device = open_device(device)
self.samna_device.get_model().apply_configuration(config)
time.sleep(1)
# Set external slow-clock if need
if slow_clk_frequency is not None:
dk_io = self.samna_device.get_io_module()
dk_io.set_slow_clk(True)
dk_io.set_slow_clk_rate(slow_clk_frequency) # Hz
builder = ChipFactory(device).get_config_builder()
# Create input source node
self.samna_input_buffer = builder.get_input_buffer()
# Create output sink node node
self.samna_output_buffer = builder.get_output_buffer()
# Connect source node to device sink
self.device_input_graph = samna.graph.EventFilterGraph()
self.device_input_graph.sequential(
[
self.samna_input_buffer,
self.samna_device.get_model().get_sink_node(),
]
)
# Connect sink node to device
self.device_output_graph = samna.graph.EventFilterGraph()
self.device_output_graph.sequential(
[
self.samna_device.get_model().get_source_node(),
self.samna_output_buffer,
]
)
self.device_input_graph.start()
self.device_output_graph.start()
self.samna_config = config
return self
else:
return super().to(device)
else:
raise Exception("Unknown device description.")
def _make_config(
self,
chip_layers_ordering: Union[Sequence[int], str] = "auto",
device="dynapcnndevkit:0",
monitor_layers: Optional[Union[List, str]] = None,
config_modifier=None,
) -> Tuple["SamnaConfiguration", bool]:
"""
Prepare and output the `samna` configuration for this network.
Parameters
----------
chip_layers_ordering: sequence of integers or `auto`
The order in which the dynapcnn layers will be used. If `auto`,
an automated procedure will be used to find a valid ordering.
A list of layers on the device where you want each of the model's DynapcnnLayers to be placed.
The index of the core on chip to which the i-th layer in the model is mapped is the value of the i-th entry in the list.
Note: This list should be the same length as the number of dynapcnn layers in your model.
device: String
dynapcnndevkit, speck2b or speck2devkit
monitor_layers: None/List/Str
A list of all layers in the module that you want to monitor. Indexing starts with the first non-dvs layer.
If you want to monitor the dvs layer for eg.
::
monitor_layers = ["dvs"] # If you want to monitor the output of the pre-processing layer
monitor_layers = ["dvs", 8] # If you want to monitor preprocessing and layer 8
monitor_layers = "all" # If you want to monitor all the layers
If this value is left as None, by default the last layer of the model is monitored.
config_modifier:
A user configuration modifier method.
This function can be used to make any custom changes you want to make to the configuration object.
Returns
-------
Configuration object
Object defining the configuration for the device
Bool
True if the configuration is valid for the given device.
Raises
------
ImportError
If samna is not available.
"""
config_builder = ChipFactory(device).get_config_builder()
has_dvs_layer = isinstance(self.sequence[0], DVSLayer)
# Figure out layer ordering
if chip_layers_ordering == "auto":
chip_layers_ordering = config_builder.get_valid_mapping(self)
else:
# Truncate chip_layers_ordering just in case a longer list is passed
if has_dvs_layer:
chip_layers_ordering = chip_layers_ordering[: len(self.sequence) - 1]
chip_layers_ordering = chip_layers_ordering[: len(self.sequence)]
# Save the chip layers
self.chip_layers_ordering = chip_layers_ordering
# Update config
config = config_builder.build_config(self, chip_layers_ordering)
if self.input_shape and self.input_shape[0] == 1:
config.dvs_layer.merge = True
# Check if any monitoring is enabled and if not, enable monitoring for the last layer
if monitor_layers is None:
monitor_layers = [-1]
elif monitor_layers == "all":
num_cnn_layers = len(self.sequence) - int(has_dvs_layer)
monitor_layers = list(range(num_cnn_layers))
# Enable monitors on the specified layers
# Find layers corresponding to the chip
monitor_chip_layers = [
self.find_chip_layer(lyr) for lyr in monitor_layers if lyr != "dvs"
]
if "dvs" in monitor_layers:
monitor_chip_layers.append("dvs")
config_builder.monitor_layers(config, monitor_chip_layers)
# Fix default factory setting to not return input events (UGLY!! Ideally this should happen in samna)
# config.factory_settings.monitor_input_enable = False
# Apply user config modifier
if config_modifier is not None:
config = config_modifier(config)
# Validate config
return config, config_builder.validate_configuration(config)
[docs] def make_config(
self,
chip_layers_ordering: Union[Sequence[int], str] = "auto",
device="dynapcnndevkit:0",
monitor_layers: Optional[Union[List, str]] = None,
config_modifier=None,
):
"""
Prepare and output the `samna` DYNAPCNN configuration for this network.
Parameters
----------
chip_layers_ordering: sequence of integers or `auto`
The order in which the dynapcnn layers will be used. If `auto`,
an automated procedure will be used to find a valid ordering.
A list of layers on the device where you want each of the model's DynapcnnLayers to be placed.
Note: This list should be the same length as the number of dynapcnn layers in your model.
device: String
dynapcnndevkit, speck2b or speck2devkit
monitor_layers: None/List/Str
A list of all layers in the module that you want to monitor. Indexing starts with the first non-dvs layer.
If you want to monitor the dvs layer for eg.
::
monitor_layers = ["dvs"] # If you want to monitor the output of the pre-processing layer
monitor_layers = ["dvs", 8] # If you want to monitor preprocessing and layer 8
monitor_layers = "all" # If you want to monitor all the layers
If this value is left as None, by default the last layer of the model is monitored.
config_modifier:
A user configuration modifier method.
This function can be used to make any custom changes you want to make to the configuration object.
Returns
-------
Configuration object
Object defining the configuration for the device
Raises
------
ImportError
If samna is not available.
ValueError
If the generated configuration is not valid for the specified device.
"""
config, is_compatible = self._make_config(
chip_layers_ordering=chip_layers_ordering,
device=device,
monitor_layers=monitor_layers,
config_modifier=config_modifier,
)
# Validate config
if is_compatible:
print("Network is valid")
return config
else:
raise ValueError(f"Generated config is not valid for {device}")
[docs] def is_compatible_with(self, device_type: str) -> bool:
"""Check if the current model is compatible with a given device.
Args:
device_type (str): Device type ie speck2b, speck2fmodule
Returns:
bool: True if compatible
"""
try:
_, is_compatible = self._make_config(device=device_type)
except ValueError as e:
# Catch "No valid mapping found" error
if e.args[0] == ("No valid mapping found"):
return False
else:
raise e
return is_compatible
[docs] def reset_states(self, randomize=False):
"""
Reset the states of the network.
"""
if hasattr(self, "device") and isinstance(self.device, str):
device_name, _ = parse_device_id(self.device)
if device_name in ChipFactory.supported_devices:
config_builder = ChipFactory(self.device).get_config_builder()
# Set all the vmem states in the samna config to zero
config_builder.reset_states(self.samna_config, randomize=randomize)
self.samna_device.get_model().apply_configuration(self.samna_config)
# wait for the config to be written
time.sleep(1)
# Note: The below shouldn't be necessary ideally
# Erase all vmem memory
if not randomize:
if hasattr(self, "samna_input_graph"):
self.samna_input_graph.stop()
for lyr_idx in self.chip_layers_ordering:
config_builder.set_all_v_mem_to_zeros(
self.samna_device, lyr_idx
)
time.sleep(0.1)
self.samna_input_graph.start()
return
for layer in self.sequence:
if isinstance(layer, DynapcnnLayer):
layer.spk_layer.reset_states(randomize=randomize)
[docs] def find_chip_layer(self, layer_idx):
"""
Given an index of a layer in the model, find the corresponding cnn core id where it is placed
> Note that the layer index does not include the DVSLayer.
> For instance your model comprises two layers [DVSLayer, DynapcnnLayer],
> then the index of DynapcnnLayer is 0 and not 1.
Parameters
----------
layer_idx: int
Index of a layer
Returns
-------
chip_lyr_idx: int
Index of the layer on the chip where the model layer is placed.
"""
# Compute the expected number of cores
num_cores_required = len(self.sequence)
if isinstance(self.sequence[0], DVSLayer):
num_cores_required -= 1
if len(self.chip_layers_ordering) != num_cores_required:
raise Exception(
f"Number of layers specified in chip_layers_ordering {self.chip_layers_ordering} does not correspond to the number of cores required for this model {num_cores_required}"
)
return self.chip_layers_ordering[layer_idx]
[docs] def forward(self, x):
if (
hasattr(self, "device")
and parse_device_id(self.device)[0] in ChipFactory.supported_devices
):
_ = self.samna_output_buffer.get_events() # Flush buffer
# NOTE: The code to start and stop time stamping is device specific
reset_timestamps(self.device)
enable_timestamps(self.device)
# Send input
self.samna_input_buffer.write(x)
received_evts = []
# Record at least until the last event has been replayed
min_duration = max(event.timestamp for event in x) * 1e-6
time.sleep(min_duration)
# Keep recording if more events are being registered
while True:
prev_length = len(received_evts)
time.sleep(0.1)
received_evts.extend(self.samna_output_buffer.get_events())
if prev_length == len(received_evts):
break
# Disable timestamp
disable_timestamps(self.device)
return received_evts
else:
"""Torch's forward pass."""
return self.sequence(x)
[docs] def memory_summary(self):
"""
Get a summary of the network's memory requirements
Returns
-------
dict:
A dictionary with keys kernel, neuron, bias.
The values are a list of the corresponding number per layer in the same order as the model
"""
summary = {}
dynapcnn_layers = [
lyr for lyr in self.sequence if isinstance(lyr, DynapcnnLayer)
]
summary.update({k: list() for k in dynapcnn_layers[0].memory_summary().keys()})
for lyr in dynapcnn_layers:
lyr_summary = lyr.memory_summary()
for k, v in lyr_summary.items():
summary[k].append(v)
return summary
[docs] def zero_grad(self, set_to_none: bool = False) -> None:
for lyr in self.sequence:
lyr.zero_grad(set_to_none)
def __del__(self):
# Stop the input graph
if hasattr(self, "device_input_graph") and self.device_input_graph:
self.device_input_graph.stop()
# Stop the output graph.
if hasattr(self, "device_output_graph") and self.device_output_graph:
self.device_output_graph.stop()
[docs]class DynapcnnCompatibleNetwork(DynapcnnNetwork):
"""Deprecated class, use DynapcnnNetwork instead."""
def __init__(
self,
snn: Union[nn.Sequential, sinabs.Network],
input_shape: Optional[Tuple[int, int, int]] = None,
dvs_input: bool = False,
discretize: bool = True,
):
from warnings import warn
warn(
"DynapcnnCompatibleNetwork has been renamed to DynapcnnNetwork "
+ "and will be removed in a future release."
)
super().__init__(snn, input_shape, dvs_input, discretize)