VirtualBox

source: vbox/trunk/src/VBox/ValidationKit/testdriver/txsclient.py@ 70541

Last change on this file since 70541 was 70541, checked in by vboxsync, 7 years ago

ValidationKit: txsCdWait/ISO fix.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
File size: 79.6 KB
Line 
1# -*- coding: utf-8 -*-
2# $Id: txsclient.py 70541 2018-01-11 14:53:57Z vboxsync $
3# pylint: disable=C0302
4
5"""
6Test eXecution Service Client.
7"""
8__copyright__ = \
9"""
10Copyright (C) 2010-2017 Oracle Corporation
11
12This file is part of VirtualBox Open Source Edition (OSE), as
13available from http://www.215389.xyz. This file is free software;
14you can redistribute it and/or modify it under the terms of the GNU
15General Public License (GPL) as published by the Free Software
16Foundation, in version 2 as it comes in the "COPYING" file of the
17VirtualBox OSE distribution. VirtualBox OSE is distributed in the
18hope that it will be useful, but WITHOUT ANY WARRANTY of any kind.
19
20The contents of this file may alternatively be used under the terms
21of the Common Development and Distribution License Version 1.0
22(CDDL) only, as it comes in the "COPYING.CDDL" file of the
23VirtualBox OSE distribution, in which case the provisions of the
24CDDL are applicable instead of those of the GPL.
25
26You may elect to license modified versions of this file under the
27terms and conditions of either the GPL or the CDDL or both.
28"""
29__version__ = "$Revision: 70541 $"
30
31# Standard Python imports.
32import array;
33import errno;
34import os;
35import select;
36import socket;
37import sys;
38import threading;
39import time;
40import types;
41import zlib;
42import uuid;
43
44# Validation Kit imports.
45from common import utils;
46from testdriver import base;
47from testdriver import reporter;
48from testdriver.base import TdTaskBase;
49
50# Python 3 hacks:
51if sys.version_info[0] >= 3:
52 long = int; # pylint: disable=redefined-builtin,invalid-name
53
54#
55# Helpers for decoding data received from the TXS.
56# These are used both the Session and Transport classes.
57#
58
59def getU32(abData, off):
60 """Get a U32 field."""
61 return abData[off] \
62 + abData[off + 1] * 256 \
63 + abData[off + 2] * 65536 \
64 + abData[off + 3] * 16777216;
65
66def getSZ(abData, off, sDefault = None):
67 """
68 Get a zero-terminated string field.
69 Returns sDefault if the string is invalid.
70 """
71 cchStr = getSZLen(abData, off);
72 if cchStr >= 0:
73 abStr = abData[off:(off + cchStr)];
74 try:
75 return abStr.tostring().decode('utf_8');
76 except:
77 reporter.errorXcpt('getSZ(,%u)' % (off));
78 return sDefault;
79
80def getSZLen(abData, off):
81 """
82 Get the length of a zero-terminated string field, in bytes.
83 Returns -1 if off is beyond the data packet or not properly terminated.
84 """
85 cbData = len(abData);
86 if off >= cbData:
87 return -1;
88
89 offCur = off;
90 while abData[offCur] != 0:
91 offCur = offCur + 1;
92 if offCur >= cbData:
93 return -1;
94
95 return offCur - off;
96
97def isValidOpcodeEncoding(sOpcode):
98 """
99 Checks if the specified opcode is valid or not.
100 Returns True on success.
101 Returns False if it is invalid, details in the log.
102 """
103 sSet1 = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
104 sSet2 = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_ ";
105 if len(sOpcode) != 8:
106 reporter.error("invalid opcode length: %s" % (len(sOpcode)));
107 return False;
108 for i in range(0, 1):
109 if sSet1.find(sOpcode[i]) < 0:
110 reporter.error("invalid opcode char #%u: %s" % (i, sOpcode));
111 return False;
112 for i in range(2, 7):
113 if sSet2.find(sOpcode[i]) < 0:
114 reporter.error("invalid opcode char #%u: %s" % (i, sOpcode));
115 return False;
116 return True;
117
118#
119# Helper for encoding data sent to the TXS.
120#
121
122def u32ToByteArray(u32):
123 """Encodes the u32 value as a little endian byte (B) array."""
124 return array.array('B', \
125 ( u32 % 256, \
126 (u32 // 256) % 256, \
127 (u32 // 65536) % 256, \
128 (u32 // 16777216) % 256) );
129
130
131
132class TransportBase(object):
133 """
134 Base class for the transport layer.
135 """
136
137 def __init__(self, sCaller):
138 self.sDbgCreated = '%s: %s' % (utils.getTimePrefix(), sCaller);
139 self.fDummy = 0;
140 self.abReadAheadHdr = array.array('B');
141
142 def toString(self):
143 """
144 Stringify the instance for logging and debugging.
145 """
146 return '<%s: abReadAheadHdr=%s, sDbgCreated=%s>' % (type(self).__name__, self.abReadAheadHdr, self.sDbgCreated);
147
148 def __str__(self):
149 return self.toString();
150
151 def cancelConnect(self):
152 """
153 Cancels any pending connect() call.
154 Returns None;
155 """
156 return None;
157
158 def connect(self, cMsTimeout):
159 """
160 Quietly attempts to connect to the TXS.
161
162 Returns True on success.
163 Returns False on retryable errors (no logging).
164 Returns None on fatal errors with details in the log.
165
166 Override this method, don't call super.
167 """
168 _ = cMsTimeout;
169 return False;
170
171 def disconnect(self, fQuiet = False):
172 """
173 Disconnect from the TXS.
174
175 Returns True.
176
177 Override this method, don't call super.
178 """
179 _ = fQuiet;
180 return True;
181
182 def sendBytes(self, abBuf, cMsTimeout):
183 """
184 Sends the bytes in the buffer abBuf to the TXS.
185
186 Returns True on success.
187 Returns False on failure and error details in the log.
188
189 Override this method, don't call super.
190
191 Remarks: len(abBuf) is always a multiple of 16.
192 """
193 _ = abBuf; _ = cMsTimeout;
194 return False;
195
196 def recvBytes(self, cb, cMsTimeout, fNoDataOk):
197 """
198 Receive cb number of bytes from the TXS.
199
200 Returns the bytes (array('B')) on success.
201 Returns None on failure and error details in the log.
202
203 Override this method, don't call super.
204
205 Remarks: cb is always a multiple of 16.
206 """
207 _ = cb; _ = cMsTimeout; _ = fNoDataOk;
208 return None;
209
210 def isConnectionOk(self):
211 """
212 Checks if the connection is OK.
213
214 Returns True if it is.
215 Returns False if it isn't (caller should call diconnect).
216
217 Override this method, don't call super.
218 """
219 return True;
220
221 def isRecvPending(self, cMsTimeout = 0):
222 """
223 Checks if there is incoming bytes, optionally waiting cMsTimeout
224 milliseconds for something to arrive.
225
226 Returns True if there is, False if there isn't.
227
228 Override this method, don't call super.
229 """
230 _ = cMsTimeout;
231 return False;
232
233 def sendMsgInt(self, sOpcode, cMsTimeout, abPayload = array.array('B')):
234 """
235 Sends a message (opcode + encoded payload).
236
237 Returns True on success.
238 Returns False on failure and error details in the log.
239 """
240 # Fix + check the opcode.
241 if len(sOpcode) < 2:
242 reporter.fatal('sendMsgInt: invalid opcode length: %d (\"%s\")' % (len(sOpcode), sOpcode));
243 return False;
244 sOpcode = sOpcode.ljust(8);
245 if not isValidOpcodeEncoding(sOpcode):
246 reporter.fatal('sendMsgInt: invalid opcode encoding: \"%s\"' % (sOpcode));
247 return False;
248
249 # Start construct the message.
250 cbMsg = 16 + len(abPayload);
251 abMsg = array.array('B');
252 abMsg.extend(u32ToByteArray(cbMsg));
253 abMsg.extend((0, 0, 0, 0)); # uCrc32
254 try:
255 abMsg.extend(array.array('B', \
256 ( ord(sOpcode[0]), \
257 ord(sOpcode[1]), \
258 ord(sOpcode[2]), \
259 ord(sOpcode[3]), \
260 ord(sOpcode[4]), \
261 ord(sOpcode[5]), \
262 ord(sOpcode[6]), \
263 ord(sOpcode[7]) ) ) );
264 if abPayload:
265 abMsg.extend(abPayload);
266 except:
267 reporter.fatalXcpt('sendMsgInt: packing problem...');
268 return False;
269
270 # checksum it, padd it and send it off.
271 uCrc32 = zlib.crc32(abMsg[8:]);
272 abMsg[4:8] = u32ToByteArray(uCrc32);
273
274 while len(abMsg) % 16:
275 abMsg.append(0);
276
277 reporter.log2('sendMsgInt: op=%s len=%d to=%d' % (sOpcode, len(abMsg), cMsTimeout));
278 return self.sendBytes(abMsg, cMsTimeout);
279
280 def recvMsg(self, cMsTimeout, fNoDataOk = False):
281 """
282 Receives a message from the TXS.
283
284 Returns the message three-tuple: length, opcode, payload.
285 Returns (None, None, None) on failure and error details in the log.
286 """
287
288 # Read the header.
289 if self.abReadAheadHdr:
290 assert(len(self.abReadAheadHdr) == 16);
291 abHdr = self.abReadAheadHdr;
292 self.abReadAheadHdr = array.array('B');
293 else:
294 abHdr = self.recvBytes(16, cMsTimeout, fNoDataOk);
295 if abHdr is None:
296 return (None, None, None);
297 if len(abHdr) != 16:
298 reporter.fatal('recvBytes(16) returns %d bytes!' % (len(abHdr)));
299 return (None, None, None);
300
301 # Unpack and validate the header.
302 cbMsg = getU32(abHdr, 0);
303 uCrc32 = getU32(abHdr, 4);
304 sOpcode = abHdr[8:16].tostring().decode('ascii');
305
306 if cbMsg < 16:
307 reporter.fatal('recvMsg: message length is out of range: %s (min 16 bytes)' % (cbMsg));
308 return (None, None, None);
309 if cbMsg > 1024*1024:
310 reporter.fatal('recvMsg: message length is out of range: %s (max 1MB)' % (cbMsg));
311 return (None, None, None);
312 if not isValidOpcodeEncoding(sOpcode):
313 reporter.fatal('recvMsg: invalid opcode \"%s\"' % (sOpcode));
314 return (None, None, None);
315
316 # Get the payload (if any), dropping the padding.
317 abPayload = array.array('B');
318 if cbMsg > 16:
319 if cbMsg % 16:
320 cbPadding = 16 - (cbMsg % 16);
321 else:
322 cbPadding = 0;
323 abPayload = self.recvBytes(cbMsg - 16 + cbPadding, cMsTimeout, False);
324 if abPayload is None:
325 self.abReadAheadHdr = abHdr;
326 if not fNoDataOk :
327 reporter.log('recvMsg: failed to recv payload bytes!');
328 return (None, None, None);
329
330 while cbPadding > 0:
331 abPayload.pop();
332 cbPadding = cbPadding - 1;
333
334 # Check the CRC-32.
335 if uCrc32 != 0:
336 uActualCrc32 = zlib.crc32(abHdr[8:]);
337 if cbMsg > 16:
338 uActualCrc32 = zlib.crc32(abPayload, uActualCrc32);
339 uActualCrc32 = uActualCrc32 & 0xffffffff;
340 if uCrc32 != uActualCrc32:
341 reporter.fatal('recvMsg: crc error: expected %s, got %s' % (hex(uCrc32), hex(uActualCrc32)));
342 return (None, None, None);
343
344 reporter.log2('recvMsg: op=%s len=%d' % (sOpcode, len(abPayload)));
345 return (cbMsg, sOpcode, abPayload);
346
347 def sendMsg(self, sOpcode, cMsTimeout, aoPayload = ()):
348 """
349 Sends a message (opcode + payload tuple).
350
351 Returns True on success.
352 Returns False on failure and error details in the log.
353 Returns None if you pass the incorrectly typed parameters.
354 """
355 # Encode the payload.
356 abPayload = array.array('B');
357 for o in aoPayload:
358 try:
359 if utils.isString(o):
360 if sys.version_info[0] >= 3:
361 abPayload.extend(o.encode('utf_8'));
362 else:
363 # the primitive approach...
364 sUtf8 = o.encode('utf_8');
365 for i in range(0, len(sUtf8)):
366 abPayload.append(ord(sUtf8[i]))
367 abPayload.append(0);
368 elif isinstance(o, types.LongType):
369 if o < 0 or o > 0xffffffff:
370 reporter.fatal('sendMsg: uint32_t payload is out of range: %s' % (hex(o)));
371 return None;
372 abPayload.extend(u32ToByteArray(o));
373 elif isinstance(o, array.array):
374 abPayload.extend(o);
375 else:
376 reporter.fatal('sendMsg: unexpected payload type: %s (%s) (aoPayload=%s)' % (type(o), o, aoPayload));
377 return None;
378 except:
379 reporter.fatalXcpt('sendMsg: screwed up the encoding code...');
380 return None;
381 return self.sendMsgInt(sOpcode, cMsTimeout, abPayload);
382
383
384class Session(TdTaskBase):
385 """
386 A Test eXecution Service (TXS) client session.
387 """
388
389 def __init__(self, oTransport, cMsTimeout, cMsIdleFudge, fTryConnect = False):
390 """
391 Construct a TXS session.
392
393 This starts by connecting to the TXS and will enter the signalled state
394 when connected or the timeout has been reached.
395 """
396 TdTaskBase.__init__(self, utils.getCallerName());
397 self.oTransport = oTransport;
398 self.sStatus = "";
399 self.cMsTimeout = 0;
400 self.fErr = True; # Whether to report errors as error.
401 self.msStart = 0;
402 self.oThread = None;
403 self.fnTask = self.taskDummy;
404 self.aTaskArgs = None;
405 self.oTaskRc = None;
406 self.t3oReply = (None, None, None);
407 self.fScrewedUpMsgState = False;
408 self.fTryConnect = fTryConnect;
409
410 if not self.startTask(cMsTimeout, False, "connecting", self.taskConnect, (cMsIdleFudge,)):
411 raise base.GenError("startTask failed");
412
413 def __del__(self):
414 """Make sure to cancel the task when deleted."""
415 self.cancelTask();
416
417 def toString(self):
418 return '<%s fnTask=%s, aTaskArgs=%s, sStatus=%s, oTaskRc=%s, cMsTimeout=%s,' \
419 ' msStart=%s, fTryConnect=%s, fErr=%s, fScrewedUpMsgState=%s, t3oReply=%s oTransport=%s, oThread=%s>' \
420 % (TdTaskBase.toString(self), self.fnTask, self.aTaskArgs, self.sStatus, self.oTaskRc, self.cMsTimeout,
421 self.msStart, self.fTryConnect, self.fErr, self.fScrewedUpMsgState, self.t3oReply, self.oTransport, self.oThread);
422
423 def taskDummy(self):
424 """Place holder to catch broken state handling."""
425 raise Exception();
426
427 def startTask(self, cMsTimeout, fIgnoreErrors, sStatus, fnTask, aArgs = ()):
428 """
429 Kicks of a new task.
430
431 cMsTimeout: The task timeout in milliseconds. Values less than
432 500 ms will be adjusted to 500 ms. This means it is
433 OK to use negative value.
434 sStatus: The task status.
435 fnTask: The method that'll execute the task.
436 aArgs: Arguments to pass to fnTask.
437
438 Returns True on success, False + error in log on failure.
439 """
440 if not self.cancelTask():
441 reporter.maybeErr(not fIgnoreErrors, 'txsclient.Session.startTask: failed to cancel previous task.');
442 return False;
443
444 # Change status and make sure we're the
445 self.lockTask();
446 if self.sStatus != "":
447 self.unlockTask();
448 reporter.maybeErr(not fIgnoreErrors, 'txsclient.Session.startTask: race.');
449 return False;
450 self.sStatus = "setup";
451 self.oTaskRc = None;
452 self.t3oReply = (None, None, None);
453 self.resetTaskLocked();
454 self.unlockTask();
455
456 self.cMsTimeout = max(cMsTimeout, 500);
457 self.fErr = not fIgnoreErrors;
458 self.fnTask = fnTask;
459 self.aTaskArgs = aArgs;
460 self.oThread = threading.Thread(target=self.taskThread, args=(), name=('TXS-%s' % (sStatus)));
461 self.oThread.setDaemon(True);
462 self.msStart = base.timestampMilli();
463
464 self.lockTask();
465 self.sStatus = sStatus;
466 self.unlockTask();
467 self.oThread.start();
468
469 return True;
470
471 def cancelTask(self, fSync = True):
472 """
473 Attempts to cancel any pending tasks.
474 Returns success indicator (True/False).
475 """
476 self.lockTask();
477
478 if self.sStatus == "":
479 self.unlockTask();
480 return True;
481 if self.sStatus == "setup":
482 self.unlockTask();
483 return False;
484 if self.sStatus == "cancelled":
485 self.unlockTask();
486 return False;
487
488 reporter.log('txsclient: cancelling "%s"...' % (self.sStatus));
489 if self.sStatus == 'connecting':
490 self.oTransport.cancelConnect();
491
492 self.sStatus = "cancelled";
493 oThread = self.oThread;
494 self.unlockTask();
495
496 if not fSync:
497 return False;
498
499 oThread.join(61.0);
500 return oThread.isAlive();
501
502 def taskThread(self):
503 """
504 The task thread function.
505 This does some housekeeping activities around the real task method call.
506 """
507 if not self.isCancelled():
508 try:
509 fnTask = self.fnTask;
510 oTaskRc = fnTask(*self.aTaskArgs);
511 except:
512 reporter.fatalXcpt('taskThread', 15);
513 oTaskRc = None;
514 else:
515 reporter.log('taskThread: cancelled already');
516
517 self.lockTask();
518
519 reporter.log('taskThread: signalling task with status "%s", oTaskRc=%s' % (self.sStatus, oTaskRc));
520 self.oTaskRc = oTaskRc;
521 self.oThread = None;
522 self.sStatus = '';
523 self.signalTaskLocked();
524
525 self.unlockTask();
526 return None;
527
528 def isCancelled(self):
529 """Internal method for checking if the task has been cancelled."""
530 self.lockTask();
531 sStatus = self.sStatus;
532 self.unlockTask();
533 if sStatus == "cancelled":
534 return True;
535 return False;
536
537 def hasTimedOut(self):
538 """Internal method for checking if the task has timed out or not."""
539 cMsLeft = self.getMsLeft();
540 if cMsLeft <= 0:
541 return True;
542 return False;
543
544 def getMsLeft(self, cMsMin = 0, cMsMax = -1):
545 """Gets the time left until the timeout."""
546 cMsElapsed = base.timestampMilli() - self.msStart;
547 if cMsElapsed < 0:
548 return cMsMin;
549 cMsLeft = self.cMsTimeout - cMsElapsed;
550 if cMsLeft <= cMsMin:
551 return cMsMin;
552 if cMsLeft > cMsMax and cMsMax > 0:
553 return cMsMax
554 return cMsLeft;
555
556 def recvReply(self, cMsTimeout = None, fNoDataOk = False):
557 """
558 Wrapper for TransportBase.recvMsg that stashes the response away
559 so the client can inspect it later on.
560 """
561 if cMsTimeout is None:
562 cMsTimeout = self.getMsLeft(500);
563 cbMsg, sOpcode, abPayload = self.oTransport.recvMsg(cMsTimeout, fNoDataOk);
564 self.lockTask();
565 self.t3oReply = (cbMsg, sOpcode, abPayload);
566 self.unlockTask();
567 return (cbMsg, sOpcode, abPayload);
568
569 def recvAck(self, fNoDataOk = False):
570 """
571 Receives an ACK or error response from the TXS.
572
573 Returns True on success.
574 Returns False on timeout or transport error.
575 Returns (sOpcode, sDetails) tuple on failure. The opcode is stripped
576 and there are always details of some sort or another.
577 """
578 cbMsg, sOpcode, abPayload = self.recvReply(None, fNoDataOk);
579 if cbMsg is None:
580 return False;
581 sOpcode = sOpcode.strip()
582 if sOpcode == "ACK":
583 return True;
584 return (sOpcode, getSZ(abPayload, 0, sOpcode));
585
586 def recvAckLogged(self, sCommand, fNoDataOk = False):
587 """
588 Wrapper for recvAck and logging.
589 Returns True on success (ACK).
590 Returns False on time, transport error and errors signalled by TXS.
591 """
592 rc = self.recvAck(fNoDataOk);
593 if rc is not True and not fNoDataOk:
594 if rc is False:
595 reporter.maybeErr(self.fErr, 'recvAckLogged: %s transport error' % (sCommand));
596 else:
597 reporter.maybeErr(self.fErr, 'recvAckLogged: %s response was %s: %s' % (sCommand, rc[0], rc[1]));
598 rc = False;
599 return rc;
600
601 def recvTrueFalse(self, sCommand):
602 """
603 Receives a TRUE/FALSE response from the TXS.
604 Returns True on TRUE, False on FALSE and None on error/other (logged).
605 """
606 cbMsg, sOpcode, abPayload = self.recvReply();
607 if cbMsg is None:
608 reporter.maybeErr(self.fErr, 'recvAckLogged: %s transport error' % (sCommand));
609 return None;
610
611 sOpcode = sOpcode.strip()
612 if sOpcode == "TRUE":
613 return True;
614 if sOpcode == "FALSE":
615 return False;
616 reporter.maybeErr(self.fErr, 'recvAckLogged: %s response was %s: %s' % (sCommand, sOpcode, getSZ(abPayload, 0, sOpcode)));
617 return None;
618
619 def sendMsg(self, sOpcode, aoPayload = (), cMsTimeout = None):
620 """
621 Wrapper for TransportBase.sendMsg that inserts the correct timeout.
622 """
623 if cMsTimeout is None:
624 cMsTimeout = self.getMsLeft(500);
625 return self.oTransport.sendMsg(sOpcode, cMsTimeout, aoPayload);
626
627 def asyncToSync(self, fnAsync, *aArgs):
628 """
629 Wraps an asynchronous task into a synchronous operation.
630
631 Returns False on failure, task return status on success.
632 """
633 rc = fnAsync(*aArgs);
634 if rc is False:
635 reporter.log2('asyncToSync(%s): returns False (#1)' % (fnAsync));
636 return rc;
637
638 rc = self.waitForTask(self.cMsTimeout + 5000);
639 if rc is False:
640 reporter.maybeErrXcpt(self.fErr, 'asyncToSync: waitForTask failed...');
641 self.cancelTask();
642 #reporter.log2('asyncToSync(%s): returns False (#2)' % (fnAsync, rc));
643 return False;
644
645 rc = self.getResult();
646 #reporter.log2('asyncToSync(%s): returns %s' % (fnAsync, rc));
647 return rc;
648
649 #
650 # Connection tasks.
651 #
652
653 def taskConnect(self, cMsIdleFudge):
654 """Tries to connect to the TXS"""
655 while not self.isCancelled():
656 reporter.log2('taskConnect: connecting ...');
657 rc = self.oTransport.connect(self.getMsLeft(500));
658 if rc is True:
659 reporter.log('taskConnect: succeeded');
660 return self.taskGreet(cMsIdleFudge);
661 if rc is None:
662 reporter.log2('taskConnect: unable to connect');
663 return None;
664 if self.hasTimedOut():
665 reporter.log2('taskConnect: timed out');
666 if not self.fTryConnect:
667 reporter.maybeErr(self.fErr, 'taskConnect: timed out');
668 return False;
669 time.sleep(self.getMsLeft(1, 1000) / 1000.0);
670 if not self.fTryConnect:
671 reporter.maybeErr(self.fErr, 'taskConnect: cancelled');
672 return False;
673
674 def taskGreet(self, cMsIdleFudge):
675 """Greets the TXS"""
676 rc = self.sendMsg("HOWDY", ());
677 if rc is True:
678 rc = self.recvAckLogged("HOWDY", self.fTryConnect);
679 if rc is True:
680 while cMsIdleFudge > 0:
681 cMsIdleFudge -= 1000;
682 time.sleep(1);
683 else:
684 self.oTransport.disconnect(self.fTryConnect);
685 return rc;
686
687 def taskBye(self):
688 """Says goodbye to the TXS"""
689 rc = self.sendMsg("BYE");
690 if rc is True:
691 rc = self.recvAckLogged("BYE");
692 self.oTransport.disconnect();
693 return rc;
694
695 def taskUuid(self):
696 """Gets the TXS UUID"""
697 rc = self.sendMsg("UUID");
698 if rc is True:
699 rc = False;
700 cbMsg, sOpcode, abPayload = self.recvReply();
701 if cbMsg is not None:
702 sOpcode = sOpcode.strip()
703 if sOpcode == "ACK UUID":
704 sUuid = getSZ(abPayload, 0);
705 if sUuid is not None:
706 sUuid = '{%s}' % (sUuid,)
707 try:
708 _ = uuid.UUID(sUuid);
709 rc = sUuid;
710 except:
711 reporter.errorXcpt('taskUuid got an invalid UUID string %s' % (sUuid,));
712 else:
713 reporter.maybeErr(self.fErr, 'taskUuid did not get a UUID string.');
714 else:
715 reporter.maybeErr(self.fErr, 'taskUuid got a bad reply: %s' % (sOpcode,));
716 else:
717 reporter.maybeErr(self.fErr, 'taskUuid got 3xNone from recvReply.');
718 return rc;
719
720 #
721 # Process task
722 # pylint: disable=C0111
723 #
724
725 def taskExecEx(self, sExecName, fFlags, asArgs, asAddEnv, oStdIn, oStdOut, oStdErr, oTestPipe, sAsUser): # pylint: disable=R0913,R0914,R0915,C0301
726 # Construct the payload.
727 aoPayload = [long(fFlags), '%s' % (sExecName), long(len(asArgs))];
728 for sArg in asArgs:
729 aoPayload.append('%s' % (sArg));
730 aoPayload.append(long(len(asAddEnv)));
731 for sPutEnv in asAddEnv:
732 aoPayload.append('%s' % (sPutEnv));
733 for o in (oStdIn, oStdOut, oStdErr, oTestPipe):
734 if utils.isString(o):
735 aoPayload.append(o);
736 elif o is not None:
737 aoPayload.append('|');
738 o.uTxsClientCrc32 = zlib.crc32('');
739 else:
740 aoPayload.append('');
741 aoPayload.append('%s' % (sAsUser));
742 aoPayload.append(long(self.cMsTimeout));
743
744 # Kick of the EXEC command.
745 rc = self.sendMsg('EXEC', aoPayload)
746 if rc is True:
747 rc = self.recvAckLogged('EXEC');
748 if rc is True:
749 # Loop till the process completes, feed input to the TXS and
750 # receive output from it.
751 sFailure = "";
752 msPendingInputReply = None;
753 cbMsg, sOpcode, abPayload = (None, None, None);
754 while True:
755 # Pending input?
756 if msPendingInputReply is None \
757 and oStdIn is not None \
758 and not utils.isString(oStdIn):
759 try:
760 sInput = oStdIn.read(65536);
761 except:
762 reporter.errorXcpt('read standard in');
763 sFailure = 'exception reading stdin';
764 rc = None;
765 break;
766 if sInput:
767 oStdIn.uTxsClientCrc32 = zlib.crc32(sInput, oStdIn.uTxsClientCrc32);
768 # Convert to a byte array before handing it of to sendMsg or the string
769 # will get some zero termination added breaking the CRC (and injecting
770 # unwanted bytes).
771 abInput = array.array('B', sInput);
772 rc = self.sendMsg('STDIN', (long(oStdIn.uTxsClientCrc32 & 0xffffffff), abInput));
773 if rc is not True:
774 sFailure = 'sendMsg failure';
775 break;
776 msPendingInputReply = base.timestampMilli();
777 continue;
778
779 rc = self.sendMsg('STDINEOS');
780 oStdIn = None;
781 if rc is not True:
782 sFailure = 'sendMsg failure';
783 break;
784 msPendingInputReply = base.timestampMilli();
785
786 # Wait for input (500 ms timeout).
787 if cbMsg is None:
788 cbMsg, sOpcode, abPayload = self.recvReply(cMsTimeout=500, fNoDataOk=True);
789 if cbMsg is None:
790 # Check for time out before restarting the loop.
791 # Note! Only doing timeout checking here does mean that
792 # the TXS may prevent us from timing out by
793 # flooding us with data. This is unlikely though.
794 if self.hasTimedOut() \
795 and ( msPendingInputReply is None \
796 or base.timestampMilli() - msPendingInputReply > 30000):
797 reporter.maybeErr(self.fErr, 'taskExecEx: timed out');
798 sFailure = 'timeout';
799 rc = None;
800 break;
801 # Check that the connection is OK.
802 if not self.oTransport.isConnectionOk():
803 self.oTransport.disconnect();
804 sFailure = 'disconnected';
805 rc = False;
806 break;
807 continue;
808
809 # Handle the response.
810 sOpcode = sOpcode.rstrip();
811 if sOpcode == 'STDOUT':
812 oOut = oStdOut;
813 elif sOpcode == 'STDERR':
814 oOut = oStdErr;
815 elif sOpcode == 'TESTPIPE':
816 oOut = oTestPipe;
817 else:
818 oOut = None;
819 if oOut is not None:
820 # Output from the process.
821 if len(abPayload) < 4:
822 sFailure = 'malformed output packet (%s, %u bytes)' % (sOpcode, cbMsg);
823 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
824 rc = None;
825 break;
826 uStreamCrc32 = getU32(abPayload, 0);
827 oOut.uTxsClientCrc32 = zlib.crc32(abPayload[4:], oOut.uTxsClientCrc32);
828 if uStreamCrc32 != (oOut.uTxsClientCrc32 & 0xffffffff):
829 sFailure = 'crc error - mine=%#x their=%#x (%s, %u bytes)' \
830 % (oOut.uTxsClientCrc32 & 0xffffffff, uStreamCrc32, sOpcode, cbMsg);
831 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
832 rc = None;
833 break;
834 try:
835 oOut.write(abPayload[4:]);
836 except:
837 sFailure = 'exception writing %s' % (sOpcode);
838 reporter.errorXcpt('taskExecEx: %s' % (sFailure));
839 rc = None;
840 break;
841 elif sOpcode == 'STDINIGN' and msPendingInputReply is not None:
842 # Standard input is ignored. Ignore this condition for now.
843 msPendingInputReply = None;
844 reporter.log('taskExecEx: Standard input is ignored... why?');
845 del oStdIn.uTxsClientCrc32;
846 oStdIn = '/dev/null';
847 elif (sOpcode == 'STDINMEM' or sOpcode == 'STDINBAD' or sOpcode == 'STDINCRC')\
848 and msPendingInputReply is not None:
849 # TXS STDIN error, abort.
850 # TODO: STDINMEM - consider undoing the previous stdin read and try resubmitt it.
851 msPendingInputReply = None;
852 sFailure = 'TXS is out of memory for std input buffering';
853 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
854 rc = None;
855 break;
856 elif sOpcode == 'ACK' and msPendingInputReply is not None:
857 msPendingInputReply = None;
858 elif sOpcode.startswith('PROC '):
859 # Process status message, handle it outside the loop.
860 rc = True;
861 break;
862 else:
863 sFailure = 'Unexpected opcode %s' % (sOpcode);
864 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
865 rc = None;
866 break;
867 # Clear the message.
868 cbMsg, sOpcode, abPayload = (None, None, None);
869
870 # If we sent an STDIN packet and didn't get a reply yet, we'll give
871 # TXS some 5 seconds to reply to this. If we don't wait here we'll
872 # get screwed later on if we mix it up with the reply to some other
873 # command. Hackish.
874 if msPendingInputReply is not None:
875 cbMsg2, sOpcode2, abPayload2 = self.oTransport.recvMsg(5000);
876 if cbMsg2 is not None:
877 reporter.log('taskExecEx: Out of order STDIN, got reply: %s, %s, %s [ignored]'
878 % (cbMsg2, sOpcode2, abPayload2));
879 msPendingInputReply = None;
880 else:
881 reporter.maybeErr(self.fErr, 'taskExecEx: Pending STDIN, no reply after 5 secs!');
882 self.fScrewedUpMsgState = True;
883
884 # Parse the exit status (True), abort (None) or do nothing (False).
885 if rc is True:
886 if sOpcode != 'PROC OK':
887 # Do proper parsing some other day if needed:
888 # PROC TOK, PROC TOA, PROC DWN, PROC DOO,
889 # PROC NOK + rc, PROC SIG + sig, PROC ABD, FAILED.
890 rc = False;
891 else:
892 if rc is None:
893 # Abort it.
894 reporter.log('taskExecEx: sending ABORT...');
895 rc = self.sendMsg('ABORT');
896 while rc is True:
897 cbMsg, sOpcode, abPayload = self.oTransport.recvMsg(30000);
898 if cbMsg is None:
899 reporter.maybeErr(self.fErr, 'taskExecEx: Pending ABORT, no reply after 30 secs!')
900 self.fScrewedUpMsgState = True;
901 break;
902 if sOpcode.startswith('PROC '):
903 reporter.log('taskExecEx: ABORT reply: %s, %s, %s [ignored]' % (cbMsg, sOpcode, abPayload));
904 break;
905 reporter.log('taskExecEx: ABORT in process, ignoring reply: %s, %s, %s' % (cbMsg, sOpcode, abPayload));
906 # Check that the connection is OK before looping.
907 if not self.oTransport.isConnectionOk():
908 self.oTransport.disconnect();
909 break;
910
911 # Fake response with the reason why we quit.
912 if sFailure is not None:
913 self.t3oReply = (0, 'EXECFAIL', sFailure);
914 rc = None;
915 else:
916 rc = None;
917
918 # Cleanup.
919 for o in (oStdIn, oStdOut, oStdErr, oTestPipe):
920 if o is not None and not utils.isString(o):
921 del o.uTxsClientCrc32; # pylint: disable=E1103
922 # Make sure all files are closed
923 o.close(); # pylint: disable=E1103
924 reporter.log('taskExecEx: returns %s' % (rc));
925 return rc;
926
927 #
928 # Admin tasks
929 #
930
931 def hlpRebootShutdownWaitForAck(self, sCmd):
932 """Wait for reboot/shutodwn ACK."""
933 rc = self.recvAckLogged(sCmd);
934 if rc is True:
935 # poll a little while for server to disconnect.
936 uMsStart = base.timestampMilli();
937 while self.oTransport.isConnectionOk() \
938 and base.timestampMilli() - uMsStart >= 5000:
939 if self.oTransport.isRecvPending(min(500, self.getMsLeft())):
940 break;
941 self.oTransport.disconnect();
942 return rc;
943
944 def taskReboot(self):
945 rc = self.sendMsg('REBOOT');
946 if rc is True:
947 rc = self.hlpRebootShutdownWaitForAck('REBOOT');
948 return rc;
949
950 def taskShutdown(self):
951 rc = self.sendMsg('SHUTDOWN');
952 if rc is True:
953 rc = self.hlpRebootShutdownWaitForAck('SHUTDOWN');
954 return rc;
955
956 #
957 # CD/DVD control tasks.
958 #
959
960 ## TODO
961
962 #
963 # File system tasks
964 #
965
966 def taskMkDir(self, sRemoteDir, fMode):
967 rc = self.sendMsg('MKDIR', (fMode, sRemoteDir));
968 if rc is True:
969 rc = self.recvAckLogged('MKDIR');
970 return rc;
971
972 def taskMkDirPath(self, sRemoteDir, fMode):
973 rc = self.sendMsg('MKDRPATH', (fMode, sRemoteDir));
974 if rc is True:
975 rc = self.recvAckLogged('MKDRPATH');
976 return rc;
977
978 def taskMkSymlink(self, sLinkTarget, sLink):
979 rc = self.sendMsg('MKSYMLNK', (sLinkTarget, sLink));
980 if rc is True:
981 rc = self.recvAckLogged('MKSYMLNK');
982 return rc;
983
984 def taskRmDir(self, sRemoteDir):
985 rc = self.sendMsg('RMDIR', (sRemoteDir,));
986 if rc is True:
987 rc = self.recvAckLogged('RMDIR');
988 return rc;
989
990 def taskRmFile(self, sRemoteFile):
991 rc = self.sendMsg('RMFILE', (sRemoteFile,));
992 if rc is True:
993 rc = self.recvAckLogged('RMFILE');
994 return rc;
995
996 def taskRmSymlink(self, sRemoteSymlink):
997 rc = self.sendMsg('RMSYMLNK', (sRemoteSymlink,));
998 if rc is True:
999 rc = self.recvAckLogged('RMSYMLNK');
1000 return rc;
1001
1002 def taskRmTree(self, sRemoteTree):
1003 rc = self.sendMsg('RMTREE', (sRemoteTree,));
1004 if rc is True:
1005 rc = self.recvAckLogged('RMTREE');
1006 return rc;
1007
1008 #def "CHMOD "
1009 #def "CHOWN "
1010 #def "CHGRP "
1011
1012 def taskIsDir(self, sRemoteDir):
1013 rc = self.sendMsg('ISDIR', (sRemoteDir,));
1014 if rc is True:
1015 rc = self.recvTrueFalse('ISDIR');
1016 return rc;
1017
1018 def taskIsFile(self, sRemoteFile):
1019 rc = self.sendMsg('ISFILE', (sRemoteFile,));
1020 if rc is True:
1021 rc = self.recvTrueFalse('ISFILE');
1022 return rc;
1023
1024 def taskIsSymlink(self, sRemoteSymlink):
1025 rc = self.sendMsg('ISSYMLNK', (sRemoteSymlink,));
1026 if rc is True:
1027 rc = self.recvTrueFalse('ISSYMLNK');
1028 return rc;
1029
1030 #def "STAT "
1031 #def "LSTAT "
1032 #def "LIST "
1033
1034 def taskUploadFile(self, sLocalFile, sRemoteFile):
1035 #
1036 # Open the local file (make sure it exist before bothering TXS) and
1037 # tell TXS that we want to upload a file.
1038 #
1039 try:
1040 oLocalFile = utils.openNoInherit(sLocalFile, 'rb');
1041 except:
1042 reporter.errorXcpt('taskUpload: failed to open "%s"' % (sLocalFile));
1043 return False;
1044
1045 # Common cause with taskUploadStr
1046 rc = self.taskUploadCommon(oLocalFile, sRemoteFile);
1047
1048 # Cleanup.
1049 oLocalFile.close();
1050 return rc;
1051
1052 def taskUploadString(self, sContent, sRemoteFile):
1053 # Wrap sContent in a file like class.
1054 class InStringFile(object): # pylint: disable=R0903
1055 def __init__(self, sContent):
1056 self.sContent = sContent;
1057 self.off = 0;
1058
1059 def read(self, cbMax):
1060 cbLeft = len(self.sContent) - self.off;
1061 if cbLeft == 0:
1062 return "";
1063 if cbLeft <= cbMax:
1064 sRet = self.sContent[self.off:(self.off + cbLeft)];
1065 else:
1066 sRet = self.sContent[self.off:(self.off + cbMax)];
1067 self.off = self.off + len(sRet);
1068 return sRet;
1069
1070 oLocalString = InStringFile(sContent);
1071 return self.taskUploadCommon(oLocalString, sRemoteFile);
1072
1073 def taskUploadCommon(self, oLocalFile, sRemoteFile):
1074 """Common worker used by taskUploadFile and taskUploadString."""
1075 # Command + ACK.
1076 rc = self.sendMsg('PUT FILE', (sRemoteFile,));
1077 if rc is True:
1078 rc = self.recvAckLogged('PUT FILE');
1079 if rc is True:
1080 #
1081 # Push data packets until eof.
1082 #
1083 uMyCrc32 = zlib.crc32("");
1084 while True:
1085 # Read up to 64 KB of data.
1086 try:
1087 sRaw = oLocalFile.read(65536);
1088 except:
1089 rc = None;
1090 break;
1091
1092 # Convert to array - this is silly!
1093 abBuf = array.array('B');
1094 if utils.isString(sRaw):
1095 for i, _ in enumerate(sRaw):
1096 abBuf.append(ord(sRaw[i]));
1097 else:
1098 abBuf.extend(sRaw);
1099 sRaw = None;
1100
1101 # Update the file stream CRC and send it off.
1102 uMyCrc32 = zlib.crc32(abBuf, uMyCrc32);
1103 if not abBuf:
1104 rc = self.sendMsg('DATA EOF', (long(uMyCrc32 & 0xffffffff), ));
1105 else:
1106 rc = self.sendMsg('DATA ', (long(uMyCrc32 & 0xffffffff), abBuf));
1107 if rc is False:
1108 break;
1109
1110 # Wait for the reply.
1111 rc = self.recvAck();
1112 if rc is not True:
1113 if rc is False:
1114 reporter.maybeErr(self.fErr, 'taskUpload: transport error waiting for ACK');
1115 else:
1116 reporter.maybeErr(self.fErr, 'taskUpload: DATA response was %s: %s' % (rc[0], rc[1]));
1117 rc = False;
1118 break;
1119
1120 # EOF?
1121 if not abBuf:
1122 break;
1123
1124 # Send ABORT on ACK and I/O errors.
1125 if rc is None:
1126 rc = self.sendMsg('ABORT');
1127 if rc is True:
1128 self.recvAckLogged('ABORT');
1129 rc = False;
1130 return rc;
1131
1132 def taskDownloadFile(self, sRemoteFile, sLocalFile):
1133 try:
1134 oLocalFile = utils.openNoInherit(sLocalFile, 'wb');
1135 except:
1136 reporter.errorXcpt('taskDownload: failed to open "%s"' % (sLocalFile));
1137 return False;
1138
1139 rc = self.taskDownloadCommon(sRemoteFile, oLocalFile);
1140
1141 oLocalFile.close();
1142 if rc is False:
1143 try:
1144 os.remove(sLocalFile);
1145 except:
1146 reporter.errorXcpt();
1147 return rc;
1148
1149 def taskDownloadString(self, sRemoteFile):
1150 # Wrap sContent in a file like class.
1151 class OutStringFile(object): # pylint: disable=R0903
1152 def __init__(self):
1153 self.asContent = [];
1154
1155 def write(self, sBuf):
1156 self.asContent.append(sBuf);
1157 return None;
1158
1159 oLocalString = OutStringFile();
1160 rc = self.taskDownloadCommon(sRemoteFile, oLocalString);
1161 if rc is True:
1162 if not oLocalString.asContent:
1163 rc = '';
1164 else:
1165 rc = ''.join(oLocalString.asContent);
1166 return rc;
1167
1168 def taskDownloadCommon(self, sRemoteFile, oLocalFile):
1169 """Common worker for taskDownloadFile and taskDownloadString."""
1170 rc = self.sendMsg('GET FILE', (sRemoteFile,))
1171 if rc is True:
1172 #
1173 # Process data packets until eof.
1174 #
1175 uMyCrc32 = zlib.crc32("");
1176 while rc is True:
1177 cbMsg, sOpcode, abPayload = self.recvReply();
1178 if cbMsg is None:
1179 reporter.maybeErr(self.fErr, 'taskDownload got 3xNone from recvReply.');
1180 rc = None;
1181 break;
1182
1183 # Validate.
1184 sOpcode = sOpcode.rstrip();
1185 if sOpcode != 'DATA' and sOpcode != 'DATA EOF':
1186 reporter.maybeErr(self.fErr, 'taskDownload got a error reply: opcode="%s" details="%s"'
1187 % (sOpcode, getSZ(abPayload, 0, "None")));
1188 rc = False;
1189 break;
1190 if sOpcode == 'DATA' and len(abPayload) < 4:
1191 reporter.maybeErr(self.fErr, 'taskDownload got a bad DATA packet: len=%u' % (len(abPayload)));
1192 rc = None;
1193 break;
1194 if sOpcode == 'DATA EOF' and len(abPayload) != 4:
1195 reporter.maybeErr(self.fErr, 'taskDownload got a bad EOF packet: len=%u' % (len(abPayload)));
1196 rc = None;
1197 break;
1198
1199 # Check the CRC (common for both packets).
1200 uCrc32 = getU32(abPayload, 0);
1201 if sOpcode == 'DATA':
1202 uMyCrc32 = zlib.crc32(abPayload[4:], uMyCrc32);
1203 if uCrc32 != (uMyCrc32 & 0xffffffff):
1204 reporter.maybeErr(self.fErr, 'taskDownload got a bad CRC: mycrc=%s remotecrc=%s'
1205 % (hex(uMyCrc32), hex(uCrc32)));
1206 rc = None;
1207 break;
1208 if sOpcode == 'DATA EOF':
1209 rc = self.sendMsg('ACK');
1210 break;
1211
1212 # Finally, push the data to the file.
1213 try:
1214 oLocalFile.write(abPayload[4:].tostring());
1215 except:
1216 reporter.errorXcpt('I/O error writing to "%s"' % (sRemoteFile));
1217 rc = None;
1218 break;
1219 rc = self.sendMsg('ACK');
1220
1221 # Send NACK on validation and I/O errors.
1222 if rc is None:
1223 rc = self.sendMsg('NACK');
1224 rc = False;
1225 return rc;
1226
1227 def taskUnpackFile(self, sRemoteFile, sRemoteDir):
1228 rc = self.sendMsg('UNPKFILE', (sRemoteFile, sRemoteDir));
1229 if rc is True:
1230 rc = self.recvAckLogged('UNPKFILE');
1231 return rc;
1232
1233 # pylint: enable=C0111
1234
1235
1236 #
1237 # Public methods - generic task queries
1238 #
1239
1240 def isSuccess(self):
1241 """Returns True if the task completed successfully, otherwise False."""
1242 self.lockTask();
1243 sStatus = self.sStatus;
1244 oTaskRc = self.oTaskRc;
1245 self.unlockTask();
1246 if sStatus != "":
1247 return False;
1248 if oTaskRc is False or oTaskRc is None:
1249 return False;
1250 return True;
1251
1252 def getResult(self):
1253 """
1254 Returns the result of a completed task.
1255 Returns None if not completed yet or no previous task.
1256 """
1257 self.lockTask();
1258 sStatus = self.sStatus;
1259 oTaskRc = self.oTaskRc;
1260 self.unlockTask();
1261 if sStatus != "":
1262 return None;
1263 return oTaskRc;
1264
1265 def getLastReply(self):
1266 """
1267 Returns the last reply three-tuple: cbMsg, sOpcode, abPayload.
1268 Returns a None, None, None three-tuple if there was no last reply.
1269 """
1270 self.lockTask();
1271 t3oReply = self.t3oReply;
1272 self.unlockTask();
1273 return t3oReply;
1274
1275 #
1276 # Public methods - connection.
1277 #
1278
1279 def asyncDisconnect(self, cMsTimeout = 30000, fIgnoreErrors = False):
1280 """
1281 Initiates a disconnect task.
1282
1283 Returns True on success, False on failure (logged).
1284
1285 The task returns True on success and False on failure.
1286 """
1287 return self.startTask(cMsTimeout, fIgnoreErrors, "bye", self.taskBye);
1288
1289 def syncDisconnect(self, cMsTimeout = 30000, fIgnoreErrors = False):
1290 """Synchronous version."""
1291 return self.asyncToSync(self.asyncDisconnect, cMsTimeout, fIgnoreErrors);
1292
1293 def asyncUuid(self, cMsTimeout = 30000, fIgnoreErrors = False):
1294 """
1295 Initiates a task for getting the TXS UUID.
1296
1297 Returns True on success, False on failure (logged).
1298
1299 The task returns UUID string (in {}) on success and False on failure.
1300 """
1301 return self.startTask(cMsTimeout, fIgnoreErrors, "bye", self.taskUuid);
1302
1303 def syncUuid(self, cMsTimeout = 30000, fIgnoreErrors = False):
1304 """Synchronous version."""
1305 return self.asyncToSync(self.asyncUuid, cMsTimeout, fIgnoreErrors);
1306
1307 #
1308 # Public methods - execution.
1309 #
1310
1311 def asyncExecEx(self, sExecName, asArgs = (), asAddEnv = (), # pylint: disable=R0913
1312 oStdIn = None, oStdOut = None, oStdErr = None, oTestPipe = None,
1313 sAsUser = "", cMsTimeout = 3600000, fIgnoreErrors = False):
1314 """
1315 Initiates a exec process task.
1316
1317 Returns True on success, False on failure (logged).
1318
1319 The task returns True if the process exited normally with status code 0.
1320 The task returns None if on failure prior to executing the process, and
1321 False if the process exited with a different status or in an abnormal
1322 manner. Both None and False are logged of course and further info can
1323 also be obtained by getLastReply().
1324
1325 The oStdIn, oStdOut, oStdErr and oTestPipe specifiy how to deal with
1326 these streams. If None, no special action is taken and the output goes
1327 to where ever the TXS sends its output, and ditto for input.
1328 - To send to / read from the bitbucket, pass '/dev/null'.
1329 - To redirect to/from a file, just specify the remote filename.
1330 - To append to a file use '>>' followed by the remote filename.
1331 - To pipe the stream to/from the TXS, specify a file like
1332 object. For StdIn a non-blocking read() method is required. For
1333 the other a write() method is required. Watch out for deadlock
1334 conditions between StdIn and StdOut/StdErr/TestPipe piping.
1335 """
1336 return self.startTask(cMsTimeout, fIgnoreErrors, "exec", self.taskExecEx,
1337 (sExecName, long(0), asArgs, asAddEnv, oStdIn,
1338 oStdOut, oStdErr, oTestPipe, sAsUser));
1339
1340 def syncExecEx(self, sExecName, asArgs = (), asAddEnv = (), # pylint: disable=R0913
1341 oStdIn = '/dev/null', oStdOut = '/dev/null',
1342 oStdErr = '/dev/null', oTestPipe = '/dev/null',
1343 sAsUser = '', cMsTimeout = 3600000, fIgnoreErrors = False):
1344 """Synchronous version."""
1345 return self.asyncToSync(self.asyncExecEx, sExecName, asArgs, asAddEnv, oStdIn, oStdOut, \
1346 oStdErr, oTestPipe, sAsUser, cMsTimeout, fIgnoreErrors);
1347
1348 def asyncExec(self, sExecName, asArgs = (), asAddEnv = (), sAsUser = "", fWithTestPipe = True, sPrefix = '', \
1349 cMsTimeout = 3600000, fIgnoreErrors = False):
1350 """
1351 Initiates a exec process test task.
1352
1353 Returns True on success, False on failure (logged).
1354
1355 The task returns True if the process exited normally with status code 0.
1356 The task returns None if on failure prior to executing the process, and
1357 False if the process exited with a different status or in an abnormal
1358 manner. Both None and False are logged of course and further info can
1359 also be obtained by getLastReply().
1360
1361 Standard in is taken from /dev/null. While both standard output and
1362 standard error goes directly to reporter.log(). The testpipe is piped
1363 to reporter.xxxx.
1364 """
1365
1366 sStdIn = '/dev/null';
1367 oStdOut = reporter.FileWrapper('%sstdout' % sPrefix);
1368 oStdErr = reporter.FileWrapper('%sstderr' % sPrefix);
1369 if fWithTestPipe: oTestPipe = reporter.FileWrapperTestPipe();
1370 else: oTestPipe = '/dev/null'; # pylint: disable=redefined-variable-type
1371
1372 return self.startTask(cMsTimeout, fIgnoreErrors, "exec", self.taskExecEx,
1373 (sExecName, long(0), asArgs, asAddEnv, sStdIn, oStdOut, oStdErr, oTestPipe, sAsUser));
1374
1375 def syncExec(self, sExecName, asArgs = (), asAddEnv = (), sAsUser = '', fWithTestPipe = True, sPrefix = '',
1376 cMsTimeout = 3600000, fIgnoreErrors = False):
1377 """Synchronous version."""
1378 return self.asyncToSync(self.asyncExec, sExecName, asArgs, asAddEnv, sAsUser, fWithTestPipe, sPrefix, \
1379 cMsTimeout, fIgnoreErrors);
1380
1381 #
1382 # Public methods - file system
1383 #
1384
1385 def asyncReboot(self, cMsTimeout = 30000, fIgnoreErrors = False):
1386 """
1387 Initiates a reboot task.
1388
1389 Returns True on success, False on failure (logged).
1390
1391 The task returns True on success, False on failure (logged). The
1392 session will be disconnected on successful task completion.
1393 """
1394 return self.startTask(cMsTimeout, fIgnoreErrors, "reboot", self.taskReboot, ());
1395
1396 def syncReboot(self, cMsTimeout = 30000, fIgnoreErrors = False):
1397 """Synchronous version."""
1398 return self.asyncToSync(self.asyncReboot, cMsTimeout, fIgnoreErrors);
1399
1400 def asyncShutdown(self, cMsTimeout = 30000, fIgnoreErrors = False):
1401 """
1402 Initiates a shutdown task.
1403
1404 Returns True on success, False on failure (logged).
1405
1406 The task returns True on success, False on failure (logged).
1407 """
1408 return self.startTask(cMsTimeout, fIgnoreErrors, "shutdown", self.taskShutdown, ());
1409
1410 def syncShutdown(self, cMsTimeout = 30000, fIgnoreErrors = False):
1411 """Synchronous version."""
1412 return self.asyncToSync(self.asyncShutdown, cMsTimeout, fIgnoreErrors);
1413
1414
1415 #
1416 # Public methods - file system
1417 #
1418
1419 def asyncMkDir(self, sRemoteDir, fMode = 0o700, cMsTimeout = 30000, fIgnoreErrors = False):
1420 """
1421 Initiates a mkdir task.
1422
1423 Returns True on success, False on failure (logged).
1424
1425 The task returns True on success, False on failure (logged).
1426 """
1427 return self.startTask(cMsTimeout, fIgnoreErrors, "mkDir", self.taskMkDir, (sRemoteDir, long(fMode)));
1428
1429 def syncMkDir(self, sRemoteDir, fMode = 0o700, cMsTimeout = 30000, fIgnoreErrors = False):
1430 """Synchronous version."""
1431 return self.asyncToSync(self.asyncMkDir, sRemoteDir, long(fMode), cMsTimeout, fIgnoreErrors);
1432
1433 def asyncMkDirPath(self, sRemoteDir, fMode = 0o700, cMsTimeout = 30000, fIgnoreErrors = False):
1434 """
1435 Initiates a mkdir -p task.
1436
1437 Returns True on success, False on failure (logged).
1438
1439 The task returns True on success, False on failure (logged).
1440 """
1441 return self.startTask(cMsTimeout, fIgnoreErrors, "mkDirPath", self.taskMkDirPath, (sRemoteDir, long(fMode)));
1442
1443 def syncMkDirPath(self, sRemoteDir, fMode = 0o700, cMsTimeout = 30000, fIgnoreErrors = False):
1444 """Synchronous version."""
1445 return self.asyncToSync(self.asyncMkDirPath, sRemoteDir, long(fMode), cMsTimeout, fIgnoreErrors);
1446
1447 def asyncMkSymlink(self, sLinkTarget, sLink, cMsTimeout = 30000, fIgnoreErrors = False):
1448 """
1449 Initiates a symlink task.
1450
1451 Returns True on success, False on failure (logged).
1452
1453 The task returns True on success, False on failure (logged).
1454 """
1455 return self.startTask(cMsTimeout, fIgnoreErrors, "mkSymlink", self.taskMkSymlink, (sLinkTarget, sLink));
1456
1457 def syncMkSymlink(self, sLinkTarget, sLink, cMsTimeout = 30000, fIgnoreErrors = False):
1458 """Synchronous version."""
1459 return self.asyncToSync(self.asyncMkSymlink, sLinkTarget, sLink, cMsTimeout, fIgnoreErrors);
1460
1461 def asyncRmDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1462 """
1463 Initiates a rmdir task.
1464
1465 Returns True on success, False on failure (logged).
1466
1467 The task returns True on success, False on failure (logged).
1468 """
1469 return self.startTask(cMsTimeout, fIgnoreErrors, "rmDir", self.taskRmDir, (sRemoteDir,));
1470
1471 def syncRmDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1472 """Synchronous version."""
1473 return self.asyncToSync(self.asyncRmDir, sRemoteDir, cMsTimeout, fIgnoreErrors);
1474
1475 def asyncRmFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1476 """
1477 Initiates a rmfile task.
1478
1479 Returns True on success, False on failure (logged).
1480
1481 The task returns True on success, False on failure (logged).
1482 """
1483 return self.startTask(cMsTimeout, fIgnoreErrors, "rmFile", self.taskRmFile, (sRemoteFile,));
1484
1485 def syncRmFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1486 """Synchronous version."""
1487 return self.asyncToSync(self.asyncRmFile, sRemoteFile, cMsTimeout, fIgnoreErrors);
1488
1489 def asyncRmSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1490 """
1491 Initiates a rmsymlink task.
1492
1493 Returns True on success, False on failure (logged).
1494
1495 The task returns True on success, False on failure (logged).
1496 """
1497 return self.startTask(cMsTimeout, fIgnoreErrors, "rmSymlink", self.taskRmSymlink, (sRemoteSymlink,));
1498
1499 def syncRmSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1500 """Synchronous version."""
1501 return self.asyncToSync(self.asyncRmSymlink, sRemoteSymlink, cMsTimeout, fIgnoreErrors);
1502
1503 def asyncRmTree(self, sRemoteTree, cMsTimeout = 30000, fIgnoreErrors = False):
1504 """
1505 Initiates a rmtree task.
1506
1507 Returns True on success, False on failure (logged).
1508
1509 The task returns True on success, False on failure (logged).
1510 """
1511 return self.startTask(cMsTimeout, fIgnoreErrors, "rmTree", self.taskRmTree, (sRemoteTree,));
1512
1513 def syncRmTree(self, sRemoteTree, cMsTimeout = 30000, fIgnoreErrors = False):
1514 """Synchronous version."""
1515 return self.asyncToSync(self.asyncRmTree, sRemoteTree, cMsTimeout, fIgnoreErrors);
1516
1517 #def "CHMOD "
1518 #def "CHOWN "
1519 #def "CHGRP "
1520
1521 def asyncIsDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1522 """
1523 Initiates a is-dir query task.
1524
1525 Returns True on success, False on failure (logged).
1526
1527 The task returns True if it's a directory, False if it isn't, and
1528 None on error (logged).
1529 """
1530 return self.startTask(cMsTimeout, fIgnoreErrors, "isDir", self.taskIsDir, (sRemoteDir,));
1531
1532 def syncIsDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1533 """Synchronous version."""
1534 return self.asyncToSync(self.asyncIsDir, sRemoteDir, cMsTimeout, fIgnoreErrors);
1535
1536 def asyncIsFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1537 """
1538 Initiates a is-file query task.
1539
1540 Returns True on success, False on failure (logged).
1541
1542 The task returns True if it's a file, False if it isn't, and None on
1543 error (logged).
1544 """
1545 return self.startTask(cMsTimeout, fIgnoreErrors, "isFile", self.taskIsFile, (sRemoteFile,));
1546
1547 def syncIsFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1548 """Synchronous version."""
1549 return self.asyncToSync(self.asyncIsFile, sRemoteFile, cMsTimeout, fIgnoreErrors);
1550
1551 def asyncIsSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1552 """
1553 Initiates a is-symbolic-link query task.
1554
1555 Returns True on success, False on failure (logged).
1556
1557 The task returns True if it's a symbolic linke, False if it isn't, and
1558 None on error (logged).
1559 """
1560 return self.startTask(cMsTimeout, fIgnoreErrors, "isSymlink", self.taskIsSymlink, (sRemoteSymlink,));
1561
1562 def syncIsSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1563 """Synchronous version."""
1564 return self.asyncToSync(self.asyncIsSymlink, sRemoteSymlink, cMsTimeout, fIgnoreErrors);
1565
1566 #def "STAT "
1567 #def "LSTAT "
1568 #def "LIST "
1569
1570 def asyncUploadFile(self, sLocalFile, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1571 """
1572 Initiates a download query task.
1573
1574 Returns True on success, False on failure (logged).
1575
1576 The task returns True on success, False on failure (logged).
1577 """
1578 return self.startTask(cMsTimeout, fIgnoreErrors, "upload", self.taskUploadFile, (sLocalFile, sRemoteFile));
1579
1580 def syncUploadFile(self, sLocalFile, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1581 """Synchronous version."""
1582 return self.asyncToSync(self.asyncUploadFile, sLocalFile, sRemoteFile, cMsTimeout, fIgnoreErrors);
1583
1584 def asyncUploadString(self, sContent, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1585 """
1586 Initiates a upload string task.
1587
1588 Returns True on success, False on failure (logged).
1589
1590 The task returns True on success, False on failure (logged).
1591 """
1592 return self.startTask(cMsTimeout, fIgnoreErrors, "uploadString", self.taskUploadString, (sContent, sRemoteFile));
1593
1594 def syncUploadString(self, sContent, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1595 """Synchronous version."""
1596 return self.asyncToSync(self.asyncUploadString, sContent, sRemoteFile, cMsTimeout, fIgnoreErrors);
1597
1598 def asyncDownloadFile(self, sRemoteFile, sLocalFile, cMsTimeout = 30000, fIgnoreErrors = False):
1599 """
1600 Initiates a download file task.
1601
1602 Returns True on success, False on failure (logged).
1603
1604 The task returns True on success, False on failure (logged).
1605 """
1606 return self.startTask(cMsTimeout, fIgnoreErrors, "downloadFile", self.taskDownloadFile, (sRemoteFile, sLocalFile));
1607
1608 def syncDownloadFile(self, sRemoteFile, sLocalFile, cMsTimeout = 30000, fIgnoreErrors = False):
1609 """Synchronous version."""
1610 return self.asyncToSync(self.asyncDownloadFile, sRemoteFile, sLocalFile, cMsTimeout, fIgnoreErrors);
1611
1612 def asyncDownloadString(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1613 """
1614 Initiates a download string task.
1615
1616 Returns True on success, False on failure (logged).
1617
1618 The task returns a byte string on success, False on failure (logged).
1619 """
1620 return self.startTask(cMsTimeout, fIgnoreErrors, "downloadString", self.taskDownloadString, (sRemoteFile,));
1621
1622 def syncDownloadString(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1623 """Synchronous version."""
1624 return self.asyncToSync(self.asyncDownloadString, sRemoteFile, cMsTimeout, fIgnoreErrors);
1625
1626 def asyncUnpackFile(self, sRemoteFile, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1627 """
1628 Initiates a unpack file task.
1629
1630 Returns True on success, False on failure (logged).
1631
1632 The task returns True on success, False on failure (logged).
1633 """
1634 return self.startTask(cMsTimeout, fIgnoreErrors, "unpackFile", self.taskUnpackFile, \
1635 (sRemoteFile, sRemoteDir));
1636
1637 def syncUnpackFile(self, sRemoteFile, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1638 """Synchronous version."""
1639 return self.asyncToSync(self.asyncUnpackFile, sRemoteFile, sRemoteDir, cMsTimeout, fIgnoreErrors);
1640
1641
1642class TransportTcp(TransportBase):
1643 """
1644 TCP transport layer for the TXS client session class.
1645 """
1646
1647 def __init__(self, sHostname, uPort, fReversedSetup):
1648 """
1649 Save the parameters. The session will call us back to make the
1650 connection later on its worker thread.
1651 """
1652 TransportBase.__init__(self, utils.getCallerName());
1653 self.sHostname = sHostname;
1654 self.fReversedSetup = fReversedSetup;
1655 self.uPort = uPort if uPort is not None else 5042 if fReversedSetup is False else 5048;
1656 self.oSocket = None;
1657 self.oWakeupW = None;
1658 self.oWakeupR = None;
1659 self.fConnectCanceled = False;
1660 self.fIsConnecting = False;
1661 self.oCv = threading.Condition();
1662 self.abReadAhead = array.array('B');
1663
1664 def toString(self):
1665 return '<%s sHostname=%s, fReversedSetup=%s, uPort=%s, oSocket=%s,'\
1666 ' fConnectCanceled=%s, fIsConnecting=%s, oCv=%s, abReadAhead=%s>' \
1667 % (TransportBase.toString(self), self.sHostname, self.fReversedSetup, self.uPort, self.oSocket,
1668 self.fConnectCanceled, self.fIsConnecting, self.oCv, self.abReadAhead);
1669
1670 def __isInProgressXcpt(self, oXcpt):
1671 """ In progress exception? """
1672 try:
1673 if isinstance(oXcpt, socket.error):
1674 try:
1675 if oXcpt.errno == errno.EINPROGRESS:
1676 return True;
1677 except: pass;
1678 # Windows?
1679 try:
1680 if oXcpt.errno == errno.EWOULDBLOCK:
1681 return True;
1682 except: pass;
1683 except:
1684 pass;
1685 return False;
1686
1687 def __isWouldBlockXcpt(self, oXcpt):
1688 """ Would block exception? """
1689 try:
1690 if isinstance(oXcpt, socket.error):
1691 try:
1692 if oXcpt.errno == errno.EWOULDBLOCK:
1693 return True;
1694 except: pass;
1695 try:
1696 if oXcpt.errno == errno.EAGAIN:
1697 return True;
1698 except: pass;
1699 except:
1700 pass;
1701 return False;
1702
1703 def __isConnectionReset(self, oXcpt):
1704 """ Connection reset by Peer or others. """
1705 try:
1706 if isinstance(oXcpt, socket.error):
1707 try:
1708 if oXcpt.errno == errno.ECONNRESET:
1709 return True;
1710 except: pass;
1711 try:
1712 if oXcpt.errno == errno.ENETRESET:
1713 return True;
1714 except: pass;
1715 except:
1716 pass;
1717 return False;
1718
1719 def _closeWakeupSockets(self):
1720 """ Closes the wakup sockets. Caller should own the CV. """
1721 oWakeupR = self.oWakeupR;
1722 self.oWakeupR = None;
1723 if oWakeupR is not None:
1724 oWakeupR.close();
1725
1726 oWakeupW = self.oWakeupW;
1727 self.oWakeupW = None;
1728 if oWakeupW is not None:
1729 oWakeupW.close();
1730
1731 return None;
1732
1733 def cancelConnect(self):
1734 # This is bad stuff.
1735 self.oCv.acquire();
1736 reporter.log2('TransportTcp::cancelConnect: fIsConnecting=%s oSocket=%s' % (self.fIsConnecting, self.oSocket));
1737 self.fConnectCanceled = True;
1738 if self.fIsConnecting:
1739 oSocket = self.oSocket;
1740 self.oSocket = None;
1741 if oSocket is not None:
1742 reporter.log2('TransportTcp::cancelConnect: closing the socket');
1743 oSocket.close();
1744
1745 oWakeupW = self.oWakeupW;
1746 self.oWakeupW = None;
1747 if oWakeupW is not None:
1748 reporter.log2('TransportTcp::cancelConnect: wakeup call');
1749 try: oWakeupW.send('cancelled!\n');
1750 except: reporter.logXcpt();
1751 try: oWakeupW.shutdown(socket.SHUT_WR);
1752 except: reporter.logXcpt();
1753 oWakeupW.close();
1754 self.oCv.release();
1755
1756 def _connectAsServer(self, oSocket, oWakeupR, cMsTimeout):
1757 """ Connects to the TXS server as server, i.e. the reversed setup. """
1758 assert(self.fReversedSetup);
1759
1760 reporter.log2('TransportTcp::_connectAsServer: oSocket=%s, cMsTimeout=%u' % (oSocket, cMsTimeout));
1761
1762 # Workaround for bind() failure...
1763 try:
1764 oSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1);
1765 except:
1766 reporter.errorXcpt('socket.listen(1) failed');
1767 return None;
1768
1769 # Bind the socket and make it listen.
1770 try:
1771 oSocket.bind((self.sHostname, self.uPort));
1772 except:
1773 reporter.errorXcpt('socket.bind((%s,%s)) failed' % (self.sHostname, self.uPort));
1774 return None;
1775 try:
1776 oSocket.listen(1);
1777 except:
1778 reporter.errorXcpt('socket.listen(1) failed');
1779 return None;
1780
1781 # Accept connections.
1782 oClientSocket = None;
1783 tClientAddr = None;
1784 try:
1785 (oClientSocket, tClientAddr) = oSocket.accept();
1786 except socket.error as e:
1787 if not self.__isInProgressXcpt(e):
1788 raise;
1789
1790 # Do the actual waiting.
1791 reporter.log2('TransportTcp::accept: operation in progress (%s)...' % (e,));
1792 try:
1793 select.select([oSocket, oWakeupR], [], [oSocket, oWakeupR], cMsTimeout / 1000.0);
1794 except socket.error as e:
1795 if e[0] != errno.EBADF or not self.fConnectCanceled:
1796 raise;
1797 reporter.log('socket.select() on accept was canceled');
1798 return None;
1799 except:
1800 reporter.logXcpt('socket.select() on accept');
1801
1802 # Try accept again.
1803 try:
1804 (oClientSocket, tClientAddr) = oSocket.accept();
1805 except socket.error as e:
1806 if not self.__isInProgressXcpt(e):
1807 if e[0] != errno.EBADF or not self.fConnectCanceled:
1808 raise;
1809 reporter.log('socket.accept() was canceled');
1810 return None;
1811 reporter.log('socket.accept() timed out');
1812 return False;
1813 except:
1814 reporter.errorXcpt('socket.accept() failed');
1815 return None;
1816 except:
1817 reporter.errorXcpt('socket.accept() failed');
1818 return None;
1819
1820 # Store the connected socket and throw away the server socket.
1821 self.oCv.acquire();
1822 if not self.fConnectCanceled:
1823 self.oSocket.close();
1824 self.oSocket = oClientSocket;
1825 self.sHostname = "%s:%s" % (tClientAddr[0], tClientAddr[1]);
1826 self.oCv.release();
1827 return True;
1828
1829 def _connectAsClient(self, oSocket, oWakeupR, cMsTimeout):
1830 """ Connects to the TXS server as client. """
1831 assert(not self.fReversedSetup);
1832
1833 # Connect w/ timeouts.
1834 rc = None;
1835 try:
1836 oSocket.connect((self.sHostname, self.uPort));
1837 rc = True;
1838 except socket.error as e:
1839 iRc = e[0];
1840 if self.__isInProgressXcpt(e):
1841 # Do the actual waiting.
1842 reporter.log2('TransportTcp::connect: operation in progress (%s)...' % (e,));
1843 try:
1844 ttRc = select.select([oWakeupR], [oSocket], [oSocket, oWakeupR], cMsTimeout / 1000.0);
1845 if len(ttRc[1]) + len(ttRc[2]) == 0:
1846 raise socket.error(errno.ETIMEDOUT, 'select timed out');
1847 iRc = oSocket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR);
1848 rc = iRc == 0;
1849 except socket.error as e:
1850 iRc = e[0];
1851 except:
1852 iRc = -42;
1853 reporter.fatalXcpt('socket.select() on connect failed');
1854
1855 if rc is True:
1856 pass;
1857 elif iRc == errno.ECONNREFUSED \
1858 or iRc == errno.EHOSTUNREACH \
1859 or iRc == errno.EINTR \
1860 or iRc == errno.ENETDOWN \
1861 or iRc == errno.ENETUNREACH \
1862 or iRc == errno.ETIMEDOUT:
1863 rc = False; # try again.
1864 else:
1865 if iRc != errno.EBADF or not self.fConnectCanceled:
1866 reporter.fatalXcpt('socket.connect((%s,%s)) failed; iRc=%s' % (self.sHostname, self.uPort, iRc));
1867 reporter.log2('TransportTcp::connect: rc=%s iRc=%s' % (rc, iRc));
1868 except:
1869 reporter.fatalXcpt('socket.connect((%s,%s)) failed' % (self.sHostname, self.uPort));
1870 return rc;
1871
1872
1873 def connect(self, cMsTimeout):
1874 # Create a non-blocking socket.
1875 reporter.log2('TransportTcp::connect: cMsTimeout=%s sHostname=%s uPort=%s' % (cMsTimeout, self.sHostname, self.uPort));
1876 try:
1877 oSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0);
1878 except:
1879 reporter.fatalXcpt('socket.socket() failed');
1880 return None;
1881 try:
1882 oSocket.setblocking(0);
1883 except:
1884 oSocket.close();
1885 reporter.fatalXcpt('socket.socket() failed');
1886 return None;
1887
1888 # Create wakeup socket pair for unix (select doesn't wake up on socket close on Linux).
1889 oWakeupR = None;
1890 oWakeupW = None;
1891 if hasattr(socket, 'socketpair'):
1892 try: (oWakeupR, oWakeupW) = socket.socketpair(); # pylint: disable=E1101
1893 except: reporter.logXcpt('socket.socketpair() failed');
1894
1895 # Update the state.
1896 self.oCv.acquire();
1897 rc = None;
1898 if not self.fConnectCanceled:
1899 self.oSocket = oSocket;
1900 self.oWakeupW = oWakeupW;
1901 self.oWakeupR = oWakeupR;
1902 self.fIsConnecting = True;
1903 self.oCv.release();
1904
1905 # Try connect.
1906 if oWakeupR is None:
1907 oWakeupR = oSocket; # Avoid select failure.
1908 if self.fReversedSetup:
1909 rc = self._connectAsServer(oSocket, oWakeupR, cMsTimeout);
1910 else:
1911 rc = self._connectAsClient(oSocket, oWakeupR, cMsTimeout);
1912 oSocket = None;
1913
1914 # Update the state and cleanup on failure/cancel.
1915 self.oCv.acquire();
1916 if rc is True and self.fConnectCanceled:
1917 rc = False;
1918 self.fIsConnecting = False;
1919
1920 if rc is not True:
1921 if self.oSocket is not None:
1922 self.oSocket.close();
1923 self.oSocket = None;
1924 self._closeWakeupSockets();
1925 self.oCv.release();
1926
1927 reporter.log2('TransportTcp::connect: returning %s' % (rc,));
1928 return rc;
1929
1930 def disconnect(self, fQuiet = False):
1931 if self.oSocket is not None:
1932 self.abReadAhead = array.array('B');
1933
1934 # Try a shutting down the socket gracefully (draining it).
1935 try:
1936 self.oSocket.shutdown(socket.SHUT_WR);
1937 except:
1938 if not fQuiet:
1939 reporter.error('shutdown(SHUT_WR)');
1940 try:
1941 self.oSocket.setblocking(0); # just in case it's not set.
1942 sData = "1";
1943 while sData:
1944 sData = self.oSocket.recv(16384);
1945 except:
1946 pass;
1947
1948 # Close it.
1949 self.oCv.acquire();
1950 try: self.oSocket.setblocking(1);
1951 except: pass;
1952 self.oSocket.close();
1953 self.oSocket = None;
1954 else:
1955 self.oCv.acquire();
1956 self._closeWakeupSockets();
1957 self.oCv.release();
1958
1959 def sendBytes(self, abBuf, cMsTimeout):
1960 if self.oSocket is None:
1961 reporter.error('TransportTcp.sendBytes: No connection.');
1962 return False;
1963
1964 # Try send it all.
1965 try:
1966 cbSent = self.oSocket.send(abBuf);
1967 if cbSent == len(abBuf):
1968 return True;
1969 except Exception as oXcpt:
1970 if not self.__isWouldBlockXcpt(oXcpt):
1971 reporter.errorXcpt('TranportTcp.sendBytes: %s bytes' % (len(abBuf)));
1972 return False;
1973 cbSent = 0;
1974
1975 # Do a timed send.
1976 msStart = base.timestampMilli();
1977 while True:
1978 cMsElapsed = base.timestampMilli() - msStart;
1979 if cMsElapsed > cMsTimeout:
1980 reporter.error('TranportTcp.sendBytes: %s bytes timed out (1)' % (len(abBuf)));
1981 break;
1982
1983 # wait.
1984 try:
1985 ttRc = select.select([], [self.oSocket], [self.oSocket], (cMsTimeout - cMsElapsed) / 1000.0);
1986 if ttRc[2] and not ttRc[1]:
1987 reporter.error('TranportTcp.sendBytes: select returned with exception');
1988 break;
1989 if not ttRc[1]:
1990 reporter.error('TranportTcp.sendBytes: %s bytes timed out (2)' % (len(abBuf)));
1991 break;
1992 except:
1993 reporter.errorXcpt('TranportTcp.sendBytes: select failed');
1994 break;
1995
1996 # Try send more.
1997 try:
1998 cbSent += self.oSocket.send(abBuf[cbSent:]);
1999 if cbSent == len(abBuf):
2000 return True;
2001 except Exception as oXcpt:
2002 if not self.__isWouldBlockXcpt(oXcpt):
2003 reporter.errorXcpt('TranportTcp.sendBytes: %s bytes' % (len(abBuf)));
2004 break;
2005
2006 return False;
2007
2008 def __returnReadAheadBytes(self, cb):
2009 """ Internal worker for recvBytes. """
2010 assert(len(self.abReadAhead) >= cb);
2011 abRet = self.abReadAhead[:cb];
2012 self.abReadAhead = self.abReadAhead[cb:];
2013 return abRet;
2014
2015 def recvBytes(self, cb, cMsTimeout, fNoDataOk):
2016 if self.oSocket is None:
2017 reporter.error('TransportTcp.recvBytes(%s,%s): No connection.' % (cb, cMsTimeout));
2018 return None;
2019
2020 # Try read in some more data without bothering with timeout handling first.
2021 if len(self.abReadAhead) < cb:
2022 try:
2023 abBuf = self.oSocket.recv(cb - len(self.abReadAhead));
2024 if abBuf:
2025 self.abReadAhead.extend(array.array('B', abBuf));
2026 except Exception as oXcpt:
2027 if not self.__isWouldBlockXcpt(oXcpt):
2028 reporter.errorXcpt('TranportTcp.recvBytes: 0/%s bytes' % (cb,));
2029 return None;
2030
2031 if len(self.abReadAhead) >= cb:
2032 return self.__returnReadAheadBytes(cb);
2033
2034 # Timeout loop.
2035 msStart = base.timestampMilli();
2036 while True:
2037 cMsElapsed = base.timestampMilli() - msStart;
2038 if cMsElapsed > cMsTimeout:
2039 if not fNoDataOk or self.abReadAhead:
2040 reporter.error('TranportTcp.recvBytes: %s/%s bytes timed out (1)' % (len(self.abReadAhead), cb));
2041 break;
2042
2043 # Wait.
2044 try:
2045 ttRc = select.select([self.oSocket], [], [self.oSocket], (cMsTimeout - cMsElapsed) / 1000.0);
2046 if ttRc[2] and not ttRc[0]:
2047 reporter.error('TranportTcp.recvBytes: select returned with exception');
2048 break;
2049 if not ttRc[0]:
2050 if not fNoDataOk or self.abReadAhead:
2051 reporter.error('TranportTcp.recvBytes: %s/%s bytes timed out (2) fNoDataOk=%s'
2052 % (len(self.abReadAhead), cb, fNoDataOk));
2053 break;
2054 except:
2055 reporter.errorXcpt('TranportTcp.recvBytes: select failed');
2056 break;
2057
2058 # Try read more.
2059 try:
2060 abBuf = self.oSocket.recv(cb - len(self.abReadAhead));
2061 if not abBuf:
2062 reporter.error('TranportTcp.recvBytes: %s/%s bytes (%s) - connection has been shut down'
2063 % (len(self.abReadAhead), cb, fNoDataOk));
2064 self.disconnect();
2065 return None;
2066
2067 self.abReadAhead.extend(array.array('B', abBuf));
2068
2069 except Exception as oXcpt:
2070 reporter.log('recv => exception %s' % (oXcpt,));
2071 if not self.__isWouldBlockXcpt(oXcpt):
2072 if not fNoDataOk or not self.__isConnectionReset(oXcpt) or self.abReadAhead:
2073 reporter.errorXcpt('TranportTcp.recvBytes: %s/%s bytes (%s)' % (len(self.abReadAhead), cb, fNoDataOk));
2074 break;
2075
2076 # Done?
2077 if len(self.abReadAhead) >= cb:
2078 return self.__returnReadAheadBytes(cb);
2079
2080 #reporter.log('recv => None len(self.abReadAhead) -> %d' % (len(self.abReadAhead), ));
2081 return None;
2082
2083 def isConnectionOk(self):
2084 if self.oSocket is None:
2085 return False;
2086 try:
2087 ttRc = select.select([], [], [self.oSocket], 0.0);
2088 if ttRc[2]:
2089 return False;
2090
2091 self.oSocket.send(array.array('B')); # send zero bytes.
2092 except:
2093 return False;
2094 return True;
2095
2096 def isRecvPending(self, cMsTimeout = 0):
2097 try:
2098 ttRc = select.select([self.oSocket], [], [], cMsTimeout / 1000.0);
2099 if not ttRc[0]:
2100 return False;
2101 except:
2102 pass;
2103 return True;
2104
2105
2106def openTcpSession(cMsTimeout, sHostname, uPort = None, fReversedSetup = False, cMsIdleFudge = 0):
2107 """
2108 Opens a connection to a Test Execution Service via TCP, given its name.
2109 """
2110 reporter.log2('openTcpSession(%s, %s, %s, %s, %s)' % \
2111 (cMsTimeout, sHostname, uPort, fReversedSetup, cMsIdleFudge));
2112 try:
2113 oTransport = TransportTcp(sHostname, uPort, fReversedSetup);
2114 oSession = Session(oTransport, cMsTimeout, cMsIdleFudge);
2115 except:
2116 reporter.errorXcpt(None, 15);
2117 return None;
2118 return oSession;
2119
2120
2121def tryOpenTcpSession(cMsTimeout, sHostname, uPort = None, fReversedSetup = False, cMsIdleFudge = 0):
2122 """
2123 Tries to open a connection to a Test Execution Service via TCP, given its name.
2124
2125 This differs from openTcpSession in that it won't log a connection failure
2126 as an error.
2127 """
2128 try:
2129 oTransport = TransportTcp(sHostname, uPort, fReversedSetup);
2130 oSession = Session(oTransport, cMsTimeout, cMsIdleFudge, fTryConnect = True);
2131 except:
2132 reporter.errorXcpt(None, 15);
2133 return None;
2134 return oSession;
2135
Note: See TracBrowser for help on using the repository browser.

© 2025 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette