Raspberry Pi 4 B IoT Data Collection: Single Workcell

In order to examine consolidation of production data and program management, it is prudent to create a LAN IoT solution for a single WorkCell as an experiment. The feasibility and best practices can be determined from this scope while limiting research cost and potential consequential complications.

In furthering this experiment, notes have been taken regarding Workcell 11 and Workcell 13. The Information is presented below:

Workcell 11Workcell 13
Degate ControllerPick-and-Place ControllerDegate ControllerPick-and-Place Controller
30iA – USB Gen1.130iB – USB Gen1.130iA – USB Gen1.130iB – USB Gen1.1
RS232C (25pin)RS232C (25pin)
Press: Van Dorn 1760UI: Pathfinder5000Press:UI: Pathfinder5000
RJGNode: 8RJGNode: 1
SN: 12EX500258SN: 22EX500036

Experimental Goals:

Topology Map:

Phase I:

  1. Connect a Raspberry Pi 4 with appropriate networking OS to the LAN
  2. Connect 2x WiFI Dongles compatible with the FANUC Controller’s old USB to the 30iA and 30iB
  3. Program Laptop or other computer for use in the Shop to set up python network scripts, SQL database, FANUC programming transformation calls, WorkCell Geometry Data etc.

According to my initial research, the USB Gen 1.1 should have legacy compatibility with these devices:

Edimax EW-7811Un V2 – N150 Wi-Fi 4 Nano USB Adapter (1-11-25 Listing)

Currently they are priced around $9.98 USB, so about $20 to find out how well they work for this experiment.

10 Lot DB25 Male to RJ45 Network 8P8C Adapter Converter Gold-Pin Connector RS232 (New) (1-15-2025 Listing)

The DB25 Male to RJ56 connection can allow for the use of standard Cat 5 or Cat 6 cable instead of USB connections. Additional USB to RJ45 adapters may be needed on the Pi depending on the number of data devices connected. List price is $19.99 USD for 10 of them. They only have 8 pins however.

New USB 2.0 to RS232 COM Port 9 Pin Serial DB25 DB9 Adapter Cable Converter (New) (1-13-25 Listing)

To connect RJ3 series RS-232-C to the raspberry Pi 4 B, USB Gen 2.0 compatibility. $8.85 USD

Plugable 2.5G USB Ethernet Adapter To 2.5 Gigabit RJ45 LAN Network Adapter/Cable (New) (1-13-25 Listing)

To connect Raspberry Pi 4 B to RJ3i series with RJ45 Option 2 Installed. $14.99 USD.

Raspberry Pi 4 Model B 2019 Quad Core 64 Bit WiFi Bluetooth (4GB) (WY) (Pre-Owned) (1-11-25 Listing)

This is a used Raspberry Pi 4 B with enough RAM for networking and Data processing purposes. It is listed at $49.99 USD as of 1-11-25. New Models with accessories run about $75.00 USD as of the same date.

Finally a laptop, which I need to get a new one anyway sometime this month. I plan on developing the networking and python scripting infrastructure on it in an Arch Linux OS and running it as a system service once the appropriate Virtual Environment and Scripts are constructed. UI can consist of a Webserver on port 80 of this device for compatibility with other OS being used at the factory. (Ugh… windows, why?)

Raspberry Pi OS will most likely be Raspberry Pi OS Lite (minimal, stable, less overhead).

Here are some initial script stubs:

FANUC Data Collection Interface (And RJG Stubs for future Dev)

import socket
import threading
import time
import logging
from datetime import datetime
import sqlite3
from pathlib import Path
import focas
import pycomm3
import paramiko

