Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 32 additions & 1 deletion src/seabreeze/pyseabreeze/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from seabreeze.pyseabreeze import features as sbf
from seabreeze.pyseabreeze.exceptions import SeaBreezeError
from seabreeze.pyseabreeze.features import SeaBreezeFeature
from seabreeze.pyseabreeze.protocol import ADCProtocol
from seabreeze.pyseabreeze.protocol import OBPProtocol
from seabreeze.pyseabreeze.protocol import OOIProtocol
from seabreeze.pyseabreeze.transport import USBTransport
Expand Down Expand Up @@ -413,7 +414,7 @@ def get_serial_number(self) -> str:
except RuntimeError:
raise SeaBreezeError("device not open")

if isinstance(protocol, OOIProtocol):
if isinstance(protocol, OOIProtocol) or isinstance(protocol, ADCProtocol):
# The serial is stored in slot 0
return self.f.eeprom.eeprom_read_slot(0)

Expand Down Expand Up @@ -1169,3 +1170,33 @@ class HDX(SeaBreezeDevice):
sbf.rawusb.SeaBreezeRawUSBBusAccessFeature,
sbf.nonlinearity.NonlinearityCoefficientsFeatureOBP,
)


class ADC1000USB(SeaBreezeDevice):

model_name = "ADC1000-USB"

# communication config
transport = (USBTransport,)
usb_product_id = 0x1004
usb_endpoint_map = EndPointMap(ep_out=0x02, lowspeed_in=0x87, highspeed_in=0x82)
usb_protocol = ADCProtocol

# spectrometer config
dark_pixel_indices = DarkPixelIndices.from_ranges((2, 24))
integration_time_min = 1000
integration_time_max = 655350000
integration_time_base = 1
spectrum_num_pixel = 2048
spectrum_raw_length = 2048 * 2 # XXX: No Sync byte!
spectrum_max_value = 65535
trigger_modes = TriggerMode.supported(
"NORMAL", "SOFTWARE", "SYNCHRONIZATION", "HARDWARE"
)

# features
feature_classes = (
sbf.eeprom.SeaBreezeEEPromFeatureADC,
sbf.spectrometer.SeaBreezeSpectrometerFeatureADC,
sbf.rawusb.SeaBreezeRawUSBBusAccessFeature,
)
39 changes: 39 additions & 0 deletions src/seabreeze/pyseabreeze/features/eeprom.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from seabreeze.pyseabreeze.exceptions import SeaBreezeError
from seabreeze.pyseabreeze.features._base import SeaBreezeFeature
from seabreeze.pyseabreeze.protocol import ADCProtocol
from seabreeze.pyseabreeze.protocol import OOIProtocol
from seabreeze.pyseabreeze.types import PySeaBreezeProtocol

Expand Down Expand Up @@ -47,3 +48,41 @@ def _func_eeprom_read_slot(
if not strip_zero_bytes:
return data
return data.rstrip("\x00")


# ADC spectrometer interface implementation
# =========================================
#
class SeaBreezeEEPromFeatureADC(SeaBreezeEEPROMFeature):
_required_protocol_cls = ADCProtocol

def eeprom_read_slot(self, slot_number: int, strip_zero_bytes: bool = False) -> str:
return self._func_eeprom_read_slot(
self.protocol, slot_number, strip_zero_bytes=strip_zero_bytes
)

@staticmethod
def _func_eeprom_read_raw(protocol: PySeaBreezeProtocol, slot_number: int) -> bytes:
protocol.send(0x05, slot_number)
ret = protocol.receive(size=17, mode="low_speed")
if ret[0] != 0x05 or ret[1] != int(slot_number) % 0xFF:
raise SeaBreezeError(f"read_eeprom_slot_raw: wrong answer: {ret!r}")
return ret

@classmethod
def _func_eeprom_read_slot(
cls,
protocol: PySeaBreezeProtocol,
slot_number: int,
*,
strip_zero_bytes: bool = False,
) -> str:
ret = cls._func_eeprom_read_raw(protocol, slot_number)
try:
end = ret[2:].index(0) + 2
except ValueError:
end = len(ret) - 1
data = ret[2:end].decode("utf-8")
if not strip_zero_bytes:
return data
return data.rstrip("\x00")
20 changes: 17 additions & 3 deletions src/seabreeze/pyseabreeze/features/spectrometer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
from seabreeze.pyseabreeze.exceptions import SeaBreezeError
from seabreeze.pyseabreeze.exceptions import SeaBreezeNotSupported
from seabreeze.pyseabreeze.features._base import SeaBreezeFeature
from seabreeze.pyseabreeze.features.eeprom import SeaBreezeEEPromFeatureADC
from seabreeze.pyseabreeze.features.eeprom import SeaBreezeEEPromFeatureOOI
from seabreeze.pyseabreeze.protocol import ADCProtocol
from seabreeze.pyseabreeze.protocol import OBPProtocol
from seabreeze.pyseabreeze.protocol import OOIProtocol
from seabreeze.pyseabreeze.transport import USBTransport
Expand Down Expand Up @@ -78,6 +80,7 @@ class SeaBreezeSpectrometerFeatureOOI(SeaBreezeSpectrometerFeature):
)

_normalization_value = 1.0
_eeprom_cls = SeaBreezeEEPromFeatureOOI

