meta

import numpy as np

from skimage import io

from skimage.color import rgb2lab, deltaE_cie76

import time

import schedule

import paho.mqtt.client as paho

from paho import mqtt

from picamera import PiCamera

import piexif

import os

 

# setting callbacks for different events to see if it works, print the message etc.

def on_connect(client, userdata, flags, rc, properties=None):

    print("CONNACK received with code %s." % rc)


# with this callback you can see if your publish was successful

def on_publish(client, userdata, mid, properties=None):

    print("mid: " + str(mid))


# print which topic was subscribed to

def on_subscribe(client, userdata, mid, granted_qos, properties=None):

    print("Subscribed: " + str(mid) + " " + str(granted_qos))


# print message, useful for checking if it was successful

def on_message(client, userdata, msg):

    print(msg.topic + " " + str(msg.qos) + " " + str(msg.payload))


# using MQTT version 5 here, for 3.1.1: MQTTv311, 3.1: MQTTv31

# userdata is user defined data of any type, updated by user_data_set()

# client_id is the given name of the client

client = paho.Client(client_id="cameraMac", userdata=None, protocol=paho.MQTTv5)

client.on_connect = on_connect


# enable TLS for secure connection

client.tls_set(tls_version=mqtt.client.ssl.PROTOCOL_TLS)

# set username and password

client.username_pw_set("verticalfarm", "Farm2023")

# connect to HiveMQ Cloud on port 8883 (default for MQTT)

client.connect("bd3a7b0413ea40f98a9bca5a2b86bdd4.s2.eu.hivemq.cloud", 8883)


# setting callbacks, use separate functions like above for better visibility

client.on_subscribe = on_subscribe

client.on_message = on_message

client.on_publish = on_publish


def get_pct_color(img_rgb, rgb_color, threshold=10):

    img_lab = rgb2lab(img_rgb)

    rgb_color_3d = np.uint8(np.asarray([[rgb_color]]))

    rgb_color_lab = rgb2lab(rgb_color_3d)

    delta = deltaE_cie76(rgb_color_lab, img_lab)

    x_positions, y_positions = np.where(delta < threshold)

    nb_pixel = img_rgb.shape[0] * img_rgb.shape[1]

    pct_color = len(x_positions) / nb_pixel

    pct_color2dec = ('%.2f' %(pct_color * 100))

    return (pct_color2dec)

        

        # send the pixel

def pixelPub():

    camera = PiCamera()

    camera.start_preview()

    time.sleep(2)

    for i in range(10):

        timestamp = int(time.time())

        filename = f"image_{timestamp}_{i}.jpg"

        filepath = os.path.join("images", filename)


        # create the images directory if it doesn't exist

        if not os.path.exists("images"):

            os.makedirs("images")

            

        camera.capture(filepath)

        img_rgb = io.imread(filepath)

        green = [0, 160, 0]

        

        # get the metadata using piexif and encode it as bytes

        exif_dict = piexif.load(filepath)

        exif_bytes = piexif.dump(exif_dict)

        

        # send the pixel percentage and metadata

        green_pct = get_pct_color(img_rgb, green, 50)

        payload = f"{green_pct},{exif_bytes}"

        client.publish(f"greenPixelPct/{timestamp}", payload=payload, qos=1)

        

    camera.stop_preview()

    

schedule.every().hour.do(pixelPub)

schedule.every().day.at("10:30").do(pixelPub)


while True:

    schedule.run_pending()

    time.sleep(10) 

client.loop_forever()

Post a Comment

My Instagram

Made with by Pi-girl | Copyright © 2020 Pi-girl