ppg: Pull the PPG signal processing into a seperate library
Signed-off-by: Daniel Thompson <daniel@redfelineninja.org.uk>
This commit is contained in:
parent
e6811bb693
commit
95e129e331
3 changed files with 181 additions and 128 deletions
|
@ -2,112 +2,8 @@
|
||||||
# Copyright (C) 2020 Daniel Thompson
|
# Copyright (C) 2020 Daniel Thompson
|
||||||
|
|
||||||
import wasp
|
import wasp
|
||||||
import array
|
|
||||||
import machine
|
import machine
|
||||||
|
import ppg
|
||||||
class Biquad():
|
|
||||||
"""Direct Form II Biquad Filter"""
|
|
||||||
|
|
||||||
def __init__(self, b0, b1, b2, a1, a2):
|
|
||||||
self._coeff = (b0, b1, b2, a1, a2)
|
|
||||||
self._v1 = 0
|
|
||||||
self._v2 = 0
|
|
||||||
|
|
||||||
def step(self, x):
|
|
||||||
c = self._coeff
|
|
||||||
v1 = self._v1
|
|
||||||
v2 = self._v2
|
|
||||||
|
|
||||||
v = x - (c[3] * v1) - (c[4] * v2)
|
|
||||||
y = (c[0] * v) + (c[1] * v1) + (c[2] * v2)
|
|
||||||
|
|
||||||
self._v2 = v1
|
|
||||||
self._v1 = v
|
|
||||||
return y
|
|
||||||
|
|
||||||
class PTAGC():
|
|
||||||
"""Peak Tracking Automatic Gain Control
|
|
||||||
|
|
||||||
In order for the correlation checks to work correctly we must
|
|
||||||
aggressively reject spikes caused by fast DC steps. Setting a
|
|
||||||
threshold based on the median is very effective at killing
|
|
||||||
spikes but needs an extra 1k for sample storage which isn't
|
|
||||||
really plausible for a microcontroller.
|
|
||||||
"""
|
|
||||||
def __init__(self, start, decay, threshold):
|
|
||||||
self._peak = start
|
|
||||||
self._decay = decay
|
|
||||||
self._boost = 1 / decay
|
|
||||||
self._threshold = threshold
|
|
||||||
|
|
||||||
def step(self, spl):
|
|
||||||
# peak tracking
|
|
||||||
peak = self._peak
|
|
||||||
if abs(spl) > peak:
|
|
||||||
peak *= self._boost
|
|
||||||
else:
|
|
||||||
peak *= self._decay
|
|
||||||
self._peak = peak
|
|
||||||
|
|
||||||
# rejection filter (clipper)
|
|
||||||
threshold = self._threshold
|
|
||||||
if spl > (peak * threshold) or spl < (peak * -threshold):
|
|
||||||
return 0
|
|
||||||
|
|
||||||
# booster
|
|
||||||
spl = 100 * spl / (2 * peak)
|
|
||||||
|
|
||||||
return spl
|
|
||||||
|
|
||||||
def _compare(d1, d2, count, shift):
|
|
||||||
e = 0
|
|
||||||
for i in range(count):
|
|
||||||
d = d1[i] - d2[i]
|
|
||||||
e += d*d
|
|
||||||
return e
|
|
||||||
|
|
||||||
def compare(d, shift):
|
|
||||||
return _compare(d[shift:], d[:-shift], len(d)-shift, shift)
|
|
||||||
|
|
||||||
def trough(d, mn, mx):
|
|
||||||
z2 = compare(d, mn-2)
|
|
||||||
z1 = compare(d, mn-1)
|
|
||||||
for i in range(mn, mx+1):
|
|
||||||
z = compare(d, i)
|
|
||||||
if z2 > z1 and z1 < z:
|
|
||||||
return i
|
|
||||||
z2 = z1
|
|
||||||
z1 = z
|
|
||||||
|
|
||||||
return -1
|
|
||||||
|
|
||||||
def get_hrs(d):
|
|
||||||
din = memoryview(d)
|
|
||||||
|
|
||||||
# Search initially from ~210 to 30 bpm
|
|
||||||
t0 = trough(din, 7, 48)
|
|
||||||
if t0 < 0:
|
|
||||||
return None
|
|
||||||
|
|
||||||
# Check the second cycle ...
|
|
||||||
t1 = t0 * 2
|
|
||||||
t1 = trough(din, t1 - 5, t1 + 5)
|
|
||||||
if t1 < 0:
|
|
||||||
return None
|
|
||||||
|
|
||||||
# ... and the third
|
|
||||||
t2 = (t1 * 3) // 2
|
|
||||||
t2 = trough(din, t2 - 5, t2 + 4)
|
|
||||||
if t2 < 0:
|
|
||||||
return None
|
|
||||||
|
|
||||||
# If we can find a fourth cycle then use that for the extra
|
|
||||||
# precision otherwise report whatever we've found
|
|
||||||
t3 = (t2 * 4) // 3
|
|
||||||
t3 = trough(din, t3 - 4, t3 + 4)
|
|
||||||
if t3 < 0:
|
|
||||||
return (60 * 24 * 3) // t2
|
|
||||||
return (60 * 24 * 4) // t3
|
|
||||||
|
|
||||||
class HeartApp():
|
class HeartApp():
|
||||||
"""Heart Rate Sensing application.
|
"""Heart Rate Sensing application.
|
||||||
|
@ -125,40 +21,26 @@ class HeartApp():
|
||||||
draw.fill()
|
draw.fill()
|
||||||
draw.string('PPG graph', 0, 6, width=240)
|
draw.string('PPG graph', 0, 6, width=240)
|
||||||
|
|
||||||
self._hpf = Biquad(0.87033078, -1.74066156, 0.87033078, -1.72377617, 0.75754694)
|
|
||||||
self._agc = PTAGC(20, 0.971, 2)
|
|
||||||
self._lpf = Biquad(0.11595249, 0.23190498, 0.11595249, -0.72168143, 0.18549138)
|
|
||||||
|
|
||||||
self._x = 0
|
|
||||||
self._offset = wasp.watch.hrs.read_hrs()
|
|
||||||
|
|
||||||
self._hrdata = array.array('b')
|
|
||||||
|
|
||||||
wasp.system.request_tick(1000 // 8)
|
wasp.system.request_tick(1000 // 8)
|
||||||
|
|
||||||
|
self._hrdata = ppg.PPG(wasp.watch.hrs.read_hrs())
|
||||||
|
self._x = 0
|
||||||
|
|
||||||
def background(self):
|
def background(self):
|
||||||
wasp.watch.hrs.disable()
|
wasp.watch.hrs.disable()
|
||||||
del self._hpf
|
del self._hrdata
|
||||||
del self._agc
|
|
||||||
del self._lpf
|
|
||||||
|
|
||||||
def _subtick(self, ticks):
|
def _subtick(self, ticks):
|
||||||
"""Notify the application that its periodic tick is due."""
|
"""Notify the application that its periodic tick is due."""
|
||||||
draw = wasp.watch.drawable
|
draw = wasp.watch.drawable
|
||||||
|
|
||||||
spl = wasp.watch.hrs.read_hrs()
|
spl = self._hrdata.preprocess(wasp.watch.hrs.read_hrs())
|
||||||
spl -= self._offset
|
|
||||||
spl = self._hpf.step(spl)
|
|
||||||
spl = self._agc.step(spl)
|
|
||||||
spl = self._lpf.step(spl)
|
|
||||||
spl = int(spl)
|
|
||||||
|
|
||||||
self._hrdata.append(spl)
|
if len(self._hrdata.data) >= 240:
|
||||||
if len(self._hrdata) >= 240:
|
draw.string('{} bpm'.format(self._hrdata.get_heart_rate()),
|
||||||
draw.string('{} bpm'.format(get_hrs(self._hrdata)), 0, 6, width=240)
|
0, 6, width=240)
|
||||||
del self._hrdata
|
|
||||||
self._hrdata = array.array('b')
|
|
||||||
|
|
||||||
|
# Graph is orange by default...
|
||||||
color = 0xffc0
|
color = 0xffc0
|
||||||
|
|
||||||
# If the maths goes wrong lets show it in the chart!
|
# If the maths goes wrong lets show it in the chart!
|
||||||
|
|
|
@ -29,6 +29,7 @@ freeze('../..',
|
||||||
'fonts/sans28.py',
|
'fonts/sans28.py',
|
||||||
'fonts/sans36.py',
|
'fonts/sans36.py',
|
||||||
'icons.py',
|
'icons.py',
|
||||||
|
'ppg.py',
|
||||||
'shell.py',
|
'shell.py',
|
||||||
'wasp.py',
|
'wasp.py',
|
||||||
'widgets.py',
|
'widgets.py',
|
||||||
|
|
170
wasp/ppg.py
Normal file
170
wasp/ppg.py
Normal file
|
@ -0,0 +1,170 @@
|
||||||
|
# SPDX-License-Identifier: LGPL-3.0-or-later
|
||||||
|
# Copyright (C) 2020 Daniel Thompson
|
||||||
|
|
||||||
|
"""Photoplethysmogram (PPG) Signal Processing
|
||||||
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||||
|
|
||||||
|
Algorithms and signal processing primatives that can be used to convert
|
||||||
|
raw PPG signals into something useful.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import array
|
||||||
|
import micropython
|
||||||
|
|
||||||
|
@micropython.viper
|
||||||
|
def _compare(d1, d2, count: int, shift: int) -> int:
|
||||||
|
"""Compare two sequences of (signed) bytes and quantify how dissimilar
|
||||||
|
they are.
|
||||||
|
"""
|
||||||
|
p1 = ptr8(d1)
|
||||||
|
p2 = ptr8(d2)
|
||||||
|
|
||||||
|
e = 0
|
||||||
|
for i in range(count):
|
||||||
|
s1 = int(p1[i])
|
||||||
|
if s1 > 127:
|
||||||
|
s1 -= 256
|
||||||
|
|
||||||
|
s2 = int(p2[i])
|
||||||
|
if s2 > 127:
|
||||||
|
s2 -= 256
|
||||||
|
|
||||||
|
d = s1 - s2
|
||||||
|
e += d*d
|
||||||
|
return e
|
||||||
|
|
||||||
|
class Biquad():
|
||||||
|
"""Direct Form II Biquad Filter"""
|
||||||
|
|
||||||
|
def __init__(self, b0, b1, b2, a1, a2):
|
||||||
|
self._coeff = (b0, b1, b2, a1, a2)
|
||||||
|
self._v1 = 0
|
||||||
|
self._v2 = 0
|
||||||
|
|
||||||
|
def step(self, x):
|
||||||
|
c = self._coeff
|
||||||
|
v1 = self._v1
|
||||||
|
v2 = self._v2
|
||||||
|
|
||||||
|
v = x - (c[3] * v1) - (c[4] * v2)
|
||||||
|
y = (c[0] * v) + (c[1] * v1) + (c[2] * v2)
|
||||||
|
|
||||||
|
self._v2 = v1
|
||||||
|
self._v1 = v
|
||||||
|
return y
|
||||||
|
|
||||||
|
class PTAGC():
|
||||||
|
"""Peak Tracking Automatic Gain Control
|
||||||
|
|
||||||
|
In order for the correlation checks to work correctly we must
|
||||||
|
aggressively reject spikes caused by fast DC steps. Setting a
|
||||||
|
threshold based on the median is very effective at killing
|
||||||
|
spikes but needs an extra 1k for sample storage which isn't
|
||||||
|
really plausible for a microcontroller.
|
||||||
|
"""
|
||||||
|
def __init__(self, start, decay, threshold):
|
||||||
|
self._peak = start
|
||||||
|
self._decay = decay
|
||||||
|
self._boost = 1 / decay
|
||||||
|
self._threshold = threshold
|
||||||
|
|
||||||
|
def step(self, spl):
|
||||||
|
# peak tracking
|
||||||
|
peak = self._peak
|
||||||
|
if abs(spl) > peak:
|
||||||
|
peak *= self._boost
|
||||||
|
else:
|
||||||
|
peak *= self._decay
|
||||||
|
self._peak = peak
|
||||||
|
|
||||||
|
# rejection filter (clipper)
|
||||||
|
threshold = self._threshold
|
||||||
|
if spl > (peak * threshold) or spl < (peak * -threshold):
|
||||||
|
return 0
|
||||||
|
|
||||||
|
# booster
|
||||||
|
spl = 100 * spl / (2 * peak)
|
||||||
|
|
||||||
|
return spl
|
||||||
|
|
||||||
|
class PPG():
|
||||||
|
"""
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, spl):
|
||||||
|
self._offset = spl
|
||||||
|
self.data = array.array('b')
|
||||||
|
|
||||||
|
self._hpf = Biquad(0.87033078, -1.74066156, 0.87033078,
|
||||||
|
-1.72377617, 0.75754694)
|
||||||
|
self._agc = PTAGC(20, 0.971, 2)
|
||||||
|
self._lpf = Biquad(0.11595249, 0.23190498, 0.11595249,
|
||||||
|
-0.72168143, 0.18549138)
|
||||||
|
|
||||||
|
def preprocess(self, spl):
|
||||||
|
"""Preprocess a PPG sample.
|
||||||
|
|
||||||
|
Must be called at 24Hz for accurate heart rate calculations.
|
||||||
|
"""
|
||||||
|
spl -= self._offset
|
||||||
|
spl = self._hpf.step(spl)
|
||||||
|
spl = self._agc.step(spl)
|
||||||
|
spl = self._lpf.step(spl)
|
||||||
|
spl = int(spl)
|
||||||
|
|
||||||
|
self.data.append(spl)
|
||||||
|
return spl
|
||||||
|
|
||||||
|
def _get_heart_rate(self):
|
||||||
|
def compare(d, shift):
|
||||||
|
return _compare(d[shift:], d[:-shift], len(d)-shift, shift)
|
||||||
|
|
||||||
|
def trough(d, mn, mx):
|
||||||
|
z2 = compare(d, mn-2)
|
||||||
|
z1 = compare(d, mn-1)
|
||||||
|
for i in range(mn, mx+1):
|
||||||
|
z = compare(d, i)
|
||||||
|
if z2 > z1 and z1 < z:
|
||||||
|
return i
|
||||||
|
z2 = z1
|
||||||
|
z1 = z
|
||||||
|
|
||||||
|
return -1
|
||||||
|
|
||||||
|
data = memoryview(self.data)
|
||||||
|
|
||||||
|
# Search initially from ~210 to 30 bpm
|
||||||
|
t0 = trough(data, 7, 48)
|
||||||
|
if t0 < 0:
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Check the second cycle ...
|
||||||
|
t1 = t0 * 2
|
||||||
|
t1 = trough(data, t1 - 5, t1 + 5)
|
||||||
|
if t1 < 0:
|
||||||
|
return None
|
||||||
|
|
||||||
|
# ... and the third
|
||||||
|
t2 = (t1 * 3) // 2
|
||||||
|
t2 = trough(data, t2 - 5, t2 + 4)
|
||||||
|
if t2 < 0:
|
||||||
|
return None
|
||||||
|
|
||||||
|
# If we can find a fourth cycle then use that for the extra
|
||||||
|
# precision otherwise report whatever we've found
|
||||||
|
t3 = (t2 * 4) // 3
|
||||||
|
t3 = trough(data, t3 - 4, t3 + 4)
|
||||||
|
if t3 < 0:
|
||||||
|
return (60 * 24 * 3) // t2
|
||||||
|
return (60 * 24 * 4) // t3
|
||||||
|
|
||||||
|
def get_heart_rate(self):
|
||||||
|
if len(self.data) < 200:
|
||||||
|
return None
|
||||||
|
|
||||||
|
hr = self._get_heart_rate()
|
||||||
|
|
||||||
|
# Clear out the accumulated data
|
||||||
|
self.data = array.array('b')
|
||||||
|
|
||||||
|
return hr
|
Loading…
Reference in a new issue