# config
_dark_pixel_indice: tuple[int, ...]
Expand Down Expand Up @@ -137,9 +140,7 @@ def get_wavelengths(self) -> NDArray[np.float_]:
for i in range(1, 5):
# noinspection PyProtectedMember
coeffs.append(
float(
SeaBreezeEEPromFeatureOOI._func_eeprom_read_slot(self.protocol, i)
)
float(self._eeprom_cls._func_eeprom_read_slot(self.protocol, i))
)
return sum(wl * (indices**i) for i, wl in enumerate(coeffs)) # type: ignore

Expand Down Expand Up @@ -608,3 +609,16 @@ def _get_spectrum_raw(self) -> NDArray[np.uint8]:
# requires addition of a new message type in protocol to work
datastring = self.protocol.query(0x00101000, timeout_ms=timeout)
return numpy.frombuffer(datastring, dtype=numpy.uint8)


class SeaBreezeSpectrometerFeatureADC(SeaBreezeSpectrometerFeatureOOI):
_required_protocol_cls = ADCProtocol
_eeprom_cls = SeaBreezeEEPromFeatureADC

def get_intensities(self) -> NDArray[np.float_]:
tmp = self._get_spectrum_raw()
ret = numpy.array(
struct.unpack("<" + "H" * self._spectrum_length, tmp),
dtype=numpy.double,
)
return ret * self._normalization_value
131 changes: 131 additions & 0 deletions src/seabreeze/pyseabreeze/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -575,3 +575,134 @@ def _extract_message_data(self, msg: bytes) -> bytes:
return payload
else:
return b""


class ADCProtocol(PySeaBreezeProtocol):

msgs = {
code: functools.partial(struct.Struct(msg).pack, code)
for code, msg in {
0x01: "<B", # Reset
0x02: "<BH", # Set Integration Time
0x03: "<BH", # Set Strobe Enable Status
0x05: "<BB", # Query Information
0x06: "<B16s", # Write Information
0x07: "<B16s", # Write Serial Number
0x08: "<B", # Get Serial Number
0x09: "<B", # Request Spectra
0x0A: "<BH", # Set Trigger Mode
0x0B: "<BH ", # Set Spectrometer Channel
0x0C: "<BH", # Set Continuous Strobe Rate
0x0D: "<BH", # Set Master Clock Rate
}.items()
} # add more here if you implement new features

def __init__(self, transport: PySeaBreezeTransport[Any]) -> None:
super().__init__(transport)
# initialize the spectrometer
self.send(0x01)
time.sleep(0.1) # wait shortly after init command

def send(
self,
msg_type: int,
payload: tuple[float | int | str, ...] | str | int | float = (),
timeout_ms: int | None = None,
**kwargs: str | int | None,
) -> int:
"""send a ooi message to the spectrometer

Parameters
----------
msg_type : int
a message type as defined in `OOIProtocol.msgs`
payload : optional
dependent on `msg_type`. a singe value or a tuple of multiple values
timeout_ms : int, optional
the timeout after which the transport layer should error.
`None` means no timeout (default)
**kwargs :
ignored and only present to provide compatible caller interfaces

Returns
-------
bytes_written : int
the number of bytes sent
"""
if kwargs:
warnings.warn(f"kwargs provided but ignored: {kwargs}")
payload = payload if isinstance(payload, (tuple, list)) else (payload,)
data = self.msgs[msg_type](*payload)
return self.transport.write(data, timeout_ms=timeout_ms)

def receive(
self,
size: int | None = None,
timeout_ms: int | None = None,
mode: str | None = None,
**kwargs: Any,
) -> bytes:
"""receive data from the spectrometer

Parameters
----------
size:
number of bytes to receive. if `None` (default) uses the
default size as specified in the transport layer.
timeout_ms:
the timeout after which the transport layer should error.
`None` means no timeout (default)
mode:
transport layers can support different modes
(i.e. {'low_speed', 'high_speed', 'high_speed_alt'} in the usb case)
kwargs:
ignored and only present to provide compatible caller interfaces

Returns
-------
data:
data returned from the spectrometer
"""
if kwargs:
warnings.warn(f"kwargs provided but ignored: {kwargs}")
return self.transport.read(
size=size, timeout_ms=timeout_ms, mode=mode, **kwargs
)

def query(
self,
msg_type: int,
payload: tuple[int | str | float, ...] | str | int | float = (),
timeout_ms: int | None = None,
size: int | None = None,
mode: str | None = None,
**kwargs: str | int | None,
) -> bytes:
"""convenience method combining send and receive

Parameters
----------
msg_type:
a message type as defined in `ADCProtocol.msgs`
payload:
the payload to be sent. Can be a singe value or a tuple, dependent
`msg_type`.
size:
number of bytes to receive. if `None` (default) uses the
default size as specified in the transport layer.
timeout_ms:
the timeout after which the transport layer should error.
`None` means no timeout (default)
mode:
transport layers can support different modes
(i.e. {'low_speed', 'high_speed', 'high_speed_alt'} in the usb case)
kwargs:
ignored and only present to provide compatible caller interfaces

Returns
-------
data:
data returned from the spectrometer
"""
self.send(msg_type, payload, timeout_ms=timeout_ms)
return self.receive(size=size, timeout_ms=timeout_ms, mode=mode, **kwargs)