Merge pull request #31 from nekromant/progressbar

Implement progress callback and tqdm progressbar
This commit is contained in:
Grigori Goronzy 2017-10-22 15:55:45 +02:00 committed by GitHub
commit a5e1cc26ee
2 changed files with 26 additions and 21 deletions

View File

@ -11,7 +11,7 @@ python:
before_install:
- sudo apt install rpm dpkg-dev debhelper dh-python python3-setuptools fakeroot python3-serial python3-yaml
install:
- pip install pyserial pyusb
- pip install pyserial pyusb tqdm
script:
- python setup.py build
- python setup.py test

View File

@ -34,6 +34,7 @@ from stcgal.utils import Utils
from stcgal.options import Stc89Option, Stc12Option, Stc12AOption, Stc15Option, Stc15AOption
from abc import ABC, abstractmethod
import functools
import tqdm
try:
import usb.core, usb.util
@ -83,6 +84,22 @@ class StcBaseProtocol(ABC):
self.debug = False
self.status_packet = None
self.protocol_name = None
self.bar = None
self.progress_cb = self.progress_bar_cb
def progress_text_cb(self, current, written, maximum):
print(current, written, maximum)
def progress_bar_cb(self, current, written, maximum):
if not self.bar:
self.bar = tqdm.tqdm(
total = maximum,
unit = " Bytes",
desc = "Writing flash"
)
self.bar.update(written)
if current == maximum:
self.bar.close()
def dump_packet(self, data, receive=True):
if self.debug:
@ -553,8 +570,6 @@ class Stc89Protocol(StcBaseProtocol):
as the block size (depends on MCU's RAM size).
"""
print("Writing %d bytes: " % len(data), end="")
sys.stdout.flush()
for i in range(0, len(data), self.PROGRAM_BLOCKSIZE):
packet = bytes(3)
packet += struct.pack(">H", i)
@ -568,9 +583,8 @@ class Stc89Protocol(StcBaseProtocol):
raise StcProtocolException("incorrect magic in write packet")
elif len(response) < 2 or response[1] != csum:
raise StcProtocolException("verification checksum mismatch")
print(".", end="")
sys.stdout.flush()
print(" done")
self.progress_cb(i, self.PROGRAM_BLOCKSIZE, len(data))
self.progress_cb(len(data), self.PROGRAM_BLOCKSIZE, len(data))
def program_options(self):
"""Program option byte into flash"""
@ -940,8 +954,6 @@ class Stc12BaseProtocol(StcBaseProtocol):
as the block size (depends on MCU's RAM size).
"""
print("Writing %d bytes: " % len(data), end="")
sys.stdout.flush()
for i in range(0, len(data), self.PROGRAM_BLOCKSIZE):
packet = bytes(3)
packet += struct.pack(">H", i)
@ -952,9 +964,8 @@ class Stc12BaseProtocol(StcBaseProtocol):
response = self.read_packet()
if response[0] != 0x00:
raise StcProtocolException("incorrect magic in write packet")
print(".", end="")
sys.stdout.flush()
print(" done")
self.progress_cb(i, self.PROGRAM_BLOCKSIZE, len(data))
self.progress_cb(len(data), self.PROGRAM_BLOCKSIZE, len(data))
print("Finishing write: ", end="")
sys.stdout.flush()
@ -1463,8 +1474,6 @@ class Stc15Protocol(Stc15AProtocol):
def program_flash(self, data):
"""Program the MCU's flash memory."""
print("Writing %d bytes: " % len(data), end="")
sys.stdout.flush()
for i in range(0, len(data), self.PROGRAM_BLOCKSIZE):
packet = bytes([0x22]) if i == 0 else bytes([0x02])
packet += struct.pack(">H", i)
@ -1476,9 +1485,8 @@ class Stc15Protocol(Stc15AProtocol):
response = self.read_packet()
if len(response) < 2 or response[0] != 0x02 or response[1] != 0x54:
raise StcProtocolException("incorrect magic in write packet")
print(".", end="")
sys.stdout.flush()
print(" done")
self.progress_cb(i, self.PROGRAM_BLOCKSIZE, len(data))
self.progress_cb(len(data), self.PROGRAM_BLOCKSIZE, len(data))
# BSL 7.2+ needs a write finish packet according to dumps
if self.bsl_version >= 0x72:
@ -1661,8 +1669,6 @@ class StcUsb15Protocol(Stc15Protocol):
def program_flash(self, data):
"""Program the MCU's flash memory."""
print("Writing %d bytes: " % len(data), end="")
sys.stdout.flush()
for i in range(0, len(data), self.PROGRAM_BLOCKSIZE):
packet = data[i:i+self.PROGRAM_BLOCKSIZE]
while len(packet) < self.PROGRAM_BLOCKSIZE: packet += b"\x00"
@ -1672,9 +1678,8 @@ class StcUsb15Protocol(Stc15Protocol):
response = self.read_packet()
if response[0] != 0x02 or response[1] != 0x54:
raise StcProtocolException("incorrect magic in write packet")
print(".", end="")
sys.stdout.flush()
print(" done")
self.progress_cb(i, self.PROGRAM_BLOCKSIZE, len(data))
self.progress_cb(len(data), self.PROGRAM_BLOCKSIZE, len(data))
def program_options(self):
print("Setting options: ", end="")