RemotePyInterpreter¶
A PyObjC Example without documentation
Sources¶
AsyncPythonInterpreter.py¶
__all__ = ["AsyncPythonInterpreter"]
import fcntl
import os
import socket
import sys
import objc
from Foundation import (
NSObject,
NSUserDefaults,
NSLog,
NSFileHandle,
NSNotificationCenter,
NSFileHandleConnectionAcceptedNotification,
NSTask,
NSTaskDidTerminateNotification,
NSFileHandleNotificationFileHandleItem,
NSFileHandleReadCompletionNotification,
NSFileHandleError,
NSData,
NSFileHandleNotificationDataItem,
)
IMPORT_MODULES = ["netrepr", "remote_console", "remote_pipe", "remote_bootstrap"]
source = []
for fn in IMPORT_MODULES:
for line in open(fn + ".py"):
source.append(line)
source.append("\n\n")
SOURCE = repr("".join(source)) + "\n"
def bind_and_listen(hostport):
if isinstance(hostport, str):
host, port = hostport.split(":")
hostport = (host, int(port))
serversock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# set close-on-exec
if hasattr(fcntl, "FD_CLOEXEC"):
old = fcntl.fcntl(serversock.fileno(), fcntl.F_GETFD)
fcntl.fcntl(serversock.fileno(), fcntl.F_SETFD, old | fcntl.FD_CLOEXEC)
# allow the address to be re-used in a reasonable amount of time
if os.name == "posix" and sys.platform != "cygwin":
serversock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
serversock.bind(hostport)
serversock.listen(5)
return serversock
class AsyncPythonInterpreter(NSObject):
commandReactor = objc.IBOutlet("commandReactor")
def init(self):
self = super().init()
self.host = None
self.port = None
self.interpreterPath = None
self.scriptPath = None
self.commandReactor = None
self.serverSocket = None
self.serverFileHandle = None
self.buffer = ""
self.serverFileHandle = None
self.remoteFileHandle = None
self.childTask = None
return self
def initWithHost_port_interpreterPath_scriptPath_commandReactor_(
self, host, port, interpreterPath, scriptPath, commandReactor
):
self = self.init()
self.host = host
self.port = port
self.interpreterPath = interpreterPath
self.scriptPath = scriptPath
self.commandReactor = commandReactor
self.serverSocket = None
return self
def awakeFromNib(self):
defaults = NSUserDefaults.standardUserDefaults()
def default(k, v, typeCheck=None):
rval = defaults.objectForKey_(k)
if typeCheck is not None and rval is not None:
try:
rval = typeCheck(rval)
except TypeError:
NSLog(
"%s failed type check %s with value %s",
k,
typeCheck.__name__,
rval,
)
rval = None
if rval is None:
defaults.setObject_forKey_(v, k)
rval = v
return rval
self.host = default("AsyncPythonInterpreterInterpreterHost", "127.0.0.1", str)
self.port = default("AsyncPythonInterpreterInterpreterPort", 0, int)
self.interpreterPath = default(
"AsyncPythonInterpreterInterpreterPath", "/usr/bin/python", str
)
self.scriptPath = (
type(self).bundleForClass().pathForResource_ofType_("tcpinterpreter", "py")
)
def connect(self):
# NSLog(u'connect')
self.serverSocket = bind_and_listen((self.host, self.port))
self.serverFileHandle = NSFileHandle.alloc().initWithFileDescriptor_(
self.serverSocket.fileno()
)
nc = NSNotificationCenter.defaultCenter()
nc.addObserver_selector_name_object_(
self,
"remoteSocketAccepted:",
NSFileHandleConnectionAcceptedNotification,
self.serverFileHandle,
)
self.serverFileHandle.acceptConnectionInBackgroundAndNotify()
self.remoteFileHandle = None
for k in os.environ.keys():
if k.startswith("PYTHON"):
del os.environ[k]
self.childTask = NSTask.launchedTaskWithLaunchPath_arguments_(
self.interpreterPath,
[self.scriptPath, repr(self.serverSocket.getsockname())],
)
nc.addObserver_selector_name_object_(
self, "childTaskTerminated:", NSTaskDidTerminateNotification, self.childTask
)
return self
def remoteSocketAccepted_(self, notification):
# NSLog(u'remoteSocketAccepted_')
self.serverFileHandle.closeFile()
self.serverFileHandle = None
ui = notification.userInfo()
self.remoteFileHandle = ui.objectForKey_(NSFileHandleNotificationFileHandleItem)
nc = NSNotificationCenter.defaultCenter()
nc.addObserver_selector_name_object_(
self,
"remoteFileHandleReadCompleted:",
NSFileHandleReadCompletionNotification,
self.remoteFileHandle,
)
self.writeBytes_(SOURCE)
self.remoteFileHandle.readInBackgroundAndNotify()
self.commandReactor.connectionEstablished_(self)
NSNotificationCenter.defaultCenter().postNotificationName_object_(
"AsyncPythonInterpreterOpened", self
)
def remoteFileHandleReadCompleted_(self, notification):
# NSLog(u'remoteFileHandleReadCompleted_')
ui = notification.userInfo()
newData = ui.objectForKey_(NSFileHandleNotificationDataItem)
if newData is None:
self.close()
NSLog("Error: %@", ui.objectForKey_(NSFileHandleError))
return
data_bytes = newData.bytes()[:]
if len(data_bytes) == 0:
self.close()
return
self.remoteFileHandle.readInBackgroundAndNotify()
start = len(self.buffer)
buff = self.buffer + newData.bytes()[:]
# NSLog(u'current buffer: %s', buff)
lines = []
while True:
linebreak = buff.find("\n", start) + 1
if linebreak == 0:
break
lines.append(buff[:linebreak])
buff = buff[linebreak:]
start = 0
# NSLog(u'lines: %s', lines)
self.buffer = buff
for line in lines:
self.commandReactor.lineReceived_fromConnection_(line, self)
def writeBytes_(self, data):
# NSLog(u'Writing bytes: %s', data)
try:
self.remoteFileHandle.writeData_(
NSData.dataWithBytes_length_(data, len(data))
)
except objc.error:
self.close()
# NSLog(u'bytes written.')
def childTaskTerminated_(self, notification):
# NSLog(u'childTaskTerminated_')
self.close()
def closeServerFileHandle(self):
# NSLog(u'closeServerFileHandle')
if self.serverFileHandle is not None:
try:
self.serverFileHandle.closeFile()
except objc.error:
pass
self.serverFileHandle = None
def closeRemoteFileHandle(self):
# NSLog(u'closeRemoteFileHandle')
if self.remoteFileHandle is not None:
try:
self.remoteFileHandle.closeFile()
except objc.error:
pass
self.remoteFileHandle = None
def terminateChildTask(self):
# NSLog(u'terminateChildTask')
if self.childTask is not None:
try:
self.childTask.terminate()
except objc.error:
pass
self.childTask = None
def close(self):
# NSLog(u'close')
NSNotificationCenter.defaultCenter().removeObserver_(self)
self.finalClose()
NSNotificationCenter.defaultCenter().postNotificationName_object_(
"AsyncPythonInterpreterClosed", self
)
def finalClose(self):
if self.commandReactor is not None:
self.commandReactor.connectionClosed_(self)
self.commandReactor = None
self.closeServerFileHandle()
self.closeRemoteFileHandle()
self.terminateChildTask()
def test_console():
from PyObjCTools import AppHelper
from ConsoleReactor import ConsoleReactor
host = "127.0.0.1"
port = 0
interpreterPath = sys.executable
scriptPath = os.path.abspath("tcpinterpreter.py")
commandReactor = ConsoleReactor.alloc().init()
interp = AsyncPythonInterpreter.alloc().initWithHost_port_interpreterPath_scriptPath_commandReactor_( # noqa: B950
host, port, interpreterPath, scriptPath, commandReactor
)
interp.connect()
class ThisEventLoopStopper(NSObject):
def interpFinished_(self, notification):
AppHelper.stopEventLoop()
stopper = ThisEventLoopStopper.alloc().init()
NSNotificationCenter.defaultCenter().addObserver_selector_name_object_(
stopper, "interpFinished:", "AsyncPythonInterpreterClosed", interp
)
AppHelper.runConsoleEventLoop(installInterrupt=True)
def main():
test_console()
if __name__ == "__main__":
main()
ConsoleReactor.py¶
import sys
from Foundation import NSObject, NSLog
from netrepr import NetRepr, RemoteObjectPool, RemoteObjectReference
__all__ = ["ConsoleReactor"]
class ConsoleReactor(NSObject):
def init(self):
self = super().init()
self.pool = None
self.netReprCenter = None
self.connection = None
self.commands = {}
return self
def connectionEstablished_(self, connection):
# NSLog(u'connectionEstablished_')
self.connection = connection
self.pool = RemoteObjectPool(self.writeCode_)
self.netReprCenter = NetRepr(self.pool)
def connectionClosed_(self, connection):
# NSLog(u'connectionClosed_')
self.connection = None
self.pool = None
self.netReprCenter = None
def writeCode_(self, code):
# NSLog(u'writeCode_')
self.connection.writeBytes_(repr(code) + "\n")
def netEval_(self, s):
# NSLog(u'netEval_')
return eval(s, self.pool.namespace, self.pool.namespace)
def lineReceived_fromConnection_(self, lineReceived, connection):
# NSLog(u'lineReceived_fromConnection_')
code = lineReceived.rstrip()
if not code:
return
self.pool.push()
command = map(self.netEval_, eval(code))
try:
self.handleCommand_(command)
finally:
self.pool.pop()
def handleCommand_(self, command):
# NSLog(u'handleCommand_')
basic = command[0]
sel = "handle%sCommand:" % (basic.capitalize())
cmd = command[1:]
if not self.respondsToSelector_(sel):
NSLog("%r does not respond to %s", self, command)
else:
self.performSelector_withObject_(sel, cmd)
getattr(self, sel.replace(":", "_"))(cmd)
def handleRespondCommand_(self, command):
self.doCallback_sequence_args_(
self.commands.pop(command[0]), command[0], map(self.netEval_, command[1:])
)
def sendResult_sequence_(self, rval, seq):
nr = self.netReprCenter
code = f"__result__[{seq!r}] = {nr.netrepr(rval)}"
self.writeCode_(code)
def sendException_sequence_(self, e, seq):
nr = self.netReprCenter
code = "raise " + nr.netrepr_exception(e)
print("forwarding:", code)
self.writeCode_(code)
def doCallback_sequence_args_(self, callback, seq, args):
# nr = self.netReprCenter
try:
rval = callback(*args)
except Exception as e:
self.sendException_sequence_(e, seq)
else:
self.sendResult_sequence_(rval, seq)
def deferCallback_sequence_value_(self, callback, seq, value):
self.commands[seq] = callback
self.writeCode_(f"pipe.respond({seq!r}, netrepr({value}))")
def handleExpectCommand_(self, command):
# NSLog(u'handleExpectCommand_')
seq = command[0]
name = command[1]
args = command[2:]
netrepr = self.netReprCenter.netrepr
if name == "RemoteConsole.raw_input":
self.doCallback_sequence_args_(input, seq, args)
elif name == "RemoteConsole.write":
self.doCallback_sequence_args_(sys.stdout.write, seq, args)
elif name == "RemoteConsole.displayhook":
obj = args[0]
def displayhook_respond(reprobject):
print(reprobject)
def displayhook_local(obj):
if obj is not None:
displayhook_respond(repr(obj))
if isinstance(obj, RemoteObjectReference):
self.deferCallback_sequence_value_(
displayhook_respond, seq, f"repr({netrepr(obj)})"
)
else:
self.doCallback_sequence_args_(displayhook_local, seq, args)
elif name.startswith("RemoteFileLike."):
fh = getattr(sys, args[0])
meth = getattr(fh, name[len("RemoteFileLike.") :]) # noqa: E203
self.doCallback_sequence_args_(meth, seq, args[1:])
elif name == "RemoteConsole.initialize":
self.doCallback_sequence_args_(lambda *args: None, seq, args)
else:
self.doCallback_sequence_args_(
NSLog, seq, ["%r does not respond to expect %r", self, command]
)
def close(self):
if self.connection is not None:
self.writeCode_("raise SystemExit")
self.pool = None
self.netReprCenter = None
self.connection = None
self.commands = None
RemotePyInterpreter.py¶
import objc
from Cocoa import (
NSLog,
NSDocument,
NSFont,
NSColor,
NSAttributedString,
NSFontAttributeName,
NSForegroundColorAttributeName,
)
# from AsyncPythonInterpreter import *
from ConsoleReactor import ConsoleReactor
from netrepr import RemoteObjectReference
from PyObjCTools import AppHelper
def ensure_unicode(s):
if not isinstance(s, str):
s = str(s, "utf-8", "replace")
return s
class RemotePyInterpreterReactor(ConsoleReactor):
delegate = objc.IBOutlet()
def handleExpectCommand_(self, command):
print(command)
seq = command[0]
name = command[1]
args = command[2:]
netrepr = self.netReprCenter.netrepr
if name == "RemoteConsole.raw_input":
prompt = ensure_unicode(args[0])
def input_received(line):
self.sendResult_sequence_(line, seq)
self.delegate.expectCodeInput_withPrompt_(input_received, prompt)
elif name == "RemoteConsole.write":
args = [ensure_unicode(args[0]), "code"]
self.doCallback_sequence_args_(
self.delegate.writeString_forOutput_, seq, args
)
elif name == "RemoteConsole.displayhook":
obj = args[0]
def displayhook_respond(reprobject):
self.delegate.writeString_forOutput_(
ensure_unicode(reprobject) + "\n", "code"
)
def displayhook_local(obj):
if obj is not None:
displayhook_respond(repr(obj))
if isinstance(obj, RemoteObjectReference):
self.deferCallback_sequence_value_(
displayhook_respond, seq, f"repr({netrepr(obj)})"
)
else:
self.doCallback_sequence_args_(displayhook_local, seq, args)
elif name.startswith("RemoteFileLike."):
method = name[len("RemoteFileLike.") :] # noqa: E203
if method == "write":
style, msg = map(ensure_unicode, args)
args = [msg, style]
self.doCallback_sequence_args_(
self.delegate.writeString_forOutput_, seq, args
)
elif method == "readline":
def input_received(line):
self.sendResult_sequence_(line, seq)
self.delegate.expectCodeInput_withPrompt_(input_received, "")
else:
self.doCallback_sequence_args_(
NSLog, seq, ["%s does not respond to expect %s", self, command]
)
elif name == "RemoteConsole.initialize":
def gotTitle(repr_versioninfo, executable, pid):
self.delegate.setVersion_executable_pid_(
".".join(map(str, self.netEval_(repr_versioninfo)[:3])),
ensure_unicode(executable),
pid,
)
self.doCallback_sequence_args_(gotTitle, seq, args)
# fh = getattr(sys, args[0])
# meth = getattr(fh, name[len('RemoteFileLike.'):])
# self.doCallback_sequence_args_(meth, seq, args[1:])
else:
self.doCallback_sequence_args_(
NSLog, seq, ["%s does not respond to expect %s", self, command]
)
def close(self):
super().close()
self.delegate = None
class PseudoUTF8Input:
softspace = 0
def __init__(self, readlinemethod):
self._buffer = ""
self._readline = readlinemethod
def read(self, chars=None):
if chars is None:
if self._buffer:
rval = self._buffer
self._buffer = ""
if rval.endswith("\r"):
rval = rval[:-1] + "\n"
return rval.encode("utf-8")
else:
return self._readline("\x04")[:-1].encode("utf-8")
else:
while len(self._buffer) < chars:
self._buffer += self._readline("\x04\r")
if self._buffer.endswith("\x04"):
self._buffer = self._buffer[:-1]
break
rval, self._buffer = self._buffer[:chars], self._buffer[chars:]
return rval.encode("utf-8").replace("\r", "\n")
def readline(self):
if "\r" not in self._buffer:
self._buffer += self._readline("\x04\r")
if self._buffer.endswith("\x04"):
rval = self._buffer[:-1].encode("utf-8")
elif self._buffer.endswith("\r"):
rval = self._buffer[:-1].encode("utf-8") + "\n"
self._buffer = ""
return rval
DEBUG_DELEGATE = 0
PASSTHROUGH = ("deleteBackward:", "complete:", "moveRight:", "moveLeft:")
class RemotePyInterpreterDocument(NSDocument):
"""
PyInterpreter is a delegate/controller for a NSTextView,
turning it into a full featured interactive Python interpreter.
"""
commandReactor = objc.IBOutlet()
interpreter = objc.IBOutlet()
textView = objc.IBOutlet()
def expectCodeInput_withPrompt_(self, callback, prompt):
self.writeString_forOutput_(prompt, "code")
self.setCharacterIndexForInput_(self.lengthOfTextView())
self.p_input_callbacks.append(callback)
self.flushCallbacks()
def flushCallbacks(self):
while self.p_input_lines and self.p_input_callbacks:
self.p_input_callbacks.pop(0)(self.p_input_lines.pop(0))
def setupTextView(self):
self.textView.setFont_(self.font())
self.textView.setContinuousSpellCheckingEnabled_(False)
self.textView.setRichText_(False)
self.setCharacterIndexForInput_(0)
def setVersion_executable_pid_(self, version, executable, pid):
self.version = version
self.pid = pid
self.executable = executable
self.setFileName_(executable)
def displayName(self):
if not hasattr(self, "version"):
return "Starting..."
return f"Python {self.version} - {self.executable} - {self.pid}"
def updateChangeCount_(self, val):
return
def windowWillClose_(self, window):
if self.commandReactor is not None:
self.commandReactor.close()
self.commandReactor = None
if self.interpreter is not None:
self.interpreter.close()
self.interpreter = None
def windowNibName(self):
return "RemotePyInterpreterDocument"
def isDocumentEdited(self):
return False
def awakeFromNib(self):
self.setFont_(NSFont.userFixedPitchFontOfSize_(10))
self.p_colors = {
"stderr": NSColor.redColor(),
"stdout": NSColor.blueColor(),
"code": NSColor.blackColor(),
}
self.setHistoryLength_(50)
self.setHistoryView_(0)
self.setInteracting_(False)
self.setAutoScroll_(True)
self.setSingleLineInteraction_(False)
self.p_history = [""]
self.p_input_callbacks = []
self.p_input_lines = []
self.setupTextView()
self.interpreter.connect()
#
# Modal input dialog support
#
# def p_nestedRunLoopReaderUntilEOLchars_(self, eolchars):
# """
# This makes the baby jesus cry.
# I want co-routines.
# """
# app = NSApplication.sharedApplication()
# window = self.textView.window()
# self.setCharacterIndexForInput_(self.lengthOfTextView())
# # change the color.. eh
# self.textView.setTypingAttributes_({
# NSFontAttributeName: self.font(),
# NSForegroundColorAttributeName: self.colorForName_(u'code'),
# })
# while True:
# event = app.nextEventMatchingMask_untilDate_inMode_dequeue_(
# NSAnyEventMask,
# NSDate.distantFuture(),
# NSDefaultRunLoopMode,
# True)
# if (event.type() == NSKeyDown) and (event.window() is window):
# eol = event.characters()
# if eol in eolchars:
# break
# app.sendEvent_(event)
# cl = self.currentLine()
# if eol == u'\r':
# self.writeNewLine()
# return cl + eol
def executeLine_(self, line):
self.addHistoryLine_(line)
self.p_input_lines.append(line)
self.flushCallbacks()
self.p_history = filter(None, self.p_history)
self.p_history.append("")
self.setHistoryView_(len(self.p_history) - 1)
def executeInteractiveLine_(self, line):
self.setInteracting_(True)
try:
self.executeLine_(line)
finally:
self.setInteracting_(False)
def replaceLineWithCode_(self, s):
idx = self.characterIndexForInput()
ts = self.textView.textStorage()
s = self.formatString_forOutput_(s, "code")
ts.replaceCharactersInRange_withAttributedString_(
(idx, len(ts.mutableString()) - idx), s
)
#
# History functions
#
def addHistoryLine_(self, line):
line = line.rstrip("\n")
if self.p_history[-1] == line:
return False
if not line:
return False
self.p_history.append(line)
if len(self.p_history) > self.historyLength():
self.p_history.pop(0)
return True
def historyDown_(self, sender):
if self.p_historyView == (len(self.p_history) - 1):
return
self.p_history[self.p_historyView] = self.currentLine()
self.p_historyView += 1
self.replaceLineWithCode_(self.p_history[self.p_historyView])
self.moveToEndOfLine_(self)
def historyUp_(self, sender):
if self.p_historyView == 0:
return
self.p_history[self.p_historyView] = self.currentLine()
self.p_historyView -= 1
self.replaceLineWithCode_(self.p_history[self.p_historyView])
self.moveToEndOfLine_(self)
#
# Convenience methods to create/write decorated text
#
def formatString_forOutput_(self, s, name):
return NSAttributedString.alloc().initWithString_attributes_(
s,
{
NSFontAttributeName: self.font(),
NSForegroundColorAttributeName: self.colorForName_(name),
},
)
def writeString_forOutput_(self, s, name):
s = self.formatString_forOutput_(s, name)
self.textView.textStorage().appendAttributedString_(s)
if self.isAutoScroll():
self.textView.scrollRangeToVisible_((self.lengthOfTextView(), 0))
def writeNewLine(self):
self.writeString_forOutput_("\n", "code")
def colorForName_(self, name):
return self.p_colors[name]
def setColor_forName_(self, color, name):
self.p_colors[name] = color
#
# Convenience methods for manipulating the NSTextView
#
def currentLine(self):
return self.textView.textStorage().mutableString()[
self.characterIndexForInput() : # noqa: E203
]
def moveAndScrollToIndex_(self, idx):
self.textView.scrollRangeToVisible_((idx, 0))
self.textView.setSelectedRange_((idx, 0))
def lengthOfTextView(self):
return len(self.textView.textStorage().mutableString())
#
# NSTextViewDelegate methods
#
def textView_completions_forPartialWordRange_indexOfSelectedItem_(
self, aTextView, completions, begin_length, index
):
# NOTE:
# this will probably have to be tricky in order to be asynchronous..
# either by:
# nesting a run loop (bleh)
# polling the subprocess (bleh)
# returning nothing and calling self.textView.complete_ later
begin, length = begin_length
return None, 0
if False:
txt = self.textView.textStorage().mutableString()
end = begin + length
while (begin > 0) and (txt[begin].isalnum() or txt[begin] in "._"):
begin -= 1
while not txt[begin].isalnum():
begin += 1
return self.p_console.recommendCompletionsFor(txt[begin:end])
def textView_shouldChangeTextInRange_replacementString_(
self, aTextView, aRange, newString
):
begin, length = aRange
lastLocation = self.characterIndexForInput()
if begin < lastLocation:
# no editing anywhere but the interactive line
return False
newString = newString.replace("\r", "\n")
if "\n" in newString:
if begin != lastLocation:
# no pasting multiline unless you're at the end
# of the interactive line
return False
# multiline paste support
# self.clearLine()
newString = self.currentLine() + newString
for s in newString.strip().split("\n"):
self.writeString_forOutput_(s + "\n", "code")
self.executeLine_(s)
return False
return True
def textView_willChangeSelectionFromCharacterRange_toCharacterRange_(
self, aTextView, fromRange, toRange
):
begin, length = toRange
if (
self.singleLineInteraction()
and length == 0
and begin < self.characterIndexForInput()
):
# no cursor movement off the interactive line
return fromRange
else:
return toRange
def textView_doCommandBySelector_(self, aTextView, aSelector):
# deleteForward: is ctrl-d
if self.isInteracting():
if aSelector == "insertNewline:":
self.writeNewLine()
return False
responder = getattr(self, aSelector.replace(":", "_"), None)
if responder is not None:
responder(aTextView)
return True
else:
if DEBUG_DELEGATE and aSelector not in PASSTHROUGH:
print(aSelector)
return False
#
# doCommandBySelector "posers" on the textView
#
def insertTabIgnoringFieldEditor_(self, sender):
# this isn't terribly necessary, b/c F5 and opt-esc do completion
# but why not
sender.complete_(self)
def moveToBeginningOfLine_(self, sender):
self.moveAndScrollToIndex_(self.characterIndexForInput())
def moveToEndOfLine_(self, sender):
self.moveAndScrollToIndex_(self.lengthOfTextView())
def moveToBeginningOfLineAndModifySelection_(self, sender):
begin, length = self.textView.selectedRange()
pos = self.characterIndexForInput()
if begin + length > pos:
self.textView.setSelectedRange_((pos, begin + length - pos))
else:
self.moveToBeginningOfLine_(sender)
def moveToEndOfLineAndModifySelection_(self, sender):
begin, length = self.textView.selectedRange()
pos = max(self.characterIndexForInput(), begin)
self.textView.setSelectedRange_((pos, self.lengthOfTextView()))
def insertNewline_(self, sender):
line = self.currentLine()
self.writeNewLine()
self.executeInteractiveLine_(line)
moveToBeginningOfParagraph_ = moveToBeginningOfLine_
moveToEndOfParagraph_ = moveToEndOfLine_
insertNewlineIgnoringFieldEditor_ = insertNewline_
moveDown_ = historyDown_
moveUp_ = historyUp_
#
# Accessors
#
def historyLength(self):
return self.p_historyLength
def setHistoryLength_(self, length):
self.p_historyLength = length
def font(self):
return self.p_font
def setFont_(self, font):
self.p_font = font
def isInteracting(self):
return self.p_interacting
def setInteracting_(self, v):
self.p_interacting = v
def isAutoScroll(self):
return self.p_autoScroll
def setAutoScroll_(self, v):
self.p_autoScroll = v
def characterIndexForInput(self):
return self.p_characterIndexForInput
def setCharacterIndexForInput_(self, idx):
self.p_characterIndexForInput = idx
self.moveAndScrollToIndex_(idx)
def historyView(self):
return self.p_historyView
def setHistoryView_(self, v):
self.p_historyView = v
def singleLineInteraction(self):
return self.p_singleLineInteraction
def setSingleLineInteraction_(self, v):
self.p_singleLineInteraction = v
if __name__ == "__main__":
AppHelper.runEventLoop(installInterrupt=True)
netrepr.py¶
import itertools
def type_string(obj):
if isinstance(obj, type):
objType = obj.__class__
else:
objType = type(obj)
return getattr(objType, "__module__", "-") + "." + objType.__name__
class NetRepr:
def __init__(self, objectPool):
self.objectPool = objectPool
self.cache = {}
self._identfactory = itertools.count()
def clear(self):
self.cache.clear()
self._identfactory = itertools.count()
def netrepr_tuple(self, obj):
return repr(tuple(itertools.imap(self.netrepr, obj)))
def netrepr_list(self, obj):
return repr(map(self.netrepr, obj))
def netrepr_exception(self, e):
cls = e.__class__
if cls.__module__ == "exceptions":
rval = cls.__name__ + self.netrepr_tuple(e.args)
else:
rval = "Exception({!r})".format(
f"[Remote] {cls.__module__}.{cls.__name__} {e}",
)
return rval
def netrepr(self, obj):
if obj is None:
return "None"
objtype = type(obj)
if objtype is int or objtype is float:
return repr(obj)
elif objtype is str:
if True:
return repr(obj)
else:
# "intern" these
obj_id = id(obj)
cached = self.get(self.cache, obj_id, None)
if cached is None:
# ident = next(self._identfactory)
self.cache[obj_id] = f"__cached__({obj_id!r})"
cached = f"__cache__({obj_id!r}, {obj!r})"
return cached
return self.netrepr_default(obj)
def netrepr_default(self, obj):
method = getattr(obj, "__netrepr__", None)
if method is None:
method = self.objectPool.referenceForObject(obj).__netrepr__
return method()
class BaseObjectPool:
def __init__(self):
self.idents = {}
self.refs = {}
self.pools = []
def referenceForIdent(self, ident):
return self.idents[ident]
def base_alloc(self, ref, ident):
self.refs[ref] = ident
self.idents[ident] = ref
def base_dealloc(self, ref, ident):
del self.refs[ref]
del self.idents[ident]
def autorelease(self, ref):
if not self.pools:
raise RuntimeError(f"no autoreleasepool for {ref!r}")
pool = self.pools[-1]
pool[ref] = pool.get(ref, 0) + 1
def push(self):
# print "pushed pool"
self.pools.append({})
def pop(self):
if not self.pools:
raise RuntimeError("popped too many pools")
# print "popped pool"
pool = self.pools.pop()
for ref, count in pool.items():
ref.release(count)
def referenceForObject(self, obj):
raise TypeError(
f"Can not create a reference to {obj!r}, the bridge is unidirectional"
)
class RemoteObjectPool(BaseObjectPool):
def __init__(self, writecode):
BaseObjectPool.__init__(self)
self.writecode = writecode
self.namespace = {"None": None, "__ref__": self.referenceForRemoteIdent}
def referenceForRemoteIdent(self, ident, type_string):
rval = self.idents.get(ident)
if rval is None:
rval = RemoteObjectReference(self, ident, type_string)
return rval
class ObjectPool(BaseObjectPool):
def __init__(self):
BaseObjectPool.__init__(self)
self._identfactory = itertools.count()
self.obj_ids = {}
self.namespace = {"__obj__": self.objectForIdent}
def object_alloc(self, ref, obj_id):
self.obj_ids[obj_id] = ref
def object_dealloc(self, ref, obj_id):
del self.obj_ids[obj_id]
def objectForIdent(self, ident):
return self.referenceForIdent(ident).obj
def referenceForObject(self, obj):
obj_id = id(obj)
rval = self.obj_ids.get(obj_id)
if rval is None:
ident = next(self._identfactory)
rval = ObjectReference(self, ident, type_string(obj), obj, obj_id)
rval = rval.alloc().autorelease()
return rval
class BaseObjectReference:
def __init__(self, objectPool, ident, type_string):
self.ident = ident
self.type_string = type_string
self.objectPool = objectPool
self.retainCount = 1
def retain(self, count=1):
# print "%r.retain(%d)" % (self, count)
self.retainCount += count
return self
def alloc(self):
self.objectPool.base_alloc(self, self.ident)
return self
def dealloc(self):
self.objectPool.base_dealloc(self, self.ident)
self.retainCount = -1
def release(self, count=1):
# print "%r.release(%d)" % (self, count)
newCount = self.retainCount - count
# print " newCount = %d" % (newCount,)
if newCount == 0:
self.dealloc()
elif newCount < 0:
raise ValueError(
"Reference %r over-released (%r -> %r)"
% (self, self.retainCount, newCount)
)
self.retainCount = newCount
return self
def autorelease(self):
# print "%s.autorelease()" % (self,)
self.objectPool.autorelease(self)
return self
def __repr__(self):
return f"{type(self).__name__}({self.ident!r}, {self.type_string!r})"
class RemoteObjectReference(BaseObjectReference):
def __netrepr__(self):
return f"__obj__({self.ident!r})"
class ObjectReference(BaseObjectReference):
def __init__(self, objectPool, ident, type_string, obj, obj_id):
BaseObjectReference.__init__(self, objectPool, ident, type_string)
self.obj = obj
self.obj_id = id(obj)
def alloc(self):
self = BaseObjectReference.alloc(self)
self.objectPool.object_alloc(self, self.obj_id)
return self
def dealloc(self):
self.objectPool.object_dealloc(self, self.obj_id)
self.obj = None
self.obj_id = -1
BaseObjectReference.dealloc(self)
def __netrepr__(self):
return f"__ref__({self.ident!r}, {self.type_string!r})"
def test_netrepr():
pool = ObjectPool()
pool.push()
netrepr = NetRepr(pool).netrepr
assert netrepr("foo") == repr("foo")
ref = pool.referenceForObject(object)
assert ref.obj is object
assert ref is pool.referenceForObject(object)
assert ref.retainCount == 1
refrepr = netrepr(ref)
assert refrepr == netrepr(ref)
ref.retain()
assert ref.retainCount == 2
pool.pop()
pool.push()
assert ref.retainCount == 1
def __ref__(ident, type_string):
return pool.referenceForIdent(ident)
netref = eval(refrepr)
assert netref is ref
assert netref.obj is object
ref.release()
pool.pop()
assert ref.obj is None
remote_bootstrap.py¶
__file__ = "<RemotePyInterpreterClient>"
import sys
# pool = ObjectPool()
# netReprCenter = NetRepr(pool)
# netrepr = netReprCenter.netrepr
# netrepr_tuple = netReprCenter.netrepr_tuple
# netrepr_list = netReprCenter.netrepr_list
# netrepr_exception = netReprCenter.netrepr_exception
namespace = globals()
# namespace.update(pool.namespace)
__main__ = sys.modules["__main__"]
# assert namespace is not __main__.__dict__
# pipe = RemotePipe(__runsocketcode__, __clientfile__, netReprCenter, namespace, pool)
# interp = RemoteConsole(pipe, locals=__main__.__dict__)
# interp.interact()
remote_console.py¶
import keyword
import os
import sys
from code import InteractiveConsole, softspace
try:
import __builtin__
except ImportError:
import builtins as __builtin__
class RemoteConsole(InteractiveConsole):
def __init__(self, pipe, **kw):
self.pipe = pipe
self.buffer = None
InteractiveConsole.__init__(self, **kw)
self.locals["__interpreter__"] = self
def raw_input(self, prompt=""):
return self.pipe.expect("RemoteConsole.raw_input", prompt)
def write(self, msg):
return self.pipe.expect("RemoteConsole.write", msg)
def resetbuffer(self):
self.lastbuffer = self.buffer
InteractiveConsole.resetbuffer(self)
def displayhook(self, value):
if value is not None:
__builtin__._ = value
return self.pipe.expect("RemoteConsole.displayhook", value)
def excepthook(self, exc_type, value, traceback):
return self.pipe.expect("RemoteConsole.excepthook", exc_type, value, traceback)
def runcode(self, code):
try:
exec(code, self.locals)
except SystemExit:
raise
except: # noqa: E722, B001
self.showtraceback()
else:
if softspace(sys.stdout, 0):
print
def interact(self):
old_raw_input = __builtin__.raw_input
old_displayhook = sys.displayhook
old_excepthook = sys.excepthook
old_stdin = sys.stdin
old_stdout = sys.stdout
old_stderr = sys.stderr
old_help = __builtin__.help
old_quit = __builtin__.quit
__builtin__.raw_input = self.raw_input
__builtin__.help = "Close window to exit."
__builtin__.quit = "Close window to exit."
sys.displayhook = self.displayhook
sys.excepthook = self.excepthook
sys.stdin = self.pipe.stdin
sys.stdout = self.pipe.stdout
sys.stderr = self.pipe.stderr
try:
self.pipe.expect(
"RemoteConsole.initialize",
repr(sys.version_info),
sys.executable,
os.getpid(),
)
InteractiveConsole.interact(self)
finally:
__builtin__.raw_input = old_raw_input
__builtin__.help = old_help
__builtin__.quit = old_quit
sys.displayhook = old_displayhook
sys.excepthook = old_excepthook
sys.stdin = old_stdin
sys.stdout = old_stdout
sys.stderr = old_stderr
def recommendCompletionsFor(self, word):
parts = word.split(".")
if len(parts) > 1:
# has a . so it must be a module or class or something
# using eval, which shouldn't normally have side effects
# unless there's descriptors/metaclasses doing some nasty
# get magic
objname = ".".join(parts[:-1])
try:
obj = eval(objname, self.locals)
except: # noqa: E722, B001
return None, 0
wordlower = parts[-1].lower()
if wordlower == "":
# they just punched in a dot, so list all attributes
# that don't look private or special
prefix = ".".join(parts[-2:])
check = [
(prefix + _method)
for _method in dir(obj)
if _method[:1] != "_" and _method.lower().startswith(wordlower)
]
else:
# they started typing the method name
check = filter(lambda s: s.lower().startswith(wordlower), dir(obj))
else:
# no dots, must be in the normal namespaces.. no eval necessary
check = set(dir(__builtin__))
check.update(keyword.kwlist)
check.update(self.locals)
wordlower = parts[-1].lower()
check = filter(lambda s: s.lower().startswith(wordlower), check)
check.sort()
return check, 0
remote_pipe.py¶
import itertools
def as_unicode(s, encoding="utf-8"):
typ = type(s)
if typ is str:
pass
elif issubclass(typ, str):
s = str(s)
elif issubclass(typ, bytes):
s = str(s, encoding, "replace")
else:
raise TypeError(f"expecting basestring, not {typ.__name__}")
return s
def as_str(s, encoding="utf-8"):
typ = type(s)
if typ is bytes:
pass
elif issubclass(typ, bytes):
s = bytes(s)
elif issubclass(typ, str):
s = s.encode(encoding)
else:
raise TypeError(f"expecting basestring, not {typ.__name__}")
return s
class RemotePipe:
def __init__(self, runcode, clientfile, netReprCenter, namespace, pool):
self.runcode = runcode
self.pool = pool
self.clientfile = clientfile
self.namespace = namespace
self.result = self.namespace["__result__"] = {}
self.netReprCenter = netReprCenter
self.netrepr_list = netReprCenter.netrepr_list
self.sequence = itertools.count()
self.stdin = RemoteFileLike(self, "stdin")
self.stdout = RemoteFileLike(self, "stdout")
self.stderr = RemoteFileLike(self, "stderr")
def send(self, *args):
self.clientfile.write(self.netrepr_list(args) + "\n")
self.clientfile.flush()
def respond(self, *args):
self.send("respond", *args)
def expect(self, *args):
self.pool.push()
try:
return self._expect(*args)
finally:
self.pool.pop()
def _expect(self, *args):
ident = next(self.sequence)
self.send("expect", ident, *args)
while ident not in self.result:
self.runcode(self.clientfile, self.namespace)
return self.result.pop(ident)
class RemoteFileLike:
softspace = 0
closed = False
encoding = "utf-8"
def __init__(self, pipe, ident):
self.pipe = pipe
self.ident = ident
def __iter__(self):
while True:
rval = self.readline()
if not rval:
break
yield rval
def write(self, s):
s = as_unicode(s, self.encoding)
self.pipe.expect("RemoteFileLike.write", self.ident, s)
def writelines(self, lines):
for line in lines:
self.write(line)
def close(self):
self.closed = True
def flush(self):
pass
def isatty(self):
return True
def read(self, size=-1):
return as_str(
self.pipe.expect("RemoteFileLike.read", self.ident, size), self.encoding
)
def readline(self, size=-1):
return as_str(
self.pipe.expect("RemoteFileLike.readline", self.ident, size), self.encoding
)
def readlines(self):
return list(self)
setup.py¶
"""
Script for building the example.
Usage:
python3 setup.py py2app
"""
from setuptools import setup
plist = {
"CFBundleIdentifier": "net.sf.pyobjc.RemotePyInterpreter",
"CFBundleDocumentTypes": [
{
"CFBundleTypeExtensions": ["RemotePyInterpreter", "*"],
"CFBundleTypeName": "RemotePyInterpreter Session",
"CFBundleTypeRole": "Editor",
"NSDocumentClass": "RemotePyInterpreterDocument",
}
],
}
REMOTE_REQUIREMENTS = [
"tcpinterpreter",
"netrepr",
"remote_console",
"remote_pipe",
"remote_bootstrap",
]
DATA_FILES = ["English.lproj"] + [(s + ".py") for s in REMOTE_REQUIREMENTS]
setup(
app=["RemotePyInterpreter.py"],
data_files=DATA_FILES,
options={"py2app": {"plist": plist}},
setup_requires=["py2app", "pyobjc-framework-Cocoa"],
)
tcpinterpreter.py¶
#! /usr/bin/env python
"""
start socket based minimal readline exec server
"""
import socket
import sys
def runsocketcode(clientfile, g):
try:
source = clientfile.readline().rstrip()
except Exception:
raise SystemExit
if not source:
raise SystemExit
source = eval(source)
co = compile(source + "\n", "<remote-source>", "exec")
exec(co, g)
def serveonce(clientsock, name="stdin"):
clientfile = clientsock.makefile("r+b", 0)
g = {
"__name__": "__socketclient__",
"__file__": f"<{name}>",
"__clientsock__": clientsock,
"__clientfile__": clientfile,
"__runsocketcode__": runsocketcode,
}
try:
runsocketcode(clientfile, g)
finally:
clientfile.close()
clientsock.close()
def real_main():
import sys
hostport = eval(sys.argv[1])
clientsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
clientsock.connect(hostport)
serveonce(clientsock)
def main():
newglobals = {
"__builtins__": sys.modules["__builtin__"],
"__doc__": None,
"__name__": "__main__",
}
sourcefile = __file__
g = globals()
g.clear()
g.update(newglobals)
serverglobals = {"__name__": "__socketclient__"}
with open(sourcefile) as fp:
sourcecode = fp.read()
exec(sourcecode, serverglobals, serverglobals)
if __name__ == "__main__":
main()
elif __name__ == "__socketclient__":
real_main()
test_client.py¶
import fcntl
import os
import socket
import sys
from subprocess import PIPE, Popen
from netrepr import NetRepr, RemoteObjectPool, RemoteObjectReference
IMPORT_MODULES = ["netrepr", "remote_console", "remote_pipe", "remote_bootstrap"]
source = []
for fn in IMPORT_MODULES:
for line in open(fn + ".py"):
source.append(line)
source.append("\n\n")
SOURCE = repr("".join(source)) + "\n"
def bind_and_listen(hostport):
if isinstance(hostport, str):
host, port = hostport.split(":")
hostport = (host, int(port))
serversock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# set close-on-exec
if hasattr(fcntl, "FD_CLOEXEC"):
old = fcntl.fcntl(serversock.fileno(), fcntl.F_GETFD)
fcntl.fcntl(serversock.fileno(), fcntl.F_SETFD, old | fcntl.FD_CLOEXEC)
# allow the address to be re-used in a reasonable amount of time
if os.name == "posix" and sys.platform != "cygwin":
serversock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
serversock.bind(hostport)
serversock.listen(5)
return serversock
def open_connection(executable=sys.executable):
serversock = bind_and_listen(("127.0.0.1", 0))
hostport = serversock.getsockname()
proc = Popen(
[executable, "tcpinterpreter.py", repr(hostport)],
stdin=PIPE,
stdout=PIPE,
stderr=PIPE,
close_fds=True,
)
clientsock, address = serversock.accept()
serversock.shutdown(2)
return clientsock, proc
def start_client(clientsock):
f = clientsock.makefile("r+b", 0)
f.write(SOURCE)
f.flush()
return f
def client_loop(f):
def writecode(code):
# print('[code: %r]' % (code,))
f.write(repr(code) + "\n")
pool = RemoteObjectPool(writecode)
netRepr = NetRepr(pool)
netrepr = netRepr.netrepr
def neteval(s):
return eval(s, pool.namespace, pool.namespace)
while True:
code = f.readline().rstrip()
pool.push()
try:
if not code:
break
command = eval(code)
basic = eval(command[0])
if basic == "expect":
seq = eval(command[1])
name = eval(command[2])
args = map(neteval, command[3:])
code = None
rval = None
if name == "RemoteConsole.input":
try:
rval = input(*args)
except EOFError:
code = "raise EOFError"
elif name == "RemoteConsole.write":
sys.stdout.write(args[0])
elif name == "RemoteConsole.displayhook":
pass
obj = args[0]
if obj is None:
pass
elif isinstance(obj, RemoteObjectReference):
writecode(f'interp.write(repr({netrepr(obj)}) + "\\n")')
else:
print(repr(obj))
elif name.startswith("RemoteFileLike."):
fh = getattr(sys, args[0])
meth = getattr(fh, name[len("RemoteFileLike.") :]) # noqa: E203
rval = meth(*args[1:])
else:
print(name, args)
if code is None:
code = f"__result__[{seq!r}] = {rval!r}"
writecode(code)
finally:
pool.pop()
def main():
clientsock, proc = open_connection()
f = start_client(clientsock)
try:
client_loop(f)
finally:
f.close()
clientsock.close()
proc.stdin.close()
print("[stdout]", proc.stdout.read())
print("[stderr]", proc.stderr.read())
if __name__ == "__main__":
main()