Raspberry Pi 4 B IoT Data Collection: Single Workcell
In order to examine consolidation of production data and program management, it is prudent to create a LAN IoT solution for a single WorkCell as an experiment. The feasibility and best practices can be determined from this scope while limiting research cost and potential consequential complications.
In furthering this experiment, notes have been taken regarding Workcell 11 and Workcell 13. The Information is presented below:
Workcell 11 | Workcell 13 | ||
Degate Controller | Pick-and-Place Controller | Degate Controller | Pick-and-Place Controller |
30iA – USB Gen1.1 | 30iB – USB Gen1.1 | 30iA – USB Gen1.1 | 30iB – USB Gen1.1 |
RS232C (25pin) | RS232C (25pin) | ||
Press: Van Dorn 1760 | UI: Pathfinder5000 | Press: | UI: Pathfinder5000 |
RJG | Node: 8 | RJG | Node: 1 |
SN: 12EX500258 | SN: 22EX500036 |
Experimental Goals:
Topology Map:

Phase I:
- Connect a Raspberry Pi 4 with appropriate networking OS to the LAN
- Connect 2x WiFI Dongles compatible with the FANUC Controller’s old USB to the 30iA and 30iB
- Program Laptop or other computer for use in the Shop to set up python network scripts, SQL database, FANUC programming transformation calls, WorkCell Geometry Data etc.
According to my initial research, the USB Gen 1.1 should have legacy compatibility with these devices:
Edimax EW-7811Un V2 – N150 Wi-Fi 4 Nano USB Adapter (1-11-25 Listing)

Currently they are priced around $9.98 USB, so about $20 to find out how well they work for this experiment.

10 Lot DB25 Male to RJ45 Network 8P8C Adapter Converter Gold-Pin Connector RS232 (New) (1-15-2025 Listing)

The DB25 Male to RJ56 connection can allow for the use of standard Cat 5 or Cat 6 cable instead of USB connections. Additional USB to RJ45 adapters may be needed on the Pi depending on the number of data devices connected. List price is $19.99 USD for 10 of them. They only have 8 pins however.
New USB 2.0 to RS232 COM Port 9 Pin Serial DB25 DB9 Adapter Cable Converter (New) (1-13-25 Listing)

To connect RJ3 series RS-232-C to the raspberry Pi 4 B, USB Gen 2.0 compatibility. $8.85 USD

Plugable 2.5G USB Ethernet Adapter To 2.5 Gigabit RJ45 LAN Network Adapter/Cable (New) (1-13-25 Listing)

To connect Raspberry Pi 4 B to RJ3i series with RJ45 Option 2 Installed. $14.99 USD.
Raspberry Pi 4 Model B 2019 Quad Core 64 Bit WiFi Bluetooth (4GB) (WY) (Pre-Owned) (1-11-25 Listing)

