This GitHub repository contains my cloud connectivity code for IoT devices and applications. It focuses on establishing secure and efficient communication between IoT devices and the IoT cloud network. The code aims to optimize real-time operations, data processing, and storage within the IoT cloud infrastructure.
Design By Muhammad Raheel
Click on the following link to create a Blynk Cloud account.
https://blynk.cloud/dashboard/register
- Enter email ID, then click on "Sign Up". You will receive a verification email.
- Click on Create Password in the email, Then set the password, click on Next.
- Enter your first name, click on Done.
After that Blynk cloud dashboard will open.
after signup confirm email then enter password
then enter password
then enter profile name
if you see this then skip this process
Click Cancle Button
First, you have to create a template in the Blynk cloud.
- Click on New Template.
- Enter a template name, select the hardware as ESP8266, and the connection type will WiFi.
- Then click on DONE.
You will get the BLYNK_TEMPLATE_ID and BLYNK_DEVICE_NAME after creating the temple.
After click you will get Device Auth key

import network
def do_connect():
sta_if = network.WLAN(network.STA_IF)
if not sta_if.isconnected():
print('connecting to network...')
sta_if.active(True)
print(sta_if.scan())
sta_if.connect("wifi Router SSID", "Wifi Router Password")
sta_if.connect("IoT_Device",'Thejudgementday@')
while not sta_if.isconnected():
pass
print('network config:', sta_if.ifconfig())
do_connect()
# Copyright (c) 2015-2019 Volodymyr Shymanskyy. See the file LICENSE for copying permission.
__version__ = "1.0.0"
import struct
import time
import sys
import os
try:
import machine
gettime = lambda: time.ticks_ms()
SOCK_TIMEOUT = 0
except ImportError:
const = lambda x: x
gettime = lambda: int(time.time() * 1000)
SOCK_TIMEOUT = 0.05
def dummy(*args):
pass
MSG_RSP = const(0)
MSG_LOGIN = const(2)
MSG_PING = const(6)
MSG_TWEET = const(12)
MSG_NOTIFY = const(14)
MSG_BRIDGE = const(15)
MSG_HW_SYNC = const(16)
MSG_INTERNAL = const(17)
MSG_PROPERTY = const(19)
MSG_HW = const(20)
MSG_HW_LOGIN = const(29)
MSG_EVENT_LOG = const(64)
MSG_REDIRECT = const(41) # TODO: not implemented
MSG_DBG_PRINT = const(55) # TODO: not implemented
STA_SUCCESS = const(200)
STA_INVALID_TOKEN = const(9)
DISCONNECTED = const(0)
CONNECTING = const(1)
CONNECTED = const(2)
print("""
___ __ __
/ _ )/ /_ _____ / /__
/ _ / / // / _ \\/ '_/
/____/_/\\_, /_//_/_/\\_\\
/___/ for Python v""" + __version__ + " (" + sys.platform + ")\n")
class EventEmitter:
def __init__(self):
self._cbks = {}
def on(self, evt, f=None):
if f:
self._cbks[evt] = f
else:
def D(f):
self._cbks[evt] = f
return f
return D
def emit(self, evt, *a, **kv):
if evt in self._cbks:
self._cbks[evt](*a, **kv)
class BlynkProtocol(EventEmitter):
def __init__(self, auth, tmpl_id=None, fw_ver=None, heartbeat=50, buffin=1024, log=None):
EventEmitter.__init__(self)
self.heartbeat = heartbeat*1000
self.buffin = buffin
self.log = log or dummy
self.auth = auth
self.tmpl_id = tmpl_id
self.fw_ver = fw_ver
self.state = DISCONNECTED
self.connect()
def virtual_write(self, pin, *val):
self._send(MSG_HW, 'vw', pin, *val)
def send_internal(self, pin, *val):
self._send(MSG_INTERNAL, pin, *val)
def set_property(self, pin, prop, *val):
self._send(MSG_PROPERTY, pin, prop, *val)
def sync_virtual(self, *pins):
self._send(MSG_HW_SYNC, 'vr', *pins)
def log_event(self, *val):
self._send(MSG_EVENT_LOG, *val)
def _send(self, cmd, *args, **kwargs):
if 'id' in kwargs:
id = kwargs.get('id')
else:
id = self.msg_id
self.msg_id += 1
if self.msg_id > 0xFFFF:
self.msg_id = 1
if cmd == MSG_RSP:
data = b''
dlen = args[0]
else:
data = ('\0'.join(map(str, args))).encode('utf8')
dlen = len(data)
self.log('<', cmd, id, '|', *args)
msg = struct.pack("!BHH", cmd, id, dlen) + data
self.lastSend = gettime()
self._write(msg)
def connect(self):
if self.state != DISCONNECTED: return
self.msg_id = 1
(self.lastRecv, self.lastSend, self.lastPing) = (gettime(), 0, 0)
self.bin = b""
self.state = CONNECTING
self._send(MSG_HW_LOGIN, self.auth)
def disconnect(self):
if self.state == DISCONNECTED: return
self.bin = b""
self.state = DISCONNECTED
self.emit('disconnected')
def process(self, data=None):
if not (self.state == CONNECTING or self.state == CONNECTED): return
now = gettime()
if now - self.lastRecv > self.heartbeat+(self.heartbeat//2):
return self.disconnect()
if (now - self.lastPing > self.heartbeat//10 and
(now - self.lastSend > self.heartbeat or
now - self.lastRecv > self.heartbeat)):
self._send(MSG_PING)
self.lastPing = now
if data != None and len(data):
self.bin += data
while True:
if len(self.bin) < 5:
break
cmd, i, dlen = struct.unpack("!BHH", self.bin[:5])
if i == 0: return self.disconnect()
self.lastRecv = now
if cmd == MSG_RSP:
self.bin = self.bin[5:]
self.log('>', cmd, i, '|', dlen)
if self.state == CONNECTING and i == 1:
if dlen == STA_SUCCESS:
self.state = CONNECTED
dt = now - self.lastSend
info = ['ver', __version__, 'h-beat', self.heartbeat//1000, 'buff-in', self.buffin, 'dev', sys.platform+'-py']
if self.tmpl_id:
info.extend(['tmpl', self.tmpl_id])
info.extend(['fw-type', self.tmpl_id])
if self.fw_ver:
info.extend(['fw', self.fw_ver])
self._send(MSG_INTERNAL, *info)
try:
self.emit('connected', ping=dt)
except TypeError:
self.emit('connected')
else:
if dlen == STA_INVALID_TOKEN:
self.emit("invalid_auth")
print("Invalid auth token")
return self.disconnect()
else:
if dlen >= self.buffin:
print("Cmd too big: ", dlen)
return self.disconnect()
if len(self.bin) < 5+dlen:
break
data = self.bin[5:5+dlen]
self.bin = self.bin[5+dlen:]
args = list(map(lambda x: x.decode('utf8'), data.split(b'\0')))
self.log('>', cmd, i, '|', ','.join(args))
if cmd == MSG_PING:
self._send(MSG_RSP, STA_SUCCESS, id=i)
elif cmd == MSG_HW or cmd == MSG_BRIDGE:
if args[0] == 'vw':
self.emit("V"+args[1], args[2:])
self.emit("V*", args[1], args[2:])
elif cmd == MSG_INTERNAL:
self.emit("internal:"+args[0], args[1:])
elif cmd == MSG_REDIRECT:
self.emit("redirect", args[0], int(args[1]))
else:
print("Unexpected command: ", cmd)
return self.disconnect()
import socket
class Blynk(BlynkProtocol):
def __init__(self, auth, **kwargs):
self.insecure = kwargs.pop('insecure', False)
self.server = kwargs.pop('server', 'blynk.cloud')
self.port = kwargs.pop('port', 80 if self.insecure else 443)
BlynkProtocol.__init__(self, auth, **kwargs)
self.on('redirect', self.redirect)
def redirect(self, server, port):
self.server = server
self.port = port
self.disconnect()
self.connect()
def connect(self):
print('Connecting to %s:%d...' % (self.server, self.port))
s = socket.socket()
s.connect(socket.getaddrinfo(self.server, self.port)[0][-1])
try:
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
except:
pass
if self.insecure:
self.conn = s
else:
try:
import ussl
ssl_context = ussl
except ImportError:
import ssl
ssl_context = ssl.create_default_context()
self.conn = ssl_context.wrap_socket(s, server_hostname=self.server)
try:
self.conn.settimeout(SOCK_TIMEOUT)
except:
s.settimeout(SOCK_TIMEOUT)
BlynkProtocol.connect(self)
def _write(self, data):
#print('<', data)
self.conn.write(data)
# TODO: handle disconnect
def run(self):
data = b''
try:
data = self.conn.read(self.buffin)
#print('>', data)
except KeyboardInterrupt:
raise
except socket.timeout:
# No data received, call process to send ping messages when needed
pass
except: # TODO: handle disconnect
return
self.process(data)
import BlynkLib
import network
BLYNK_AUTH = 'yqD8Pga0PduvJHOvahcRI3wH-Cj-cKju'
wifi = network.WLAN(network.STA_IF)
wifi.active(True)
while not wifi.isconnected():
pass
print('IP:', wifi.ifconfig()[0])
blynk = BlynkLib.Blynk(BLYNK_AUTH)
@blynk.on("connected")
def blynk_connected(ping):
print("Connecting.............")
print('Blynk ready. Ping:', ping, 'ms')
print("Connected!")
while True:
blynk.run()
import BlynkLib
import network
import machine
import time
from machine import Pin
led1=Pin(14,Pin.OUT)
def do_connect():
sta_if = network.WLAN(network.STA_IF)
if not sta_if.isconnected():
print('connecting to network...')
sta_if.active(True)
print(sta_if.scan())
sta_if.connect("Raheel-Desktop", "Raheel42")
while not sta_if.isconnected():
pass
print('network config:', sta_if.ifconfig())
do_connect()
BLYNK_AUTH = "xl8Tswb2Ug-WoHoU3CclBzHO8RYzQw53"
blynk = BlynkLib.Blynk(BLYNK_AUTH)
@blynk.on("connected")
def blynk_connected(ping):
print("Connecting.................")
print('Blynk ready. Ping:', ping, 'ms')
print("Connected!")
@blynk.on("V0")
def terminal_write_handler(value):
print('V0: {}'.format(value))
blynk.virtual_write(0, "You wrote: " + value[0])
# Run blynk in the main thread:
while True:
blynk.run()
import BlynkLib
import network
import machine
import time
from machine import Pin
led1=Pin(14,Pin.OUT)
def do_connect():
sta_if = network.WLAN(network.STA_IF)
if not sta_if.isconnected():
print('connecting to network...')
sta_if.active(True)
print(sta_if.scan())
sta_if.connect("Raheel-Desktop", "Raheel42")
while not sta_if.isconnected():
pass
print('network config:', sta_if.ifconfig())
do_connect()
BLYNK_AUTH = "xl8Tswb2Ug-WoHoU3CclBzHO8RYzQw53"
blynk = BlynkLib.Blynk(BLYNK_AUTH)
@blynk.on("connected")
def blynk_connected(ping):
print("Connecting.................")
print('Blynk ready. Ping:', ping, 'ms')
print("Connected!")
# Define your functions
def function1():
# Logic for function 1
blynk.virtual_write(0, 'Function 1 executed')
print("FUNCTION 1 Executed")
def function2():
# Logic for function 2
blynk.virtual_write(0, 'Function 2 executed')
print("FUNCTION 2 Executed")
def function3():
# Logic for function 3
blynk.virtual_write(0, 'Function 3 executed')
print("FUNCTION 3 Executed")
# Register a handler for the terminal widget
@blynk.on('V0')
def terminal_handler(value):
# Check the received command from the terminal
if value[0] == 'function1':
function1()
elif value[0] == 'function2':
function2()
elif value[0] == 'function3':
function3()
else:
blynk.virtual_write(0, 'Invalid function name')
# Run blynk in the main thread:
while True:
blynk.run()
# `led_blynk.py`
import BlynkLib
import network
import machine
import time
from machine import Pin
led1=Pin(14,Pin.OUT)
led2=Pin(13,Pin.OUT)
BLYNK_AUTH = 'yqD8Pga0PduvJHOvahcRI3wH-Cj-cKju'
wifi = network.WLAN(network.STA_IF)
wifi.active(True)
while not wifi.isconnected():
pass
print('IP:', wifi.ifconfig()[0])
blynk = BlynkLib.Blynk(BLYNK_AUTH)
tmr_start_time = time.time()
@blynk.on("connected")
def blynk_connected(ping,value):
print("Connecting.................")
print('Blynk ready. Ping:', ping, 'ms')
print("Connected!")
@blynk.on("V1")
def v3_write_handler(value):
if int(value[0])==1:
led1.on()
else:
led1.off()
# Run blynk in the main thread:
while True:
blynk.run()
import BlynkLib
import network
import time
from machine import Pin
led1=Pin(14,Pin.OUT)
led2=Pin(13,Pin.OUT)
BLYNK_AUTH = 'yqD8Pga0PduvJHOvahcRI3wH-Cj-cKju'
wifi = network.WLAN(network.STA_IF)
wifi.active(True)
while not wifi.isconnected():
pass
print('IP:', wifi.ifconfig()[0])
blynk = BlynkLib.Blynk(BLYNK_AUTH)
tmr_start_time = time.time()
@blynk.on("connected")
def blynk_connected(ping):
print('Blynk ready. Ping:', ping, 'ms')
print("Connected!")
@blynk.on("V1")
def v3_write_handler(value):
if int(value[0])==1:
led1.on()
else:
led1.off()
@blynk.on("V2")
def v2_write_handler(value):
led2.value(int(value[0]))
if int(value[0])==1:
led2.on()
else:
led2.off()
while True:
blynk.run()
from time import sleep
import dht
from machine import Pin
import BlynkLib
import network
import machine
import time
from machine import Pin
import dht
sensor = dht.DHT11(Pin(33))
BLYNK_AUTH = 'yqD8Pga0PduvJHOvahcRI3wH-Cj-cKju'
wifi = network.WLAN(network.STA_IF)
wifi.active(True)
while not wifi.isconnected():
pass
print('IP:', wifi.ifconfig()[0])
blynk = BlynkLib.Blynk(BLYNK_AUTH)
tmr_start_time = time.time()
@blynk.on("connected")
def blynk_connected(ping):
print('Blynk ready. Ping:', ping, 'ms')
print("Connected!")
while True:
sensor.measure()
temp = sensor.temperature()
hum = sensor.humidity()
print('Air Temperature: %3.1f C' %temp)
print('Air Humidity: %3.1f %%' %hum)
blynk.virtual_write('13',temp)
blynk.virtual_write('14',hum)
sleep(1)
# Run blynk in the main thread:
while True:
blynk.run()
from time import sleep
from hcsr04 import HCSR04
import dht
from machine import Pin
import BlynkLib
import network
import machine
import time
from machine import Pin
import dht
sensor = HCSR04(trigger_pin=26, echo_pin=27, echo_timeout_us=10000)
BLYNK_AUTH = 'yqD8Pga0PduvJHOvahcRI3wH-Cj-cKju'
wifi = network.WLAN(network.STA_IF)
wifi.active(True)
while not wifi.isconnected():
pass
print('IP:', wifi.ifconfig()[0])
blynk = BlynkLib.Blynk(BLYNK_AUTH)
tmr_start_time = time.time()
@blynk.on("connected")
def blynk_connected(ping):
print('Blynk ready. Ping:', ping, 'ms')
print("Connected!")
while True:
distance = sensor.distance_cm()
print('Distance:', distance, 'cm')
blynk.virtual_write("20",distance)
sleep(1)
# Run blynk in the main thread:
while True:
blynk.run()
goto this link https://thingspeak.com/login Click create one then fill all fields
We will define the channels by entering the a proper name, description, and up to 8 fields can be used to name the parameter. For the Field 1 and Field 2 we have named Temperature and humidity. These field values that you set can be modified later. These values will be in Degree Centigrade and Relative Humidity in %. Once you update the name, click on Save.

Once you have saved the channel, you will be automatically redirected to the “Private View” tab. Here the mapped fields are shown as a diagram. You will find the “Channel ID” (we will need it later). Below You will also see API Keys option.
Later, click on the “API Keys” tab. The two values of “Write API key” and “Read API key” are equally necessary for us to write or retrieve data. Copy these keys and keep it safe as we need to put it in the code.
import machine
import urequests
from machine import Pin
import time, network
import dht
sensor = dht.DHT11(Pin(26))
HTTP_HEADERS = {'Content-Type': 'application/json'}
THINGSPEAK_WRITE_API_KEY = '07D8NPQGAVVGAV99'
UPDATE_TIME_INTERVAL = 5000 # in ms
last_update = time.ticks_ms()
# Configure ESP32 as Station
sta_if=network.WLAN(network.STA_IF)
sta_if.active(True)
print('network config:', sta_if.ifconfig())
while True:
if time.ticks_ms() - last_update >= UPDATE_TIME_INTERVAL:
while True:
sensor.measure()
temp = sensor.temperature()
hum = sensor.humidity()
print('Air Temperature: %3.1f C' %temp)
print('Air Humidity: %3.1f %%' %hum)
time.sleep(1)
readings = {'field1':temp, 'field2':hum}
request = urequests.post( 'http://api.thingspeak.com/update?api_key=' + THINGSPEAK_WRITE_API_KEY,
json = readings, headers = HTTP_HEADERS )
request.close()
print(readings)
Click here for getting API => https://www.callmebot.com/
Click here for url encoding codes =>> https://www.w3schools.com/tags/ref_urlencode.ASP
try:
import urequests as requests
except:
import requests
import network
#Your network credentials
ssid = 'Raheel-Desktop'
password = 'Raheel42'
def connect_wifi(ssid, password):
#Connect to your network
station = network.WLAN(network.STA_IF)
station.active(True)
station.connect(ssid, password)
while station.isconnected() == False:
pass
print('Connection successful')
print(station.ifconfig())
def send_message(phone_number, api_key, message):
#set your host URL
url = 'https://api.callmebot.com/whatsapp.php?phone='+phone_number+'&text='+message+'&apikey='+api_key
#make the request
response = requests.get(url)
#check if it was successful
if response.status_code == 200:
print('Success!')
else:
print('Error')
print(response.text)
# Connect to WiFi
connect_wifi(ssid, password)
#Your phone number in international format
phone_number = '+923452510056'
#Your callmebot API key
api_key = '9775846'
# Send message to WhatsApp "Hello"
message = 'Hello%20from%20ESP32%20%28micropython%29' #YOUR MESSAGE HERE (URL ENCODED)https://www.urlencoder.io/
send_message(phone_number, api_key, message)
Copyright © Muhammad Raheel
You can Contact me with this link or number
mraheel.naseem@gmail.com
+923452510056


























