RemotePyInterpreter

A PyObjC Example without documentation

Sources

AsyncPythonInterpreter.py

__all__ = ["AsyncPythonInterpreter"]

import fcntl
import os
import socket
import sys

import objc
from Foundation import (
    NSObject,
    NSUserDefaults,
    NSLog,
    NSFileHandle,
    NSNotificationCenter,
    NSFileHandleConnectionAcceptedNotification,
    NSTask,
    NSTaskDidTerminateNotification,
    NSFileHandleNotificationFileHandleItem,
    NSFileHandleReadCompletionNotification,
    NSFileHandleError,
    NSData,
    NSFileHandleNotificationDataItem,
)


IMPORT_MODULES = ["netrepr", "remote_console", "remote_pipe", "remote_bootstrap"]
source = []
for fn in IMPORT_MODULES:
    for line in open(fn + ".py"):
        source.append(line)
    source.append("\n\n")
SOURCE = repr("".join(source)) + "\n"


def bind_and_listen(hostport):
    if isinstance(hostport, str):
        host, port = hostport.split(":")
        hostport = (host, int(port))
    serversock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    # set close-on-exec
    if hasattr(fcntl, "FD_CLOEXEC"):
        old = fcntl.fcntl(serversock.fileno(), fcntl.F_GETFD)
        fcntl.fcntl(serversock.fileno(), fcntl.F_SETFD, old | fcntl.FD_CLOEXEC)
    # allow the address to be re-used in a reasonable amount of time
    if os.name == "posix" and sys.platform != "cygwin":
        serversock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

    serversock.bind(hostport)
    serversock.listen(5)
    return serversock


class AsyncPythonInterpreter(NSObject):
    commandReactor = objc.IBOutlet("commandReactor")

    def init(self):
        self = super().init()
        self.host = None
        self.port = None
        self.interpreterPath = None
        self.scriptPath = None
        self.commandReactor = None
        self.serverSocket = None
        self.serverFileHandle = None
        self.buffer = ""
        self.serverFileHandle = None
        self.remoteFileHandle = None
        self.childTask = None
        return self

    def initWithHost_port_interpreterPath_scriptPath_commandReactor_(
        self, host, port, interpreterPath, scriptPath, commandReactor
    ):
        self = self.init()
        self.host = host
        self.port = port
        self.interpreterPath = interpreterPath
        self.scriptPath = scriptPath
        self.commandReactor = commandReactor
        self.serverSocket = None
        return self

    def awakeFromNib(self):
        defaults = NSUserDefaults.standardUserDefaults()

        def default(k, v, typeCheck=None):
            rval = defaults.objectForKey_(k)
            if typeCheck is not None and rval is not None:
                try:
                    rval = typeCheck(rval)
                except TypeError:
                    NSLog(
                        "%s failed type check %s with value %s",
                        k,
                        typeCheck.__name__,
                        rval,
                    )
                    rval = None
            if rval is None:
                defaults.setObject_forKey_(v, k)
                rval = v
            return rval

        self.host = default("AsyncPythonInterpreterInterpreterHost", "127.0.0.1", str)
        self.port = default("AsyncPythonInterpreterInterpreterPort", 0, int)
        self.interpreterPath = default(
            "AsyncPythonInterpreterInterpreterPath", "/usr/bin/python", str
        )
        self.scriptPath = (
            type(self).bundleForClass().pathForResource_ofType_("tcpinterpreter", "py")
        )

    def connect(self):
        # NSLog(u'connect')
        self.serverSocket = bind_and_listen((self.host, self.port))
        self.serverFileHandle = NSFileHandle.alloc().initWithFileDescriptor_(
            self.serverSocket.fileno()
        )
        nc = NSNotificationCenter.defaultCenter()
        nc.addObserver_selector_name_object_(
            self,
            "remoteSocketAccepted:",
            NSFileHandleConnectionAcceptedNotification,
            self.serverFileHandle,
        )
        self.serverFileHandle.acceptConnectionInBackgroundAndNotify()
        self.remoteFileHandle = None
        for k in os.environ.keys():
            if k.startswith("PYTHON"):
                del os.environ[k]
        self.childTask = NSTask.launchedTaskWithLaunchPath_arguments_(
            self.interpreterPath,
            [self.scriptPath, repr(self.serverSocket.getsockname())],
        )
        nc.addObserver_selector_name_object_(
            self, "childTaskTerminated:", NSTaskDidTerminateNotification, self.childTask
        )
        return self

    def remoteSocketAccepted_(self, notification):
        # NSLog(u'remoteSocketAccepted_')
        self.serverFileHandle.closeFile()
        self.serverFileHandle = None
        ui = notification.userInfo()
        self.remoteFileHandle = ui.objectForKey_(NSFileHandleNotificationFileHandleItem)
        nc = NSNotificationCenter.defaultCenter()
        nc.addObserver_selector_name_object_(
            self,
            "remoteFileHandleReadCompleted:",
            NSFileHandleReadCompletionNotification,
            self.remoteFileHandle,
        )
        self.writeBytes_(SOURCE)
        self.remoteFileHandle.readInBackgroundAndNotify()
        self.commandReactor.connectionEstablished_(self)
        NSNotificationCenter.defaultCenter().postNotificationName_object_(
            "AsyncPythonInterpreterOpened", self
        )

    def remoteFileHandleReadCompleted_(self, notification):
        # NSLog(u'remoteFileHandleReadCompleted_')
        ui = notification.userInfo()
        newData = ui.objectForKey_(NSFileHandleNotificationDataItem)
        if newData is None:
            self.close()
            NSLog("Error: %@", ui.objectForKey_(NSFileHandleError))
            return
        data_bytes = newData.bytes()[:]
        if len(data_bytes) == 0:
            self.close()
            return
        self.remoteFileHandle.readInBackgroundAndNotify()
        start = len(self.buffer)
        buff = self.buffer + newData.bytes()[:]
        # NSLog(u'current buffer: %s', buff)
        lines = []
        while True:
            linebreak = buff.find("\n", start) + 1
            if linebreak == 0:
                break
            lines.append(buff[:linebreak])
            buff = buff[linebreak:]
            start = 0
        # NSLog(u'lines: %s', lines)
        self.buffer = buff
        for line in lines:
            self.commandReactor.lineReceived_fromConnection_(line, self)

    def writeBytes_(self, data):
        # NSLog(u'Writing bytes: %s', data)
        try:
            self.remoteFileHandle.writeData_(
                NSData.dataWithBytes_length_(data, len(data))
            )
        except objc.error:
            self.close()
        # NSLog(u'bytes written.')

    def childTaskTerminated_(self, notification):
        # NSLog(u'childTaskTerminated_')
        self.close()

    def closeServerFileHandle(self):
        # NSLog(u'closeServerFileHandle')
        if self.serverFileHandle is not None:
            try:
                self.serverFileHandle.closeFile()
            except objc.error:
                pass
            self.serverFileHandle = None

    def closeRemoteFileHandle(self):
        # NSLog(u'closeRemoteFileHandle')
        if self.remoteFileHandle is not None:
            try:
                self.remoteFileHandle.closeFile()
            except objc.error:
                pass
            self.remoteFileHandle = None

    def terminateChildTask(self):
        # NSLog(u'terminateChildTask')
        if self.childTask is not None:
            try:
                self.childTask.terminate()
            except objc.error:
                pass
            self.childTask = None

    def close(self):
        # NSLog(u'close')
        NSNotificationCenter.defaultCenter().removeObserver_(self)
        self.finalClose()
        NSNotificationCenter.defaultCenter().postNotificationName_object_(
            "AsyncPythonInterpreterClosed", self
        )

    def finalClose(self):
        if self.commandReactor is not None:
            self.commandReactor.connectionClosed_(self)
            self.commandReactor = None
        self.closeServerFileHandle()
        self.closeRemoteFileHandle()
        self.terminateChildTask()