This is a used Raspberry Pi 4 B with enough RAM for networking and Data processing purposes. It is listed at $49.99 USD as of 1-11-25. New Models with accessories run about $75.00 USD as of the same date.
Finally a laptop, which I need to get a new one anyway sometime this month. I plan on developing the networking and python scripting infrastructure on it in an Arch Linux OS and running it as a system service once the appropriate Virtual Environment and Scripts are constructed. UI can consist of a Webserver on port 80 of this device for compatibility with other OS being used at the factory. (Ugh… windows, why?)
Raspberry Pi OS will most likely be Raspberry Pi OS Lite (minimal, stable, less overhead).
Here are some initial script stubs:
FANUC Data Collection Interface (And RJG Stubs for future Dev)
import socket
import threading
import time
import logging
from datetime import datetime
import sqlite3
from pathlib import Path
import focas
import pycomm3
import paramiko
class FanucInterface:
def __init__(self, config):
self.config = config
self.setup_logging()
self.setup_database()
self.running = False
def setup_logging(self):
logging.basicConfig(
filename='fanuc_interface.log',
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
def setup_database(self):
self.conn = sqlite3.connect('production_data.db')
self.cursor = self.conn.cursor()
# Create tables for storing machine data
self.cursor.execute('''
CREATE TABLE IF NOT EXISTS machine_data (
timestamp TEXT,
machine_id TEXT,
program_number TEXT,
status TEXT,
cycle_time REAL,
alarm_code TEXT
)
''')
self.conn.commit()
def connect_fanuc(self, ip_address, port=8193):
"""Connect to FANUC controller via FOCAS"""
try:
handle = focas.connect(ip_address, port)
logging.info(f"Connected to FANUC at {ip_address}")
return handle
except Exception as e:
logging.error(f"Failed to connect to FANUC: {e}")
return None
def read_program(self, handle, program_number):
"""Read program from FANUC controller"""
try:
program = focas.read_program(handle, program_number)
return program
except Exception as e:
logging.error(f"Failed to read program {program_number}: {e}")
return None
def write_program(self, handle, program_number, program_content):
"""Write program to FANUC controller"""
try:
focas.write_program(handle, program_number, program_content)
logging.info(f"Program {program_number} written successfully")
return True
except Exception as e:
logging.error(f"Failed to write program {program_number}: {e}")
return False
def collect_machine_data(self, handle):
"""Collect current machine status and data"""
try:
status = focas.read_status(handle)
alarms = focas.read_alarms(handle)
program = focas.read_current_program(handle)
position = focas.read_position(handle)
data = {
'timestamp': datetime.now().isoformat(),
'status': status,
'program': program,
'position': position,
'alarms': alarms
}
self.store_data(data)
return data
except Exception as e:
logging.error(f"Failed to collect machine data: {e}")
return None
def connect_rjg(self, ip_address, port):
"""Connect to RJG system via Ethernet/IP"""
try:
comm = pycomm3.LogixDriver(f'{{ip_address}}')
return comm
except Exception as e:
logging.error(f"Failed to connect to RJG: {e}")
return None
def read_rjg_data(self, comm):
"""Read data from RJG system"""
try:
# Example tags - modify based on your RJG setup
data = {
'timestamp': datetime.now().isoformat(),
'pressure': comm.read('Pressure')[1],
'temperature': comm.read('Temperature')[1],
'cycle_time': comm.read('CycleTime')[1]
}
self.store_data(data, 'rjg_data')
return data
except Exception as e:
logging.error(f"Failed to read RJG data: {e}")
return None
def store_data(self, data, table='machine_data'):
"""Store collected data in SQLite database"""
try:
columns = ', '.join(data.keys())
placeholders = ':' + ', :'.join(data.keys())
query = f'INSERT INTO {table} ({columns}) VALUES ({placeholders})'
self.cursor.execute(query, data)
self.conn.commit()
except Exception as e:
logging.error(f"Failed to store data: {e}")
def start_monitoring(self):
"""Start continuous monitoring of all systems"""
self.running = True
self.monitor_thread = threading.Thread(target=self._monitoring_loop)
self.monitor_thread.start()
def stop_monitoring(self):
"""Stop monitoring systems"""
self.running = False
if hasattr(self, 'monitor_thread'):
self.monitor_thread.join()
def _monitoring_loop(self):
"""Main monitoring loop"""
fanuc_30iB = self.connect_fanuc(self.config['fanuc_30iB_ip'])
fanuc_30iA = self.connect_fanuc(self.config['fanuc_30iA_ip'])
rjg_comm = self.connect_rjg(self.config['rjg_ip'], self.config['rjg_port'])
while self.running:
try:
if fanuc_30iB:
data_30iB = self.collect_machine_data(fanuc_30iB)
if fanuc_30iA:
data_30iA = self.collect_machine_data(fanuc_30iA)
if rjg_comm:
rjg_data = self.read_rjg_data(rjg_comm)
time.sleep(self.config['polling_interval'])
except Exception as e:
logging.error(f"Error in monitoring loop: {e}")
time.sleep(5) # Wait before retrying
def __del__(self):
"""Cleanup on object destruction"""
if hasattr(self, 'conn'):
self.conn.close()
# Example configuration
config = {
'fanuc_30iB_ip': '192.168.1.100',
'fanuc_30iA_ip': '192.168.1.101',
'rjg_ip': '192.168.1.102',
'rjg_port': 44818, # Standard Ethernet/IP port
'polling_interval': 1.0 # seconds
}
# Usage example
if __name__ == "__main__":
interface = FanucInterface(config)
try:
interface.start_monitoring()
# Keep the main thread alive
while True:
time.sleep(1)
except KeyboardInterrupt:
interface.stop_monitoring()
Remember this is a speculative first pass script. Any suggestions are appreciated.
Packages needed in virtual environment on Pi device:
pip install focas-python pycomm3 paramiko sqlite3 logging
Key Features:
- Multiple Protocol Support:
- FOCAS library for FANUC communication
- Ethernet/IP for RJG communication
- Built-in error handling and reconnection logic
- Data Collection:
- Machine status
- Program information
- Alarm conditions
- RJG process data
- Automatic SQLite storage
- Program Management:
- Read/Write capabilities for robot programs
- Program backup functionality
- Version control support
- Diagnostics:
- Real-time status monitoring
- Alarm tracking
- Performance metrics
- Comprehensive logging
Setup Steps:
- Install Raspberry Pi OS Lite
- Configure static IP for the Pi
- Install required Python packages
- Configure network settings for all devices
- Set up the database structure
- Configure logging
- Set up automatic startup service
Security Considerations:
- Use VLANs to isolate machine network
- Implement proper firewall rules
- Use strong WiFi security (WPA2-Enterprise if possible)
- Regular security updates
- Access control for program modifications
This system should give you a reliable interface for managing both FANUC controllers and collecting RJG data. The modular design allows for easy expansion and maintenance.
(Generated By Claude AI)
As far as the Shop computer goes, I intend to run Arch btw. Arch Linux provides the most versatile and open OS for software development along any dependency path.
Here are some of the basic first pass scripts and environment configurations:
Arch Linux OS Environment Setup:
sudo pacman -S python python-pip git sqlite mariadb python-virtualenv
sudo pacman -S python-poetry dbeaver # For database management
sudo pacman -S qtcreator qt6-base # If you want to build a GUI
Most likely I will be using yay instead of pacman in order to incorporate AUR packages, possibly developing and maintaining an AUR package for these purposes.
Virtual Environment Setup:
python -m venv venv
source venv/bin/activate
pip install PyQt6 pandas matplotlib numpy focas-python pycomm3
Depending on system constraints, I would prefer to use miniconda3 as it provides more flexibility in using different versions of python, which most likely will be needed when we get around to using machine learning algorithms on this setup for homebrew copilot, vision recognition models and other technologies as determined to be effective and useful in practical applications.
Database Setup:
sqlite3 production_data.db
Shop Computer Script:
import sys
import sqlite3
from datetime import datetime, timedelta
import pandas as pd
from PyQt6.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout,
QHBoxLayout, QTableWidget, QTableWidgetItem, QPushButton,
QLabel, QComboBox, QTabWidget, QTextEdit)
from PyQt6.QtCore import Qt, QTimer
import matplotlib.pyplot as plt
from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas
class ShopFloorClient(QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle("Shop Floor Monitor")
self.setGeometry(100, 100, 1200, 800)
# Create main widget and layout
main_widget = QWidget()
self.setCentralWidget(main_widget)
layout = QVBoxLayout(main_widget)
# Create tab widget
tabs = QTabWidget()
layout.addWidget(tabs)
# Add tabs
tabs.addTab(self.create_monitor_tab(), "Live Monitor")
tabs.addTab(self.create_program_tab(), "Programs")
tabs.addTab(self.create_analysis_tab(), "Analysis")
# Setup database connection
self.setup_database()
# Start update timer
self.timer = QTimer()
self.timer.timeout.connect(self.update_data)
self.timer.start(1000) # Update every second
def setup_database(self):
"""Initialize database connection"""
try:
self.conn = sqlite3.connect('production_data.db')
self.cursor = self.conn.cursor()
except Exception as e:
print(f"Database connection error: {e}")
def create_monitor_tab(self):
"""Create the live monitoring tab"""
tab = QWidget()
layout = QVBoxLayout(tab)
# Machine selection
machine_layout = QHBoxLayout()
machine_label = QLabel("Select Machine:")
self.machine_combo = QComboBox()
self.machine_combo.addItems(["FANUC 30iB", "FANUC 30iA", "RJG"])
machine_layout.addWidget(machine_label)
machine_layout.addWidget(self.machine_combo)
machine_layout.addStretch()
layout.addLayout(machine_layout)
# Status display
self.status_table = QTableWidget()
self.status_table.setColumnCount(4)
self.status_table.setHorizontalHeaderLabels(["Parameter", "Value", "Status", "Last Update"])
layout.addWidget(self.status_table)
# Alarm display
alarm_label = QLabel("Active Alarms:")
layout.addWidget(alarm_label)
self.alarm_text = QTextEdit()
self.alarm_text.setReadOnly(True)
layout.addWidget(self.alarm_text)
return tab
def create_program_tab(self):
"""Create the program management tab"""
tab = QWidget()
layout = QVBoxLayout(tab)
# Program selection
prog_layout = QHBoxLayout()
self.program_combo = QComboBox()
upload_btn = QPushButton("Upload")
download_btn = QPushButton("Download")
prog_layout.addWidget(QLabel("Program:"))
prog_layout.addWidget(self.program_combo)
prog_layout.addWidget(upload_btn)
prog_layout.addWidget(download_btn)
layout.addLayout(prog_layout)
# Program editor
self.program_editor = QTextEdit()
layout.addWidget(self.program_editor)
# Connect buttons
upload_btn.clicked.connect(self.upload_program)
download_btn.clicked.connect(self.download_program)
return tab
def create_analysis_tab(self):
"""Create the data analysis tab"""
tab = QWidget()
layout = QVBoxLayout(tab)
# Time range selection
time_layout = QHBoxLayout()
self.time_combo = QComboBox()
self.time_combo.addItems(["Last Hour", "Last Shift", "Last Day", "Last Week"])
time_layout.addWidget(QLabel("Time Range:"))
time_layout.addWidget(self.time_combo)
layout.addLayout(time_layout)
# Create matplotlib figure
self.figure, self.ax = plt.subplots()
self.canvas = FigureCanvas(self.figure)
layout.addWidget(self.canvas)
# Analysis buttons
button_layout = QHBoxLayout()
cycle_btn = QPushButton("Cycle Time Analysis")
alarm_btn = QPushButton("Alarm Analysis")
export_btn = QPushButton("Export Data")
button_layout.addWidget(cycle_btn)
button_layout.addWidget(alarm_btn)
button_layout.addWidget(export_btn)
layout.addLayout(button_layout)
# Connect buttons
cycle_btn.clicked.connect(self.analyze_cycle_time)
alarm_btn.clicked.connect(self.analyze_alarms)
export_btn.clicked.connect(self.export_data)
return tab
def update_data(self):
"""Update live data displays"""
try:
machine = self.machine_combo.currentText()
# Query latest data for selected machine
self.cursor.execute("""
SELECT * FROM machine_data
WHERE machine_id = ?
ORDER BY timestamp DESC LIMIT 1
""", (machine,))
data = self.cursor.fetchone()
if data:
self.update_status_table(data)
self.update_alarms(data)
except Exception as e:
print(f"Update error: {e}")
def update_status_table(self, data):
"""Update the status table with latest data"""
self.status_table.setRowCount(0)
for key, value in data.items():
row = self.status_table.rowCount()
self.status_table.insertRow(row)
self.status_table.setItem(row, 0, QTableWidgetItem(key))
self.status_table.setItem(row, 1, QTableWidgetItem(str(value)))
def update_alarms(self, data):
"""Update alarm display"""
if 'alarms' in data and data['alarms']:
self.alarm_text.setText('\n'.join(data['alarms']))
else:
self.alarm_text.setText("No active alarms")
def upload_program(self):
"""Upload program to selected machine"""
program = self.program_editor.toPlainText()
machine = self.machine_combo.currentText()
program_number = self.program_combo.currentText()
# Implement program upload logic here
def download_program(self):
"""Download program from selected machine"""
machine = self.machine_combo.currentText()
program_number = self.program_combo.currentText()
# Implement program download logic here
def analyze_cycle_time(self):
"""Perform cycle time analysis"""
time_range = self.time_combo.currentText()
# Query and analyze cycle time data
self.ax.clear()
# Plot cycle time data
self.canvas.draw()
def analyze_alarms(self):
"""Perform alarm analysis"""
time_range = self.time_combo.currentText()
# Query and analyze alarm data
self.ax.clear()
# Plot alarm data
self.canvas.draw()
def export_data(self):
"""Export analysis data to CSV"""
time_range = self.time_combo.currentText()
# Query data and export to CSV
def closeEvent(self, event):
"""Clean up on close"""
self.timer.stop()
self.conn.close()
event.accept()
if __name__ == '__main__':
app = QApplication(sys.argv)
window = ShopFloorClient()
window.show()
sys.exit(app.exec())
Once again, this is a draft. Likely development will proceed as follows:
- Attain functional Data Management and Transfer
- Harden Security
- Iterations of development to improve functionality, compatibility and security as time progresses.
Key Features of the Shop Client:
- Live monitoring of all connected machines
- Program upload/download capability
- Real-time status updates
- Alarm monitoring
- Data analysis tools
- Export functionality for reports
System Service Unit File:
[Unit]
Description=Shop Floor Monitoring Client
After=network.target
[Service]
Type=simple
User=YOUR_USERNAME
WorkingDirectory=/path/to/application
Environment=DISPLAY=:0
ExecStart=/path/to/venv/bin/python shop_client.py
Restart=always
[Install]
WantedBy=multi-user.target
So there you have it, an initial speculative toplogy for WorkCell IoT from the Shop. Please let me know your concerns and suggestions. Thank you.