protocols.py: Implement progress callback and tqdm progressbar

Signed-off-by: Andrew Andrianov <andrew@ncrmnt.org>
This commit is contained in:
Andrew Andrianov 2017-10-15 16:07:12 +03:00
parent e0bda73fed
commit 092fbdc842

View File

@ -34,6 +34,7 @@ from stcgal.utils import Utils
from stcgal.options import Stc89Option, Stc12Option, Stc12AOption, Stc15Option, Stc15AOption
from abc import ABC, abstractmethod
import functools
import tqdm
try:
import usb.core, usb.util
@ -83,6 +84,22 @@ class StcBaseProtocol(ABC):
self.debug = False
self.status_packet = None
self.protocol_name = None
self.bar = None
self.progress_cb = self.progress_bar_cb
def progress_text_cb(self, current, written, maximum):
print(current, written, maximum)
def progress_bar_cb(self, current, written, maximum):
if not self.bar:
self.bar = tqdm.tqdm(
total = maximum,
unit = " Bytes",
desc = "Writing flash"
)
self.bar.update(written)
if current == maximum:
self.bar.close()
def dump_packet(self, data, receive=True):
if self.debug:
@ -553,8 +570,6 @@ class Stc89Protocol(StcBaseProtocol):
as the block size (depends on MCU's RAM size).
"""
print("Writing %d bytes: " % len(data), end="")
sys.stdout.flush()
for i in range(0, len(data), self.PROGRAM_BLOCKSIZE):
packet = bytes(3)
packet += struct.pack(">H", i)
@ -568,9 +583,8 @@ class Stc89Protocol(StcBaseProtocol):
raise StcProtocolException("incorrect magic in write packet")
elif len(response) < 2 or response[1] != csum:
raise StcProtocolException("verification checksum mismatch")
print(".", end="")
sys.stdout.flush()
print(" done")
self.progress_cb(i, self.PROGRAM_BLOCKSIZE, len(data))
self.progress_cb(len(data), self.PROGRAM_BLOCKSIZE, len(data))
def program_options(self):
"""Program option byte into flash"""
@ -940,8 +954,6 @@ class Stc12BaseProtocol(StcBaseProtocol):
as the block size (depends on MCU's RAM size).
"""
print("Writing %d bytes: " % len(data), end="")
sys.stdout.flush()
for i in range(0, len(data), self.PROGRAM_BLOCKSIZE):
packet = bytes(3)
packet += struct.pack(">H", i)
@ -952,9 +964,8 @@ class Stc12BaseProtocol(StcBaseProtocol):
response = self.read_packet()
if response[0] != 0x00:
raise StcProtocolException("incorrect magic in write packet")
print(".", end="")
sys.stdout.flush()
print(" done")
self.progress_cb(i, self.PROGRAM_BLOCKSIZE, len(data))
self.progress_cb(len(data), self.PROGRAM_BLOCKSIZE, len(data))
print("Finishing write: ", end="")
sys.stdout.flush()
@ -1463,8 +1474,6 @@ class Stc15Protocol(Stc15AProtocol):
def program_flash(self, data):
"""Program the MCU's flash memory."""
print("Writing %d bytes: " % len(data), end="")
sys.stdout.flush()
for i in range(0, len(data), self.PROGRAM_BLOCKSIZE):
packet = bytes([0x22]) if i == 0 else bytes([0x02])
packet += struct.pack(">H", i)
@ -1476,9 +1485,8 @@ class Stc15Protocol(Stc15AProtocol):
response = self.read_packet()
if len(response) < 2 or response[0] != 0x02 or response[1] != 0x54:
raise StcProtocolException("incorrect magic in write packet")
print(".", end="")
sys.stdout.flush()
print(" done")
self.progress_cb(i, self.PROGRAM_BLOCKSIZE, len(data))
self.progress_cb(len(data), self.PROGRAM_BLOCKSIZE, len(data))
# BSL 7.2+ needs a write finish packet according to dumps
if self.bsl_version >= 0x72:
@ -1661,8 +1669,6 @@ class StcUsb15Protocol(Stc15Protocol):
def program_flash(self, data):
"""Program the MCU's flash memory."""
print("Writing %d bytes: " % len(data), end="")
sys.stdout.flush()
for i in range(0, len(data), self.PROGRAM_BLOCKSIZE):
packet = data[i:i+self.PROGRAM_BLOCKSIZE]
while len(packet) < self.PROGRAM_BLOCKSIZE: packet += b"\x00"
@ -1672,9 +1678,8 @@ class StcUsb15Protocol(Stc15Protocol):
response = self.read_packet()
if response[0] != 0x02 or response[1] != 0x54:
raise StcProtocolException("incorrect magic in write packet")
print(".", end="")
sys.stdout.flush()
print(" done")
self.progress_cb(i, self.PROGRAM_BLOCKSIZE, len(data))
self.progress_cb(len(data), self.PROGRAM_BLOCKSIZE, len(data))
def program_options(self):
print("Setting options: ", end="")