def test_console():
    from PyObjCTools import AppHelper
    from ConsoleReactor import ConsoleReactor

    host = "127.0.0.1"
    port = 0
    interpreterPath = sys.executable
    scriptPath = os.path.abspath("tcpinterpreter.py")
    commandReactor = ConsoleReactor.alloc().init()
    interp = AsyncPythonInterpreter.alloc().initWithHost_port_interpreterPath_scriptPath_commandReactor_(  # noqa: B950
        host, port, interpreterPath, scriptPath, commandReactor
    )
    interp.connect()

    class ThisEventLoopStopper(NSObject):
        def interpFinished_(self, notification):
            AppHelper.stopEventLoop()

    stopper = ThisEventLoopStopper.alloc().init()
    NSNotificationCenter.defaultCenter().addObserver_selector_name_object_(
        stopper, "interpFinished:", "AsyncPythonInterpreterClosed", interp
    )
    AppHelper.runConsoleEventLoop(installInterrupt=True)


def main():
    test_console()


if __name__ == "__main__":
    main()

ConsoleReactor.py

import sys

from Foundation import NSObject, NSLog
from netrepr import NetRepr, RemoteObjectPool, RemoteObjectReference

__all__ = ["ConsoleReactor"]


class ConsoleReactor(NSObject):
    def init(self):
        self = super().init()
        self.pool = None
        self.netReprCenter = None
        self.connection = None
        self.commands = {}
        return self

    def connectionEstablished_(self, connection):
        # NSLog(u'connectionEstablished_')
        self.connection = connection
        self.pool = RemoteObjectPool(self.writeCode_)
        self.netReprCenter = NetRepr(self.pool)

    def connectionClosed_(self, connection):
        # NSLog(u'connectionClosed_')
        self.connection = None
        self.pool = None
        self.netReprCenter = None

    def writeCode_(self, code):
        # NSLog(u'writeCode_')
        self.connection.writeBytes_(repr(code) + "\n")

    def netEval_(self, s):
        # NSLog(u'netEval_')
        return eval(s, self.pool.namespace, self.pool.namespace)

    def lineReceived_fromConnection_(self, lineReceived, connection):
        # NSLog(u'lineReceived_fromConnection_')
        code = lineReceived.rstrip()
        if not code:
            return
        self.pool.push()
        command = map(self.netEval_, eval(code))
        try:
            self.handleCommand_(command)
        finally:
            self.pool.pop()

    def handleCommand_(self, command):
        # NSLog(u'handleCommand_')
        basic = command[0]
        sel = "handle%sCommand:" % (basic.capitalize())
        cmd = command[1:]
        if not self.respondsToSelector_(sel):
            NSLog("%r does not respond to %s", self, command)
        else:
            self.performSelector_withObject_(sel, cmd)
            getattr(self, sel.replace(":", "_"))(cmd)

    def handleRespondCommand_(self, command):
        self.doCallback_sequence_args_(
            self.commands.pop(command[0]), command[0], map(self.netEval_, command[1:])
        )

    def sendResult_sequence_(self, rval, seq):
        nr = self.netReprCenter
        code = f"__result__[{seq!r}] = {nr.netrepr(rval)}"
        self.writeCode_(code)

    def sendException_sequence_(self, e, seq):
        nr = self.netReprCenter
        code = "raise " + nr.netrepr_exception(e)
        print("forwarding:", code)
        self.writeCode_(code)

    def doCallback_sequence_args_(self, callback, seq, args):
        # nr = self.netReprCenter
        try:
            rval = callback(*args)
        except Exception as e:
            self.sendException_sequence_(e, seq)
        else:
            self.sendResult_sequence_(rval, seq)

    def deferCallback_sequence_value_(self, callback, seq, value):
        self.commands[seq] = callback
        self.writeCode_(f"pipe.respond({seq!r}, netrepr({value}))")

    def handleExpectCommand_(self, command):
        # NSLog(u'handleExpectCommand_')
        seq = command[0]
        name = command[1]
        args = command[2:]
        netrepr = self.netReprCenter.netrepr
        if name == "RemoteConsole.raw_input":
            self.doCallback_sequence_args_(input, seq, args)
        elif name == "RemoteConsole.write":
            self.doCallback_sequence_args_(sys.stdout.write, seq, args)
        elif name == "RemoteConsole.displayhook":
            obj = args[0]

            def displayhook_respond(reprobject):
                print(reprobject)

            def displayhook_local(obj):
                if obj is not None:
                    displayhook_respond(repr(obj))

            if isinstance(obj, RemoteObjectReference):
                self.deferCallback_sequence_value_(
                    displayhook_respond, seq, f"repr({netrepr(obj)})"
                )
            else:
                self.doCallback_sequence_args_(displayhook_local, seq, args)
        elif name.startswith("RemoteFileLike."):
            fh = getattr(sys, args[0])
            meth = getattr(fh, name[len("RemoteFileLike.") :])  # noqa: E203
            self.doCallback_sequence_args_(meth, seq, args[1:])
        elif name == "RemoteConsole.initialize":
            self.doCallback_sequence_args_(lambda *args: None, seq, args)
        else:
            self.doCallback_sequence_args_(
                NSLog, seq, ["%r does not respond to expect %r", self, command]
            )

    def close(self):
        if self.connection is not None:
            self.writeCode_("raise SystemExit")
        self.pool = None
        self.netReprCenter = None
        self.connection = None
        self.commands = None

