ti_kit_board_communication.main

  1import os
  2import time
  3from typing import Any, Dict, Optional, Self
  4
  5import serial
  6
  7
  8class TiKitBoard:
  9    """
 10    This class provides a mean to establish a connection between Purdue's FYE TI Kit board and the connected computer.
 11    Additionally, this class provides useful methods to write information to the host computer, allowing data to persist
 12    beyond sessions.
 13    """
 14
 15    def __init__(
 16        self: Self,
 17        port: str,
 18        baud_rate: int = 9600,
 19        max_retries: int = 3,
 20        special_ending_character: bytes = b"\n",
 21        storage_file_path: str = "data.txt",
 22    ) -> None:
 23        """
 24        Parameters
 25        ----------
 26        `port`:
 27            The port the TI Kit Board is connected to. This value is accessible in the Energia IDE.
 28        `baud_rate`:
 29            The number of signal changes or "symbols" that occur per second. Communication between the board and host computer will
 30            only work if the `baud_rate` given to the class and on the TI Kit Board's side is the same. `9600` is what most people use.
 31        `max_retries`:
 32            If the TI Kit board can't be found instantly, a connection request will be attempted after one second. This process will keep
 33            happening until the number of retries equals `max_retries`. You may call `connect_with_retries()` method to retry the entire
 34            process again.
 35        `special_ending_character`:
 36            Sending and reading requests by the board and the computer both work by reading new strings that are waiting in the serial
 37            connection until `special_ending_character` is encountered. Both the computer and the board expect the same
 38            `special_ending_character`.
 39        `storage_file_path`:
 40            This parameter controls the name of the text file that's made for persistent memory. If you delete the text file, all the data
 41            contained inside will be lost.
 42        """
 43
 44        self.connected: bool = False
 45
 46        self.port: str = port
 47        self.baud_rate: int = baud_rate
 48        self.max_retries: int = max_retries
 49        self.serial: Optional[serial.Serial] = None
 50        self.special_ending_character: bytes = special_ending_character
 51
 52        self.storage_file_path: str = storage_file_path
 53        self.storage: Dict[str, Any] = {}
 54
 55    def _init_storage(self: Self) -> None:
 56        self.storage.clear()
 57
 58        if os.path.isfile(self.storage_file_path):
 59            with open(self.storage_file_path, "r") as f:
 60                for line in f.readlines():
 61                    key, _, value = line.partition("=")
 62                    self.storage[key] = value
 63
 64        with open(self.storage_file_path, "w") as f:
 65            f.close()
 66
 67    def _write_to_storage(self: Self) -> None:
 68        if not self.connected or self.serial is None:
 69            return
 70
 71        with open(self.storage_file_path, "w") as f:
 72            for key, value in self.storage.items():
 73                f.write(f"{key}={value}")
 74
 75    def connect_with_retries(self: Self, max_retries: Optional[int] = None) -> None:
 76        """Open the serial connection, retrying on failure.
 77
 78        Parameters
 79        ----------
 80        `max_retries`:
 81            Maximum number of connection attempts. If not given,
 82            uses the instance's ``max_retries``.
 83        """
 84
 85        if max_retries is None:
 86            max_retries = self.max_retries
 87
 88        num_tries: int = 0
 89        self.connected = False
 90        self.serial = None
 91
 92        while num_tries < max_retries and not self.connected:
 93            try:
 94                print("Trying to connect...")
 95                self.serial = serial.Serial(self.port, self.baud_rate, timeout=1)
 96                print("Successfully connected.")
 97                self.connected = True
 98            except serial.SerialException:
 99                print("Failed to connect. Trying again in one second...\n")