class FanucInterface:
    def __init__(self, config):
        self.config = config
        self.setup_logging()
        self.setup_database()
        self.running = False
        
    def setup_logging(self):
        logging.basicConfig(
            filename='fanuc_interface.log',
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s'
        )
        
    def setup_database(self):
        self.conn = sqlite3.connect('production_data.db')
        self.cursor = self.conn.cursor()
        # Create tables for storing machine data
        self.cursor.execute('''
            CREATE TABLE IF NOT EXISTS machine_data (
                timestamp TEXT,
                machine_id TEXT,
                program_number TEXT,
                status TEXT,
                cycle_time REAL,
                alarm_code TEXT
            )
        ''')
        self.conn.commit()

    def connect_fanuc(self, ip_address, port=8193):
        """Connect to FANUC controller via FOCAS"""
        try:
            handle = focas.connect(ip_address, port)
            logging.info(f"Connected to FANUC at {ip_address}")
            return handle
        except Exception as e:
            logging.error(f"Failed to connect to FANUC: {e}")
            return None

    def read_program(self, handle, program_number):
        """Read program from FANUC controller"""
        try:
            program = focas.read_program(handle, program_number)
            return program
        except Exception as e:
            logging.error(f"Failed to read program {program_number}: {e}")
            return None

    def write_program(self, handle, program_number, program_content):
        """Write program to FANUC controller"""
        try:
            focas.write_program(handle, program_number, program_content)
            logging.info(f"Program {program_number} written successfully")
            return True
        except Exception as e:
            logging.error(f"Failed to write program {program_number}: {e}")
            return False

    def collect_machine_data(self, handle):
        """Collect current machine status and data"""
        try:
            status = focas.read_status(handle)
            alarms = focas.read_alarms(handle)
            program = focas.read_current_program(handle)
            position = focas.read_position(handle)
            
            data = {
                'timestamp': datetime.now().isoformat(),
                'status': status,
                'program': program,
                'position': position,
                'alarms': alarms
            }
            
            self.store_data(data)
            return data
        except Exception as e:
            logging.error(f"Failed to collect machine data: {e}")
            return None

    def connect_rjg(self, ip_address, port):
        """Connect to RJG system via Ethernet/IP"""
        try:
            comm = pycomm3.LogixDriver(f'{{ip_address}}')
            return comm
        except Exception as e:
            logging.error(f"Failed to connect to RJG: {e}")
            return None

    def read_rjg_data(self, comm):
        """Read data from RJG system"""
        try:
            # Example tags - modify based on your RJG setup
            data = {
                'timestamp': datetime.now().isoformat(),
                'pressure': comm.read('Pressure')[1],
                'temperature': comm.read('Temperature')[1],
                'cycle_time': comm.read('CycleTime')[1]
            }
            self.store_data(data, 'rjg_data')
            return data
        except Exception as e:
            logging.error(f"Failed to read RJG data: {e}")
            return None

    def store_data(self, data, table='machine_data'):
        """Store collected data in SQLite database"""
        try:
            columns = ', '.join(data.keys())
            placeholders = ':' + ', :'.join(data.keys())
            query = f'INSERT INTO {table} ({columns}) VALUES ({placeholders})'
            self.cursor.execute(query, data)
            self.conn.commit()
        except Exception as e:
            logging.error(f"Failed to store data: {e}")

    def start_monitoring(self):
        """Start continuous monitoring of all systems"""
        self.running = True
        self.monitor_thread = threading.Thread(target=self._monitoring_loop)
        self.monitor_thread.start()

    def stop_monitoring(self):
        """Stop monitoring systems"""
        self.running = False
        if hasattr(self, 'monitor_thread'):
            self.monitor_thread.join()

    def _monitoring_loop(self):
        """Main monitoring loop"""
        fanuc_30iB = self.connect_fanuc(self.config['fanuc_30iB_ip'])
        fanuc_30iA = self.connect_fanuc(self.config['fanuc_30iA_ip'])
        rjg_comm = self.connect_rjg(self.config['rjg_ip'], self.config['rjg_port'])

        while self.running:
            try:
                if fanuc_30iB:
                    data_30iB = self.collect_machine_data(fanuc_30iB)
                if fanuc_30iA:
                    data_30iA = self.collect_machine_data(fanuc_30iA)
                if rjg_comm:
                    rjg_data = self.read_rjg_data(rjg_comm)
                    
                time.sleep(self.config['polling_interval'])
                
            except Exception as e:
                logging.error(f"Error in monitoring loop: {e}")
                time.sleep(5)  # Wait before retrying

    def __del__(self):
        """Cleanup on object destruction"""
        if hasattr(self, 'conn'):
            self.conn.close()

# Example configuration
config = {
    'fanuc_30iB_ip': '192.168.1.100',
    'fanuc_30iA_ip': '192.168.1.101',
    'rjg_ip': '192.168.1.102',
    'rjg_port': 44818,  # Standard Ethernet/IP port
    'polling_interval': 1.0  # seconds
}

# Usage example
if __name__ == "__main__":
    interface = FanucInterface(config)
    try:
        interface.start_monitoring()
        # Keep the main thread alive
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        interface.stop_monitoring()

Remember this is a speculative first pass script. Any suggestions are appreciated.

Packages needed in virtual environment on Pi device:

pip install focas-python pycomm3 paramiko sqlite3 logging

Key Features:

  1. Multiple Protocol Support:
    • FOCAS library for FANUC communication
    • Ethernet/IP for RJG communication
    • Built-in error handling and reconnection logic
  2. Data Collection:
    • Machine status
    • Program information
    • Alarm conditions
    • RJG process data
    • Automatic SQLite storage
  3. Program Management:
    • Read/Write capabilities for robot programs
    • Program backup functionality
    • Version control support
  4. Diagnostics:
    • Real-time status monitoring
    • Alarm tracking
    • Performance metrics
    • Comprehensive logging

