aboutsummaryrefslogtreecommitdiff
path: root/packages/Python/lldbsuite/test/tools/lldb-server/socket_packet_pump.py
blob: 6f32dcacd353aaeb34f6b2b32d9f2043e27882e9 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197

from __future__ import print_function


import re
import select
import threading
import traceback
import codecs

from six.moves import queue


def _handle_output_packet_string(packet_contents):
    if (not packet_contents) or (len(packet_contents) < 1):
        return None
    elif packet_contents[0] != "O":
        return None
    elif packet_contents == "OK":
        return None
    else:
        return packet_contents[1:].decode("hex")


def _dump_queue(the_queue):
    while not the_queue.empty():
        print(codecs.encode(the_queue.get(True), "string_escape"))
        print("\n")


class PumpQueues(object):

    def __init__(self):
        self._output_queue = queue.Queue()
        self._packet_queue = queue.Queue()

    def output_queue(self):
        return self._output_queue

    def packet_queue(self):
        return self._packet_queue

    def verify_queues_empty(self):
        # Warn if there is any content left in any of the queues.
        # That would represent unmatched packets.
        if not self.output_queue().empty():
            print("warning: output queue entries still exist:")
            _dump_queue(self.output_queue())
            print("from here:")
            traceback.print_stack()

        if not self.packet_queue().empty():
            print("warning: packet queue entries still exist:")
            _dump_queue(self.packet_queue())
            print("from here:")
            traceback.print_stack()


class SocketPacketPump(object):
    """A threaded packet reader that partitions packets into two streams.

    All incoming $O packet content is accumulated with the current accumulation
    state put into the OutputQueue.

    All other incoming packets are placed in the packet queue.

    A select thread can be started and stopped, and runs to place packet
    content into the two queues.
    """

    _GDB_REMOTE_PACKET_REGEX = re.compile(r'^\$([^\#]*)#[0-9a-fA-F]{2}')

    def __init__(self, pump_socket, pump_queues, logger=None):
        if not pump_socket:
            raise Exception("pump_socket cannot be None")

        self._thread = None
        self._stop_thread = False
        self._socket = pump_socket
        self._logger = logger
        self._receive_buffer = ""
        self._accumulated_output = ""
        self._pump_queues = pump_queues

    def __enter__(self):
        """Support the python 'with' statement.

        Start the pump thread."""
        self.start_pump_thread()
        return self

    def __exit__(self, exit_type, value, the_traceback):
        """Support the python 'with' statement.

        Shut down the pump thread."""
        self.stop_pump_thread()

    def start_pump_thread(self):
        if self._thread:
            raise Exception("pump thread is already running")
        self._stop_thread = False
        self._thread = threading.Thread(target=self._run_method)
        self._thread.start()

    def stop_pump_thread(self):
        self._stop_thread = True
        if self._thread:
            self._thread.join()

    def _process_new_bytes(self, new_bytes):
        if not new_bytes:
            return
        if len(new_bytes) < 1:
            return

        # Add new bytes to our accumulated unprocessed packet bytes.
        self._receive_buffer += new_bytes

        # Parse fully-formed packets into individual packets.
        has_more = len(self._receive_buffer) > 0
        while has_more:
            if len(self._receive_buffer) <= 0:
                has_more = False
            # handle '+' ack
            elif self._receive_buffer[0] == "+":
                self._pump_queues.packet_queue().put("+")
                self._receive_buffer = self._receive_buffer[1:]
                if self._logger:
                    self._logger.debug(
                        "parsed packet from stub: +\n" +
                        "new receive_buffer: {}".format(
                            self._receive_buffer))
            else:
                packet_match = self._GDB_REMOTE_PACKET_REGEX.match(
                    self._receive_buffer)
                if packet_match:
                    # Our receive buffer matches a packet at the
                    # start of the receive buffer.
                    new_output_content = _handle_output_packet_string(
                        packet_match.group(1))
                    if new_output_content:
                        # This was an $O packet with new content.
                        self._accumulated_output += new_output_content
                        self._pump_queues.output_queue().put(self._accumulated_output)
                    else:
                        # Any packet other than $O.
                        self._pump_queues.packet_queue().put(packet_match.group(0))

                    # Remove the parsed packet from the receive
                    # buffer.
                    self._receive_buffer = self._receive_buffer[
                        len(packet_match.group(0)):]
                    if self._logger:
                        self._logger.debug(
                            "parsed packet from stub: " +
                            packet_match.group(0))
                        self._logger.debug(
                            "new receive_buffer: " +
                            self._receive_buffer)
                else:
                    # We don't have enough in the receive bufferto make a full
                    # packet. Stop trying until we read more.
                    has_more = False

    def _run_method(self):
        self._receive_buffer = ""
        self._accumulated_output = ""

        if self._logger:
            self._logger.info("socket pump starting")

        # Keep looping around until we're asked to stop the thread.
        while not self._stop_thread:
            can_read, _, _ = select.select([self._socket], [], [], 0)
            if can_read and self._socket in can_read:
                try:
                    new_bytes = self._socket.recv(4096)
                    if self._logger and new_bytes and len(new_bytes) > 0:
                        self._logger.debug(
                            "pump received bytes: {}".format(new_bytes))
                except:
                    # Likely a closed socket.  Done with the pump thread.
                    if self._logger:
                        self._logger.debug(
                            "socket read failed, stopping pump read thread\n" +
                            traceback.format_exc(3))
                    break
                self._process_new_bytes(new_bytes)

        if self._logger:
            self._logger.info("socket pump exiting")

    def get_accumulated_output(self):
        return self._accumulated_output

    def get_receive_buffer(self):
        return self._receive_buffer