Skip to content

Commit f2e10d3

Browse files
committed
tsize and metrics in packethook; fixes msoulier#103
now shows remaining amount to transfer in progress messages if the client was started with --tsize also fixes partftpy_client so it sends tsize on upload
1 parent 20cc6a9 commit f2e10d3

File tree

5 files changed

+76
-22
lines changed

5 files changed

+76
-22
lines changed

partftpy/TftpClient.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,11 @@ def download(
5454
"""This method initiates a tftp download from the configured remote
5555
host, requesting the filename passed. It writes the file to output,
5656
which can be a file-like object or a path to a local file. If a
57-
packethook is provided, it must be a function that takes a single
58-
parameter, which will be a copy of each DAT packet received in the
59-
form of a TftpPacketDAT object. The timeout parameter may be used to
57+
packethook is provided, it must be a function that takes two
58+
parameters, the first being a copy of each packet received in the
59+
form of a TftpPacket object, and the second being the TftpContext
60+
for this transfer, which can be inspected for more accurate statistics,
61+
progress estimates and such. The timeout parameter may be used to
6062
override the default SOCK_TIMEOUT setting, which is the amount of time
6163
that the client will wait for a receive packet to arrive.
6264
The retires parameter may be used to override the default DEF_TIMEOUT_RETRIES
@@ -108,9 +110,11 @@ def upload(
108110
"""This method initiates a tftp upload to the configured remote host,
109111
uploading the filename passed. It reads the file from input, which
110112
can be a file-like object or a path to a local file. If a packethook
111-
is provided, it must be a function that takes a single parameter,
112-
which will be a copy of each DAT packet sent in the form of a
113-
TftpPacketDAT object. The timeout parameter may be used to override
113+
is provided, it must be a function that takes two parameters,
114+
the first being a copy of each packet received in the form of
115+
a TftpPacket object, and the second being the TftpContext for
116+
this transfer, which can be inspected for more accurate statistics,
117+
progress estimates, etc. The timeout parameter may be used to override
114118
the default SOCK_TIMEOUT setting, which is the amount of time that
115119
the client will wait for a DAT packet to be ACKd by the server.
116120
The retires parameter may be used to override the default DEF_TIMEOUT_RETRIES

partftpy/TftpContexts.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@
2424
from .TftpShared import *
2525
from .TftpStates import *
2626

27+
if TYPE_CHECKING:
28+
from typing import Optional
29+
2730
log = logging.getLogger("partftpy.TftpContext")
2831

2932
###############################################################################
@@ -35,10 +38,13 @@ class TftpMetrics(object):
3538
"""A class representing metrics of the transfer."""
3639

3740
def __init__(self):
38-
# Bytes transferred
41+
# Set by context if available
42+
self.tsize = 0
43+
# Transfer counters
3944
self.bytes = 0
40-
# Bytes re-sent
45+
self.packets = 0
4146
self.resent_bytes = 0
47+
self.resent_packets = 0
4248
# Duplicate packets received
4349
self.dups = {}
4450
self.dupcount = 0
@@ -134,7 +140,7 @@ def __init__(
134140
# FIXME: does this belong in metrics?
135141
self.last_update = 0
136142
# The last packet we sent, if applicable, to make resending easy.
137-
self.last_pkt = None
143+
self.last_pkt = None # type: Optional[TftpPacket]
138144
# Count the number of retry attempts.
139145
self.retry_count = 0
140146
# Flag to signal timeout error when waiting for ACK of the current block
@@ -254,7 +260,7 @@ def cycle(self):
254260
# kinds of packets. This way, the client is privy to things like
255261
# negotiated options.
256262
if self.packethook:
257-
self.packethook(recvpkt)
263+
self.packethook(recvpkt, self)
258264

259265
# And handle it, possibly changing state.
260266
self.state = self.state.handle(recvpkt, raddress, rport)
@@ -367,6 +373,10 @@ def start(self):
367373
log.info(" filename -> %s", self.file_to_transfer)
368374
log.info(" options -> %s", self.options)
369375

376+
tsize = self.options.get("tsize")
377+
if tsize:
378+
self.metrics.tsize = tsize
379+
370380
self.metrics.start_time = time.time()
371381
log.debug("Set metrics.start_time to %s", self.metrics.start_time)
372382

partftpy/TftpStates.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@
2121
from .TftpPacketTypes import *
2222
from .TftpShared import *
2323

24+
if TYPE_CHECKING:
25+
from .TftpContexts import TftpContext
26+
27+
2428
log = logging.getLogger("partftpy.TftpStates")
2529

2630
###############################################################################
@@ -35,7 +39,7 @@ def __init__(self, context):
3539
"""Constructor for setting up common instance variables. The involved
3640
file object is required, since in tftp there's always a file
3741
involved."""
38-
self.context = context
42+
self.context = context # type: TftpContext
3943

4044
def handle(self, pkt, raddress, rport):
4145
"""An abstract method for handling a packet. It is expected to return
@@ -50,6 +54,9 @@ def handleOACK(self, pkt):
5054
log.info("Successful negotiation of options")
5155
# Set options to OACK options
5256
self.context.options = pkt.options
57+
tsize = pkt.options.get("tsize")
58+
if tsize:
59+
self.context.metrics.tsize = tsize
5360
for k, v in self.context.options.items():
5461
log.info(" %s = %s", k, v)
5562
else:
@@ -112,6 +119,7 @@ def sendDAT(self):
112119
dat.data = buffer
113120
dat.blocknumber = blocknumber
114121
self.context.metrics.bytes += len(dat.data)
122+
self.context.metrics.packets += 1
115123
# Testing hook
116124
if NETWORK_UNRELIABILITY > 0 and random.randrange(NETWORK_UNRELIABILITY) == 0:
117125
log.warning("Skipping DAT packet %d for testing", dat.blocknumber)
@@ -122,7 +130,7 @@ def sendDAT(self):
122130
)
123131
self.context.metrics.last_dat_time = time.time()
124132
if self.context.packethook:
125-
self.context.packethook(dat)
133+
self.context.packethook(dat, self.context)
126134
self.context.last_pkt = dat
127135
return finished
128136

@@ -175,6 +183,7 @@ def resendLast(self):
175183
assert self.context.last_pkt is not None
176184
log.warning("Resending packet %s on sessions %s", self.context.last_pkt, self)
177185
self.context.metrics.resent_bytes += len(self.context.last_pkt.buffer)
186+
self.context.metrics.resent_packets += 1
178187
self.context.metrics.add_dup(self.context.last_pkt)
179188
sendto_port = self.context.tidport
180189
if not sendto_port:
@@ -186,9 +195,10 @@ def resendLast(self):
186195
self.context.last_pkt.encode().buffer, (self.context.host, sendto_port)
187196
)
188197
if self.context.packethook:
189-
self.context.packethook(self.context.last_pkt)
198+
self.context.packethook(self.context.last_pkt, self.context)
190199

191200
def handleDat(self, pkt):
201+
# type: (TftpPacket) -> TftpState
192202
"""This method handles a DAT packet during a client download, or a
193203
server upload."""
194204
log.debug("Handling DAT packet - block %d", pkt.blocknumber)
@@ -202,6 +212,7 @@ def handleDat(self, pkt):
202212
log.debug("Writing %d bytes to output file", len(pkt.data))
203213
self.context.fileobj.write(pkt.data)
204214
self.context.metrics.bytes += len(pkt.data)
215+
self.context.metrics.packets += 1
205216
# Check for end-of-file, any less than full data packet.
206217
if len(pkt.data) < self.context.options["blksize"]:
207218
log.info("End of file detected")
@@ -354,6 +365,7 @@ def handle(self, pkt, raddress, rport):
354365
tsize = str(self.context.fileobj.tell())
355366
self.context.fileobj.seek(0, 0)
356367
self.context.options["tsize"] = tsize
368+
self.context.metrics.tsize = tsize
357369

358370
if sendoack:
359371
# Note, next_block is 0 here since that's the proper

partftpy/bin/partftpy_client.py

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,14 @@
66
import os
77
import socket
88
import sys
9+
import threading
10+
import time
911
from optparse import OptionParser
1012

1113
import partftpy.TftpPacketTypes
1214
from partftpy.TftpClient import TftpClient
1315
from partftpy.TftpShared import TftpException
16+
from partftpy.TftpContexts import TftpContext
1417

1518
log = logging.getLogger("partftpy")
1619
log.setLevel(logging.INFO)
@@ -112,18 +115,40 @@ def main():
112115

113116
class Progress(object):
114117
def __init__(self, out):
115-
self.progress = 0
116-
self.pkts = 0
117118
self.out = out
119+
self.metrics = None
120+
self.thr = threading.Thread(target=self._print_progress)
121+
self.thr.daemon = True
122+
self.thr.start()
118123

119-
def progresshook(self, pkt):
124+
def progresshook(self, pkt, ctx):
125+
# type: (bytes, TftpContext) -> None
120126
if isinstance(pkt, partftpy.TftpPacketTypes.TftpPacketDAT):
121-
self.pkts += 1
122-
self.progress += len(pkt.data)
123-
self.out("Transferred %d bytes, %d pkts" % (self.progress, self.pkts))
127+
self.metrics = ctx.metrics
124128
elif isinstance(pkt, partftpy.TftpPacketTypes.TftpPacketOACK):
125129
self.out("Received OACK, options are: %s" % pkt.options)
126130

131+
def _print_progress(self):
132+
while True:
133+
time.sleep(0.5)
134+
if not self.metrics:
135+
continue
136+
metrics = self.metrics
137+
self.metrics = None
138+
139+
pkts = metrics.packets
140+
nbytes = metrics.bytes
141+
left = metrics.tsize - nbytes
142+
if left < 0:
143+
self.out("Transferred %d pkts, %d bytes", pkts, nbytes)
144+
else:
145+
self.out(
146+
"Transferred %d pkts, %d bytes, %d bytes left",
147+
pkts,
148+
nbytes,
149+
left,
150+
)
151+
127152
if options.debug:
128153
log.setLevel(logging.DEBUG)
129154
# increase the verbosity of the formatter
@@ -139,8 +164,11 @@ def progresshook(self, pkt):
139164
tftp_options = {}
140165
if options.blksize:
141166
tftp_options["blksize"] = int(options.blksize)
142-
if options.tsize:
167+
if options.tsize and options.download:
143168
tftp_options["tsize"] = 0
169+
if options.tsize and options.upload and options.input != "-":
170+
fn = options.input or options.upload
171+
tftp_options["tsize"] = os.path.getsize(fn)
144172

145173
fam = socket.AF_INET6 if ":" in options.host else socket.AF_INET
146174

t/test.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -495,7 +495,7 @@ def testServerDownloadWithStopNow(self, output="/tmp/out"):
495495
stopped_early = False
496496
time.sleep(1)
497497

498-
def delay_hook(pkt):
498+
def delay_hook(pkt, ctx):
499499
time.sleep(0.005) # 5ms
500500

501501
client.download("640KBFILE", output, delay_hook)
@@ -539,7 +539,7 @@ def testServerDownloadWithStopNotNow(self, output="/tmp/out"):
539539
# parent - let the server start
540540
time.sleep(1)
541541

542-
def delay_hook(pkt):
542+
def delay_hook(pkt, ctx):
543543
time.sleep(0.005) # 5ms
544544

545545
client.download("640KBFILE", output, delay_hook)

0 commit comments

Comments
 (0)