Setup Steps:

  1. Install Raspberry Pi OS Lite
  2. Configure static IP for the Pi
  3. Install required Python packages
  4. Configure network settings for all devices
  5. Set up the database structure
  6. Configure logging
  7. Set up automatic startup service

Security Considerations:

  1. Use VLANs to isolate machine network
  2. Implement proper firewall rules
  3. Use strong WiFi security (WPA2-Enterprise if possible)
  4. Regular security updates
  5. Access control for program modifications

This system should give you a reliable interface for managing both FANUC controllers and collecting RJG data. The modular design allows for easy expansion and maintenance.

(Generated By Claude AI)

As far as the Shop computer goes, I intend to run Arch btw. Arch Linux provides the most versatile and open OS for software development along any dependency path.

Here are some of the basic first pass scripts and environment configurations:

Arch Linux OS Environment Setup:

sudo pacman -S python python-pip git sqlite mariadb python-virtualenv
sudo pacman -S python-poetry dbeaver # For database management
sudo pacman -S qtcreator qt6-base # If you want to build a GUI

Most likely I will be using yay instead of pacman in order to incorporate AUR packages, possibly developing and maintaining an AUR package for these purposes.

Virtual Environment Setup:

python -m venv venv
source venv/bin/activate
pip install PyQt6 pandas matplotlib numpy focas-python pycomm3

Depending on system constraints, I would prefer to use miniconda3 as it provides more flexibility in using different versions of python, which most likely will be needed when we get around to using machine learning algorithms on this setup for homebrew copilot, vision recognition models and other technologies as determined to be effective and useful in practical applications.

Database Setup:

sqlite3 production_data.db

Shop Computer Script:

import sys
import sqlite3
from datetime import datetime, timedelta
import pandas as pd
from PyQt6.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout, 
                           QHBoxLayout, QTableWidget, QTableWidgetItem, QPushButton, 
                           QLabel, QComboBox, QTabWidget, QTextEdit)
from PyQt6.QtCore import Qt, QTimer
import matplotlib.pyplot as plt
from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas

