This GitHub repository contains my cloud connectivity code for IoT devices and applications. It focuses on establishing secure and efficient communication between IoT devices and the IoT cloud network. The code aims to optimize real-time operations, data processing, and storage within the IoT cloud infrastructure.
Design By Muhammad Raheel
Click on the following link to create a Blynk Cloud account.
https://blynk.cloud/dashboard/register
- Enter email ID, then click on "Sign Up". You will receive a verification email.
- Click on Create Password in the email, Then set the password, click on Next.
- Enter your first name, click on Done.
After that Blynk cloud dashboard will open.
after signup confirm email then enter password
then enter password
then enter profile name
if you see this then skip this process
Click Cancle Button
First, you have to create a template in the Blynk cloud.
- Click on New Template.
- Enter a template name, select the hardware as ESP8266, and the connection type will WiFi.
- Then click on DONE.
You will get the BLYNK_TEMPLATE_ID and BLYNK_DEVICE_NAME after creating the temple.
After click you will get Device Auth key
import network def do_connect(): sta_if = network.WLAN(network.STA_IF) if not sta_if.isconnected(): print('connecting to network...') sta_if.active(True) print(sta_if.scan()) sta_if.connect("wifi Router SSID", "Wifi Router Password") sta_if.connect("IoT_Device",'Thejudgementday@') while not sta_if.isconnected(): pass print('network config:', sta_if.ifconfig()) do_connect()
# Copyright (c) 2015-2019 Volodymyr Shymanskyy. See the file LICENSE for copying permission. __version__ = "1.0.0" import struct import time import sys import os try: import machine gettime = lambda: time.ticks_ms() SOCK_TIMEOUT = 0 except ImportError: const = lambda x: x gettime = lambda: int(time.time() * 1000) SOCK_TIMEOUT = 0.05 def dummy(*args): pass MSG_RSP = const(0) MSG_LOGIN = const(2) MSG_PING = const(6) MSG_TWEET = const(12) MSG_NOTIFY = const(14) MSG_BRIDGE = const(15) MSG_HW_SYNC = const(16) MSG_INTERNAL = const(17) MSG_PROPERTY = const(19) MSG_HW = const(20) MSG_HW_LOGIN = const(29) MSG_EVENT_LOG = const(64) MSG_REDIRECT = const(41) # TODO: not implemented MSG_DBG_PRINT = const(55) # TODO: not implemented STA_SUCCESS = const(200) STA_INVALID_TOKEN = const(9) DISCONNECTED = const(0) CONNECTING = const(1) CONNECTED = const(2) print(""" ___ __ __ / _ )/ /_ _____ / /__ / _ / / // / _ \\/ '_/ /____/_/\\_, /_//_/_/\\_\\ /___/ for Python v""" + __version__ + " (" + sys.platform + ")\n") class EventEmitter: def __init__(self): self._cbks = {} def on(self, evt, f=None): if f: self._cbks[evt] = f else: def D(f): self._cbks[evt] = f return f return D def emit(self, evt, *a, **kv): if evt in self._cbks: self._cbks[evt](*a, **kv) class BlynkProtocol(EventEmitter): def __init__(self, auth, tmpl_id=None, fw_ver=None, heartbeat=50, buffin=1024, log=None): EventEmitter.__init__(self) self.heartbeat = heartbeat*1000 self.buffin = buffin self.log = log or dummy self.auth = auth self.tmpl_id = tmpl_id self.fw_ver = fw_ver self.state = DISCONNECTED self.connect() def virtual_write(self, pin, *val): self._send(MSG_HW, 'vw', pin, *val) def send_internal(self, pin, *val): self._send(MSG_INTERNAL, pin, *val) def set_property(self, pin, prop, *val): self._send(MSG_PROPERTY, pin, prop, *val) def sync_virtual(self, *pins): self._send(MSG_HW_SYNC, 'vr', *pins) def log_event(self, *val): self._send(MSG_EVENT_LOG, *val) def _send(self, cmd, *args, **kwargs): if 'id' in kwargs: id = kwargs.get('id') else: id = self.msg_id self.msg_id += 1 if self.msg_id > 0xFFFF: self.msg_id = 1 if cmd == MSG_RSP: data = b'' dlen = args[0] else: data = ('\0'.join(map(str, args))).encode('utf8') dlen = len(data) self.log('<', cmd, id, '|', *args) msg = struct.pack("!BHH", cmd, id, dlen) + data self.lastSend = gettime() self._write(msg) def connect(self): if self.state != DISCONNECTED: return self.msg_id = 1 (self.lastRecv, self.lastSend, self.lastPing) = (gettime(), 0, 0) self.bin = b"" self.state = CONNECTING self._send(MSG_HW_LOGIN, self.auth) def disconnect(self): if self.state == DISCONNECTED: return self.bin = b"" self.state = DISCONNECTED self.emit('disconnected') def process(self, data=None): if not (self.state == CONNECTING or self.state == CONNECTED): return now = gettime() if now - self.lastRecv > self.heartbeat+(self.heartbeat//2): return self.disconnect() if (now - self.lastPing > self.heartbeat//10 and (now - self.lastSend > self.heartbeat or now - self.lastRecv > self.heartbeat)): self._send(MSG_PING) self.lastPing = now if data != None and len(data): self.bin += data while True: if len(self.bin) < 5: break cmd, i, dlen = struct.unpack("!BHH", self.bin[:5]) if i == 0: return self.disconnect() self.lastRecv = now if cmd == MSG_RSP: self.bin = self.bin[5:] self.log('>', cmd, i, '|', dlen) if self.state == CONNECTING and i == 1: if dlen == STA_SUCCESS: self.state = CONNECTED dt = now - self.lastSend info = ['ver', __version__, 'h-beat', self.heartbeat//1000, 'buff-in', self.buffin, 'dev', sys.platform+'-py'] if self.tmpl_id: info.extend(['tmpl', self.tmpl_id]) info.extend(['fw-type', self.tmpl_id]) if self.fw_ver: info.extend(['fw', self.fw_ver]) self._send(MSG_INTERNAL, *info) try: self.emit('connected', ping=dt) except TypeError: self.emit('connected') else: if dlen == STA_INVALID_TOKEN: self.emit("invalid_auth") print("Invalid auth token") return self.disconnect() else: if dlen >= self.buffin: print("Cmd too big: ", dlen) return self.disconnect() if len(self.bin) < 5+dlen: break data = self.bin[5:5+dlen] self.bin = self.bin[5+dlen:] args = list(map(lambda x: x.decode('utf8'), data.split(b'\0'))) self.log('>', cmd, i, '|', ','.join(args)) if cmd == MSG_PING: self._send(MSG_RSP, STA_SUCCESS, id=i) elif cmd == MSG_HW or cmd == MSG_BRIDGE: if args[0] == 'vw': self.emit("V"+args[1], args[2:]) self.emit("V*", args[1], args[2:]) elif cmd == MSG_INTERNAL: self.emit("internal:"+args[0], args[1:]) elif cmd == MSG_REDIRECT: self.emit("redirect", args[0], int(args[1])) else: print("Unexpected command: ", cmd) return self.disconnect() import socket class Blynk(BlynkProtocol): def __init__(self, auth, **kwargs): self.insecure = kwargs.pop('insecure', False) self.server = kwargs.pop('server', 'blynk.cloud') self.port = kwargs.pop('port', 80 if self.insecure else 443) BlynkProtocol.__init__(self, auth, **kwargs) self.on('redirect', self.redirect) def redirect(self, server, port): self.server = server self.port = port self.disconnect() self.connect() def connect(self): print('Connecting to %s:%d...' % (self.server, self.port)) s = socket.socket() s.connect(socket.getaddrinfo(self.server, self.port)[0][-1]) try: s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) except: pass if self.insecure: self.conn = s else: try: import ussl ssl_context = ussl except ImportError: import ssl ssl_context = ssl.create_default_context() self.conn = ssl_context.wrap_socket(s, server_hostname=self.server) try: self.conn.settimeout(SOCK_TIMEOUT) except: s.settimeout(SOCK_TIMEOUT) BlynkProtocol.connect(self) def _write(self, data): #print('<', data) self.conn.write(data) # TODO: handle disconnect def run(self): data = b'' try: data = self.conn.read(self.buffin) #print('>', data) except KeyboardInterrupt: raise except socket.timeout: # No data received, call process to send ping messages when needed pass except: # TODO: handle disconnect return self.process(data)
import BlynkLib import network BLYNK_AUTH = 'yqD8Pga0PduvJHOvahcRI3wH-Cj-cKju' wifi = network.WLAN(network.STA_IF) wifi.active(True) while not wifi.isconnected(): pass print('IP:', wifi.ifconfig()[0]) blynk = BlynkLib.Blynk(BLYNK_AUTH) @blynk.on("connected") def blynk_connected(ping): print("Connecting.............") print('Blynk ready. Ping:', ping, 'ms') print("Connected!") while True: blynk.run()
import BlynkLib import network import machine import time from machine import Pin led1=Pin(14,Pin.OUT) def do_connect(): sta_if = network.WLAN(network.STA_IF) if not sta_if.isconnected(): print('connecting to network...') sta_if.active(True) print(sta_if.scan()) sta_if.connect("Raheel-Desktop", "Raheel42") while not sta_if.isconnected(): pass print('network config:', sta_if.ifconfig()) do_connect() BLYNK_AUTH = "xl8Tswb2Ug-WoHoU3CclBzHO8RYzQw53" blynk = BlynkLib.Blynk(BLYNK_AUTH) @blynk.on("connected") def blynk_connected(ping): print("Connecting.................") print('Blynk ready. Ping:', ping, 'ms') print("Connected!") @blynk.on("V0") def terminal_write_handler(value): print('V0: {}'.format(value)) blynk.virtual_write(0, "You wrote: " + value[0]) # Run blynk in the main thread: while True: blynk.run()
import BlynkLib import network import machine import time from machine import Pin led1=Pin(14,Pin.OUT) def do_connect(): sta_if = network.WLAN(network.STA_IF) if not sta_if.isconnected(): print('connecting to network...') sta_if.active(True) print(sta_if.scan()) sta_if.connect("Raheel-Desktop", "Raheel42") while not sta_if.isconnected(): pass print('network config:', sta_if.ifconfig()) do_connect() BLYNK_AUTH = "xl8Tswb2Ug-WoHoU3CclBzHO8RYzQw53" blynk = BlynkLib.Blynk(BLYNK_AUTH) @blynk.on("connected") def blynk_connected(ping): print("Connecting.................") print('Blynk ready. Ping:', ping, 'ms') print("Connected!") # Define your functions def function1(): # Logic for function 1 blynk.virtual_write(0, 'Function 1 executed') print("FUNCTION 1 Executed") def function2(): # Logic for function 2 blynk.virtual_write(0, 'Function 2 executed') print("FUNCTION 2 Executed") def function3(): # Logic for function 3 blynk.virtual_write(0, 'Function 3 executed') print("FUNCTION 3 Executed") # Register a handler for the terminal widget @blynk.on('V0') def terminal_handler(value): # Check the received command from the terminal if value[0] == 'function1': function1() elif value[0] == 'function2': function2() elif value[0] == 'function3': function3() else: blynk.virtual_write(0, 'Invalid function name') # Run blynk in the main thread: while True: blynk.run() # `led_blynk.py`import BlynkLib import network import machine import time from machine import Pin led1=Pin(14,Pin.OUT) led2=Pin(13,Pin.OUT) BLYNK_AUTH = 'yqD8Pga0PduvJHOvahcRI3wH-Cj-cKju' wifi = network.WLAN(network.STA_IF) wifi.active(True) while not wifi.isconnected(): pass print('IP:', wifi.ifconfig()[0]) blynk = BlynkLib.Blynk(BLYNK_AUTH) tmr_start_time = time.time() @blynk.on("connected") def blynk_connected(ping,value): print("Connecting.................") print('Blynk ready. Ping:', ping, 'ms') print("Connected!") @blynk.on("V1") def v3_write_handler(value): if int(value[0])==1: led1.on() else: led1.off() # Run blynk in the main thread: while True: blynk.run()import BlynkLib import network import time from machine import Pin led1=Pin(14,Pin.OUT) led2=Pin(13,Pin.OUT) BLYNK_AUTH = 'yqD8Pga0PduvJHOvahcRI3wH-Cj-cKju' wifi = network.WLAN(network.STA_IF) wifi.active(True) while not wifi.isconnected(): pass print('IP:', wifi.ifconfig()[0]) blynk = BlynkLib.Blynk(BLYNK_AUTH) tmr_start_time = time.time() @blynk.on("connected") def blynk_connected(ping): print('Blynk ready. Ping:', ping, 'ms') print("Connected!") @blynk.on("V1") def v3_write_handler(value): if int(value[0])==1: led1.on() else: led1.off() @blynk.on("V2") def v2_write_handler(value): led2.value(int(value[0])) if int(value[0])==1: led2.on() else: led2.off() while True: blynk.run()from time import sleep import dht from machine import Pin import BlynkLib import network import machine import time from machine import Pin import dht sensor = dht.DHT11(Pin(33)) BLYNK_AUTH = 'yqD8Pga0PduvJHOvahcRI3wH-Cj-cKju' wifi = network.WLAN(network.STA_IF) wifi.active(True) while not wifi.isconnected(): pass print('IP:', wifi.ifconfig()[0]) blynk = BlynkLib.Blynk(BLYNK_AUTH) tmr_start_time = time.time() @blynk.on("connected") def blynk_connected(ping): print('Blynk ready. Ping:', ping, 'ms') print("Connected!") while True: sensor.measure() temp = sensor.temperature() hum = sensor.humidity() print('Air Temperature: %3.1f C' %temp) print('Air Humidity: %3.1f %%' %hum) blynk.virtual_write('13',temp) blynk.virtual_write('14',hum) sleep(1) # Run blynk in the main thread: while True: blynk.run()from time import sleep from hcsr04 import HCSR04 import dht from machine import Pin import BlynkLib import network import machine import time from machine import Pin import dht sensor = HCSR04(trigger_pin=26, echo_pin=27, echo_timeout_us=10000) BLYNK_AUTH = 'yqD8Pga0PduvJHOvahcRI3wH-Cj-cKju' wifi = network.WLAN(network.STA_IF) wifi.active(True) while not wifi.isconnected(): pass print('IP:', wifi.ifconfig()[0]) blynk = BlynkLib.Blynk(BLYNK_AUTH) tmr_start_time = time.time() @blynk.on("connected") def blynk_connected(ping): print('Blynk ready. Ping:', ping, 'ms') print("Connected!") while True: distance = sensor.distance_cm() print('Distance:', distance, 'cm') blynk.virtual_write("20",distance) sleep(1) # Run blynk in the main thread: while True: blynk.run()
goto this link
https://thingspeak.com/loginClick create one then fill all fields
We will define the channels by entering the a proper name, description, and up to 8 fields can be used to name the parameter. For the Field 1 and Field 2 we have named Temperature and humidity. These field values that you set can be modified later. These values will be in Degree Centigrade and Relative Humidity in %. Once you update the name, click on Save.
Once you have saved the channel, you will be automatically redirected to the “Private View” tab. Here the mapped fields are shown as a diagram. You will find the “Channel ID” (we will need it later). Below You will also see API Keys option.
Later, click on the “API Keys” tab. The two values of “Write API key” and “Read API key” are equally necessary for us to write or retrieve data. Copy these keys and keep it safe as we need to put it in the code.
import machine import urequests from machine import Pin import time, network import dht sensor = dht.DHT11(Pin(26)) HTTP_HEADERS = {'Content-Type': 'application/json'} THINGSPEAK_WRITE_API_KEY = '07D8NPQGAVVGAV99' UPDATE_TIME_INTERVAL = 5000 # in ms last_update = time.ticks_ms() # Configure ESP32 as Station sta_if=network.WLAN(network.STA_IF) sta_if.active(True) print('network config:', sta_if.ifconfig()) while True: if time.ticks_ms() - last_update >= UPDATE_TIME_INTERVAL: while True: sensor.measure() temp = sensor.temperature() hum = sensor.humidity() print('Air Temperature: %3.1f C' %temp) print('Air Humidity: %3.1f %%' %hum) time.sleep(1) readings = {'field1':temp, 'field2':hum} request = urequests.post( 'http://api.thingspeak.com/update?api_key=' + THINGSPEAK_WRITE_API_KEY, json = readings, headers = HTTP_HEADERS ) request.close() print(readings)
Click here for getting API => https://www.callmebot.com/
Click here for url encoding codes =>> https://www.w3schools.com/tags/ref_urlencode.ASPtry: import urequests as requests except: import requests import network #Your network credentials ssid = 'Raheel-Desktop' password = 'Raheel42' def connect_wifi(ssid, password): #Connect to your network station = network.WLAN(network.STA_IF) station.active(True) station.connect(ssid, password) while station.isconnected() == False: pass print('Connection successful') print(station.ifconfig()) def send_message(phone_number, api_key, message): #set your host URL url = 'https://api.callmebot.com/whatsapp.php?phone='+phone_number+'&text='+message+'&apikey='+api_key #make the request response = requests.get(url) #check if it was successful if response.status_code == 200: print('Success!') else: print('Error') print(response.text) # Connect to WiFi connect_wifi(ssid, password) #Your phone number in international format phone_number = '+923452510056' #Your callmebot API key api_key = '9775846' # Send message to WhatsApp "Hello" message = 'Hello%20from%20ESP32%20%28micropython%29' #YOUR MESSAGE HERE (URL ENCODED)https://www.urlencoder.io/ send_message(phone_number, api_key, message)
Copyright © Muhammad Raheel
You can Contact me with this link or number
[email protected]
+923452510056