Introduction
Significant number of users asked about real time plotting examples in Python and tried to use matplotlib for it. Matplotlib is a great library, but its primary focus is offline data. For real time visualization tools like PyQT and Kivy work better. Here we will use pyqtgraph which is built on top of PyQT. Despite the fact that this example is only for Python for now, it shows the basic idea which remains the same across different programming languages and GUI frameworks.
Also, feel free to check Matthijs blog post about visualization in Julia.
Full code for this example can be found here.
Installing dependencies
All you need to install is BrainFlow and pyqtgraph.
python -m pip install -U brainflow
python -m pip install pyqtgraph==0.12.1
Or use requirements.txt file
pyqtgraph requires PyQT preinstalled, on unix like systems process of installation is different based on OS. On Fedora it’s as simple as:
sudo dnf install PyQt5
We recommend to use system package managers like dnf, apt, brew, etc instead installation using pip.
Real time plot using pyqtgraph
There are a lot of tutorials about pyqtgraph, we recommend this one.
Let’s copypaste code from it:
from PyQt5 import QtWidgets, QtCore
from pyqtgraph import PlotWidget, plot
import pyqtgraph as pg
import sys # We need sys so that we can pass argv to QApplication
import os
from random import randint
class MainWindow(QtWidgets.QMainWindow):
def __init__(self, *args, **kwargs):
super(MainWindow, self).__init__(*args, **kwargs)
self.graphWidget = pg.PlotWidget()
self.setCentralWidget(self.graphWidget)
self.x = list(range(100)) # 100 time points
self.y = [randint(0,100) for _ in range(100)] # 100 data points
self.graphWidget.setBackground('w')
pen = pg.mkPen(color=(255, 0, 0))
self.data_line = self.graphWidget.plot(self.x, self.y, pen=pen)
self.timer = QtCore.QTimer()
self.timer.setInterval(50)
self.timer.timeout.connect(self.update_plot_data)
self.timer.start()
def update_plot_data(self):
self.x = self.x[1:] # Remove the first y element.
self.x.append(self.x[-1] + 1) # Add a new value 1 higher than the last.
self.y = self.y[1:] # Remove the first
self.y.append(randint(0,100)) # Add a new random value.
self.data_line.setData(self.x, self.y) # Update the data.
app = QtWidgets.QApplication(sys.argv)
w = MainWindow()
w.show()
sys.exit(app.exec_())
Getting some data from BrainFlow
We will use brainflow_get_data.py from code samples as a starting point.
import argparse
import time
import numpy as np
import brainflow
from brainflow.board_shim import BoardShim, BrainFlowInputParams
from brainflow.data_filter import DataFilter, FilterTypes, AggOperations
def main():
BoardShim.enable_dev_board_logger()
parser = argparse.ArgumentParser()
# use docs to check which parameters are required for specific board, e.g. for Cyton - set serial port
parser.add_argument('--timeout', type=int, help='timeout for device discovery or connection', required=False,
default=0)
parser.add_argument('--ip-port', type=int, help='ip port', required=False, default=0)
parser.add_argument('--ip-protocol', type=int, help='ip protocol, check IpProtocolType enum', required=False,
default=0)
parser.add_argument('--ip-address', type=str, help='ip address', required=False, default='')
parser.add_argument('--serial-port', type=str, help='serial port', required=False, default='')
parser.add_argument('--mac-address', type=str, help='mac address', required=False, default='')
parser.add_argument('--other-info', type=str, help='other info', required=False, default='')
parser.add_argument('--streamer-params', type=str, help='streamer params', required=False, default='')
parser.add_argument('--serial-number', type=str, help='serial number', required=False, default='')
parser.add_argument('--board-id', type=int, help='board id, check docs to get a list of supported boards',
required=True)
parser.add_argument('--file', type=str, help='file', required=False, default='')
args = parser.parse_args()
params = BrainFlowInputParams()
params.ip_port = args.ip_port
params.serial_port = args.serial_port
params.mac_address = args.mac_address
params.other_info = args.other_info
params.serial_number = args.serial_number
params.ip_address = args.ip_address
params.ip_protocol = args.ip_protocol
params.timeout = args.timeout
params.file = args.file
board = BoardShim(args.board_id, params)
board.prepare_session()
# board.start_stream () # use this for default options
board.start_stream(45000, args.streamer_params)
time.sleep(10)
# data = board.get_current_board_data (256) # get latest 256 packages or less, doesnt remove them from internal buffer
data = board.get_board_data() # get all data and remove it from internal buffer
board.stop_stream()
board.release_session()
print(data)
if __name__ == "__main__":
main()
Mixing it all together
We are one step away from this result.
We will not get data in the main
function and in the main thread, but we want to prepare and release session there and pass board object to visualization class to access data. So, let’s patch our main function a little:
def main():
BoardShim.enable_dev_board_logger()
logging.basicConfig(level=logging.DEBUG)
parser = argparse.ArgumentParser()
# use docs to check which parameters are required for specific board, e.g. for Cyton - set serial port
parser.add_argument('--timeout', type=int, help='timeout for device discovery or connection', required=False,
default=0)
parser.add_argument('--ip-port', type=int, help='ip port', required=False, default=0)
parser.add_argument('--ip-protocol', type=int, help='ip protocol, check IpProtocolType enum', required=False,
default=0)
parser.add_argument('--ip-address', type=str, help='ip address', required=False, default='')
parser.add_argument('--serial-port', type=str, help='serial port', required=False, default='')
parser.add_argument('--mac-address', type=str, help='mac address', required=False, default='')
parser.add_argument('--other-info', type=str, help='other info', required=False, default='')
parser.add_argument('--streamer-params', type=str, help='streamer params', required=False, default='')
parser.add_argument('--serial-number', type=str, help='serial number', required=False, default='')
parser.add_argument('--board-id', type=int, help='board id, check docs to get a list of supported boards',
required=False, default=BoardIds.SYNTHETIC_BOARD)
parser.add_argument('--file', type=str, help='file', required=False, default='')
args = parser.parse_args()
params = BrainFlowInputParams()
params.ip_port = args.ip_port
params.serial_port = args.serial_port
params.mac_address = args.mac_address
params.other_info = args.other_info
params.serial_number = args.serial_number
params.ip_address = args.ip_address
params.ip_protocol = args.ip_protocol
params.timeout = args.timeout
params.file = args.file
try:
board_shim = BoardShim(args.board_id, params)
board_shim.prepare_session()
board_shim.start_stream(450000, args.streamer_params)
g = Graph(board_shim)
except BaseException as e:
logging.warning('Exception', exc_info=True)
finally:
logging.info('End')
if board_shim.is_prepared():
logging.info('Releasing session')
board_shim.release_session()
And patch our pyqtgraph part:
class Graph:
def __init__(self, board_shim):
self.board_id = board_shim.get_board_id()
self.board_shim = board_shim
self.exg_channels = BoardShim.get_exg_channels(self.board_id)
self.sampling_rate = BoardShim.get_sampling_rate(self.board_id)
self.update_speed_ms = 50
self.window_size = 4
self.num_points = self.window_size * self.sampling_rate
self.app = QtGui.QApplication([])
self.win = pg.GraphicsWindow(title='BrainFlow Plot',size=(800, 600))
self._init_timeseries()
timer = QtCore.QTimer()
timer.timeout.connect(self.update)
timer.start(self.update_speed_ms)
QtGui.QApplication.instance().exec_()
def _init_timeseries(self):
self.plots = list()
self.curves = list()
for i in range(len(self.exg_channels)):
p = self.win.addPlot(row=i,col=0)
self.plots.append(p)
curve = p.plot()
self.curves.append(curve)
def update(self):
data = self.board_shim.get_current_board_data(self.num_points)
avg_bands = [0, 0, 0, 0, 0]
for count, channel in enumerate(self.exg_channels):
# plot timeseries
self.curves[count].setData(data[channel].tolist())
self.app.processEvents()
Note: here we use get_current_board_data
method which doesn’t remove data from the internal buffer. It allows you to implement sliding window using single method and wo any efforts
Finally, let’s add a title to our plot and some filters.
import argparse
import logging
import pyqtgraph as pg
from brainflow.board_shim import BoardShim, BrainFlowInputParams, BoardIds
from brainflow.data_filter import DataFilter, FilterTypes, DetrendOperations
from pyqtgraph.Qt import QtGui, QtCore
class Graph:
def __init__(self, board_shim):
self.board_id = board_shim.get_board_id()
self.board_shim = board_shim
self.exg_channels = BoardShim.get_exg_channels(self.board_id)
self.sampling_rate = BoardShim.get_sampling_rate(self.board_id)
self.update_speed_ms = 50
self.window_size = 4
self.num_points = self.window_size * self.sampling_rate
self.app = QtGui.QApplication([])
self.win = pg.GraphicsWindow(title='BrainFlow Plot', size=(800, 600))
self._init_timeseries()
timer = QtCore.QTimer()
timer.timeout.connect(self.update)
timer.start(self.update_speed_ms)
QtGui.QApplication.instance().exec_()
def _init_timeseries(self):
self.plots = list()
self.curves = list()
for i in range(len(self.exg_channels)):
p = self.win.addPlot(row=i, col=0)
p.showAxis('left', False)
p.setMenuEnabled('left', False)
p.showAxis('bottom', False)
p.setMenuEnabled('bottom', False)
if i == 0:
p.setTitle('TimeSeries Plot')
self.plots.append(p)
curve = p.plot()
self.curves.append(curve)
def update(self):
data = self.board_shim.get_current_board_data(self.num_points)
for count, channel in enumerate(self.exg_channels):
# plot timeseries
DataFilter.detrend(data[channel], DetrendOperations.CONSTANT.value)
DataFilter.perform_bandpass(data[channel], self.sampling_rate, 3.0, 45.0, 2,
FilterTypes.BUTTERWORTH.value, 0)
DataFilter.perform_bandstop(data[channel], self.sampling_rate, 48.0, 52.0, 2,
FilterTypes.BUTTERWORTH.value, 0)
DataFilter.perform_bandstop(data[channel], self.sampling_rate, 58.0, 62.0, 2,
FilterTypes.BUTTERWORTH.value, 0)
self.curves[count].setData(data[channel].tolist())
self.app.processEvents()
def main():
BoardShim.enable_dev_board_logger()
logging.basicConfig(level=logging.DEBUG)
parser = argparse.ArgumentParser()
# use docs to check which parameters are required for specific board, e.g. for Cyton - set serial port
parser.add_argument('--timeout', type=int, help='timeout for device discovery or connection', required=False,
default=0)
parser.add_argument('--ip-port', type=int, help='ip port', required=False, default=0)
parser.add_argument('--ip-protocol', type=int, help='ip protocol, check IpProtocolType enum', required=False,
default=0)
parser.add_argument('--ip-address', type=str, help='ip address', required=False, default='')
parser.add_argument('--serial-port', type=str, help='serial port', required=False, default='')
parser.add_argument('--mac-address', type=str, help='mac address', required=False, default='')
parser.add_argument('--other-info', type=str, help='other info', required=False, default='')
parser.add_argument('--streamer-params', type=str, help='streamer params', required=False, default='')
parser.add_argument('--serial-number', type=str, help='serial number', required=False, default='')
parser.add_argument('--board-id', type=int, help='board id, check docs to get a list of supported boards',
required=False, default=BoardIds.SYNTHETIC_BOARD)
parser.add_argument('--file', type=str, help='file', required=False, default='')
args = parser.parse_args()
params = BrainFlowInputParams()
params.ip_port = args.ip_port
params.serial_port = args.serial_port
params.mac_address = args.mac_address
params.other_info = args.other_info
params.serial_number = args.serial_number
params.ip_address = args.ip_address
params.ip_protocol = args.ip_protocol
params.timeout = args.timeout
params.file = args.file
try:
board_shim = BoardShim(args.board_id, params)
board_shim.prepare_session()
board_shim.start_stream(450000, args.streamer_params)
Graph(board_shim)
except BaseException:
logging.warning('Exception', exc_info=True)
finally:
logging.info('End')
if board_shim.is_prepared():
logging.info('Releasing session')
board_shim.release_session()
if __name__ == '__main__':
main()