100                num_tries += 1
101                time.sleep(1)
102
103        self._init_storage()
104
105    def read_serial(self: Self) -> Optional[bytes]:
106        """Read a single message from the serial port.
107
108        Returns
109        -------
110        `Optional[bytes]`
111            The bytes read (without the ending character), or `None`
112            if no data is available or the board is disconnected.
113        """
114
115        if not self.connected or self.serial is None:
116            return
117
118        try:
119            if self.serial.in_waiting:
120                data: bytes = self.serial.read_until(self.special_ending_character)[:-1]
121                print(f"Incoming data: {data}")
122                return data
123
124        except (OSError, serial.SerialException, AttributeError):
125            self.connected = False
126            try:
127                self.serial.close()
128            except Exception:
129                pass
130            self.serial = None
131            print("No longer have serial in check_serial")
132
133    def is_board_connected(self: Self) -> bool:
134        """Checks if the TI Kit Board is connected.
135
136        Returns
137        -------
138        `bool`
139            returns `True` if the board is connected, else `False` if no board is found.
140        """
141
142        if self.serial is None or not getattr(self.serial, "is_open", False):
143            self.connected = False
144            return False
145
146        try:
147            _ = self.serial.in_waiting
148            self.connected = True
149        except (OSError, serial.SerialException, AttributeError):
150            self.connected = False
151
152            try:
153                self.serial.close()
154            except Exception:
155                pass
156            self.serial = None
157            print("No longer have serial in is_board_connected")
158        return self.connected
159
160    def send_message(self: Self, message: bytes) -> None:
161        """Sends a message to the board if connected, else nothing is done."""
162
163        if not self.connected or self.serial is None:
164            return
165
166        self.serial.write(message + self.special_ending_character)
167
168    def write_key_to_storage(self: Self, key: str, value: Any) -> None:
169        """Adds a key-value pair and saves it to memory if the board is connected."""
170
171        if not self.connected or self.serial is None:
172            return
173
174        self.storage[key] = value
175
176        self._write_to_storage()
177
178    def remove_key_from_storage(self: Self, key: str) -> None:
179        """Removes a key-value pair and saves it to memory if the board is connected."""
180
181        if not self.connected or self.serial is None:
182            return
183
184        _ = self.storage.pop(key, None)
185
186        self._write_to_storage()
187
188    def get_value_from_storage(self: Self, key: str) -> Optional[Any]:
189        """Retrieves a value from memory if the board is connected.
190
191        Returns
192        -------
193        `Optional[Any]`
194            returns `None` if the key isn't found or if the board isn't connected, else returns the value.
195        """
196
197        if not self.connected or self.serial is None:
198            return
199
200        return self.storage.get(key, None)
201
202    def get_full_storage(self: Self) -> Optional[Dict[str, Any]]:
203        """Retrieves the full dictionary if the board is connected.
204
205        Returns
206        -------
207        `Optional[Any]`
208            returns `None` if the board isn't connected, else returns the full dictionary.
209        """
210
211        if not self.connected or self.serial is None:
212            return
213
214        return self.storage
class TiKitBoard:
  9class TiKitBoard:
 10    """
 11    This class provides a mean to establish a connection between Purdue's FYE TI Kit board and the connected computer.
 12    Additionally, this class provides useful methods to write information to the host computer, allowing data to persist
 13    beyond sessions.
 14    """
 15
 16    def __init__(
 17        self: Self,
 18        port: str,
 19        baud_rate: int = 9600,
 20        max_retries: int = 3,
 21        special_ending_character: bytes = b"\n",
 22        storage_file_path: str = "data.txt",
 23    ) -> None:
 24        """
 25        Parameters
 26        ----------
 27        `port`:
 28            The port the TI Kit Board is connected to. This value is accessible in the Energia IDE.
 29        `baud_rate`:
 30            The number of signal changes or "symbols" that occur per second. Communication between the board and host computer will
 31            only work if the `baud_rate` given to the class and on the TI Kit Board's side is the same. `9600` is what most people use.
 32        `max_retries`:
 33            If the TI Kit board can't be found instantly, a connection request will be attempted after one second. This process will keep
 34            happening until the number of retries equals `max_retries`. You may call `connect_with_retries()` method to retry the entire
 35            process again.
 36        `special_ending_character`:
 37            Sending and reading requests by the board and the computer both work by reading new strings that are waiting in the serial
 38            connection until `special_ending_character` is encountered. Both the computer and the board expect the same
 39            `special_ending_character`.
 40        `storage_file_path`:
 41            This parameter controls the name of the text file that's made for persistent memory. If you delete the text file, all the data
 42            contained inside will be lost.
 43        """
 44
 45        self.connected: bool = False
 46
 47        self.port: str = port
 48        self.baud_rate: int = baud_rate
 49        self.max_retries: int = max_retries
 50        self.serial: Optional[serial.Serial] = None
 51        self.special_ending_character: bytes = special_ending_character
 52
 53        self.storage_file_path: str = storage_file_path
 54        self.storage: Dict[str, Any] = {}
 55
 56    def _init_storage(self: Self) -> None:
 57        self.storage.clear()
 58
 59        if os.path.isfile(self.storage_file_path):
 60            with open(self.storage_file_path, "r") as f:
 61                for line in f.readlines():
 62                    key, _, value = line.partition("=")
 63                    self.storage[key] = value
 64
 65        with open(self.storage_file_path, "w") as f:
 66            f.close()
 67
 68    def _write_to_storage(self: Self) -> None:
 69        if not self.connected or self.serial is None:
 70            return
 71
 72        with open(self.storage_file_path, "w") as f:
 73            for key, value in self.storage.items():
 74                f.write(f"{key}={value}")
 75
 76    def connect_with_retries(self: Self, max_retries: Optional[int] = None) -> None:
 77        """Open the serial connection, retrying on failure.
 78
 79        Parameters
 80        ----------
 81        `max_retries`:
 82            Maximum number of connection attempts. If not given,
 83            uses the instance's ``max_retries``.
 84        """
 85
 86        if max_retries is None:
 87            max_retries = self.max_retries
 88
 89        num_tries: int = 0
 90        self.connected = False
 91        self.serial = None
 92
 93        while num_tries < max_retries and not self.connected:
 94            try:
 95                print("Trying to connect...")
 96                self.serial = serial.Serial(self.port, self.baud_rate, timeout=1)
 97                print("Successfully connected.")
 98                self.connected = True
 99            except serial.SerialException:
