""" Implementation of the MKS ASCII Protocol as specified in "ASCII Protocol V1.8.pdf" """ __author__ = "Georg Schlisio 0: break time.sleep(1) tries -= 1 if tries == 0: logging.error(f"Connection not successfully established: received no greeting. Exiting.") exit(201) message = self.message_queue.pop(0) assert message.type == "MKSRGA", f"Unexpected message type: {message.type}" assert message.parms[message.type] in ("Single", "Multi"), f"Unexpected connection type: {message.parms[message.type]}" assert message.parms[message.type] == "Single", f"Connection to QMS Servers of type 'Multi' is not supported." # To support this, implement the commands "Sensors" and "Select" assert "Protocol_Revision" in message.parms assert "Min_Compatibility" in message.parms self._remote_protocol_version = float(message.parms["Protocol_Revision"]) self._remote_min_protocol_version = float(message.parms["Min_Compatibility"]) if self._remote_min_protocol_version > self._protocol_version: logging.error(f"Version mismatches: remote requires V{self._remote_min_protocol_version} but we have only V{self._protocol_version}, exiting") exit(202) def _disconnect(self): """ Terminate connection :return: """ # send message "Release" self._s.send(composer(mtype="Release")) # TODO: replace sleep with actual check for "Release Ok" time.sleep(2) self._s.close() self._connected = False def _check_for_messages(self): """ Read socket buffer, parse messages and enqueue them into the message queue :return: """ rawinput = self._s.recv(4096).decode("ASCII").split("\r\r") if len(rawinput) > 1: logging.info(f"found {len(rawinput) - 1} messages") for i in range(len(rawinput) - 1): self.message_queue.append(ParseMessage(rawinput[i])) if __name__ == "__main__": p = MKS_ASCII_Protocol(ip="127.0.0.1") p.run()