class ShopFloorClient(QMainWindow):
    def __init__(self):
        super().__init__()
        self.setWindowTitle("Shop Floor Monitor")
        self.setGeometry(100, 100, 1200, 800)
        
        # Create main widget and layout
        main_widget = QWidget()
        self.setCentralWidget(main_widget)
        layout = QVBoxLayout(main_widget)
        
        # Create tab widget
        tabs = QTabWidget()
        layout.addWidget(tabs)
        
        # Add tabs
        tabs.addTab(self.create_monitor_tab(), "Live Monitor")
        tabs.addTab(self.create_program_tab(), "Programs")
        tabs.addTab(self.create_analysis_tab(), "Analysis")
        
        # Setup database connection
        self.setup_database()
        
        # Start update timer
        self.timer = QTimer()
        self.timer.timeout.connect(self.update_data)
        self.timer.start(1000)  # Update every second
        
    def setup_database(self):
        """Initialize database connection"""
        try:
            self.conn = sqlite3.connect('production_data.db')
            self.cursor = self.conn.cursor()
        except Exception as e:
            print(f"Database connection error: {e}")
            
    def create_monitor_tab(self):
        """Create the live monitoring tab"""
        tab = QWidget()
        layout = QVBoxLayout(tab)
        
        # Machine selection
        machine_layout = QHBoxLayout()
        machine_label = QLabel("Select Machine:")
        self.machine_combo = QComboBox()
        self.machine_combo.addItems(["FANUC 30iB", "FANUC 30iA", "RJG"])
        machine_layout.addWidget(machine_label)
        machine_layout.addWidget(self.machine_combo)
        machine_layout.addStretch()
        layout.addLayout(machine_layout)
        
        # Status display
        self.status_table = QTableWidget()
        self.status_table.setColumnCount(4)
        self.status_table.setHorizontalHeaderLabels(["Parameter", "Value", "Status", "Last Update"])
        layout.addWidget(self.status_table)
        
        # Alarm display
        alarm_label = QLabel("Active Alarms:")
        layout.addWidget(alarm_label)
        self.alarm_text = QTextEdit()
        self.alarm_text.setReadOnly(True)
        layout.addWidget(self.alarm_text)
        
        return tab
        
    def create_program_tab(self):
        """Create the program management tab"""
        tab = QWidget()
        layout = QVBoxLayout(tab)
        
        # Program selection
        prog_layout = QHBoxLayout()
        self.program_combo = QComboBox()
        upload_btn = QPushButton("Upload")
        download_btn = QPushButton("Download")
        prog_layout.addWidget(QLabel("Program:"))
        prog_layout.addWidget(self.program_combo)
        prog_layout.addWidget(upload_btn)
        prog_layout.addWidget(download_btn)
        layout.addLayout(prog_layout)
        
        # Program editor
        self.program_editor = QTextEdit()
        layout.addWidget(self.program_editor)
        
        # Connect buttons
        upload_btn.clicked.connect(self.upload_program)
        download_btn.clicked.connect(self.download_program)
        
        return tab
        
    def create_analysis_tab(self):
        """Create the data analysis tab"""
        tab = QWidget()
        layout = QVBoxLayout(tab)
        
        # Time range selection
        time_layout = QHBoxLayout()
        self.time_combo = QComboBox()
        self.time_combo.addItems(["Last Hour", "Last Shift", "Last Day", "Last Week"])
        time_layout.addWidget(QLabel("Time Range:"))
        time_layout.addWidget(self.time_combo)
        layout.addLayout(time_layout)
        
        # Create matplotlib figure
        self.figure, self.ax = plt.subplots()
        self.canvas = FigureCanvas(self.figure)
        layout.addWidget(self.canvas)
        
        # Analysis buttons
        button_layout = QHBoxLayout()
        cycle_btn = QPushButton("Cycle Time Analysis")
        alarm_btn = QPushButton("Alarm Analysis")
        export_btn = QPushButton("Export Data")
        button_layout.addWidget(cycle_btn)
        button_layout.addWidget(alarm_btn)
        button_layout.addWidget(export_btn)
        layout.addLayout(button_layout)
        
        # Connect buttons
        cycle_btn.clicked.connect(self.analyze_cycle_time)
        alarm_btn.clicked.connect(self.analyze_alarms)
        export_btn.clicked.connect(self.export_data)
        
        return tab
    
    def update_data(self):
        """Update live data displays"""
        try:
            machine = self.machine_combo.currentText()
            # Query latest data for selected machine
            self.cursor.execute("""
                SELECT * FROM machine_data 
                WHERE machine_id = ? 
                ORDER BY timestamp DESC LIMIT 1
            """, (machine,))
            data = self.cursor.fetchone()
            
            if data:
                self.update_status_table(data)
                self.update_alarms(data)
        except Exception as e:
            print(f"Update error: {e}")
            
    def update_status_table(self, data):
        """Update the status table with latest data"""
        self.status_table.setRowCount(0)
        for key, value in data.items():
            row = self.status_table.rowCount()
            self.status_table.insertRow(row)
            self.status_table.setItem(row, 0, QTableWidgetItem(key))
            self.status_table.setItem(row, 1, QTableWidgetItem(str(value)))
            
    def update_alarms(self, data):
        """Update alarm display"""
        if 'alarms' in data and data['alarms']:
            self.alarm_text.setText('\n'.join(data['alarms']))
        else:
            self.alarm_text.setText("No active alarms")
            
    def upload_program(self):
        """Upload program to selected machine"""
        program = self.program_editor.toPlainText()
        machine = self.machine_combo.currentText()
        program_number = self.program_combo.currentText()
        # Implement program upload logic here
        
    def download_program(self):
        """Download program from selected machine"""
        machine = self.machine_combo.currentText()
        program_number = self.program_combo.currentText()
        # Implement program download logic here
        
    def analyze_cycle_time(self):
        """Perform cycle time analysis"""
        time_range = self.time_combo.currentText()
        # Query and analyze cycle time data
        self.ax.clear()
        # Plot cycle time data
        self.canvas.draw()
        
    def analyze_alarms(self):
        """Perform alarm analysis"""
        time_range = self.time_combo.currentText()
        # Query and analyze alarm data
        self.ax.clear()
        # Plot alarm data
        self.canvas.draw()
        
    def export_data(self):
        """Export analysis data to CSV"""
        time_range = self.time_combo.currentText()
        # Query data and export to CSV
        
    def closeEvent(self, event):
        """Clean up on close"""
        self.timer.stop()
        self.conn.close()
        event.accept()

if __name__ == '__main__':
    app = QApplication(sys.argv)
    window = ShopFloorClient()
    window.show()
    sys.exit(app.exec())

Once again, this is a draft. Likely development will proceed as follows:

  1. Attain functional Data Management and Transfer
  2. Harden Security
  3. Iterations of development to improve functionality, compatibility and security as time progresses.

Key Features of the Shop Client:

  1. Live monitoring of all connected machines
  2. Program upload/download capability
  3. Real-time status updates
  4. Alarm monitoring
  5. Data analysis tools
  6. Export functionality for reports

System Service Unit File:

[Unit]
Description=Shop Floor Monitoring Client
After=network.target

[Service]
Type=simple
User=YOUR_USERNAME
WorkingDirectory=/path/to/application
Environment=DISPLAY=:0
ExecStart=/path/to/venv/bin/python shop_client.py
Restart=always

[Install]
WantedBy=multi-user.target

So there you have it, an initial speculative toplogy for WorkCell IoT from the Shop. Please let me know your concerns and suggestions. Thank you.

Leave a Reply

Your email address will not be published. Required fields are marked *