100                print("Failed to connect. Trying again in one second...\n")
101                num_tries += 1
102                time.sleep(1)
103
104        self._init_storage()
105
106    def read_serial(self: Self) -> Optional[bytes]:
107        """Read a single message from the serial port.
108
109        Returns
110        -------
111        `Optional[bytes]`
112            The bytes read (without the ending character), or `None`
113            if no data is available or the board is disconnected.
114        """
115
116        if not self.connected or self.serial is None:
117            return
118
119        try:
120            if self.serial.in_waiting:
121                data: bytes = self.serial.read_until(self.special_ending_character)[:-1]
122                print(f"Incoming data: {data}")
123                return data
124
125        except (OSError, serial.SerialException, AttributeError):
126            self.connected = False
127            try:
128                self.serial.close()
129            except Exception:
130                pass
131            self.serial = None
132            print("No longer have serial in check_serial")
133
134    def is_board_connected(self: Self) -> bool:
135        """Checks if the TI Kit Board is connected.
136
137        Returns
138        -------
139        `bool`
140            returns `True` if the board is connected, else `False` if no board is found.
141        """
142
143        if self.serial is None or not getattr(self.serial, "is_open", False):
144            self.connected = False
145            return False
146
147        try:
148            _ = self.serial.in_waiting
149            self.connected = True
150        except (OSError, serial.SerialException, AttributeError):
151            self.connected = False
152
153            try:
154                self.serial.close()
155            except Exception:
156                pass
157            self.serial = None
158            print("No longer have serial in is_board_connected")
159        return self.connected
160
161    def send_message(self: Self, message: bytes) -> None:
162        """Sends a message to the board if connected, else nothing is done."""
163
164        if not self.connected or self.serial is None:
165            return
166
167        self.serial.write(message + self.special_ending_character)
168
169    def write_key_to_storage(self: Self, key: str, value: Any) -> None:
170        """Adds a key-value pair and saves it to memory if the board is connected."""
171
172        if not self.connected or self.serial is None:
173            return
174
175        self.storage[key] = value
176
177        self._write_to_storage()
178
179    def remove_key_from_storage(self: Self, key: str) -> None:
180        """Removes a key-value pair and saves it to memory if the board is connected."""
181
182        if not self.connected or self.serial is None:
183            return
184
185        _ = self.storage.pop(key, None)
186
187        self._write_to_storage()
188
189    def get_value_from_storage(self: Self, key: str) -> Optional[Any]:
190        """Retrieves a value from memory if the board is connected.
191
192        Returns
193        -------
194        `Optional[Any]`
195            returns `None` if the key isn't found or if the board isn't connected, else returns the value.
196        """
197
198        if not self.connected or self.serial is None:
199            return
200
201        return self.storage.get(key, None)
202
203    def get_full_storage(self: Self) -> Optional[Dict[str, Any]]:
204        """Retrieves the full dictionary if the board is connected.
205
206        Returns
207        -------
208        `Optional[Any]`
209            returns `None` if the board isn't connected, else returns the full dictionary.
210        """
211
212        if not self.connected or self.serial is None:
213            return
214
215        return self.storage

This class provides a mean to establish a connection between Purdue's FYE TI Kit board and the connected computer. Additionally, this class provides useful methods to write information to the host computer, allowing data to persist beyond sessions.

TiKitBoard( port: str, baud_rate: int = 9600, max_retries: int = 3, special_ending_character: bytes = b'\n', storage_file_path: str = 'data.txt')
16    def __init__(
17        self: Self,
18        port: str,
19        baud_rate: int = 9600,
20        max_retries: int = 3,
21        special_ending_character: bytes = b"\n",
22        storage_file_path: str = "data.txt",
23    ) -> None:
24        """
25        Parameters
26        ----------
27        `port`:
28            The port the TI Kit Board is connected to. This value is accessible in the Energia IDE.
29        `baud_rate`:
30            The number of signal changes or "symbols" that occur per second. Communication between the board and host computer will
31            only work if the `baud_rate` given to the class and on the TI Kit Board's side is the same. `9600` is what most people use.
32        `max_retries`:
33            If the TI Kit board can't be found instantly, a connection request will be attempted after one second. This process will keep
34            happening until the number of retries equals `max_retries`. You may call `connect_with_retries()` method to retry the entire
35            process again.
36        `special_ending_character`:
37            Sending and reading requests by the board and the computer both work by reading new strings that are waiting in the serial
38            connection until `special_ending_character` is encountered. Both the computer and the board expect the same
39            `special_ending_character`.
40        `storage_file_path`:
41            This parameter controls the name of the text file that's made for persistent memory. If you delete the text file, all the data
42            contained inside will be lost.
43        """
44
45        self.connected: bool = False
46
47        self.port: str = port
48        self.baud_rate: int = baud_rate
49        self.max_retries: int = max_retries
50        self.serial: Optional[serial.Serial] = None
51        self.special_ending_character: bytes = special_ending_character
52
53        self.storage_file_path: str = storage_file_path
54        self.storage: Dict[str, Any] = {}

Parameters

port: The port the TI Kit Board is connected to. This value is accessible in the Energia IDE. baud_rate: The number of signal changes or "symbols" that occur per second. Communication between the board and host computer will only work if the baud_rate given to the class and on the TI Kit Board's side is the same. 9600 is what most people use. max_retries: If the TI Kit board can't be found instantly, a connection request will be attempted after one second. This process will keep happening until the number of retries equals max_retries. You may call connect_with_retries() method to retry the entire process again. special_ending_character: Sending and reading requests by the board and the computer both work by reading new strings that are waiting in the serial connection until special_ending_character is encountered. Both the computer and the board expect the same special_ending_character. storage_file_path: This parameter controls the name of the text file that's made for persistent memory. If you delete the text file, all the data contained inside will be lost.

connected: bool
port: str
baud_rate: int
max_retries: int
serial: Optional[serial.serialposix.Serial]
special_ending_character: bytes
storage_file_path: str
storage: Dict[str, Any]
def connect_with_retries(self: Self, max_retries: Optional[int] = None) -> None:
 76    def connect_with_retries(self: Self, max_retries: Optional[int] = None) -> None:
 77        """Open the serial connection, retrying on failure.
 78
 79        Parameters
 80        ----------
 81        `max_retries`:
 82            Maximum number of connection attempts. If not given,
 83            uses the instance's ``max_retries``.
 84        """
 85
 86        if max_retries is None:
 87            max_retries = self.max_retries
 88
 89        num_tries: int = 0
 90        self.connected = False
 91        self.serial = None
 92
 93        while num_tries < max_retries and not self.connected:
 94            try:
 95                print("Trying to connect...")
 96                self.serial = serial.Serial(self.port, self.baud_rate, timeout=1)
 97                print("Successfully connected.")
 98                self.connected = True
 99            except serial.SerialException:
100                print("Failed to connect. Trying again in one second...\n")
101                num_tries += 1
102                time.sleep(1)
103
104        self._init_storage()