RemotePyInterpreter.py

import objc

from Cocoa import (
    NSLog,
    NSDocument,
    NSFont,
    NSColor,
    NSAttributedString,
    NSFontAttributeName,
    NSForegroundColorAttributeName,
)

# from AsyncPythonInterpreter import *
from ConsoleReactor import ConsoleReactor
from netrepr import RemoteObjectReference
from PyObjCTools import AppHelper


def ensure_unicode(s):
    if not isinstance(s, str):
        s = str(s, "utf-8", "replace")
    return s


class RemotePyInterpreterReactor(ConsoleReactor):
    delegate = objc.IBOutlet()

    def handleExpectCommand_(self, command):
        print(command)
        seq = command[0]
        name = command[1]
        args = command[2:]
        netrepr = self.netReprCenter.netrepr
        if name == "RemoteConsole.raw_input":
            prompt = ensure_unicode(args[0])

            def input_received(line):
                self.sendResult_sequence_(line, seq)

            self.delegate.expectCodeInput_withPrompt_(input_received, prompt)
        elif name == "RemoteConsole.write":
            args = [ensure_unicode(args[0]), "code"]
            self.doCallback_sequence_args_(
                self.delegate.writeString_forOutput_, seq, args
            )
        elif name == "RemoteConsole.displayhook":
            obj = args[0]

            def displayhook_respond(reprobject):
                self.delegate.writeString_forOutput_(
                    ensure_unicode(reprobject) + "\n", "code"
                )

            def displayhook_local(obj):
                if obj is not None:
                    displayhook_respond(repr(obj))

            if isinstance(obj, RemoteObjectReference):
                self.deferCallback_sequence_value_(
                    displayhook_respond, seq, f"repr({netrepr(obj)})"
                )
            else:
                self.doCallback_sequence_args_(displayhook_local, seq, args)
        elif name.startswith("RemoteFileLike."):
            method = name[len("RemoteFileLike.") :]  # noqa: E203
            if method == "write":
                style, msg = map(ensure_unicode, args)
                args = [msg, style]
                self.doCallback_sequence_args_(
                    self.delegate.writeString_forOutput_, seq, args
                )

            elif method == "readline":

                def input_received(line):
                    self.sendResult_sequence_(line, seq)

                self.delegate.expectCodeInput_withPrompt_(input_received, "")

            else:
                self.doCallback_sequence_args_(
                    NSLog, seq, ["%s does not respond to expect %s", self, command]
                )
        elif name == "RemoteConsole.initialize":

            def gotTitle(repr_versioninfo, executable, pid):
                self.delegate.setVersion_executable_pid_(
                    ".".join(map(str, self.netEval_(repr_versioninfo)[:3])),
                    ensure_unicode(executable),
                    pid,
                )

            self.doCallback_sequence_args_(gotTitle, seq, args)
        #    fh = getattr(sys, args[0])
        #    meth = getattr(fh, name[len('RemoteFileLike.'):])
        #    self.doCallback_sequence_args_(meth, seq, args[1:])
        else:
            self.doCallback_sequence_args_(
                NSLog, seq, ["%s does not respond to expect %s", self, command]
            )

    def close(self):
        super().close()
        self.delegate = None


class PseudoUTF8Input:
    softspace = 0

    def __init__(self, readlinemethod):
        self._buffer = ""
        self._readline = readlinemethod

    def read(self, chars=None):
        if chars is None:
            if self._buffer:
                rval = self._buffer
                self._buffer = ""
                if rval.endswith("\r"):
                    rval = rval[:-1] + "\n"
                return rval.encode("utf-8")
            else:
                return self._readline("\x04")[:-1].encode("utf-8")
        else:
            while len(self._buffer) < chars:
                self._buffer += self._readline("\x04\r")
                if self._buffer.endswith("\x04"):
                    self._buffer = self._buffer[:-1]
                    break
            rval, self._buffer = self._buffer[:chars], self._buffer[chars:]
            return rval.encode("utf-8").replace("\r", "\n")

    def readline(self):
        if "\r" not in self._buffer:
            self._buffer += self._readline("\x04\r")
        if self._buffer.endswith("\x04"):
            rval = self._buffer[:-1].encode("utf-8")
        elif self._buffer.endswith("\r"):
            rval = self._buffer[:-1].encode("utf-8") + "\n"
        self._buffer = ""

        return rval


DEBUG_DELEGATE = 0
PASSTHROUGH = ("deleteBackward:", "complete:", "moveRight:", "moveLeft:")


class RemotePyInterpreterDocument(NSDocument):
    """
    PyInterpreter is a delegate/controller for a NSTextView,
    turning it into a full featured interactive Python interpreter.
    """

    commandReactor = objc.IBOutlet()
    interpreter = objc.IBOutlet()
    textView = objc.IBOutlet()

    def expectCodeInput_withPrompt_(self, callback, prompt):
        self.writeString_forOutput_(prompt, "code")
        self.setCharacterIndexForInput_(self.lengthOfTextView())
        self.p_input_callbacks.append(callback)
        self.flushCallbacks()

    def flushCallbacks(self):
        while self.p_input_lines and self.p_input_callbacks:
            self.p_input_callbacks.pop(0)(self.p_input_lines.pop(0))

    def setupTextView(self):
        self.textView.setFont_(self.font())
        self.textView.setContinuousSpellCheckingEnabled_(False)
        self.textView.setRichText_(False)
        self.setCharacterIndexForInput_(0)

    def setVersion_executable_pid_(self, version, executable, pid):
        self.version = version
        self.pid = pid
        self.executable = executable
        self.setFileName_(executable)

    def displayName(self):
        if not hasattr(self, "version"):
            return "Starting..."
        return f"Python {self.version} - {self.executable} - {self.pid}"

    def updateChangeCount_(self, val):
        return

    def windowWillClose_(self, window):
        if self.commandReactor is not None:
            self.commandReactor.close()
            self.commandReactor = None
        if self.interpreter is not None:
            self.interpreter.close()
            self.interpreter = None

    def windowNibName(self):
        return "RemotePyInterpreterDocument"

    def isDocumentEdited(self):
        return False

    def awakeFromNib(self):
        self.setFont_(NSFont.userFixedPitchFontOfSize_(10))
        self.p_colors = {
            "stderr": NSColor.redColor(),
            "stdout": NSColor.blueColor(),
            "code": NSColor.blackColor(),
        }
        self.setHistoryLength_(50)
        self.setHistoryView_(0)
        self.setInteracting_(False)
        self.setAutoScroll_(True)
        self.setSingleLineInteraction_(False)
        self.p_history = [""]
        self.p_input_callbacks = []
        self.p_input_lines = []
        self.setupTextView()
        self.interpreter.connect()

    #
    #  Modal input dialog support
    #

    # def p_nestedRunLoopReaderUntilEOLchars_(self, eolchars):
    #    """
    #    This makes the baby jesus cry.

    #    I want co-routines.
    #    """
    #    app = NSApplication.sharedApplication()
    #    window = self.textView.window()
    #    self.setCharacterIndexForInput_(self.lengthOfTextView())
    #    # change the color.. eh
    #    self.textView.setTypingAttributes_({
    #        NSFontAttributeName: self.font(),
    #        NSForegroundColorAttributeName: self.colorForName_(u'code'),
    #    })
    #    while True:
    #        event = app.nextEventMatchingMask_untilDate_inMode_dequeue_(
    #            NSAnyEventMask,
    #            NSDate.distantFuture(),
    #            NSDefaultRunLoopMode,
    #            True)
    #        if (event.type() == NSKeyDown) and (event.window() is window):
    #            eol = event.characters()
    #            if eol in eolchars:
    #                break
    #        app.sendEvent_(event)
    #    cl = self.currentLine()
    #    if eol == u'\r':
    #        self.writeNewLine()
    #    return cl + eol

    def executeLine_(self, line):
        self.addHistoryLine_(line)
        self.p_input_lines.append(line)
        self.flushCallbacks()
        self.p_history = filter(None, self.p_history)
        self.p_history.append("")
        self.setHistoryView_(len(self.p_history) - 1)

    def executeInteractiveLine_(self, line):
        self.setInteracting_(True)
        try:
            self.executeLine_(line)
        finally:
            self.setInteracting_(False)

    def replaceLineWithCode_(self, s):
        idx = self.characterIndexForInput()
        ts = self.textView.textStorage()
        s = self.formatString_forOutput_(s, "code")
        ts.replaceCharactersInRange_withAttributedString_(
            (idx, len(ts.mutableString()) - idx), s
        )

    #
    #  History functions
    #

    def addHistoryLine_(self, line):
        line = line.rstrip("\n")
        if self.p_history[-1] == line:
            return False
        if not line:
            return False
        self.p_history.append(line)
        if len(self.p_history) > self.historyLength():
            self.p_history.pop(0)
        return True

    def historyDown_(self, sender):
        if self.p_historyView == (len(self.p_history) - 1):
            return
        self.p_history[self.p_historyView] = self.currentLine()
        self.p_historyView += 1
        self.replaceLineWithCode_(self.p_history[self.p_historyView])
        self.moveToEndOfLine_(self)

    def historyUp_(self, sender):
        if self.p_historyView == 0:
            return
        self.p_history[self.p_historyView] = self.currentLine()
        self.p_historyView -= 1
        self.replaceLineWithCode_(self.p_history[self.p_historyView])
        self.moveToEndOfLine_(self)

    #
    #  Convenience methods to create/write decorated text
    #

    def formatString_forOutput_(self, s, name):
        return NSAttributedString.alloc().initWithString_attributes_(
            s,
            {
                NSFontAttributeName: self.font(),
                NSForegroundColorAttributeName: self.colorForName_(name),
            },
        )

    def writeString_forOutput_(self, s, name):
        s = self.formatString_forOutput_(s, name)
        self.textView.textStorage().appendAttributedString_(s)
        if self.isAutoScroll():
            self.textView.scrollRangeToVisible_((self.lengthOfTextView(), 0))

    def writeNewLine(self):
        self.writeString_forOutput_("\n", "code")

    def colorForName_(self, name):
        return self.p_colors[name]

    def setColor_forName_(self, color, name):
        self.p_colors[name] = color

    #
    #  Convenience methods for manipulating the NSTextView
    #

    def currentLine(self):
        return self.textView.textStorage().mutableString()[
            self.characterIndexForInput() :  # noqa: E203
        ]

    def moveAndScrollToIndex_(self, idx):
        self.textView.scrollRangeToVisible_((idx, 0))
        self.textView.setSelectedRange_((idx, 0))

    def lengthOfTextView(self):
        return len(self.textView.textStorage().mutableString())

    #
    #  NSTextViewDelegate methods
    #

    def textView_completions_forPartialWordRange_indexOfSelectedItem_(
        self, aTextView, completions, begin_length, index
    ):
        # NOTE:
        # this will probably have to be tricky in order to be asynchronous..
        # either by:
        #     nesting a run loop (bleh)
        #     polling the subprocess (bleh)
        #     returning nothing and calling self.textView.complete_ later
        begin, length = begin_length
        return None, 0

        if False:
            txt = self.textView.textStorage().mutableString()
            end = begin + length
            while (begin > 0) and (txt[begin].isalnum() or txt[begin] in "._"):
                begin -= 1
            while not txt[begin].isalnum():
                begin += 1
            return self.p_console.recommendCompletionsFor(txt[begin:end])

    def textView_shouldChangeTextInRange_replacementString_(
        self, aTextView, aRange, newString
    ):
        begin, length = aRange
        lastLocation = self.characterIndexForInput()
        if begin < lastLocation:
            # no editing anywhere but the interactive line
            return False
        newString = newString.replace("\r", "\n")
        if "\n" in newString:
            if begin != lastLocation:
                # no pasting multiline unless you're at the end
                # of the interactive line
                return False
            # multiline paste support
            # self.clearLine()
            newString = self.currentLine() + newString
            for s in newString.strip().split("\n"):
                self.writeString_forOutput_(s + "\n", "code")
                self.executeLine_(s)
            return False
        return True

    def textView_willChangeSelectionFromCharacterRange_toCharacterRange_(
        self, aTextView, fromRange, toRange
    ):
        begin, length = toRange
        if (
            self.singleLineInteraction()
            and length == 0
            and begin < self.characterIndexForInput()
        ):
            # no cursor movement off the interactive line
            return fromRange
        else:
            return toRange

    def textView_doCommandBySelector_(self, aTextView, aSelector):
        # deleteForward: is ctrl-d
        if self.isInteracting():
            if aSelector == "insertNewline:":
                self.writeNewLine()
            return False
        responder = getattr(self, aSelector.replace(":", "_"), None)
        if responder is not None:
            responder(aTextView)
            return True
        else:
            if DEBUG_DELEGATE and aSelector not in PASSTHROUGH:
                print(aSelector)
            return False

    #
    #  doCommandBySelector "posers" on the textView
    #

    def insertTabIgnoringFieldEditor_(self, sender):
        # this isn't terribly necessary, b/c F5 and opt-esc do completion
        # but why not
        sender.complete_(self)

    def moveToBeginningOfLine_(self, sender):
        self.moveAndScrollToIndex_(self.characterIndexForInput())

    def moveToEndOfLine_(self, sender):
        self.moveAndScrollToIndex_(self.lengthOfTextView())

    def moveToBeginningOfLineAndModifySelection_(self, sender):
        begin, length = self.textView.selectedRange()
        pos = self.characterIndexForInput()
        if begin + length > pos:
            self.textView.setSelectedRange_((pos, begin + length - pos))
        else:
            self.moveToBeginningOfLine_(sender)

    def moveToEndOfLineAndModifySelection_(self, sender):
        begin, length = self.textView.selectedRange()
        pos = max(self.characterIndexForInput(), begin)
        self.textView.setSelectedRange_((pos, self.lengthOfTextView()))

    def insertNewline_(self, sender):
        line = self.currentLine()
        self.writeNewLine()
        self.executeInteractiveLine_(line)

    moveToBeginningOfParagraph_ = moveToBeginningOfLine_
    moveToEndOfParagraph_ = moveToEndOfLine_
    insertNewlineIgnoringFieldEditor_ = insertNewline_
    moveDown_ = historyDown_
    moveUp_ = historyUp_

    #
    #  Accessors
    #

    def historyLength(self):
        return self.p_historyLength

    def setHistoryLength_(self, length):
        self.p_historyLength = length

    def font(self):
        return self.p_font

    def setFont_(self, font):
        self.p_font = font

    def isInteracting(self):
        return self.p_interacting

    def setInteracting_(self, v):
        self.p_interacting = v

    def isAutoScroll(self):
        return self.p_autoScroll

    def setAutoScroll_(self, v):
        self.p_autoScroll = v

    def characterIndexForInput(self):
        return self.p_characterIndexForInput

    def setCharacterIndexForInput_(self, idx):
        self.p_characterIndexForInput = idx
        self.moveAndScrollToIndex_(idx)

    def historyView(self):
        return self.p_historyView

    def setHistoryView_(self, v):
        self.p_historyView = v

    def singleLineInteraction(self):
        return self.p_singleLineInteraction

    def setSingleLineInteraction_(self, v):
        self.p_singleLineInteraction = v


