デバイス
今回使ったデバイスは、SonyのラズパイカメラIMX500というものです。

写真を撮ってみる
では早速写真を撮っていきます。ラズパイへの接続イメージは下図を参照してください。

接続したら、ラズパイ内にフォルダーを作ります。テキストを作り下記のコードをペーストし、フォルダー名をcam_test.pyにします。
from picamera2 import Picamera2
import cv2
import paho.mqtt.client as mqtt
import json
# MQTTブローカーの設定
broker = "localhost"
port = 1883
topic = "camera/faces"
# MQTTクライアントの設定
client = mqtt.Client()
# コネクションのコールバック関数(接続後に呼ばれる)
def on_connect(client, userdata, flags, rc):
print(f"Connected with result code {rc}")
# データをPublish
client.publish(topic, json.dumps(face_count)) # JSONを文字列として送信
print(f"Data published to topic {topic}")
# カメラを初期化
camera = Picamera2()
camera.configure(camera.create_preview_configuration(main={"format": 'XRGB8888', "size": (640, 480)}))
# カメラを開始
camera.start()
# 画像をキャプチャ
img = camera.capture_array()
# カメラを停止
camera.stop()
# 顔検出器を初期化
face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
# モデルが正しく読み込まれているか確認
if face_cascade.empty():
print("顔検出モデルが読み込まれていません。")
exit()
# グレースケール変換
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
# 顔を検出
faces = face_cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30))
face_count = len(faces)
print(f"検出された顔の数: {face_count}")
# 検出した顔を描画
for (x, y, w, h) in faces:
cv2.rectangle(img, (x, y), (x + w, y + h), (255, 0, 0), 2)
# 画像を保存
if not cv2.imwrite('test2.jpg', img):
print("test2.jpg の保存に失敗しました。")
exit()
print("顔検出後の画像が保存されました: test2.jpg")
# MQTTブローカーへの接続
client.on_connect = on_connect # 接続時に呼ばれるコールバック関数を設定
client.connect(broker, port, 60) # ブローカーのIPとポートを指定
# ループを開始して、接続とメッセージ送信を待機
client.loop_start()
# MQTT処理を終了する場合(処理が完了した後に呼び出す)
client.loop_stop()
client.disconnect()
Code language: PHP (php)
そうしましたら、terminalで「python3 動作したいフォルダー」を呼びます。今回は「python3 cam_test.py」と打ちます。
するとカメラが写真を撮り、指定したフォルダーに保存されます。

ファイルの保存先は、コードの# 画像を保存
if not cv2.imwrite(‘test2.jpg’, img):
print(“test2.jpg の保存に失敗しました。”)
exit()
の部分で決めています。
この部分を変えることで保存先を変えることができます。
次に、もう1つテキストを作り下記のコードをコピーし、名前を「cam_cars.py」というフォルダーにします。
import cv2
import paho.mqtt.client as mqtt
import json
# MQTTブローカーの設定
broker = "localhost"
port = 1883
topic = "camera/cars"
# MQTTクライアントの設定
client = mqtt.Client()
# コネクションのコールバック関数(接続後に呼ばれる)
def on_connect(client, userdata, flags, rc):
print(f"Connected with result code {rc}")
# データをPublish
client.publish(topic, json.dumps(car_count)) # JSONを文字列として送信
print(f"Data published to topic {topic}")
# 車検出器を初期化
car_cascade = cv2.CascadeClassifier('haarcascade_car.xml') # モデルファイルのパスを指定
# モデルが正しく読み込まれているか確認
if car_cascade.empty():
print("車検出モデルが読み込まれていません。")
exit()
# 画像を読み込む(事前に画像があることが前提)
img = cv2.imread('parking.jpg')
if img is None:
print("parking.jpg の読み込みに失敗しました。")
exit()
# グレースケール変換
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
# 車を検出 scaleFactor=検出分母と思って大丈夫、minNeighbors=重なった部分について、サイズはpx数を参照
cars = car_cascade.detectMultiScale(gray, scaleFactor=1.008, minNeighbors=1, minSize=(70, 70), maxSize=(115, 115))
car_count = len(cars)
print(f"検出された車の数: {car_count}")
# 検出した車を描画
for (x, y, w, h) in cars:
cv2.rectangle(img, (x, y), (x + w, y + h), (0, 255, 0), 2) # 車を緑色の矩形で描画
# 画像を保存
if not cv2.imwrite('test3.jpg', img):
print("test3.jpg の保存に失敗しました。")
exit()
print("車検出後の画像が保存されました: test3.jpg")
# MQTTブローカーへの接続
client.on_connect = on_connect # 接続時に呼ばれるコールバック関数を設定
client.connect(broker, port, 60) # ブローカーのIPとポートを指定
# ループを開始して、接続とメッセージ送信を待機
client.loop_start()
# MQTT処理を終了する場合(処理が完了した後に呼び出す)
client.loop_stop()
client.disconnect()
Code language: PHP (php)
このコードは、保存された画像から車を検出するという内容になります。
どこからか車の画像を用意し、読み込むファイルを指定してあげます。(指定場所は下記参照)
# 画像を読み込む(事前に画像があることが前提)
img = cv2.imread(‘読み込むファイル名’)
if img is None:
print(“読み込むファイル名 の読み込みに失敗しました。”)
exit()
これで、terminalで動作をかけると、車個検出数と画像の保存先が返ってきます。

