Intern

Paspberry PiのAIカメラ IMX500をいじってみた

デバイス

今回使ったデバイスは、SonyのラズパイカメラIMX500というものです。

写真を撮ってみる

では早速写真を撮っていきます。ラズパイへの接続イメージは下図を参照してください。

接続したら、ラズパイ内にフォルダーを作ります。テキストを作り下記のコードをペーストし、フォルダー名をcam_test.pyにします。

from picamera2 import Picamera2
import cv2
import paho.mqtt.client as mqtt
import json

# MQTTブローカーの設定
broker = "localhost"
port = 1883
topic = "camera/faces"

# MQTTクライアントの設定
client = mqtt.Client()

# コネクションのコールバック関数(接続後に呼ばれる)
def on_connect(client, userdata, flags, rc):
    print(f"Connected with result code {rc}")
    # データをPublish
    client.publish(topic, json.dumps(face_count))  # JSONを文字列として送信
    print(f"Data published to topic {topic}")

# カメラを初期化
camera = Picamera2()
camera.configure(camera.create_preview_configuration(main={"format": 'XRGB8888', "size": (640, 480)}))

# カメラを開始
camera.start()

# 画像をキャプチャ
img = camera.capture_array()

# カメラを停止
camera.stop()

# 顔検出器を初期化
face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')

# モデルが正しく読み込まれているか確認
if face_cascade.empty():
    print("顔検出モデルが読み込まれていません。")
    exit()

# グレースケール変換
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)

# 顔を検出
faces = face_cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30))

face_count = len(faces)

print(f"検出された顔の数: {face_count}")

# 検出した顔を描画
for (x, y, w, h) in faces:
    cv2.rectangle(img, (x, y), (x + w, y + h), (255, 0, 0), 2)

# 画像を保存
if not cv2.imwrite('test2.jpg', img):
    print("test2.jpg の保存に失敗しました。")
    exit()

print("顔検出後の画像が保存されました: test2.jpg")

# MQTTブローカーへの接続
client.on_connect = on_connect  # 接続時に呼ばれるコールバック関数を設定
client.connect(broker, port, 60)  # ブローカーのIPとポートを指定

# ループを開始して、接続とメッセージ送信を待機
client.loop_start()

# MQTT処理を終了する場合(処理が完了した後に呼び出す)
client.loop_stop()
client.disconnect()Code language: PHP (php)

そうしましたら、terminalで「python3 動作したいフォルダー」を呼びます。今回は「python3 cam_test.py」と打ちます。
するとカメラが写真を撮り、指定したフォルダーに保存されます。

ファイルの保存先は、コードの# 画像を保存
if not cv2.imwrite(‘test2.jpg’, img):
print(“test2.jpg の保存に失敗しました。”)
exit()
の部分で決めています。
この部分を変えることで保存先を変えることができます。

次に、もう1つテキストを作り下記のコードをコピーし、名前を「cam_cars.py」というフォルダーにします。

import cv2
import paho.mqtt.client as mqtt
import json

# MQTTブローカーの設定
broker = "localhost"
port = 1883
topic = "camera/cars"

# MQTTクライアントの設定
client = mqtt.Client()

# コネクションのコールバック関数(接続後に呼ばれる)
def on_connect(client, userdata, flags, rc):
    print(f"Connected with result code {rc}")
    # データをPublish
    client.publish(topic, json.dumps(car_count))  # JSONを文字列として送信
    print(f"Data published to topic {topic}")

# 車検出器を初期化
car_cascade = cv2.CascadeClassifier('haarcascade_car.xml')  # モデルファイルのパスを指定

# モデルが正しく読み込まれているか確認
if car_cascade.empty():
    print("車検出モデルが読み込まれていません。")
    exit()

# 画像を読み込む(事前に画像があることが前提)
img = cv2.imread('parking.jpg')
if img is None:
    print("parking.jpg の読み込みに失敗しました。")
    exit()

# グレースケール変換
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)

# 車を検出 scaleFactor=検出分母と思って大丈夫、minNeighbors=重なった部分について、サイズはpx数を参照
cars = car_cascade.detectMultiScale(gray, scaleFactor=1.008, minNeighbors=1, minSize=(70, 70), maxSize=(115, 115))

car_count = len(cars)

print(f"検出された車の数: {car_count}")