if __name__ == "__main__":
    AppHelper.runEventLoop(installInterrupt=True)

netrepr.py

import itertools


def type_string(obj):
    if isinstance(obj, type):
        objType = obj.__class__
    else:
        objType = type(obj)
    return getattr(objType, "__module__", "-") + "." + objType.__name__


class NetRepr:
    def __init__(self, objectPool):
        self.objectPool = objectPool
        self.cache = {}
        self._identfactory = itertools.count()

    def clear(self):
        self.cache.clear()
        self._identfactory = itertools.count()

    def netrepr_tuple(self, obj):
        return repr(tuple(itertools.imap(self.netrepr, obj)))

    def netrepr_list(self, obj):
        return repr(map(self.netrepr, obj))

    def netrepr_exception(self, e):
        cls = e.__class__
        if cls.__module__ == "exceptions":
            rval = cls.__name__ + self.netrepr_tuple(e.args)
        else:
            rval = "Exception({!r})".format(
                f"[Remote] {cls.__module__}.{cls.__name__} {e}",
            )
        return rval

    def netrepr(self, obj):
        if obj is None:
            return "None"
        objtype = type(obj)
        if objtype is int or objtype is float:
            return repr(obj)
        elif objtype is str:
            if True:
                return repr(obj)
            else:
                # "intern" these
                obj_id = id(obj)
                cached = self.get(self.cache, obj_id, None)
                if cached is None:
                    # ident = next(self._identfactory)
                    self.cache[obj_id] = f"__cached__({obj_id!r})"
                    cached = f"__cache__({obj_id!r}, {obj!r})"
                return cached
        return self.netrepr_default(obj)

    def netrepr_default(self, obj):
        method = getattr(obj, "__netrepr__", None)
        if method is None:
            method = self.objectPool.referenceForObject(obj).__netrepr__
        return method()