他にもオブジェクトやQRコードなどの検出もできました。
オブジェクト検出
from picamera2 import Picamera2
import paho.mqtt.client as mqtt
import json
import cv2
# YOLOモデルのパス設定
model_cfg = "/home/pi/yolo/yolov3.cfg" # YOLOの設定ファイル
model_weights = "/home/pi/yolo/yolov3.weights" # YOLOの重みファイル
class_names_file = "/home/pi/yolo/coco.names" # 検出するクラス名
# クラス名を読み込む
with open(class_names_file, 'r') as f:
class_names = f.read().strip().split('\n')
# YOLOネットワークの読み込み
net = cv2.dnn.readNetFromDarknet(model_cfg, model_weights)
net.setPreferableBackend(cv2.dnn.DNN_BACKEND_OPENCV)
net.setPreferableTarget(cv2.dnn.DNN_TARGET_CPU)
# MQTTブローカーの設定
broker = "localhost"
port = 1883
topic = "camera/objects"
# MQTTクライアントの設定
client = mqtt.Client()
# コネクションのコールバック関数(接続後に呼ばれる)
def on_connect(client, userdata, flags, rc):
print(f"Connected with result code {rc}")
# データをPublish
client.publish(topic, json.dumps(read_objects)) # JSONを文字列として送信
print(f"Data published to topic {topic}")
# カメラを初期化
camera = Picamera2()
camera.configure(camera.create_preview_configuration(main={"format": 'XRGB8888', "size": (2028, 1520)}))
# カメラを開始
camera.start()
# 画像をキャプチャ
image = camera.capture_array()
# カメラを停止
camera.stop()
# 画像をRGBに変換(チャンネルを3にする)
image_rgb = cv2.cvtColor(image, cv2.COLOR_RGBA2RGB)
# YOLO物体検出処理
blob = cv2.dnn.blobFromImage(image_rgb, 0.00392, (416, 416), (0, 0, 0), True, crop=False)
# ネットワークに入力
net.setInput(blob)
# 出力層を取得
layer_names = net.getLayerNames()
output_layers = [layer_names[i - 1] for i in net.getUnconnectedOutLayers()]
# 物体検出
layer_outputs = net.forward(output_layers)
# 検出結果を解析
boxes = []
confidences = []
class_ids = []
h, w = image.shape[:2]
for output in layer_outputs:
for detection in output:
scores = detection[5:]
class_id = int(scores.argmax())
confidence = scores[class_id]
if confidence > 0.5: # 信頼度閾値
# 検出された物体の座標
center_x, center_y, box_w, box_h = (detection[0:4] * [w, h, w, h]).astype("int")
x = int(center_x - box_w / 2)
y = int(center_y - box_h / 2)
# バウンディングボックスを格納
boxes.append([x, y, box_w, box_h])
confidences.append(float(confidence))
class_ids.append(class_id)
# NMSを適用して重複する検出を削除
indices = cv2.dnn.NMSBoxes(boxes, confidences, score_threshold=0.5, nms_threshold=0.4)
# 検出結果を描画
read_objects = []
if len(indices) > 0:
for i in indices.flatten():
x, y, w, h = boxes[i]
label = class_names[class_ids[i]]
confidence = confidences[i]
# 検出結果を保存
read_objects.append({
"object": label,
"confidence": float(confidence)
})
# バウンディングボックスを描画
cv2.rectangle(image, (x, y), (x + w, y + h), (0, 255, 0), 2)
text = f"{label}: {confidence:.2f}"
cv2.putText(image, text, (x, y - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
# 検出結果がなければ通知
if not read_objects:
print("物体が検出されませんでした。")
else:
print(f"検出された物体: {read_objects}")
# 画像を保存
if not cv2.imwrite('test_obj.jpg', image):
print("test_obj.jpg の保存に失敗しました。")
exit()
print("物体検出後の画像が保存されました: test_obj.jpg")
# MQTTブローカーへの接続
client.on_connect = on_connect # 接続時に呼ばれるコールバック関数を設定
client.connect(broker, port, 60) # ブローカーのIPとポートを指定
# ループを開始して、接続とメッセージ送信を待機
client.loop_start()
# MQTT処理を終了する場合(処理が完了した後に呼び出す)
client.loop_stop()
client.disconnect()
Code language: PHP (php)
QRコード検出
from picamera2 import Picamera2
import cv2
import paho.mqtt.client as mqtt
import json
# MQTTブローカーの設定
broker = "localhost"
port = 1883
topic = "camera/qrcodes"
# MQTTクライアントの設定
client = mqtt.Client()
# コネクションのコールバック関数(接続後に呼ばれる)
def on_connect(client, userdata, flags, rc):
print(f"Connected with result code {rc}")
# データをPublish
client.publish(topic, json.dumps(qr_result)) # JSONを文字列として送信
print(f"Data published to topic {topic}")
# カメラを初期化
camera = Picamera2()
camera.configure(camera.create_preview_configuration(main={"format": 'XRGB8888', "size": (1280, 960)}))
# カメラを開始
camera.start()
# 画像をキャプチャ
image = camera.capture_array()
# カメラを停止
camera.stop()
# QRコード検出器を初期化
qr_detector = cv2.QRCodeDetector()
# QRコードを検出
data, points, _ = qr_detector.detectAndDecode(image)
qr_result = []
if data:
print(f"検出されたQRコード: {data}")
qr_result.append({"data": data, "points": points.tolist() if points is not None else []})
# 検出結果を描画(QRコードが検出された場合)
if points is not None:
points = points[0] # 頂点座標を取得
for i in range(len(points)):
pt1 = tuple(map(int, points[i])) # 整数に変換
pt2 = tuple(map(int, points[(i + 1) % len(points)])) # 次の点とループ
cv2.line(image, pt1, pt2, (0, 255, 0), 2) # 緑色の線で描画
else:
print("QRコードが検出されませんでした。")
# 画像を保存
if not cv2.imwrite('test_qr.jpg', image):
print("test_qr.jpg の保存に失敗しました。")
exit()
print("QRコード検出後の画像が保存されました: test_qr.jpg")
# MQTTブローカーへの接続
client.on_connect = on_connect # 接続時に呼ばれるコールバック関数を設定
client.connect(broker, port, 60) # ブローカーのIPとポートを指定
# ループを開始して、接続とメッセージ送信を待機
client.loop_start()
# MQTT処理を終了する場合(処理が完了した後に呼び出す)
client.loop_stop()
client.disconnect()
Code language: PHP (php)
保存先と読み取り先を同じにすれば、実際に撮った画像から車やQRの検出をすることも可能です。
Node-REDとの接続
Node-REDの接続は、コード内に記載があります。(下記参照)
# MQTTブローカーの設定
broker = “IPアドレス”
port = 1883
topic = “好きな名前”
こちらの設定をすることで、mqtt通信でNode-REDにデータを送ることができます。

Node-RED内の設定は、セキュリティーとかはなくコード内で設定したIPアドレス、トピックを設定だけで通信が取れます。


これをlocalhostではなく、他のサーバーに繋ぐ際は、コード内のIPアドレスをしっかりしていできれば、localのNode-REDを経由せず直接通信することができます。