import numpy as np
from skimage import io
from skimage.color import rgb2lab, deltaE_cie76
import time
import schedule
import paho.mqtt.client as paho
from paho import mqtt
from picamera import PiCamera
import piexif
import os
# setting callbacks for different events to see if it works, print the message etc.
def on_connect(client, userdata, flags, rc, properties=None):
print("CONNACK received with code %s." % rc)
# with this callback you can see if your publish was successful
def on_publish(client, userdata, mid, properties=None):
print("mid: " + str(mid))
# print which topic was subscribed to
def on_subscribe(client, userdata, mid, granted_qos, properties=None):
print("Subscribed: " + str(mid) + " " + str(granted_qos))
# print message, useful for checking if it was successful
def on_message(client, userdata, msg):
print(msg.topic + " " + str(msg.qos) + " " + str(msg.payload))
# using MQTT version 5 here, for 3.1.1: MQTTv311, 3.1: MQTTv31
# userdata is user defined data of any type, updated by user_data_set()
# client_id is the given name of the client
client = paho.Client(client_id="cameraMac", userdata=None, protocol=paho.MQTTv5)
client.on_connect = on_connect
# enable TLS for secure connection
client.tls_set(tls_version=mqtt.client.ssl.PROTOCOL_TLS)
# set username and password
client.username_pw_set("verticalfarm", "Farm2023")
# connect to HiveMQ Cloud on port 8883 (default for MQTT)
client.connect("bd3a7b0413ea40f98a9bca5a2b86bdd4.s2.eu.hivemq.cloud", 8883)
# setting callbacks, use separate functions like above for better visibility
client.on_subscribe = on_subscribe
client.on_message = on_message
client.on_publish = on_publish
def get_pct_color(img_rgb, rgb_color, threshold=10):
img_lab = rgb2lab(img_rgb)
rgb_color_3d = np.uint8(np.asarray([[rgb_color]]))
rgb_color_lab = rgb2lab(rgb_color_3d)
delta = deltaE_cie76(rgb_color_lab, img_lab)
x_positions, y_positions = np.where(delta < threshold)
nb_pixel = img_rgb.shape[0] * img_rgb.shape[1]
pct_color = len(x_positions) / nb_pixel
pct_color2dec = ('%.2f' %(pct_color * 100))
return (pct_color2dec)
# send the pixel
def pixelPub():
camera = PiCamera()
camera.start_preview()
time.sleep(2)
for i in range(10):
timestamp = int(time.time())
filename = f"image_{timestamp}_{i}.jpg"
filepath = os.path.join("images", filename)
# create the images directory if it doesn't exist
if not os.path.exists("images"):
os.makedirs("images")
camera.capture(filepath)
img_rgb = io.imread(filepath)
green = [0, 160, 0]
# get the metadata using piexif and encode it as bytes
exif_dict = piexif.load(filepath)
exif_bytes = piexif.dump(exif_dict)
# send the pixel percentage and metadata
green_pct = get_pct_color(img_rgb, green, 50)
payload = f"{green_pct},{exif_bytes}"
client.publish(f"greenPixelPct/{timestamp}", payload=payload, qos=1)
camera.stop_preview()
schedule.every().hour.do(pixelPub)
schedule.every().day.at("10:30").do(pixelPub)
while True:
schedule.run_pending()
time.sleep(10)
client.loop_forever()
Post a Comment