class BaseObjectPool:
    def __init__(self):
        self.idents = {}
        self.refs = {}
        self.pools = []

    def referenceForIdent(self, ident):
        return self.idents[ident]

    def base_alloc(self, ref, ident):
        self.refs[ref] = ident
        self.idents[ident] = ref

    def base_dealloc(self, ref, ident):
        del self.refs[ref]
        del self.idents[ident]

    def autorelease(self, ref):
        if not self.pools:
            raise RuntimeError(f"no autoreleasepool for {ref!r}")
        pool = self.pools[-1]
        pool[ref] = pool.get(ref, 0) + 1

    def push(self):
        # print "pushed pool"
        self.pools.append({})

    def pop(self):
        if not self.pools:
            raise RuntimeError("popped too many pools")
        # print "popped pool"
        pool = self.pools.pop()
        for ref, count in pool.items():
            ref.release(count)

    def referenceForObject(self, obj):
        raise TypeError(
            f"Can not create a reference to {obj!r}, the bridge is unidirectional"
        )


class RemoteObjectPool(BaseObjectPool):
    def __init__(self, writecode):
        BaseObjectPool.__init__(self)
        self.writecode = writecode
        self.namespace = {"None": None, "__ref__": self.referenceForRemoteIdent}

    def referenceForRemoteIdent(self, ident, type_string):
        rval = self.idents.get(ident)
        if rval is None:
            rval = RemoteObjectReference(self, ident, type_string)
        return rval


class ObjectPool(BaseObjectPool):
    def __init__(self):
        BaseObjectPool.__init__(self)
        self._identfactory = itertools.count()
        self.obj_ids = {}
        self.namespace = {"__obj__": self.objectForIdent}

    def object_alloc(self, ref, obj_id):
        self.obj_ids[obj_id] = ref

    def object_dealloc(self, ref, obj_id):
        del self.obj_ids[obj_id]

    def objectForIdent(self, ident):
        return self.referenceForIdent(ident).obj

    def referenceForObject(self, obj):
        obj_id = id(obj)
        rval = self.obj_ids.get(obj_id)
        if rval is None:
            ident = next(self._identfactory)
            rval = ObjectReference(self, ident, type_string(obj), obj, obj_id)
            rval = rval.alloc().autorelease()
        return rval


class BaseObjectReference:
    def __init__(self, objectPool, ident, type_string):
        self.ident = ident
        self.type_string = type_string
        self.objectPool = objectPool
        self.retainCount = 1

    def retain(self, count=1):
        # print "%r.retain(%d)" % (self, count)
        self.retainCount += count
        return self

    def alloc(self):
        self.objectPool.base_alloc(self, self.ident)
        return self

    def dealloc(self):
        self.objectPool.base_dealloc(self, self.ident)
        self.retainCount = -1

    def release(self, count=1):
        # print "%r.release(%d)" % (self, count)
        newCount = self.retainCount - count
        # print "  newCount = %d" % (newCount,)
        if newCount == 0:
            self.dealloc()
        elif newCount < 0:
            raise ValueError(
                "Reference %r over-released (%r -> %r)"
                % (self, self.retainCount, newCount)
            )
        self.retainCount = newCount
        return self

    def autorelease(self):
        # print "%s.autorelease()" % (self,)
        self.objectPool.autorelease(self)
        return self

    def __repr__(self):
        return f"{type(self).__name__}({self.ident!r}, {self.type_string!r})"


class RemoteObjectReference(BaseObjectReference):
    def __netrepr__(self):
        return f"__obj__({self.ident!r})"


class ObjectReference(BaseObjectReference):
    def __init__(self, objectPool, ident, type_string, obj, obj_id):
        BaseObjectReference.__init__(self, objectPool, ident, type_string)
        self.obj = obj
        self.obj_id = id(obj)

    def alloc(self):
        self = BaseObjectReference.alloc(self)
        self.objectPool.object_alloc(self, self.obj_id)
        return self

    def dealloc(self):
        self.objectPool.object_dealloc(self, self.obj_id)
        self.obj = None
        self.obj_id = -1
        BaseObjectReference.dealloc(self)

    def __netrepr__(self):
        return f"__ref__({self.ident!r}, {self.type_string!r})"


def test_netrepr():
    pool = ObjectPool()
    pool.push()
    netrepr = NetRepr(pool).netrepr
    assert netrepr("foo") == repr("foo")
    ref = pool.referenceForObject(object)
    assert ref.obj is object
    assert ref is pool.referenceForObject(object)
    assert ref.retainCount == 1
    refrepr = netrepr(ref)
    assert refrepr == netrepr(ref)
    ref.retain()
    assert ref.retainCount == 2
    pool.pop()
    pool.push()
    assert ref.retainCount == 1

    def __ref__(ident, type_string):
        return pool.referenceForIdent(ident)

    netref = eval(refrepr)
    assert netref is ref
    assert netref.obj is object
    ref.release()
    pool.pop()
    assert ref.obj is None

remote_bootstrap.py

__file__ = "<RemotePyInterpreterClient>"
import sys

# pool = ObjectPool()
# netReprCenter = NetRepr(pool)
# netrepr = netReprCenter.netrepr
# netrepr_tuple = netReprCenter.netrepr_tuple
# netrepr_list = netReprCenter.netrepr_list
# netrepr_exception = netReprCenter.netrepr_exception
namespace = globals()
# namespace.update(pool.namespace)
__main__ = sys.modules["__main__"]
# assert namespace is not __main__.__dict__
# pipe = RemotePipe(__runsocketcode__, __clientfile__, netReprCenter, namespace, pool)
# interp = RemoteConsole(pipe, locals=__main__.__dict__)
# interp.interact()

remote_console.py

import keyword
import os
import sys
from code import InteractiveConsole, softspace

try:
    import __builtin__
except ImportError:
    import builtins as __builtin__