# 検出した車を描画
for (x, y, w, h) in cars:
    cv2.rectangle(img, (x, y), (x + w, y + h), (0, 255, 0), 2)  # 車を緑色の矩形で描画

# 画像を保存
if not cv2.imwrite('test3.jpg', img):
    print("test3.jpg の保存に失敗しました。")
    exit()

print("車検出後の画像が保存されました: test3.jpg")


# MQTTブローカーへの接続
client.on_connect = on_connect  # 接続時に呼ばれるコールバック関数を設定
client.connect(broker, port, 60)  # ブローカーのIPとポートを指定

# ループを開始して、接続とメッセージ送信を待機
client.loop_start()

# MQTT処理を終了する場合(処理が完了した後に呼び出す)
client.loop_stop()
client.disconnect()Code language: PHP (php)

このコードは、保存された画像から車を検出するという内容になります。
どこからか車の画像を用意し、読み込むファイルを指定してあげます。(指定場所は下記参照)
# 画像を読み込む(事前に画像があることが前提)
img = cv2.imread(‘読み込むファイル名’)
if img is None:
print(“読み込むファイル名 の読み込みに失敗しました。”)
exit()
これで、terminalで動作をかけると、車個検出数と画像の保存先が返ってきます。

他にもオブジェクトやQRコードなどの検出もできました。

オブジェクト検出

from picamera2 import Picamera2
import paho.mqtt.client as mqtt
import json
import cv2

# YOLOモデルのパス設定
model_cfg = "/home/pi/yolo/yolov3.cfg"  # YOLOの設定ファイル
model_weights = "/home/pi/yolo/yolov3.weights"  # YOLOの重みファイル
class_names_file = "/home/pi/yolo/coco.names"  # 検出するクラス名

# クラス名を読み込む
with open(class_names_file, 'r') as f:
    class_names = f.read().strip().split('\n')

# YOLOネットワークの読み込み
net = cv2.dnn.readNetFromDarknet(model_cfg, model_weights)
net.setPreferableBackend(cv2.dnn.DNN_BACKEND_OPENCV)
net.setPreferableTarget(cv2.dnn.DNN_TARGET_CPU)

# MQTTブローカーの設定
broker = "localhost"
port = 1883
topic = "camera/objects"

# MQTTクライアントの設定
client = mqtt.Client()

# コネクションのコールバック関数(接続後に呼ばれる)
def on_connect(client, userdata, flags, rc):
    print(f"Connected with result code {rc}")
    # データをPublish
    client.publish(topic, json.dumps(read_objects))  # JSONを文字列として送信
    print(f"Data published to topic {topic}")

# カメラを初期化
camera = Picamera2()
camera.configure(camera.create_preview_configuration(main={"format": 'XRGB8888', "size": (2028, 1520)}))

# カメラを開始
camera.start()

# 画像をキャプチャ
image = camera.capture_array()

# カメラを停止
camera.stop()

# 画像をRGBに変換(チャンネルを3にする)
image_rgb = cv2.cvtColor(image, cv2.COLOR_RGBA2RGB)

# YOLO物体検出処理
blob = cv2.dnn.blobFromImage(image_rgb, 0.00392, (416, 416), (0, 0, 0), True, crop=False)

# ネットワークに入力
net.setInput(blob)

# 出力層を取得
layer_names = net.getLayerNames()
output_layers = [layer_names[i - 1] for i in net.getUnconnectedOutLayers()]

# 物体検出
layer_outputs = net.forward(output_layers)

# 検出結果を解析
boxes = []
confidences = []
class_ids = []
h, w = image.shape[:2]

for output in layer_outputs:
    for detection in output:
        scores = detection[5:]
        class_id = int(scores.argmax())
        confidence = scores[class_id]
        if confidence > 0.5:  # 信頼度閾値
            # 検出された物体の座標
            center_x, center_y, box_w, box_h = (detection[0:4] * [w, h, w, h]).astype("int")
            x = int(center_x - box_w / 2)
            y = int(center_y - box_h / 2)
            
            # バウンディングボックスを格納
            boxes.append([x, y, box_w, box_h])
            confidences.append(float(confidence))
            class_ids.append(class_id)

# NMSを適用して重複する検出を削除
indices = cv2.dnn.NMSBoxes(boxes, confidences, score_threshold=0.5, nms_threshold=0.4)

