diff --git a/crg.py b/crg.py index 23466c2..eeccd89 100644 --- a/crg.py +++ b/crg.py @@ -10,13 +10,27 @@ def __init__(self, cd, async_reset): rst_meta = Signal() rst_unbuf = Signal() self.specials += [ - Instance("FDPE", p_INIT=1, i_D=0, i_PRE=async_reset, - i_CE=1, i_C=cd.clk, o_Q=rst_meta, - attr={"async_reg", "ars_ff1"}), - Instance("FDPE", p_INIT=1, i_D=rst_meta, i_PRE=async_reset, - i_CE=1, i_C=cd.clk, o_Q=rst_unbuf, - attr={"async_reg", "ars_ff2"}), - Instance("BUFG", i_I=rst_unbuf, o_O=cd.rst) + Instance( + "FDPE", + p_INIT=1, + i_D=0, + i_PRE=async_reset, + i_CE=1, + i_C=cd.clk, + o_Q=rst_meta, + attr={"async_reg", "ars_ff1"}, + ), + Instance( + "FDPE", + p_INIT=1, + i_D=rst_meta, + i_PRE=async_reset, + i_CE=1, + i_C=cd.clk, + o_Q=rst_unbuf, + attr={"async_reg", "ars_ff2"}, + ), + Instance("BUFG", i_I=rst_unbuf, o_O=cd.rst), ] @@ -30,17 +44,19 @@ def __init__(self, platform, link=None): self.clock_domains.cd_clk125 = ClockDomain(reset_less=True) clk125 = platform.request("clk125_gtp") - platform.add_period_constraint(clk125, 8.) + platform.add_period_constraint(clk125, 8.0) self.clk125 = Signal() self.clk125_div2 = Signal() self.specials += [ - Instance("IBUFDS_GTE2", + Instance( + "IBUFDS_GTE2", i_CEB=0, - i_I=clk125.p, i_IB=clk125.n, + i_I=clk125.p, + i_IB=clk125.n, o_O=self.clk125, - o_ODIV2=self.clk125_div2), - Instance("BUFG", - i_I=self.clk125, o_O=self.cd_clk125.clk), + o_ODIV2=self.clk125_div2, + ), + Instance("BUFG", i_I=self.clk125, o_O=self.cd_clk125.clk), ] if link is not None: self.clock_domains.cd_link = ClockDomain(reset_less=True) @@ -54,27 +70,42 @@ def __init__(self, platform, link=None): clk200 = Signal() delay_rdy = Signal() self.specials += [ - Instance("MMCME2_BASE", + Instance( + "MMCME2_BASE", p_BANDWIDTH="LOW", - p_CLKIN1_PERIOD=8. if link is None else 4.*8, - p_CLKFBOUT_MULT_F=8 if link is None else 4*8, + p_CLKIN1_PERIOD=8.0 if link is None else 4.0 * 8, + p_CLKFBOUT_MULT_F=8 if link is None else 4 * 8, p_DIVCLK_DIVIDE=1, i_CLKIN1=self.cd_clk125.clk if link is None else link, - i_CLKFBIN=self.cd_fb.clk, o_CLKFBOUT=fb, + i_CLKFBIN=self.cd_fb.clk, + o_CLKFBOUT=fb, o_LOCKED=locked, # p_CLKOUT0_DIVIDE_F=4, p_CLKOUT0_PHASE=0, o_CLKOUT0=sys, - p_CLKOUT1_DIVIDE=2, p_CLKOUT1_PHASE=0, o_CLKOUT1=sys2, - p_CLKOUT2_DIVIDE=2, p_CLKOUT2_PHASE=90, o_CLKOUT2=sys2q, - p_CLKOUT3_DIVIDE=4, p_CLKOUT3_PHASE=0, o_CLKOUT3=sys, - p_CLKOUT4_DIVIDE=5, p_CLKOUT4_PHASE=0, o_CLKOUT4=clk200, + p_CLKOUT1_DIVIDE=2, + p_CLKOUT1_PHASE=0, + o_CLKOUT1=sys2, + p_CLKOUT2_DIVIDE=2, + p_CLKOUT2_PHASE=90, + o_CLKOUT2=sys2q, + p_CLKOUT3_DIVIDE=4, + p_CLKOUT3_PHASE=0, + o_CLKOUT3=sys, + p_CLKOUT4_DIVIDE=5, + p_CLKOUT4_PHASE=0, + o_CLKOUT4=clk200, ), Instance("BUFG", i_I=fb, o_O=self.cd_fb.clk), Instance("BUFG", i_I=sys, o_O=self.cd_sys.clk), Instance("BUFG", i_I=sys2, o_O=self.cd_sys2.clk), Instance("BUFG", i_I=sys2q, o_O=self.cd_sys2q.clk), Instance("BUFG", i_I=clk200, o_O=self.cd_clk200.clk), - Instance("IDELAYCTRL", - i_REFCLK=self.cd_clk200.clk, i_RST=~locked, o_RDY=delay_rdy), + Instance( + "IDELAYCTRL", + i_REFCLK=self.cd_clk200.clk, + i_RST=~locked, + o_RDY=delay_rdy, + ), ] self.submodules += AsyncResetSynchronizerBUFG( - self.cd_sys, ~(locked & delay_rdy)) + self.cd_sys, ~(locked & delay_rdy) + ) diff --git a/dac_data.py b/dac_data.py index 4cfdd35..9eeccb3 100644 --- a/dac_data.py +++ b/dac_data.py @@ -16,14 +16,13 @@ def parity(*x): # synchronize frame marker to frame processing and to data_sync and thus to # ISTR + class DacData(Module): def __init__(self, pins, swap=((0, 3), (1, 8))): self.data_sync = Signal() # at most every 8 samples (4 sys cycles) self.sync_dly = Signal(max=8, reset_less=True) # format as in the DS: A0:C0, B0:D0, A1:C1, B1:D1 - self.data = [[ - Signal(16, reset_less=True) for _ in range(2) - ] for _ in range(4)] + self.data = [[Signal(16, reset_less=True) for _ in range(2)] for _ in range(4)] self.istr = Signal(reset_less=True) # buffer for parity calculation @@ -42,12 +41,14 @@ def __init__(self, pins, swap=((0, 3), (1, 8))): self.sync += [ i.eq(i + 1), sync.eq(sync[2:]), - If(self.data_sync, + If( + self.data_sync, i.eq(0), - sync.eq(0xf << self.sync_dly), + sync.eq(0xF << self.sync_dly), ), self.istr.eq(0), - If(i == 4 - 1, + If( + i == 4 - 1, self.istr.eq(1), ), ] @@ -62,8 +63,7 @@ def __init__(self, pins, swap=((0, 3), (1, 8))): self._oserdes([sync[0], sync[0], sync[1], sync[1]], pins.sync_p, pins.sync_n) # ISTR for write pointer - self._oserdes([self.istr, 0, 0, 0], - pins.istr_parityab_p, pins.istr_parityab_n) + self._oserdes([self.istr, 0, 0, 0], pins.istr_parityab_p, pins.istr_parityab_n) # 32 bit parity self._oserdes(par, pins.paritycd_p, pins.paritycd_n) @@ -72,9 +72,9 @@ def __init__(self, pins, swap=((0, 3), (1, 8))): # not needed with SYNC+N-div+PLL generated internal OSTR # self._oserdes([0, 0, 0, 0], pins.ostr_p, pins.ostr_n) - for i_port, port in enumerate([ - (pins.data_a_p, pins.data_a_n), - (pins.data_b_p, pins.data_b_n)]): + for i_port, port in enumerate( + [(pins.data_a_p, pins.data_a_n), (pins.data_b_p, pins.data_b_n)] + ): for i_pin, pin in enumerate(zip(*port)): bits = [words[i_word][i_port][i_pin] for i_word in range(4)] if (i_port, i_pin) in swap: # sinara-hw/Phaser#102 @@ -85,15 +85,25 @@ def __init__(self, pins, swap=((0, 3), (1, 8))): def _oserdes(self, data, pin_p, pin_n, clk="sys2", attr=set()): pin = Signal() self.specials += [ - Instance("OSERDESE2", attr=attr, - p_DATA_RATE_OQ="DDR", p_DATA_RATE_TQ="BUF", - p_DATA_WIDTH=4, p_TRISTATE_WIDTH=1, + Instance( + "OSERDESE2", + attr=attr, + p_DATA_RATE_OQ="DDR", + p_DATA_RATE_TQ="BUF", + p_DATA_WIDTH=4, + p_TRISTATE_WIDTH=1, i_RST=ResetSignal(), i_CLK=ClockSignal(clk), i_CLKDIV=ClockSignal(), # LSB first, D1 is closest to Q - i_D1=data[0], i_D2=data[1], i_D3=data[2], i_D4=data[3], - i_TCE=1, i_OCE=1, i_T1=0, - o_OQ=pin), + i_D1=data[0], + i_D2=data[1], + i_D3=data[2], + i_D4=data[3], + i_TCE=1, + i_OCE=1, + i_T1=0, + o_OQ=pin, + ), DifferentialOutput(pin, pin_p, pin_n), ] diff --git a/decode.py b/decode.py index 25b8fc8..84f142c 100644 --- a/decode.py +++ b/decode.py @@ -6,12 +6,7 @@ from interpolate import SampleMux, InterpolateChannel -header_layout = [ - ("we", 1), - ("addr", 7), - ("data", 8), - ("type", 4) -] +header_layout = [("we", 1), ("addr", 7), ("data", 8), ("type", 4)] # straming gearbox # @@ -20,11 +15,13 @@ # 66666666666666666666666666666665666666656666666566666665666666656666666566666660 # 4 2 0 4 2 0 4 2 0 4 2 0 4 + class SampleGearbox(Module): """Variable width input uneven ratio gearbox (e.g. 5/6 to 7) `data_width <= sample_width` """ + def __init__(self, data_width, sample_width): self.data = Signal(data_width, reset_less=True) self.data_short = Signal() # disregard data lsb @@ -39,37 +36,43 @@ def __init__(self, data_width, sample_width): outgoing = Signal(max=sample_width + 1) full = Signal() self.comb += [ - If(self.data_stb, - If(self.data_short, - incoming.eq(data_width - 1), - ).Else( + If( + self.data_stb, + If(self.data_short, incoming.eq(data_width - 1),).Else( incoming.eq(data_width), ), ).Else( incoming.eq(0), ), full.eq(level >= sample_width), - If(full, - outgoing.eq(sample_width), - ).Else( + If(full, outgoing.eq(sample_width),).Else( outgoing.eq(0), ), ] self.sync += [ - If(self.data_stb, - buf.eq(Mux(self.data_short, - Cat(self.data[1:], buf), - Cat(self.data, buf), - )), + If( + self.data_stb, + buf.eq( + Mux( + self.data_short, + Cat(self.data[1:], buf), + Cat(self.data, buf), + ) + ), ), self.sample_stb.eq(full), - If(full, - self.sample.eq(Case(level, { - sample_width + i: buf[i:] for i in range(data_width - 1) - })), + If( + full, + self.sample.eq( + Case( + level, + {sample_width + i: buf[i:] for i in range(data_width - 1)}, + ) + ), ), level.eq(level + incoming - outgoing), - If(self.clr, + If( + self.clr, level.eq(0), ), ] @@ -86,6 +89,7 @@ def __init__(self, data_width, sample_width): class Register(Module): """Configuration/status register""" + def __init__(self, width=None, read=True, write=True, readback=True): self.bus = Record(bus_layout) if width is None: @@ -115,8 +119,9 @@ def __init__(self): def _check_intersection(self, adr, mask): for _, b_adr, b_mask in self._slaves: if intersection((b_adr, b_mask), (adr, mask)): - raise ValueError("{} intersects {}".format( - (adr, mask), (b_adr, b_mask))) + raise ValueError( + "{} intersects {}".format((adr, mask), (b_adr, b_mask)) + ) def connect(self, bus, adr, mask): adr &= mask @@ -129,9 +134,7 @@ def connect(self, bus, adr, mask): bus.dat_w.eq(self.bus.dat_w), bus.we.eq(self.bus.we & stb), bus.re.eq(self.bus.re & stb), - If(stb, - self.bus.dat_r.eq(bus.dat_r) - ) + If(stb, self.bus.dat_r.eq(bus.dat_r)), ] @@ -139,10 +142,11 @@ class Decode(Module): """Decode a frame into samples and metadata and drive a bus of registers from the metadata. """ + def __init__(self, b_sample, n_channel, n_mux, t_frame): - n_samples = n_mux*n_channel*2 + n_samples = n_mux * n_channel * 2 header = Record(header_layout) - body = Signal(n_samples*b_sample) + body = Signal(n_samples * b_sample) self.frame = Signal(len(body) + len(header)) self.stb = Signal() self.response = Signal(8) @@ -151,16 +155,15 @@ def __init__(self, b_sample, n_channel, n_mux, t_frame): ] self.submodules.zoh = SampleMux( - b_sample=b_sample, n_channel=n_channel, n_mux=n_mux, - t_frame=t_frame) + b_sample=b_sample, n_channel=n_channel, n_mux=n_mux, t_frame=t_frame + ) self.comb += [ self.zoh.body.eq(body), self.zoh.body_stb.eq(self.stb & (header.type == 1)), ] self.interpolate = [] - self.data = [[Record(complex(16)) for _ in range(n_channel)] - for _ in range(2)] + self.data = [[Record(complex(16)) for _ in range(n_channel)] for _ in range(2)] for ch in range(n_channel): for iq in "iq": inter = InterpolateChannel() @@ -194,7 +197,7 @@ def map_registers(self, registers): assert name not in self.registers self.registers[name] = regs for i, reg in enumerate(regs): - self.bus.connect(reg.bus, addr, mask=0x7f) + self.bus.connect(reg.bus, addr, mask=0x7F) assert addr not in self.mem_map self.mem_map[addr] = (name, i) self.submodules += reg diff --git a/example.py b/example.py index 76a6032..306c45b 100644 --- a/example.py +++ b/example.py @@ -3,6 +3,7 @@ # This is a volatile test script to exercise and evaluate some functionality of # Phaser through ARTIQ. + class Phaser(EnvExperiment): def build(self): self.setattr_device("core") @@ -10,7 +11,7 @@ def build(self): @rpc(flags={"async"}) def p(self, *p): - print([hex(_ & 0xffffffff) for _ in p]) + print([hex(_ & 0xFFFFFFFF) for _ in p]) def run(self): self.do() @@ -29,31 +30,33 @@ def inner(self): f.init(debug=True) for ch in range(2): - f.channel[ch].set_att(0*dB) + f.channel[ch].set_att(0 * dB) # f.channel[ch].set_duc_frequency_mu(0) - f.channel[ch].set_duc_frequency(190.598551*MHz) - f.channel[ch].set_duc_phase(.25) + f.channel[ch].set_duc_frequency(190.598551 * MHz) + f.channel[ch].set_duc_phase(0.25) f.channel[ch].set_duc_cfg(select=0, clr=0) - delay(.1*ms) + delay(0.1 * ms) for osc in range(5): - ftw = (osc + 1)*1.875391*MHz - asf = (osc + 1)*.066 - #if osc != 4: + ftw = (osc + 1) * 1.875391 * MHz + asf = (osc + 1) * 0.066 + # if osc != 4: # asf = 0. - #else: + # else: # asf = .9 # ftw = 9.5*MHz # f.channel[ch].oscillator[osc].set_frequency_mu(0) f.channel[ch].oscillator[osc].set_frequency(ftw) - delay(.1*ms) - f.channel[ch].oscillator[osc].set_amplitude_phase(asf, phase=.25, clr=0) - delay(.1*ms) + delay(0.1 * ms) + f.channel[ch].oscillator[osc].set_amplitude_phase( + asf, phase=0.25, clr=0 + ) + delay(0.1 * ms) f.duc_stb() for ch in range(2): for addr in range(8): r = f.channel[ch].trf_read(addr) - delay(.1*ms) + delay(0.1 * ms) self.p(r) self.core.break_realtime() diff --git a/interpolate.py b/interpolate.py index 7adaa34..0c6621e 100644 --- a/interpolate.py +++ b/interpolate.py @@ -7,6 +7,7 @@ from mac_hbf_upsampler import MAC_HBF_Upsampler from mac_sym_fir import MAC_SYM_FIR + class SampleMux(Module): """Zero order hold interpolator. @@ -15,41 +16,47 @@ class SampleMux(Module): * `n_mux`: samples in a frame * `t_frame`: clock cycles per frame """ + def __init__(self, b_sample, n_channel, n_mux, t_frame): n_interp, n_rest = divmod(t_frame, n_mux) assert n_rest == 0 - self.body = Signal(n_mux*n_channel*2*b_sample) + self.body = Signal(n_mux * n_channel * 2 * b_sample) self.body_stb = Signal() self.sample = [Record(complex(b_sample)) for _ in range(n_channel)] self.sample_stb = Signal() # frame body shift register - samples = [Signal(n_channel*2*b_sample, reset_less=True) - for _ in range(n_mux)] + samples = [ + Signal(n_channel * 2 * b_sample, reset_less=True) for _ in range(n_mux) + ] assert len(Cat(samples)) == len(self.body) i_interp = Signal(max=n_interp, reset_less=True) # interpolation self.comb += [ # early sample is most significant Cat([(_.i[-b_sample:], _.q[-b_sample:]) for _ in self.sample]).eq( - samples[-1]) + samples[-1] + ) ] self.sync += [ i_interp.eq(i_interp - 1), self.sample_stb.eq(0), - If(i_interp == 0, + If( + i_interp == 0, Cat(samples[1:]).eq(Cat(samples)), i_interp.eq(n_interp - 1), self.sample_stb.eq(1), ), - If(self.body_stb, + If( + self.body_stb, Cat(samples).eq(self.body), i_interp.eq(n_interp - 1), self.sample_stb.eq(1), - ) + ), ] class MiniFIFO(Module): """Minimal FIFO buffer, unit capacity""" + def __init__(self, width): self.input = Endpoint([("data", width)]) self.output = Endpoint([("data", width)]) @@ -59,13 +66,15 @@ def __init__(self, width): self.input.ack.eq(~self.output.stb | self.output.ack), ] self.sync += [ - If(self.output.stb & self.output.ack, + If( + self.output.stb & self.output.ack, self.output.stb.eq(0), ), - If(self.input.stb & self.input.ack, + If( + self.input.stb & self.input.ack, self.output.data.eq(self.input.data), self.output.stb.eq(1), - ) + ), ] @@ -74,14 +83,70 @@ def __init__(self): h_fir = [24, -85, 281, -1314, 55856, -1314, 281, -85, 24] # ciccomp: cic droop and gain, rate 1/10, gain 2**9/5**4 ~ 0.9, 9 taps self.submodules.ciccomp = MAC_SYM_FIR(h_fir, width_d=24, width_coef=16) - h_hbf0 = [-167, 0, 428, 0, -931, 0, 1776, 0, -3115, 0, 5185, 0, -8442, 0, - 14028, 0, -26142, 0, 82873, 131072, 82873, 0, -26142, 0, 14028, - 0, -8442, 0, 5185, 0, -3115, 0, 1776, 0, -931, 0, 428, 0, - -167] + h_hbf0 = [ + -167, + 0, + 428, + 0, + -931, + 0, + 1776, + 0, + -3115, + 0, + 5185, + 0, + -8442, + 0, + 14028, + 0, + -26142, + 0, + 82873, + 131072, + 82873, + 0, + -26142, + 0, + 14028, + 0, + -8442, + 0, + 5185, + 0, + -3115, + 0, + 1776, + 0, + -931, + 0, + 428, + 0, + -167, + ] # hbf1: rate 1/10 -> 1/5, gain=1, 39 taps self.submodules.hbf0 = MAC_HBF_Upsampler(h_hbf0, width_d=24, width_coef=17) - h_hbf1 = [294, 0, -1865, 0, 6869, 0, -20436, 0, 80679, 131072, 80679, 0, - -20436, 0, 6869, 0, -1865, 0, 294] + h_hbf1 = [ + 294, + 0, + -1865, + 0, + 6869, + 0, + -20436, + 0, + 80679, + 131072, + 80679, + 0, + -20436, + 0, + 6869, + 0, + -1865, + 0, + 294, + ] # hbf1: rate 1/5 -> 2/5, gain=1, 19 taps self.submodules.hbf1 = MAC_HBF_Upsampler(h_hbf1, width_d=24, width_coef=17) # cic: rate 2/5 -> 2/1, gain=5**4 @@ -110,13 +175,9 @@ def __init__(self): self.hbf0.output.connect(self.buf0.input), self.buf0.output.connect(self.hbf1.input), self.hbf1.output.connect(self.buf1.input, omit=["data"]), - self.buf1.input.data.eq((self.hbf1.output.data + bias_out) >> - scale_out), + self.buf1.input.data.eq((self.hbf1.output.data + bias_out) >> scale_out), self.buf1.output.connect(self.cic.input), self.cic.output.connect(self.output, omit=["data0", "data1"]), - self.output.data0.eq((self.cic.output.data0 + bias_cic) >> - scale_cic), - self.output.data1.eq((self.cic.output.data1 + bias_cic) >> - scale_cic), + self.output.data0.eq((self.cic.output.data0 + bias_cic) >> scale_cic), + self.output.data1.eq((self.cic.output.data1 + bias_cic) >> scale_cic), ] - diff --git a/link.py b/link.py index 99cea50..6953241 100644 --- a/link.py +++ b/link.py @@ -4,7 +4,7 @@ class Phy(Module): def __init__(self, eem): - mid = int(4e-9/4/78e-12/2) + mid = int(4e-9 / 4 / 78e-12 / 2) self.clk = Signal() # link clock self.ld = Signal() # load delay @@ -16,9 +16,7 @@ def __init__(self, eem): cnt = Signal(5) self.comb += [ - If(self.cnt_out >= mid, - cnt.eq(self.cnt_out - mid), - ).Else( + If(self.cnt_out >= mid, cnt.eq(self.cnt_out - mid),).Else( cnt.eq(self.cnt_out + mid), ) ] @@ -31,12 +29,17 @@ def __init__(self, eem): if i == 0: attr.add(("IBUF_LOW_PWR", "FALSE")) self.specials += [ - Instance("IBUFGDS" if i == 0 else "IBUFDS", attr=attr, + Instance( + "IBUFGDS" if i == 0 else "IBUFDS", + attr=attr, i_I=getattr(eem, "data{}_p".format(i)), i_IB=getattr(eem, "data{}_n".format(i)), - o_O=buf), - Instance("IDELAYE2", - p_IDELAY_TYPE="VAR_LOAD", p_IDELAY_VALUE=0, + o_O=buf, + ), + Instance( + "IDELAYE2", + p_IDELAY_TYPE="VAR_LOAD", + p_IDELAY_VALUE=0, p_SIGNAL_PATTERN="DATA" if i else "CLOCK", i_C=ClockSignal(), i_LD=1 if i else self.ld, @@ -44,35 +47,57 @@ def __init__(self, eem): i_INC=1, i_CNTVALUEIN=self.cnt_out if i else cnt, o_CNTVALUEOUT=Signal() if i else self.cnt_out, - i_IDATAIN=buf, o_DATAOUT=dly), - Instance("ISERDESE2", - p_DATA_RATE="DDR", p_DATA_WIDTH=4, - p_INTERFACE_TYPE="NETWORKING", p_NUM_CE=1, + i_IDATAIN=buf, + o_DATAOUT=dly, + ), + Instance( + "ISERDESE2", + p_DATA_RATE="DDR", + p_DATA_WIDTH=4, + p_INTERFACE_TYPE="NETWORKING", + p_NUM_CE=1, p_IOBDELAY="IFD", - i_D=buf, i_DDLY=dly, + i_D=buf, + i_DDLY=dly, i_BITSLIP=self.bitslip, - i_CLK=ClockSignal("sys2"), i_CLKB=~ClockSignal("sys2"), - i_CLKDIV=ClockSignal(), i_RST=ResetSignal(), i_CE1=1, + i_CLK=ClockSignal("sys2"), + i_CLKB=~ClockSignal("sys2"), + i_CLKDIV=ClockSignal(), + i_RST=ResetSignal(), + i_CE1=1, # MSB first, Q1 is closest to D - o_Q1=data[0], o_Q2=data[1], o_Q3=data[2], o_Q4=data[3], - o_O=thru) + o_Q1=data[0], + o_Q2=data[1], + o_Q3=data[2], + o_Q4=data[3], + o_O=thru, + ), ] if i == 0: self.comb += self.clk.eq(thru) pin = Signal() data = self.miso self.specials += [ - Instance("OSERDESE2", - p_DATA_RATE_OQ="DDR", p_DATA_RATE_TQ="BUF", - p_DATA_WIDTH=4, p_TRISTATE_WIDTH=1, + Instance( + "OSERDESE2", + p_DATA_RATE_OQ="DDR", + p_DATA_RATE_TQ="BUF", + p_DATA_WIDTH=4, + p_TRISTATE_WIDTH=1, i_RST=ResetSignal(), i_CLK=ClockSignal("sys2"), i_CLKDIV=ClockSignal(), # LSB first, D1 is closest to Q - i_D1=data[0], i_D2=data[1], i_D3=data[2], i_D4=data[3], - i_TCE=1, i_OCE=1, i_T1=0, - o_OQ=pin), - Instance("OBUFDS", i_I=pin, o_O=eem.data7_p, o_OB=eem.data7_n) + i_D1=data[0], + i_D2=data[1], + i_D3=data[2], + i_D4=data[3], + i_TCE=1, + i_OCE=1, + i_T1=0, + o_OQ=pin, + ), + Instance("OBUFDS", i_I=pin, o_O=eem.data7_p, o_OB=eem.data7_n), ] @@ -83,6 +108,7 @@ class Slipper(Module): If not, asserts bitslip and enforces latency blocking. Use bit index `width//2` as data. """ + def __init__(self, width): self.data = Signal(width) self.valid = Signal() @@ -110,8 +136,9 @@ class Unframer(Module): * `t_clk` is the clock pattern length * `n_frame` clock cycles per frame """ + def __init__(self, n_data, t_clk, n_frame): - n_marker = n_frame//2 + 1 + n_marker = n_frame // 2 + 1 # clock and data inputs self.data_in = Signal(n_data) @@ -122,30 +149,35 @@ def __init__(self, n_data, t_clk, n_frame): self.end_of_frame = Signal(reset_less=True) # 0b0000111 reset (plus the data[0] LSB) - clk_sr = Signal(t_clk - 1, reset_less=True, - reset=((1 << t_clk//2) - 1) << (t_clk//2 - 1)) + clk_sr = Signal( + t_clk - 1, + reset_less=True, + reset=((1 << t_clk // 2) - 1) << (t_clk // 2 - 1), + ) clk_stb = Signal() self.clk_stb = clk_stb # debug - marker_sr = Signal(n_marker, reset_less=True, - reset=((1 << n_marker - 1) - 1) << 1) + marker_sr = Signal( + n_marker, reset_less=True, reset=((1 << n_marker - 1) - 1) << 1 + ) marker_stb = Signal() self.marker_stb = marker_stb # debug self.comb += [ # clock pattern match (00001111) - clk_stb.eq(Cat(self.data_in[0], clk_sr) == (1 << t_clk//2) - 1), + clk_stb.eq(Cat(self.data_in[0], clk_sr) == (1 << t_clk // 2) - 1), # marker pattern match (000001x) marker_stb.eq(marker_sr == 1), ] self.sync += [ clk_sr.eq(Cat(self.data_in[0], clk_sr)), - If(clk_stb, + If( + clk_stb, marker_sr.eq(Cat(self.data_in[1], marker_sr)), ), - #If(~self.data_in_stb, + # If(~self.data_in_stb, # clk_sr.eq(clk_sr.reset), # marker_sr.eq(marker_sr.reset), - #), + # ), self.data_out_stb.eq(self.data_in_stb), self.data_out.eq(self.data_in[1:]), self.end_of_frame.eq(clk_stb & marker_stb), @@ -154,28 +186,30 @@ def __init__(self, n_data, t_clk, n_frame): class Checker(Module): """Check CRC and assemble a frame""" + def __init__(self, n_data, t_clk, n_frame): - n_word = n_data*t_clk - n_marker = n_frame//2 + 1 + n_word = n_data * t_clk + n_marker = n_frame // 2 + 1 n_crc = n_data self.data = Signal(n_data) self.data_stb = Signal() self.end_of_frame = Signal() - self.frame = Signal(n_word*n_frame - n_marker - n_crc) + self.frame = Signal(n_word * n_frame - n_marker - n_crc) self.frame_stb = Signal() self.crc_err = Signal(8) # response bitstream self.miso = Signal(reset_less=True) # response data, latched on end_of_frame - self.response = Signal(n_frame*t_clk, reset_less=True) + self.response = Signal(n_frame * t_clk, reset_less=True) poly = { # 6: 0x27, # CRC-6-CDMA2000-A - 6: 0x2f, # CRC-6-GSM + 6: 0x2F, # CRC-6-GSM }[n_data] self.submodules.crc = LiteEthMACCRCEngine( - data_width=n_data, width=n_data, polynom=poly) + data_width=n_data, width=n_data, polynom=poly + ) self.crc.last.reset_less = True crc_good = Signal() crc = Signal.like(self.crc.last, reset_less=True) @@ -187,15 +221,17 @@ def __init__(self, n_data, t_clk, n_frame): ] self.sync += [ self.crc.last.eq(self.crc.next), - If(self.end_of_frame | ~self.data_stb, + If( + self.end_of_frame | ~self.data_stb, self.crc.last.eq(0), - If(~crc_good, + If( + ~crc_good, self.crc_err.eq(self.crc_err + 1), ), ), ] - frame_buf = Signal(n_word*n_frame, reset_less=True) + frame_buf = Signal(n_word * n_frame, reset_less=True) self.sync += [ frame_buf.eq(Cat(self.data, frame_buf)), self.frame_stb.eq(self.end_of_frame & crc_good & self.data_stb), @@ -209,7 +245,7 @@ def __init__(self, n_data, t_clk, n_frame): offset = 1 else: offset = 0 - frame_parts.append(frame_buf[i*n_word + offset: (i + 1)*n_word]) + frame_parts.append(frame_buf[i * n_word + offset : (i + 1) * n_word]) assert len(Cat(frame_parts)) == len(self.frame) self.comb += self.frame.eq(Cat(frame_parts)) @@ -218,9 +254,10 @@ def __init__(self, n_data, t_clk, n_frame): self.comb += self.miso.eq(response_sr[-1]) self.sync += [ response_sr[1:].eq(response_sr), - If(self.frame_stb, + If( + self.frame_stb, response_sr.eq(self.response), - ) + ), ] @@ -230,6 +267,7 @@ class Link(Module): * Like the Fastino link but with 8 bits per clock cycle * 1 clock lane, 6 phaser input data lanes, 1 phaser output data lane """ + def __init__(self, eem): self.submodules.phy = Phy(eem) n_serde = len(self.phy.data[0]) @@ -238,12 +276,10 @@ def __init__(self, eem): self.slip.data.eq(self.phy.data[0]), # clk self.phy.bitslip.eq(self.slip.bitslip), ] - self.submodules.unframe = Unframer( - n_data=7, n_frame=10, t_clk=8) + self.submodules.unframe = Unframer(n_data=7, n_frame=10, t_clk=8) self.comb += [ self.unframe.data_in_stb.eq(self.slip.valid), - self.unframe.data_in.eq(Cat([ - d[n_serde//2 - 1] for d in self.phy.data])), + self.unframe.data_in.eq(Cat([d[n_serde // 2 - 1] for d in self.phy.data])), ] self.submodules.checker = Checker(n_data=6, n_frame=10, t_clk=8) self.comb += [ @@ -257,41 +293,52 @@ def __init__(self, eem): class Test(Module): def __init__(self, platform): eem = platform.request("eem", 0) - platform.add_period_constraint(eem.data0_p, 4.*8) + platform.add_period_constraint(eem.data0_p, 4.0 * 8) self.submodules.link = Link(eem) self.submodules.crg = CRG(platform, link=self.link.phy.clk) if True: - platform.add_false_path_constraint( - eem.data0_p, self.crg.cd_sys2.clk) + platform.add_false_path_constraint(eem.data0_p, self.crg.cd_sys2.clk) else: for i in range(1, 7): pin = getattr(eem, "data{}_p".format(i)) platform.add_platform_command( "set_input_delay -0.25 -min -clock " - "[get_generated_clocks sys2] [get_ports {pin}]", pin=pin) + "[get_generated_clocks sys2] [get_ports {pin}]", + pin=pin, + ) platform.add_platform_command( "set_input_delay 0.25 -max -clock " - "[get_generated_clocks sys2] [get_ports {pin}]", pin=pin) + "[get_generated_clocks sys2] [get_ports {pin}]", + pin=pin, + ) platform.add_platform_command( "set_input_delay -0.25 -min -add_delay -clock_fall -clock " - "[get_generated_clocks sys2] [get_ports {pin}]", pin=pin) + "[get_generated_clocks sys2] [get_ports {pin}]", + pin=pin, + ) platform.add_platform_command( "set_input_delay 0.25 -max -add_delay -clock_fall -clock " - "[get_generated_clocks sys2] [get_ports {pin}]", pin=pin) - - platform.toolchain.additional_commands.extend([ - "report_timing -nworst 20 -setup -hold -from [get_ports] " - "-file {build_name}_timing_in.rpt", - ]) + "[get_generated_clocks sys2] [get_ports {pin}]", + pin=pin, + ) + + platform.toolchain.additional_commands.extend( + [ + "report_timing -nworst 20 -setup -hold -from [get_ports] " + "-file {build_name}_timing_in.rpt", + ] + ) - self.sync += platform.request("user_led").eq(Cat( - self.link.checker.frame, self.link.checker.frame_stb) == 0) + self.sync += platform.request("user_led").eq( + Cat(self.link.checker.frame, self.link.checker.frame_stb) == 0 + ) if __name__ == "__main__": from migen.build.platforms.sinara.phaser import Platform from crg import CRG + platform = Platform() test = Test(platform) platform.build(test) diff --git a/mac_hbf_upsampler.py b/mac_hbf_upsampler.py index 435302c..b1022b7 100644 --- a/mac_hbf_upsampler.py +++ b/mac_hbf_upsampler.py @@ -20,17 +20,18 @@ class MAC_HBF_Upsampler(Module): :param width_coef: Coefficient width (fixed point position) :param dsp_arch: DSP block architecture (Xilinx/Lattice) """ + def __init__(self, coeff, width_d, width_coef, dsp_arch="xilinx"): assert dsp_arch in ("xilinx", "lattice"), "unsupported dsp architecture" self.dsp_arch = dsp_arch - n = (len(coeff) + 1)//4 - if len(coeff) != n*4 - 1: + n = (len(coeff) + 1) // 4 + if len(coeff) != n * 4 - 1: raise ValueError("HBF length must be 4*n-1", coeff) elif n < 2: raise ValueError("Need order n >= 2") for i, c in enumerate(coeff): - if i == n*2 - 1: + if i == n * 2 - 1: if not c: raise ValueError("HBF center tap must not be zero") elif i & 1: @@ -44,26 +45,32 @@ def __init__(self, coeff, width_d, width_coef, dsp_arch="xilinx"): dsp_pipelen = 4 bias = (1 << width_coef - 1) - 1 coef = [] - for i, c in enumerate(coeff[: (len(coeff) + 1) // 2: 2]): + for i, c in enumerate(coeff[: (len(coeff) + 1) // 2 : 2]): coef.append(Signal((width_coef + 1, True), reset_less=True, reset=c)) self.input = Endpoint([("data", (width_d, True))]) self.output = Endpoint([("data", (width_d, True))]) - x = [Signal((width_d, True), reset_less=True) for _ in range((len(coef) * 2))] # input hbf + x = [ + Signal((width_d, True), reset_less=True) for _ in range((len(coef) * 2)) + ] # input hbf self.stop = Signal() # filter output stall signal pos = Signal(int(np.ceil(np.log2(len(coef))))) pos_neg = Signal(len(pos) + 1) self.comb += [ - self.stop.eq(self.output.stb & ~self.output.ack) # filter is sensitive to output and ignores input stb + self.stop.eq( + self.output.stb & ~self.output.ack + ) # filter is sensitive to output and ignores input stb ] a, b, c, d, mux_p, p = self._dsp() self.comb += [ - pos_neg.eq((len(coef)*2) - 1 - pos), # position from end of input shift reg + pos_neg.eq( + (len(coef) * 2) - 1 - pos + ), # position from end of input shift reg c.eq(bias), a.eq(Array(x)[pos]), d.eq(Array(x)[pos_neg]), @@ -71,37 +78,47 @@ def __init__(self, coeff, width_d, width_coef, dsp_arch="xilinx"): ] self.sync += [ - If(~self.stop, - self.input.ack.eq(0), # default no in ack - self.output.stb.eq(0), # default no out strobe - mux_p.eq(0), # default accumulate - pos.eq(pos+1), - If(pos == len(coef) - 1, # new input sample - pos.eq(0), - Cat(x).eq(Cat(self.input.data, x)), # shift in new sample - self.input.ack.eq(1), - ), - If(pos == dsp_pipelen - 2, - mux_p.eq(1), - ), - If(pos == dsp_pipelen - 1, # new output sample at the end of the dsp pipe - self.output.data.eq(p >> width_coef), - self.output.stb.eq(1), - ), - If(pos == (len(coef) // 2) + dsp_pipelen - 1, # emit trivial sample at halfway computation + pipelen - self.output.data.eq(x[len(coef)]), - self.output.stb.eq(1), - ) - ) + If( + ~self.stop, + self.input.ack.eq(0), # default no in ack + self.output.stb.eq(0), # default no out strobe + mux_p.eq(0), # default accumulate + pos.eq(pos + 1), + If( + pos == len(coef) - 1, # new input sample + pos.eq(0), + Cat(x).eq(Cat(self.input.data, x)), # shift in new sample + self.input.ack.eq(1), + ), + If( + pos == dsp_pipelen - 2, + mux_p.eq(1), + ), + If( + pos + == dsp_pipelen - 1, # new output sample at the end of the dsp pipe + self.output.data.eq(p >> width_coef), + self.output.stb.eq(1), + ), + If( + pos + == (len(coef) // 2) + + dsp_pipelen + - 1, # emit trivial sample at halfway computation + pipelen + self.output.data.eq(x[len(coef)]), + self.output.stb.eq(1), + ), + ) ] - if dsp_pipelen > (len(coef)//2): # if dsp pipe too long + if dsp_pipelen > (len(coef) // 2): # if dsp pipe too long self.sync += [ - If(pos == ((len(coef)//2) + dsp_pipelen - 1) % (len(coef)//2), - # emit trivial sample at halfway computation + pipelen - self.output.data.eq(x[len(coef) + 1]), - self.output.stb.eq(1), - ) + If( + pos == ((len(coef) // 2) + dsp_pipelen - 1) % (len(coef) // 2), + # emit trivial sample at halfway computation + pipelen + self.output.data.eq(x[len(coef) + 1]), + self.output.stb.eq(1), + ) ] def _dsp(self): @@ -130,14 +147,14 @@ def _dsp(self): b_reg = [Signal.like(b) for _ in range(2)] self.sync += [ - If(~self.stop, - a_reg.eq(a), - Cat(b_reg).eq(Cat(b, b_reg)), - d_reg.eq(d), - ad.eq(a_reg + d_reg), - m.eq(ad * b_reg[-1]), # b is double piped to be in line with a+d - If(~mux_p, p.eq(p + m) - ).Else(p.eq(m + c)) - ) + If( + ~self.stop, + a_reg.eq(a), + Cat(b_reg).eq(Cat(b, b_reg)), + d_reg.eq(d), + ad.eq(a_reg + d_reg), + m.eq(ad * b_reg[-1]), # b is double piped to be in line with a+d + If(~mux_p, p.eq(p + m)).Else(p.eq(m + c)), + ) ] return a, b, c, d, mux_p, p diff --git a/mac_sym_fir.py b/mac_sym_fir.py index 5bac1bf..e4bed9d 100644 --- a/mac_sym_fir.py +++ b/mac_sym_fir.py @@ -13,23 +13,24 @@ class MAC_SYM_FIR(Module): The filter only checks if the output stream isn't stalled so no sample is lost downstream. The input strobe is ignored and the filter always uses the currently available data. Rounding is round half down. - + :param coeff: Filter coeffiecient list (full impulse response including center and zeros) :param width_d: Input/output data width :param width_coef: Coefficient width (fixed point position) :param dsp_arch: DSP block architecture (Xilinx/Lattice) """ + def __init__(self, coeff, width_d, width_coef, dsp_arch="xilinx"): assert dsp_arch in ("xilinx", "lattice"), "unsupported dsp architecture" self.dsp_arch = dsp_arch - n = (len(coeff) + 1)//2 - if len(coeff) != n*2 - 1: + n = (len(coeff) + 1) // 2 + if len(coeff) != n * 2 - 1: raise ValueError("FIR length must be 2*n-1", coeff) elif n < 2: raise ValueError("Need order n >= 2") for i, c in enumerate(coeff): - if i == n*2 - 1: + if i == n * 2 - 1: if not c: raise ValueError("HBF center tap must not be zero") elif not c: @@ -46,49 +47,61 @@ def __init__(self, coeff, width_d, width_coef, dsp_arch="xilinx"): self.input = Endpoint([("data", (width_d, True))]) self.output = Endpoint([("data", (width_d, True))]) - x = [Signal((width_d, True), reset_less=True) for _ in range((len(coef) * 2) - 1)] # input hbf + x = [ + Signal((width_d, True), reset_less=True) for _ in range((len(coef) * 2) - 1) + ] # input hbf self.stop = Signal() # filter output stall signal pos = Signal(int(np.ceil(np.log2(len(coef))))) pos_neg = Signal(len(pos) + 1) self.comb += [ - self.stop.eq(self.output.stb & ~self.output.ack) # filter is sensitive to output and ignores input stb + self.stop.eq( + self.output.stb & ~self.output.ack + ) # filter is sensitive to output and ignores input stb ] a, b, c, d, mux_p, p = self._dsp() self.comb += [ - pos_neg.eq((len(coef) * 2) - 2 - pos), # position from end of input shift reg + pos_neg.eq( + (len(coef) * 2) - 2 - pos + ), # position from end of input shift reg c.eq(bias), a.eq(Array(x)[pos]), d.eq(Array(x)[pos_neg]), - If(pos == len(coef) - 1, - d.eq(0), # inject zero sample so center tap is only multiplied once - ), + If( + pos == len(coef) - 1, + d.eq(0), # inject zero sample so center tap is only multiplied once + ), b.eq(Array(coef)[pos]), ] self.sync += [ - If(~self.stop, - self.input.ack.eq(0), # default no in ack - self.output.stb.eq(0), # default no out strobe - mux_p.eq(0), # default accumulate - pos.eq(pos+1), - If(pos == len(coef) - 1, # new input sample - pos.eq(0), - Cat(x).eq(Cat(self.input.data, x)), # shift in new sample - self.input.ack.eq(1), - ), - If(pos == dsp_pipelen - 2, # new output sample at the end of the dsp pipe - - mux_p.eq(1), - ), - If(pos == dsp_pipelen - 1, # new output sample at the end of the dsp pipe - self.output.data.eq(p >> width_coef), - self.output.stb.eq(1), - ), - ) + If( + ~self.stop, + self.input.ack.eq(0), # default no in ack + self.output.stb.eq(0), # default no out strobe + mux_p.eq(0), # default accumulate + pos.eq(pos + 1), + If( + pos == len(coef) - 1, # new input sample + pos.eq(0), + Cat(x).eq(Cat(self.input.data, x)), # shift in new sample + self.input.ack.eq(1), + ), + If( + pos + == dsp_pipelen - 2, # new output sample at the end of the dsp pipe + mux_p.eq(1), + ), + If( + pos + == dsp_pipelen - 1, # new output sample at the end of the dsp pipe + self.output.data.eq(p >> width_coef), + self.output.stb.eq(1), + ), + ) ] def _dsp(self): @@ -117,14 +130,14 @@ def _dsp(self): b_reg = [Signal.like(b) for _ in range(2)] self.sync += [ - If(~self.stop, - a_reg.eq(a), - Cat(b_reg).eq(Cat(b, b_reg)), - d_reg.eq(d), - ad.eq(a_reg + d_reg), - m.eq(ad * b_reg[-1]), # b is double piped to be in line with a+d - If(~mux_p, p.eq(p + m) - ).Else(p.eq(m + c)) - ) + If( + ~self.stop, + a_reg.eq(a), + Cat(b_reg).eq(Cat(b, b_reg)), + d_reg.eq(d), + ad.eq(a_reg + d_reg), + m.eq(ad * b_reg[-1]), # b is double piped to be in line with a+d + If(~mux_p, p.eq(p + m)).Else(p.eq(m + c)), + ) ] return a, b, c, d, mux_p, p diff --git a/phaser.py b/phaser.py index fccf0f4..b7d1c00 100644 --- a/phaser.py +++ b/phaser.py @@ -10,15 +10,18 @@ class PWM(Module): """Pulse width modulation""" + def __init__(self, pin, width=10): cnt = Signal(width, reset_less=True) self.duty = Signal.like(cnt) self.sync += [ cnt.eq(cnt + 1), - If(cnt == 0, + If( + cnt == 0, pin.eq(1), ), - If(cnt == self.duty, + If( + cnt == self.duty, pin.eq(0), ), ] @@ -35,88 +38,102 @@ def __init__(self, platform): # w.r.t DAC clk by PLL at PFD frequency). Deterministic initial # conditions are set during initialization in terms # of optimal DAC EB output reset address (fifo_offset). - platform.add_period_constraint(eem.data0_p, 4.*8) + platform.add_period_constraint(eem.data0_p, 4.0 * 8) self.submodules.crg = CRG(platform, link=self.link.phy.clk) # Don't bother meeting s/h for the clk iserdes. We align it. platform.add_false_path_constraint(eem.data0_p, self.crg.cd_sys2.clk) self.submodules.decoder = Decode( - b_sample=14, n_channel=2, n_mux=8, t_frame=8*10) + b_sample=14, n_channel=2, n_mux=8, t_frame=8 * 10 + ) self.comb += [ self.decoder.frame.eq(self.link.checker.frame), self.decoder.stb.eq(self.link.checker.frame_stb), # Send the 8 bit response early (msb aligned) and slowly (/8) # This gives 2 miso bits max rtt latency - Cat(self.link.checker.response[2*8:]).eq( - Cat([Replicate(d, 8) for d in self.decoder.response])), + Cat(self.link.checker.response[2 * 8 :]).eq( + Cat([Replicate(d, 8) for d in self.decoder.response]) + ), ] - self.decoder.map_registers([ - (0x00,), - # Sinara board id (19) as assigned in the Sinara EEPROM layout - ("board_id", Register(write=False)), - # hardware revision and variant - ("hw_rev", Register(write=False)), - # gateware revision - ("gw_rev", Register(write=False)), - # configuration (clk_sel, dac_resetb, dac_sleep, - # dac_txena, trf0_ps, trf1_ps, att0_rstn, att1_rstn) - ("cfg", Register()), - # status (dac_alarm, trf0_ld, trf1_ld, term0_stat, - # term1_stat, spi_idle) - ("sta", Register(write=False)), - # frame crc error counter - ("crc_err", Register(write=False)), - # led configuration - ("led", Register(width=6)), - # fan pwm duty cycle - ("fan", Register()), - # DUC settings update strobe - ("duc_stb", Register(write=False, read=False)), - # ADC gain configuration (pgia0_gain, pgia1_gain) - ("adc_cfg", Register(width=4)), - # spi configuration (offline, end, clk_phase, clk_polarity, - # half_duplex, lsb_first) - ("spi_cfg", Register()), - # spi divider and transaction length (div(5), len(3)) - ("spi_divlen", Register()), - # spi chip select (dac, trf0, trf1, att0, att1) - ("spi_sel", Register()), - # spi mosi data and transaction start/continue - ("spi_datw", Register(read=False)), - # spi readback data, available after each transaction - ("spi_datr", Register(write=False)), - # dac data interface sync delay (for sync-dac_clk alignment and - # n-div/pll/ostr fifo output synchronization) - ("sync_dly", Register(width=3)), - (0x10,), - # digital upconverter (duc) configuration - # (accu_clr, accu_clr_once, data_select (0: duc, 1: test)) - ("duc0_cfg", Register()), - ("duc0_reserved", Register(read=False, write=False)), - # duc frequency tuning word (msb first) - ("duc0_f", Register(), Register(), Register(), Register()), - # duc phase offset word - ("duc0_p", Register(), Register()), - # dac data - ("dac0_data", Register(write=False), Register(write=False), - Register(write=False), Register(write=False)), - # dac test data for duc_cfg:data_select == 1 - ("dac0_test", Register(), Register(), Register(), Register()), - (0x20,), - # digital upconverter (duc) configuration - # (accu_clr, accu_clr_once, data_select (0: duc, 1: test)) - ("duc1_cfg", Register()), - ("duc1_reserved", Register(read=False, write=False)), - # duc frequency tuning word (msb first) - ("duc1_f", Register(), Register(), Register(), Register()), - # duc phase offset word - ("duc1_p", Register(), Register()), - # dac data - ("dac1_data", Register(write=False), Register(write=False), - Register(write=False), Register(write=False)), - # dac test data for duc_cfg:data_select == 1 - ("dac1_test", Register(), Register(), Register(), Register()), - ]) + self.decoder.map_registers( + [ + (0x00,), + # Sinara board id (19) as assigned in the Sinara EEPROM layout + ("board_id", Register(write=False)), + # hardware revision and variant + ("hw_rev", Register(write=False)), + # gateware revision + ("gw_rev", Register(write=False)), + # configuration (clk_sel, dac_resetb, dac_sleep, + # dac_txena, trf0_ps, trf1_ps, att0_rstn, att1_rstn) + ("cfg", Register()), + # status (dac_alarm, trf0_ld, trf1_ld, term0_stat, + # term1_stat, spi_idle) + ("sta", Register(write=False)), + # frame crc error counter + ("crc_err", Register(write=False)), + # led configuration + ("led", Register(width=6)), + # fan pwm duty cycle + ("fan", Register()), + # DUC settings update strobe + ("duc_stb", Register(write=False, read=False)), + # ADC gain configuration (pgia0_gain, pgia1_gain) + ("adc_cfg", Register(width=4)), + # spi configuration (offline, end, clk_phase, clk_polarity, + # half_duplex, lsb_first) + ("spi_cfg", Register()), + # spi divider and transaction length (div(5), len(3)) + ("spi_divlen", Register()), + # spi chip select (dac, trf0, trf1, att0, att1) + ("spi_sel", Register()), + # spi mosi data and transaction start/continue + ("spi_datw", Register(read=False)), + # spi readback data, available after each transaction + ("spi_datr", Register(write=False)), + # dac data interface sync delay (for sync-dac_clk alignment and + # n-div/pll/ostr fifo output synchronization) + ("sync_dly", Register(width=3)), + (0x10,), + # digital upconverter (duc) configuration + # (accu_clr, accu_clr_once, data_select (0: duc, 1: test)) + ("duc0_cfg", Register()), + ("duc0_reserved", Register(read=False, write=False)), + # duc frequency tuning word (msb first) + ("duc0_f", Register(), Register(), Register(), Register()), + # duc phase offset word + ("duc0_p", Register(), Register()), + # dac data + ( + "dac0_data", + Register(write=False), + Register(write=False), + Register(write=False), + Register(write=False), + ), + # dac test data for duc_cfg:data_select == 1 + ("dac0_test", Register(), Register(), Register(), Register()), + (0x20,), + # digital upconverter (duc) configuration + # (accu_clr, accu_clr_once, data_select (0: duc, 1: test)) + ("duc1_cfg", Register()), + ("duc1_reserved", Register(read=False, write=False)), + # duc frequency tuning word (msb first) + ("duc1_f", Register(), Register(), Register(), Register()), + # duc phase offset word + ("duc1_p", Register(), Register()), + # dac data + ( + "dac1_data", + Register(write=False), + Register(write=False), + Register(write=False), + Register(write=False), + ), + # dac test data for duc_cfg:data_select == 1 + ("dac1_test", Register(), Register(), Register(), Register()), + ] + ) dac_ctrl = platform.request("dac_ctrl") trf_ctrl = [platform.request("trf_ctrl") for _ in range(2)] @@ -125,16 +142,26 @@ def __init__(self, platform): self.comb += [ # Sinara.boards.index("Phaser") == 19 self.decoder.get("board_id", "read").eq(19), - self.decoder.get("hw_rev", "read").eq(Cat( - platform.request("hw_rev"), platform.request("hw_variant"))), + self.decoder.get("hw_rev", "read").eq( + Cat(platform.request("hw_rev"), platform.request("hw_variant")) + ), self.decoder.get("gw_rev", "read").eq(0x01), Cat([platform.request("user_led", i) for i in range(6)]).eq( - self.decoder.get("led", "write")), - Cat(platform.request("clk_sel"), dac_ctrl.resetb, dac_ctrl.sleep, - dac_ctrl.txena, trf_ctrl[0].ps, trf_ctrl[1].ps, - att_rstn[0], att_rstn[1]).eq(self.decoder.get("cfg", "write")), + self.decoder.get("led", "write") + ), + Cat( + platform.request("clk_sel"), + dac_ctrl.resetb, + dac_ctrl.sleep, + dac_ctrl.txena, + trf_ctrl[0].ps, + trf_ctrl[1].ps, + att_rstn[0], + att_rstn[1], + ).eq(self.decoder.get("cfg", "write")), Cat(adc_ctrl.gain0, adc_ctrl.gain1).eq( - self.decoder.get("adc_cfg", "write")), + self.decoder.get("adc_cfg", "write") + ), self.decoder.get("crc_err", "read").eq(self.link.checker.crc_err), ] @@ -152,21 +179,31 @@ def __init__(self, platform): ) self.submodules.spi = SPIMachine(data_width=8, div_width=8) self.comb += [ - self.decoder.get("sta", "read").eq(Cat( - dac_ctrl.alarm, trf_ctrl[0].ld, trf_ctrl[1].ld, - adc_ctrl.term_stat, self.spi.idle)), + self.decoder.get("sta", "read").eq( + Cat( + dac_ctrl.alarm, + trf_ctrl[0].ld, + trf_ctrl[1].ld, + adc_ctrl.term_stat, + self.spi.idle, + ) + ), self.spi.load.eq(self.decoder.registers["spi_datw"][0].bus.we), - self.spi.reg.pdo.eq( - self.decoder.registers["spi_datw"][0].bus.dat_w), + self.spi.reg.pdo.eq(self.decoder.registers["spi_datw"][0].bus.dat_w), # self.spi.readable, self.spi.writable, self.spi.idle, self.spiint.cs.eq(self.decoder.get("spi_sel", "write")), self.spiint.cs_polarity.eq(0), # all active low Cat(self.spi.cg.div[3:], self.spi.length).eq( - self.decoder.get("spi_divlen", "write")), - Cat(self.spiint.offline, self.spi.end, - self.spi.clk_phase, self.spiint.clk_polarity, - self.spiint.half_duplex, self.spi.reg.lsb_first).eq( - self.decoder.get("spi_cfg", "write")), + self.decoder.get("spi_divlen", "write") + ), + Cat( + self.spiint.offline, + self.spi.end, + self.spi.clk_phase, + self.spiint.clk_polarity, + self.spiint.half_duplex, + self.spi.reg.lsb_first, + ).eq(self.decoder.get("spi_cfg", "write")), self.spiint.cs_next.eq(self.spi.cs_next), self.spiint.clk_next.eq(self.spi.clk_next), self.spiint.ce.eq(self.spi.ce), @@ -194,15 +231,16 @@ def __init__(self, platform): self.sync += [ # keep accu cleared duc.clr.eq(cfg[0]), - If(self.decoder.registers["duc_stb"][0].bus.we, + If( + self.decoder.registers["duc_stb"][0].bus.we, # clear accu once - If(cfg[1], + If( + cfg[1], duc.clr.eq(1), ), duc.f.eq(self.decoder.get("duc{}_f".format(ch), "write")), # msb align to 19 bit duc.p - duc.p[3:].eq( - self.decoder.get("duc{}_p".format(ch), "write")), + duc.p[3:].eq(self.decoder.get("duc{}_p".format(ch), "write")), ), ] for t, (ti, to) in enumerate(zip(duc.i, duc.o)): @@ -211,53 +249,60 @@ def __init__(self, platform): ti.q.eq(self.decoder.data[t][ch].q), ] self.sync += [ - If(cfg[2:4] == 0, # ducx_cfg_sel - self.dac.data[2*t][ch].eq(to.i), - self.dac.data[2*t + 1][ch].eq(to.q), + If( + cfg[2:4] == 0, # ducx_cfg_sel + self.dac.data[2 * t][ch].eq(to.i), + self.dac.data[2 * t + 1][ch].eq(to.q), ) ] self.sync += [ - If(cfg[2:4] == 1, # ducx_cfg_sel + If( + cfg[2:4] == 1, # ducx_cfg_sel # i is lsb, q is msb # repeat the test data to fill the oserdes - Cat([d[ch] for d in self.dac.data]).eq(Replicate( - self.decoder.get("dac{}_test".format(ch), "write"), 2)) + Cat([d[ch] for d in self.dac.data]).eq( + Replicate(self.decoder.get("dac{}_test".format(ch), "write"), 2) + ), ) ] self.comb += [ # even sample just before the oserdes self.decoder.get("dac{}_data".format(ch), "read").eq( - Cat(d[ch] for d in self.dac.data)), + Cat(d[ch] for d in self.dac.data) + ), ] # use liberally for debugging self.comb += [ - Cat([platform.request("test_point", i) for i in range(6)]).eq(Cat( - ClockSignal("clk125"), - ClockSignal("link"), - #ClockSignal(), - ResetSignal(), - #self.link.slip.bitslip, - #self.link.unframe.data[0], - #self.link.unframe.data[1], - #self.link.unframe.clk_stb, - #self.link.unframe.marker_stb, - #self.link.unframe.end_of_frame, - self.link.checker.frame_stb, - # self.decoder.bus.bus.we, - # self.decoder.bus.bus.re, - # self.decoder.bus.bus.adr[0], - self.link.checker.miso, - # self.dac.data_sync, - self.dac.istr, - dac_ctrl.alarm, - )) + Cat([platform.request("test_point", i) for i in range(6)]).eq( + Cat( + ClockSignal("clk125"), + ClockSignal("link"), + # ClockSignal(), + ResetSignal(), + # self.link.slip.bitslip, + # self.link.unframe.data[0], + # self.link.unframe.data[1], + # self.link.unframe.clk_stb, + # self.link.unframe.marker_stb, + # self.link.unframe.end_of_frame, + self.link.checker.frame_stb, + # self.decoder.bus.bus.we, + # self.decoder.bus.bus.re, + # self.decoder.bus.bus.adr[0], + self.link.checker.miso, + # self.dac.data_sync, + self.dac.istr, + dac_ctrl.alarm, + ) + ) ] if __name__ == "__main__": from migen.build.platforms.sinara.phaser import Platform + platform = Platform() # platform.toolchain.additional_commands.extend([ # "set argv phaser.bit", diff --git a/test_interpolate.py b/test_interpolate.py index 7949c97..dd8ad20 100644 --- a/test_interpolate.py +++ b/test_interpolate.py @@ -10,7 +10,7 @@ def feed(endpoint, x, rate): n, d = rate t = 0 for i, xi in enumerate(x): - while t*n < i*d: + while t * n < i * d: yield t += 1 yield endpoint.data.eq(int(xi)) @@ -44,12 +44,14 @@ def test_init(self): def test_seq(self): # impulse response plus latency - x = [(1 << 13) - 1] + [0]*(30 + 10) + x = [(1 << 13) - 1] + [0] * (30 + 10) y = [] - run_simulation(self.dut, [feed(self.dut.input, x, rate=(1, 10)), - retrieve(self.dut.output, y)], - vcd_name="int.vcd") + run_simulation( + self.dut, + [feed(self.dut.input, x, rate=(1, 10)), retrieve(self.dut.output, y)], + vcd_name="int.vcd", + ) y = np.ravel(y) print(repr(y)) - #y0 = - #np.testing.assert_equal(y, y0) + # y0 = + # np.testing.assert_equal(y, y0) diff --git a/test_link.py b/test_link.py index 010fd97..527f62f 100644 --- a/test_link.py +++ b/test_link.py @@ -37,6 +37,7 @@ def gen(): self.assertEqual((yield self.dut.bitslip), 0) yield self.assertEqual((yield self.dut.bitslip), 1) + run_simulation(self.dut, gen()) @@ -44,20 +45,20 @@ def pack(data): n_frame = 10 n_data = 7 t_clk = 8 - n_marker = n_frame//2 + 1 + n_marker = n_frame // 2 + 1 n_crc = 6 - assert len(data) == n_frame*(n_data - 1)*t_clk - n_marker - n_crc + assert len(data) == n_frame * (n_data - 1) * t_clk - n_marker - n_crc frame = [] for i in range(n_frame): for j in range(t_clk): b = 0 - if j >= t_clk//2: + if j >= t_clk // 2: b |= 1 if j == t_clk - 1: if i == n_frame - 1: # CRC-6-GSM # b |= CRC(0x3f, data_width=6)(frame) << 1 - b |= 0x3f << 1 # TODO + b |= 0x3F << 1 # TODO frame.append(b) continue elif i == n_frame - 2: @@ -127,31 +128,39 @@ def record_check(self, bits, n_max=100): def test_mini(self): bits = [] - run_simulation(self.dut, - [self.feed_bits([]), self.record_frame(bits)]) + run_simulation(self.dut, [self.feed_bits([]), self.record_frame(bits)]) self.assertEqual(bits, []) def test_zeros(self): - frame = pack([0] * (10*8*6 - 6 - 6)) + frame = pack([0] * (10 * 8 * 6 - 6 - 6)) frame[-1] = 1 | (0x13 << 1) # crc bits = [] - run_simulation(self.dut, - [self.feed_bits(frame), self.record_frame(bits)]) - self.assertEqual(len(bits), 10*8) + run_simulation(self.dut, [self.feed_bits(frame), self.record_frame(bits)]) + self.assertEqual(len(bits), 10 * 8) self.assertEqual(bits[-8 - 1], 1) - self.assertTrue(all(bits[i] == 0 for i in range(len(bits)) - if i not in (len(bits) - 8 - 1, len(bits) - 1)), bits) + self.assertTrue( + all( + bits[i] == 0 + for i in range(len(bits)) + if i not in (len(bits) - 8 - 1, len(bits) - 1) + ), + bits, + ) def test_ones(self): - frame = pack([1] * (10*8*6 - 6 - 6)) - frame[-1] = 1 | (0x0b << 1) # crc + frame = pack([1] * (10 * 8 * 6 - 6 - 6)) + frame[-1] = 1 | (0x0B << 1) # crc bits = [] rec_frame = [] - run_simulation(self.dut, - [self.feed_bits(frame), - self.record_frame(bits), - self.record_check(rec_frame)]) - self.assertEqual(len(bits), 10*8) - #self.assertEqual(bits[-8 - 1], 0x3f) + run_simulation( + self.dut, + [ + self.feed_bits(frame), + self.record_frame(bits), + self.record_check(rec_frame), + ], + ) + self.assertEqual(len(bits), 10 * 8) + # self.assertEqual(bits[-8 - 1], 0x3f) self.assertEqual(len(rec_frame), 1) - #self.assertEqual(rec_frame[0], (1 << 10*8*6 - 6 - 6) - 1) + # self.assertEqual(rec_frame[0], (1 << 10*8*6 - 6 - 6) - 1)