class RemoteConsole(InteractiveConsole):
    def __init__(self, pipe, **kw):
        self.pipe = pipe
        self.buffer = None
        InteractiveConsole.__init__(self, **kw)
        self.locals["__interpreter__"] = self

    def raw_input(self, prompt=""):
        return self.pipe.expect("RemoteConsole.raw_input", prompt)

    def write(self, msg):
        return self.pipe.expect("RemoteConsole.write", msg)

    def resetbuffer(self):
        self.lastbuffer = self.buffer
        InteractiveConsole.resetbuffer(self)

    def displayhook(self, value):
        if value is not None:
            __builtin__._ = value
        return self.pipe.expect("RemoteConsole.displayhook", value)

    def excepthook(self, exc_type, value, traceback):
        return self.pipe.expect("RemoteConsole.excepthook", exc_type, value, traceback)

    def runcode(self, code):
        try:
            exec(code, self.locals)
        except SystemExit:
            raise
        except:  # noqa: E722, B001
            self.showtraceback()
        else:
            if softspace(sys.stdout, 0):
                print

    def interact(self):
        old_raw_input = __builtin__.raw_input
        old_displayhook = sys.displayhook
        old_excepthook = sys.excepthook
        old_stdin = sys.stdin
        old_stdout = sys.stdout
        old_stderr = sys.stderr
        old_help = __builtin__.help
        old_quit = __builtin__.quit
        __builtin__.raw_input = self.raw_input
        __builtin__.help = "Close window to exit."
        __builtin__.quit = "Close window to exit."
        sys.displayhook = self.displayhook
        sys.excepthook = self.excepthook
        sys.stdin = self.pipe.stdin
        sys.stdout = self.pipe.stdout
        sys.stderr = self.pipe.stderr
        try:
            self.pipe.expect(
                "RemoteConsole.initialize",
                repr(sys.version_info),
                sys.executable,
                os.getpid(),
            )
            InteractiveConsole.interact(self)
        finally:
            __builtin__.raw_input = old_raw_input
            __builtin__.help = old_help
            __builtin__.quit = old_quit
            sys.displayhook = old_displayhook
            sys.excepthook = old_excepthook
            sys.stdin = old_stdin
            sys.stdout = old_stdout
            sys.stderr = old_stderr

    def recommendCompletionsFor(self, word):
        parts = word.split(".")
        if len(parts) > 1:
            # has a . so it must be a module or class or something
            # using eval, which shouldn't normally have side effects
            # unless there's descriptors/metaclasses doing some nasty
            # get magic
            objname = ".".join(parts[:-1])
            try:
                obj = eval(objname, self.locals)
            except:  # noqa: E722, B001
                return None, 0
            wordlower = parts[-1].lower()
            if wordlower == "":
                # they just punched in a dot, so list all attributes
                # that don't look private or special
                prefix = ".".join(parts[-2:])
                check = [
                    (prefix + _method)
                    for _method in dir(obj)
                    if _method[:1] != "_" and _method.lower().startswith(wordlower)
                ]
            else:
                # they started typing the method name
                check = filter(lambda s: s.lower().startswith(wordlower), dir(obj))
        else:
            # no dots, must be in the normal namespaces.. no eval necessary
            check = set(dir(__builtin__))
            check.update(keyword.kwlist)
            check.update(self.locals)
            wordlower = parts[-1].lower()
            check = filter(lambda s: s.lower().startswith(wordlower), check)
        check.sort()
        return check, 0

remote_pipe.py

import itertools


def as_unicode(s, encoding="utf-8"):
    typ = type(s)
    if typ is str:
        pass
    elif issubclass(typ, str):
        s = str(s)
    elif issubclass(typ, bytes):
        s = str(s, encoding, "replace")
    else:
        raise TypeError(f"expecting basestring, not {typ.__name__}")
    return s


def as_str(s, encoding="utf-8"):
    typ = type(s)
    if typ is bytes:
        pass
    elif issubclass(typ, bytes):
        s = bytes(s)
    elif issubclass(typ, str):
        s = s.encode(encoding)
    else:
        raise TypeError(f"expecting basestring, not {typ.__name__}")
    return s


class RemotePipe:
    def __init__(self, runcode, clientfile, netReprCenter, namespace, pool):
        self.runcode = runcode
        self.pool = pool
        self.clientfile = clientfile
        self.namespace = namespace
        self.result = self.namespace["__result__"] = {}
        self.netReprCenter = netReprCenter
        self.netrepr_list = netReprCenter.netrepr_list
        self.sequence = itertools.count()
        self.stdin = RemoteFileLike(self, "stdin")
        self.stdout = RemoteFileLike(self, "stdout")
        self.stderr = RemoteFileLike(self, "stderr")

    def send(self, *args):
        self.clientfile.write(self.netrepr_list(args) + "\n")
        self.clientfile.flush()

    def respond(self, *args):
        self.send("respond", *args)

    def expect(self, *args):
        self.pool.push()
        try:
            return self._expect(*args)
        finally:
            self.pool.pop()

    def _expect(self, *args):
        ident = next(self.sequence)
        self.send("expect", ident, *args)
        while ident not in self.result:
            self.runcode(self.clientfile, self.namespace)
        return self.result.pop(ident)


class RemoteFileLike:
    softspace = 0
    closed = False
    encoding = "utf-8"

    def __init__(self, pipe, ident):
        self.pipe = pipe
        self.ident = ident

    def __iter__(self):
        while True:
            rval = self.readline()
            if not rval:
                break
            yield rval

    def write(self, s):
        s = as_unicode(s, self.encoding)
        self.pipe.expect("RemoteFileLike.write", self.ident, s)

    def writelines(self, lines):
        for line in lines:
            self.write(line)

    def close(self):
        self.closed = True

    def flush(self):
        pass

    def isatty(self):
        return True

    def read(self, size=-1):
        return as_str(
            self.pipe.expect("RemoteFileLike.read", self.ident, size), self.encoding
        )

    def readline(self, size=-1):
        return as_str(
            self.pipe.expect("RemoteFileLike.readline", self.ident, size), self.encoding
        )

    def readlines(self):
        return list(self)

setup.py