Open the serial connection, retrying on failure.

Parameters

max_retries: Maximum number of connection attempts. If not given, uses the instance's max_retries.

def read_serial(self: Self) -> Optional[bytes]:
106    def read_serial(self: Self) -> Optional[bytes]:
107        """Read a single message from the serial port.
108
109        Returns
110        -------
111        `Optional[bytes]`
112            The bytes read (without the ending character), or `None`
113            if no data is available or the board is disconnected.
114        """
115
116        if not self.connected or self.serial is None:
117            return
118
119        try:
120            if self.serial.in_waiting:
121                data: bytes = self.serial.read_until(self.special_ending_character)[:-1]
122                print(f"Incoming data: {data}")
123                return data
124
125        except (OSError, serial.SerialException, AttributeError):
126            self.connected = False
127            try:
128                self.serial.close()
129            except Exception:
130                pass
131            self.serial = None
132            print("No longer have serial in check_serial")

Read a single message from the serial port.

Returns

Optional[bytes] The bytes read (without the ending character), or None if no data is available or the board is disconnected.

def is_board_connected(self: Self) -> bool:
134    def is_board_connected(self: Self) -> bool:
135        """Checks if the TI Kit Board is connected.
136
137        Returns
138        -------
139        `bool`
140            returns `True` if the board is connected, else `False` if no board is found.
141        """
142
143        if self.serial is None or not getattr(self.serial, "is_open", False):
144            self.connected = False
145            return False
146
147        try:
148            _ = self.serial.in_waiting
149            self.connected = True
150        except (OSError, serial.SerialException, AttributeError):
151            self.connected = False
152
153            try:
154                self.serial.close()
155            except Exception:
156                pass
157            self.serial = None
158            print("No longer have serial in is_board_connected")
159        return self.connected

Checks if the TI Kit Board is connected.

Returns

bool returns True if the board is connected, else False if no board is found.

def send_message(self: Self, message: bytes) -> None:
161    def send_message(self: Self, message: bytes) -> None:
162        """Sends a message to the board if connected, else nothing is done."""
163
164        if not self.connected or self.serial is None:
165            return
166
167        self.serial.write(message + self.special_ending_character)

Sends a message to the board if connected, else nothing is done.

def write_key_to_storage(self: Self, key: str, value: Any) -> None:
169    def write_key_to_storage(self: Self, key: str, value: Any) -> None:
170        """Adds a key-value pair and saves it to memory if the board is connected."""
171
172        if not self.connected or self.serial is None:
173            return
174
175        self.storage[key] = value
176
177        self._write_to_storage()

Adds a key-value pair and saves it to memory if the board is connected.

def remove_key_from_storage(self: Self, key: str) -> None:
179    def remove_key_from_storage(self: Self, key: str) -> None:
180        """Removes a key-value pair and saves it to memory if the board is connected."""
181
182        if not self.connected or self.serial is None:
183            return
184
185        _ = self.storage.pop(key, None)
186
187        self._write_to_storage()

Removes a key-value pair and saves it to memory if the board is connected.

def get_value_from_storage(self: Self, key: str) -> Optional[Any]:
189    def get_value_from_storage(self: Self, key: str) -> Optional[Any]:
190        """Retrieves a value from memory if the board is connected.
191
192        Returns
193        -------
194        `Optional[Any]`
195            returns `None` if the key isn't found or if the board isn't connected, else returns the value.
196        """
197
198        if not self.connected or self.serial is None:
199            return
200
201        return self.storage.get(key, None)

Retrieves a value from memory if the board is connected.

Returns

Optional[Any] returns None if the key isn't found or if the board isn't connected, else returns the value.

def get_full_storage(self: Self) -> Optional[Dict[str, Any]]:
203    def get_full_storage(self: Self) -> Optional[Dict[str, Any]]:
204        """Retrieves the full dictionary if the board is connected.
205
206        Returns
207        -------
208        `Optional[Any]`
209            returns `None` if the board isn't connected, else returns the full dictionary.
210        """
211
212        if not self.connected or self.serial is None:
213            return
214
215        return self.storage

Retrieves the full dictionary if the board is connected.

Returns

Optional[Any] returns None if the board isn't connected, else returns the full dictionary.