这几天都在折腾图片这个事,受制于单片机的不稳定性很简单的图片经常花花绿绿的。。。 考虑到自己写传输需要分段还要写校验协议,不但麻烦而且速度慢还浪费数据流量,不如直接用别人的成品库,由于自己有个自用服务器于是直接用request库传送图片。
首先是PC端对PC端的收发
这是电脑给服务器发送图片的发送端写法
import requests
import binascii #二进制转字符串一下
url = 'http://192.168.3.190:5000/updata'
with open('5.jpg','rb') as f:
img=f.read()
img=binascii.b2a_base64(img) # 转换中-
d = {'jpg': img} # 字典
r = requests.post(url, data=d) #发送
print (r.text) #服务器回传的数据
对应的服务器简单写法如下
from flask import Flask,request
import binascii
app = Flask(__name__)
@app.route('/updata',methods=['POST','GET'])
def updata():
a=request.form.get('jpg')
a=binascii.a2b_base64(a)
with open('555.jpg','wb') as f:
f.write(a)
return '收到了你的数据了%d'%len(a)
if __name__ == '__main__':
app.run('0.0.0.0',5000)
上面方法优点是既可以传简短的数据也可以传图片,而且可以直接用get方法获取值,相对比较简单,但是这个方法必须是PC间的方式。单片机不行~单片机要改一改如下:
如果使用ESP32的CAM版本带摄像头的这玩意或者ESP32普通版本传送图片则:
ESP32端的写法
如果固件自己没有requests库,先贴一个这个库
import usocket
class Response:
def __init__(self, f):
self.raw = f
self.encoding = "utf-8"
self._cached = None
def close(self):
if self.raw:
self.raw.close()
self.raw = None
self._cached = None
@property
def content(self):
if self._cached is None:
self._cached = self.raw.read()
self.raw.close()
self.raw = None
return self._cached
@property
def text(self):
return str(self.content, self.encoding)
def json(self):
import ujson
return ujson.loads(self.content)
def request(method, url, data=None, json=None, headers={}, stream=None,params=None):
try:
proto, dummy, host, path = url.split("/", 3)
except ValueError:
proto, dummy, host = url.split("/", 2)
path = ""
if proto == "http:":
port = 80
elif proto == "https:":
import ussl
port = 443
else:
raise ValueError("Unsupported protocol: " + proto)
if ":" in host:
host, port = host.split(":", 1)
port = int(port)
if params:
path = path + "?"
for k in params:
path = path + '&'+k+'='+params[k]
ai = usocket.getaddrinfo(host, port)
addr = ai[0][4]
s = usocket.socket()
s.connect(addr)
if proto == "https:":
s = ussl.wrap_socket(s)
s.write(b"%s /%s HTTP/1.0\r\n" % (method, path))
if not "Host" in headers:
s.write(b"Host: %s\r\n" % host)
# Iterate over keys to avoid tuple alloc
for k in headers:
s.write(k)
s.write(b": ")
s.write(headers[k])
s.write(b"\r\n")
if json is not None:
assert data is None
import ujson
data = ujson.dumps(json)
if data:
s.write(b"Content-Length: %d\r\n" % len(data))
s.write(b"\r\n")
if data:
s.write(data)
l = s.readline()
protover, status, msg = l.split(None, 2)
status = int(status)
#print(protover, status, msg)
while True:
l = s.readline()
if not l or l == b"\r\n":
break
#print(l)
if l.startswith(b"Transfer-Encoding:"):
if b"chunked" in l:
raise ValueError("Unsupported " + l)
elif l.startswith(b"Location:") and not 200 <= status <= 299:
raise NotImplementedError("Redirects not yet supported")
resp = Response(s)
resp.status_code = status
resp.reason = msg.rstrip()
return resp
def head(url, **kw):
return request("HEAD", url, **kw)
def get(url, **kw):
return request("GET", url, **kw)
def post(url, **kw):
return request("POST", url, **kw)
def put(url, **kw):
return request("PUT", url, **kw)
def patch(url, **kw):
return request("PATCH", url, **kw)
def delete(url, **kw):
return request("DELETE", url, **kw)
ESP32端的写法
from time import sleep
import network
####################
# wifi类
####################
class Sta:
WIFI_SSID = "300king"
WIFI_PWD = "13704677369"
wlan = None
def __init__(self, wifi_ssid='', wifi_pwd=''):
network.WLAN(network.AP_IF).active(False) # disable access point
self.wlan = network.WLAN(network.STA_IF)
self.wlan.active(True)
if wifi_ssid == '':
self.wifi_ssid = Sta.WIFI_SSID
self.wifi_pwd = Sta.WIFI_PWD
else:
self.wifi_ssid = wifi_ssid
self.wifi_pwd = wifi_pwd
def connect(self, wifi_ssid='', wifi_pwd=''):
if wifi_ssid != '':
self.wifi_ssid = wifi_ssid
self.wifi_pwd = wifi_pwd
if not self.wlan.isconnected():
self.wlan.connect(self.wifi_ssid, self.wifi_pwd)
def status(self):
if self.wlan.isconnected():
return self.wlan.ifconfig()
else:
return ()
def wait(self):
cnt = 30
while cnt > 0:
print("Waiting ..." )
# con(self.wifi_ssid, self.wifi_pwd) # Connect to an WIFI_SSID
if self.wlan.isconnected():
print("Connected to %s" % self.wifi_ssid)
print('network config:', self.wlan.ifconfig())
cnt = 0
else:
sleep(5)
cnt -= 5
return
def scan(self):
return self.wlan.scan() # Scan for available access points
if __name__=='__main__':
a=Sta()
a.connect()
a.wait()
##############
# 上面是联网,下面才是发送
#############
import urequests as requests
import binascii,json
url = 'http://192.168.3.190:5000/updata' #服务器地址直接指向POST接收位置
with open('3333.jpg','rb') as f: #已经有的图片发送
img=f.read()
r = requests.post(url, data=img)
#CAM自己没有requests库,粘贴过来的库要求直接发送二进制,这里不排除别的库可以发送字典但是没有测试
print(r.text) #接收服务器数据
r.close()
服务器端的写法
from flask import Flask,render_template,request
app = Flask(__name__)
@app.route('/updata',methods=['POST','GET'])
def updata():
a=request.get_data() #直接接收数据就是二进制直接保存得了
with open('555.jpg','wb') as f:
f.write(a)
return '老子收到了你的数据了%d'%len(a)
if __name__ == '__main__':
app.run('0.0.0.0',5000)
经过升级的写法: a=request.get_data() 这个接收的是全部数据,也就是说你可以先把所有数据用JSON打包好用request 发过来由服务器全读出来在依次解出数据格式,如下
######################
#客户端写法
#####################
import urequest as requests
import binascii,json
url = 'http://192.168.3.190:5000/updata' #服务器地址直接指向POST接收位置
with open('3333.jpg','rb') as f: #已经有的图片发送
img=f.read()
aaa={'jpg':binascii.b2a_base64(img)} #数据包装,先转字符串
aaa=json.dumps(aaa) #再转JSON
r = requests.post(url, data=aaa) #发包装好的json
print(r.text) #服务器发回的数据
r.close()
######################################
#服务端如下:
######################################
from flask import Flask,render_template,request
import binascii,json
app = Flask(__name__)
@app.route('/')
def hello_world(): # put application's code here
return render_template('login.html')
@app.route('/updata',methods=['POST','GET'])
def updata():
a=request.get_data()
a=json.loads(a)
a=binascii.a2b_base64(a['jpg'])
with open('555.jpg','wb') as f:
f.write(a)
return '老子收到了你的数据了%d'%len(a)
if __name__ == '__main__':
app.run('0.0.0.0',5000)
如果文章或资源对您有帮助,欢迎打赏作者。一路走来,感谢有您!
txttool.com 说一段 esp56物联 查询128 IP查询