"""
Script for building the example.

Usage:
    python3 setup.py py2app
"""

from setuptools import setup

plist = {
    "CFBundleIdentifier": "net.sf.pyobjc.RemotePyInterpreter",
    "CFBundleDocumentTypes": [
        {
            "CFBundleTypeExtensions": ["RemotePyInterpreter", "*"],
            "CFBundleTypeName": "RemotePyInterpreter Session",
            "CFBundleTypeRole": "Editor",
            "NSDocumentClass": "RemotePyInterpreterDocument",
        }
    ],
}

REMOTE_REQUIREMENTS = [
    "tcpinterpreter",
    "netrepr",
    "remote_console",
    "remote_pipe",
    "remote_bootstrap",
]

DATA_FILES = ["English.lproj"] + [(s + ".py") for s in REMOTE_REQUIREMENTS]

setup(
    app=["RemotePyInterpreter.py"],
    data_files=DATA_FILES,
    options={"py2app": {"plist": plist}},
    setup_requires=["py2app", "pyobjc-framework-Cocoa"],
)

tcpinterpreter.py

#! /usr/bin/env python

"""
    start socket based minimal readline exec server
"""
import socket
import sys


def runsocketcode(clientfile, g):
    try:
        source = clientfile.readline().rstrip()
    except Exception:
        raise SystemExit
    if not source:
        raise SystemExit
    source = eval(source)
    co = compile(source + "\n", "<remote-source>", "exec")
    exec(co, g)


def serveonce(clientsock, name="stdin"):
    clientfile = clientsock.makefile("r+b", 0)
    g = {
        "__name__": "__socketclient__",
        "__file__": f"<{name}>",
        "__clientsock__": clientsock,
        "__clientfile__": clientfile,
        "__runsocketcode__": runsocketcode,
    }
    try:
        runsocketcode(clientfile, g)
    finally:
        clientfile.close()
        clientsock.close()


def real_main():
    import sys

    hostport = eval(sys.argv[1])
    clientsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    clientsock.connect(hostport)
    serveonce(clientsock)


def main():
    newglobals = {
        "__builtins__": sys.modules["__builtin__"],
        "__doc__": None,
        "__name__": "__main__",
    }
    sourcefile = __file__
    g = globals()
    g.clear()
    g.update(newglobals)
    serverglobals = {"__name__": "__socketclient__"}
    with open(sourcefile) as fp:
        sourcecode = fp.read()
    exec(sourcecode, serverglobals, serverglobals)


if __name__ == "__main__":
    main()
elif __name__ == "__socketclient__":
    real_main()

test_client.py

import fcntl
import os
import socket
import sys
from subprocess import PIPE, Popen

from netrepr import NetRepr, RemoteObjectPool, RemoteObjectReference


IMPORT_MODULES = ["netrepr", "remote_console", "remote_pipe", "remote_bootstrap"]
source = []
for fn in IMPORT_MODULES:
    for line in open(fn + ".py"):
        source.append(line)
    source.append("\n\n")
SOURCE = repr("".join(source)) + "\n"


def bind_and_listen(hostport):
    if isinstance(hostport, str):
        host, port = hostport.split(":")
        hostport = (host, int(port))
    serversock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    # set close-on-exec
    if hasattr(fcntl, "FD_CLOEXEC"):
        old = fcntl.fcntl(serversock.fileno(), fcntl.F_GETFD)
        fcntl.fcntl(serversock.fileno(), fcntl.F_SETFD, old | fcntl.FD_CLOEXEC)
    # allow the address to be re-used in a reasonable amount of time
    if os.name == "posix" and sys.platform != "cygwin":
        serversock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

    serversock.bind(hostport)
    serversock.listen(5)
    return serversock


def open_connection(executable=sys.executable):
    serversock = bind_and_listen(("127.0.0.1", 0))
    hostport = serversock.getsockname()
    proc = Popen(
        [executable, "tcpinterpreter.py", repr(hostport)],
        stdin=PIPE,
        stdout=PIPE,
        stderr=PIPE,
        close_fds=True,
    )
    clientsock, address = serversock.accept()
    serversock.shutdown(2)
    return clientsock, proc


def start_client(clientsock):
    f = clientsock.makefile("r+b", 0)
    f.write(SOURCE)
    f.flush()
    return f


def client_loop(f):
    def writecode(code):
        # print('[code: %r]' % (code,))
        f.write(repr(code) + "\n")

    pool = RemoteObjectPool(writecode)
    netRepr = NetRepr(pool)
    netrepr = netRepr.netrepr

    def neteval(s):
        return eval(s, pool.namespace, pool.namespace)

    while True:
        code = f.readline().rstrip()
        pool.push()
        try:
            if not code:
                break
            command = eval(code)
            basic = eval(command[0])
            if basic == "expect":
                seq = eval(command[1])
                name = eval(command[2])
                args = map(neteval, command[3:])
                code = None
                rval = None
                if name == "RemoteConsole.input":
                    try:
                        rval = input(*args)
                    except EOFError:
                        code = "raise EOFError"
                elif name == "RemoteConsole.write":
                    sys.stdout.write(args[0])
                elif name == "RemoteConsole.displayhook":
                    pass
                    obj = args[0]
                    if obj is None:
                        pass
                    elif isinstance(obj, RemoteObjectReference):
                        writecode(f'interp.write(repr({netrepr(obj)}) + "\\n")')
                    else:
                        print(repr(obj))
                elif name.startswith("RemoteFileLike."):
                    fh = getattr(sys, args[0])
                    meth = getattr(fh, name[len("RemoteFileLike.") :])  # noqa: E203
                    rval = meth(*args[1:])
                else:
                    print(name, args)
                if code is None:
                    code = f"__result__[{seq!r}] = {rval!r}"
                writecode(code)
        finally:
            pool.pop()


def main():
    clientsock, proc = open_connection()
    f = start_client(clientsock)
    try:
        client_loop(f)
    finally:
        f.close()
        clientsock.close()
        proc.stdin.close()
        print("[stdout]", proc.stdout.read())
        print("[stderr]", proc.stderr.read())


if __name__ == "__main__":
    main()