# 検出結果を描画
read_objects = []

if len(indices) > 0:
    for i in indices.flatten():
        x, y, w, h = boxes[i]
        label = class_names[class_ids[i]]
        confidence = confidences[i]
        
        # 検出結果を保存
        read_objects.append({
            "object": label,
            "confidence": float(confidence)
        })
        
        # バウンディングボックスを描画
        cv2.rectangle(image, (x, y), (x + w, y + h), (0, 255, 0), 2)
        text = f"{label}: {confidence:.2f}"
        cv2.putText(image, text, (x, y - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)

# 検出結果がなければ通知
if not read_objects:
    print("物体が検出されませんでした。")
else:
    print(f"検出された物体: {read_objects}")

# 画像を保存
if not cv2.imwrite('test_obj.jpg', image):
    print("test_obj.jpg の保存に失敗しました。")
    exit()

print("物体検出後の画像が保存されました: test_obj.jpg")

# MQTTブローカーへの接続
client.on_connect = on_connect  # 接続時に呼ばれるコールバック関数を設定
client.connect(broker, port, 60)  # ブローカーのIPとポートを指定

# ループを開始して、接続とメッセージ送信を待機
client.loop_start()

# MQTT処理を終了する場合(処理が完了した後に呼び出す)
client.loop_stop()
client.disconnect()Code language: PHP (php)

QRコード検出

from picamera2 import Picamera2
import cv2
import paho.mqtt.client as mqtt
import json

# MQTTブローカーの設定
broker = "localhost"
port = 1883
topic = "camera/qrcodes"

# MQTTクライアントの設定
client = mqtt.Client()

# コネクションのコールバック関数(接続後に呼ばれる)
def on_connect(client, userdata, flags, rc):
    print(f"Connected with result code {rc}")
    # データをPublish
    client.publish(topic, json.dumps(qr_result))  # JSONを文字列として送信
    print(f"Data published to topic {topic}")

# カメラを初期化
camera = Picamera2()
camera.configure(camera.create_preview_configuration(main={"format": 'XRGB8888', "size": (1280, 960)}))

# カメラを開始
camera.start()

# 画像をキャプチャ
image = camera.capture_array()

# カメラを停止
camera.stop()

# QRコード検出器を初期化
qr_detector = cv2.QRCodeDetector()

# QRコードを検出
data, points, _ = qr_detector.detectAndDecode(image)

qr_result = []
if data:
    print(f"検出されたQRコード: {data}")
    qr_result.append({"data": data, "points": points.tolist() if points is not None else []})

    # 検出結果を描画(QRコードが検出された場合)
    if points is not None:
        points = points[0]  # 頂点座標を取得
        for i in range(len(points)):
            pt1 = tuple(map(int, points[i]))  # 整数に変換
            pt2 = tuple(map(int, points[(i + 1) % len(points)]))  # 次の点とループ
            cv2.line(image, pt1, pt2, (0, 255, 0), 2)  # 緑色の線で描画
else:
    print("QRコードが検出されませんでした。")

# 画像を保存
if not cv2.imwrite('test_qr.jpg', image):
    print("test_qr.jpg の保存に失敗しました。")
    exit()

print("QRコード検出後の画像が保存されました: test_qr.jpg")

# MQTTブローカーへの接続
client.on_connect = on_connect  # 接続時に呼ばれるコールバック関数を設定
client.connect(broker, port, 60)  # ブローカーのIPとポートを指定

# ループを開始して、接続とメッセージ送信を待機
client.loop_start()

# MQTT処理を終了する場合(処理が完了した後に呼び出す)
client.loop_stop()
client.disconnect()
Code language: PHP (php)

保存先と読み取り先を同じにすれば、実際に撮った画像から車やQRの検出をすることも可能です。

Node-REDとの接続

Node-REDの接続は、コード内に記載があります。(下記参照)
# MQTTブローカーの設定
broker = “IPアドレス”
port = 1883
topic = “好きな名前”
こちらの設定をすることで、mqtt通信でNode-REDにデータを送ることができます。

Node-RED内の設定は、セキュリティーとかはなくコード内で設定したIPアドレス、トピックを設定だけで通信が取れます。

これをlocalhostではなく、他のサーバーに繋ぐ際は、コード内のIPアドレスをしっかりしていできれば、localのNode-REDを経由せず直接通信することができます。