//===----- RPCUTils.h - Basic tilities for building RPC APIs ----*- C++ -*-===// // // The LLVM Compiler Infrastructure // // This file is distributed under the University of Illinois Open Source // License. See LICENSE.TXT for details. // //===----------------------------------------------------------------------===// // // Basic utilities for building RPC APIs. // //===----------------------------------------------------------------------===// #ifndef LLVM_EXECUTIONENGINE_ORC_RPCUTILS_H #define LLVM_EXECUTIONENGINE_ORC_RPCUTILS_H #include #include #include "llvm/ADT/Optional.h" #include "llvm/ADT/STLExtras.h" #include "llvm/ExecutionEngine/Orc/OrcError.h" #ifdef _MSC_VER // concrt.h depends on eh.h for __uncaught_exception declaration // even if we disable exceptions. #include // Disable warnings from ppltasks.h transitively included by . #pragma warning(push) #pragma warning(disable : 4530) #pragma warning(disable : 4062) #endif #include #ifdef _MSC_VER #pragma warning(pop) #endif namespace llvm { namespace orc { namespace remote { /// Describes reserved RPC Function Ids. /// /// The default implementation will serve for integer and enum function id /// types. If you want to use a custom type as your FunctionId you can /// specialize this class and provide unique values for InvalidId, /// ResponseId and FirstValidId. template class RPCFunctionIdTraits { public: static const T InvalidId = static_cast(0); static const T ResponseId = static_cast(1); static const T FirstValidId = static_cast(2); }; // Base class containing utilities that require partial specialization. // These cannot be included in RPC, as template class members cannot be // partially specialized. class RPCBase { protected: // RPC Function description type. // // This class provides the information and operations needed to support the // RPC primitive operations (call, expect, etc) for a given function. It // is specialized for void and non-void functions to deal with the differences // betwen the two. Both specializations have the same interface: // // Id - The function's unique identifier. // OptionalReturn - The return type for asyncronous calls. // ErrorReturn - The return type for synchronous calls. // optionalToErrorReturn - Conversion from a valid OptionalReturn to an // ErrorReturn. // readResult - Deserialize a result from a channel. // abandon - Abandon a promised (asynchronous) result. // respond - Retun a result on the channel. template class FunctionHelper {}; // RPC Function description specialization for non-void functions. template class FunctionHelper { public: static_assert(FuncId != RPCFunctionIdTraits::InvalidId && FuncId != RPCFunctionIdTraits::ResponseId, "Cannot define custom function with InvalidId or ResponseId. " "Please use RPCFunctionTraits::FirstValidId."); static const FunctionIdT Id = FuncId; typedef Optional OptionalReturn; typedef Expected ErrorReturn; static ErrorReturn optionalToErrorReturn(OptionalReturn &&V) { assert(V && "Return value not available"); return std::move(*V); } template static Error readResult(ChannelT &C, std::promise &P) { RetT Val; auto Err = deserialize(C, Val); auto Err2 = endReceiveMessage(C); Err = joinErrors(std::move(Err), std::move(Err2)); if (Err) { P.set_value(OptionalReturn()); return Err; } P.set_value(std::move(Val)); return Error::success(); } static void abandon(std::promise &P) { P.set_value(OptionalReturn()); } template static Error respond(ChannelT &C, SequenceNumberT SeqNo, ErrorReturn &Result) { FunctionIdT ResponseId = RPCFunctionIdTraits::ResponseId; // If the handler returned an error then bail out with that. if (!Result) return Result.takeError(); // Otherwise open a new message on the channel and send the result. if (auto Err = startSendMessage(C)) return Err; if (auto Err = serializeSeq(C, ResponseId, SeqNo, *Result)) return Err; return endSendMessage(C); } }; // RPC Function description specialization for void functions. template class FunctionHelper { public: static_assert(FuncId != RPCFunctionIdTraits::InvalidId && FuncId != RPCFunctionIdTraits::ResponseId, "Cannot define custom function with InvalidId or ResponseId. " "Please use RPCFunctionTraits::FirstValidId."); static const FunctionIdT Id = FuncId; typedef bool OptionalReturn; typedef Error ErrorReturn; static ErrorReturn optionalToErrorReturn(OptionalReturn &&V) { assert(V && "Return value not available"); return Error::success(); } template static Error readResult(ChannelT &C, std::promise &P) { // Void functions don't have anything to deserialize, so we're good. P.set_value(true); return endReceiveMessage(C); } static void abandon(std::promise &P) { P.set_value(false); } template static Error respond(ChannelT &C, SequenceNumberT SeqNo, ErrorReturn &Result) { const FunctionIdT ResponseId = RPCFunctionIdTraits::ResponseId; // If the handler returned an error then bail out with that. if (Result) return std::move(Result); // Otherwise open a new message on the channel and send the result. if (auto Err = startSendMessage(C)) return Err; if (auto Err = serializeSeq(C, ResponseId, SeqNo)) return Err; return endSendMessage(C); } }; // Helper for the call primitive. template class CallHelper; template class CallHelper> { public: static Error call(ChannelT &C, SequenceNumberT SeqNo, const ArgTs &... Args) { if (auto Err = startSendMessage(C)) return Err; if (auto Err = serializeSeq(C, FuncId, SeqNo, Args...)) return Err; return endSendMessage(C); } }; // Helper for handle primitive. template class HandlerHelper; template class HandlerHelper> { public: template static Error handle(ChannelT &C, HandlerT Handler) { return readAndHandle(C, Handler, llvm::index_sequence_for()); } private: typedef FunctionHelper Func; template static Error readAndHandle(ChannelT &C, HandlerT Handler, llvm::index_sequence _) { std::tuple RPCArgs; SequenceNumberT SeqNo; // GCC 4.7 and 4.8 incorrectly issue a -Wunused-but-set-variable warning // for RPCArgs. Void cast RPCArgs to work around this for now. // FIXME: Remove this workaround once we can assume a working GCC version. (void)RPCArgs; if (auto Err = deserializeSeq(C, SeqNo, std::get(RPCArgs)...)) return Err; // We've deserialized the arguments, so unlock the channel for reading // before we call the handler. This allows recursive RPC calls. if (auto Err = endReceiveMessage(C)) return Err; // Run the handler and get the result. auto Result = Handler(std::get(RPCArgs)...); // Return the result to the client. return Func::template respond(C, SeqNo, Result); } }; // Helper for wrapping member functions up as functors. template class MemberFnWrapper { public: typedef RetT (ClassT::*MethodT)(ArgTs...); MemberFnWrapper(ClassT &Instance, MethodT Method) : Instance(Instance), Method(Method) {} RetT operator()(ArgTs &... Args) { return (Instance.*Method)(Args...); } private: ClassT &Instance; MethodT Method; }; // Helper that provides a Functor for deserializing arguments. template class ReadArgs { public: Error operator()() { return Error::success(); } }; template class ReadArgs : public ReadArgs { public: ReadArgs(ArgT &Arg, ArgTs &... Args) : ReadArgs(Args...), Arg(Arg) {} Error operator()(ArgT &ArgVal, ArgTs &... ArgVals) { this->Arg = std::move(ArgVal); return ReadArgs::operator()(ArgVals...); } private: ArgT &Arg; }; }; /// Contains primitive utilities for defining, calling and handling calls to /// remote procedures. ChannelT is a bidirectional stream conforming to the /// RPCChannel interface (see RPCChannel.h), and FunctionIdT is a procedure /// identifier type that must be serializable on ChannelT. /// /// These utilities support the construction of very primitive RPC utilities. /// Their intent is to ensure correct serialization and deserialization of /// procedure arguments, and to keep the client and server's view of the API in /// sync. /// /// These utilities do not support return values. These can be handled by /// declaring a corresponding '.*Response' procedure and expecting it after a /// call). They also do not support versioning: the client and server *must* be /// compiled with the same procedure definitions. /// /// /// /// Overview (see comments individual types/methods for details): /// /// Function : /// /// associates a unique serializable id with an argument list. /// /// /// call(Channel, Args...) : /// /// Calls the remote procedure 'Func' by serializing Func's id followed by its /// arguments and sending the resulting bytes to 'Channel'. /// /// /// handle(Channel, : /// /// Handles a call to 'Func' by deserializing its arguments and calling the /// given functor. This assumes that the id for 'Func' has already been /// deserialized. /// /// expect(Channel, : /// /// The same as 'handle', except that the procedure id should not have been /// read yet. Expect will deserialize the id and assert that it matches Func's /// id. If it does not, and unexpected RPC call error is returned. template class RPC : public RPCBase { public: /// RPC default constructor. RPC() = default; /// RPC instances cannot be copied. RPC(const RPC &) = delete; /// RPC instances cannot be copied. RPC &operator=(const RPC &) = delete; /// RPC move constructor. // FIXME: Remove once MSVC can synthesize move ops. RPC(RPC &&Other) : SequenceNumberMgr(std::move(Other.SequenceNumberMgr)), OutstandingResults(std::move(Other.OutstandingResults)) {} /// RPC move assignment. // FIXME: Remove once MSVC can synthesize move ops. RPC &operator=(RPC &&Other) { SequenceNumberMgr = std::move(Other.SequenceNumberMgr); OutstandingResults = std::move(Other.OutstandingResults); return *this; } /// Utility class for defining/referring to RPC procedures. /// /// Typedefs of this utility are used when calling/handling remote procedures. /// /// FuncId should be a unique value of FunctionIdT (i.e. not used with any /// other Function typedef in the RPC API being defined. /// /// the template argument Ts... gives the argument list for the remote /// procedure. /// /// E.g. /// /// typedef Function<0, bool> Func1; /// typedef Function<1, std::string, std::vector> Func2; /// /// if (auto Err = call(Channel, true)) /// /* handle Err */; /// /// if (auto Err = expect(Channel, /// [](std::string &S, std::vector &V) { /// // Stuff. /// return Error::success(); /// }) /// /* handle Err */; /// template using Function = FunctionHelper; /// Return type for asynchronous call primitives. template using AsyncCallResult = std::future; /// Return type for asynchronous call-with-seq primitives. template using AsyncCallWithSeqResult = std::pair, SequenceNumberT>; /// Serialize Args... to channel C, but do not call C.send(). /// /// Returns an error (on serialization failure) or a pair of: /// (1) A future Optional (or future for void functions), and /// (2) A sequence number. /// /// This utility function is primarily used for single-threaded mode support, /// where the sequence number can be used to wait for the corresponding /// result. In multi-threaded mode the appendCallAsync method, which does not /// return the sequence numeber, should be preferred. template Expected> appendCallAsyncWithSeq(ChannelT &C, const ArgTs &... Args) { auto SeqNo = SequenceNumberMgr.getSequenceNumber(); std::promise Promise; auto Result = Promise.get_future(); OutstandingResults[SeqNo] = createOutstandingResult(std::move(Promise)); if (auto Err = CallHelper::call(C, SeqNo, Args...)) { abandonOutstandingResults(); return std::move(Err); } else return AsyncCallWithSeqResult(std::move(Result), SeqNo); } /// The same as appendCallAsyncWithSeq, except that it calls C.send() to /// flush the channel after serializing the call. template Expected> callAsyncWithSeq(ChannelT &C, const ArgTs &... Args) { auto Result = appendCallAsyncWithSeq(C, Args...); if (!Result) return Result; if (auto Err = C.send()) { abandonOutstandingResults(); return std::move(Err); } return Result; } /// Serialize Args... to channel C, but do not call send. /// Returns an error if serialization fails, otherwise returns a /// std::future> (or a future for void functions). template Expected> appendCallAsync(ChannelT &C, const ArgTs &... Args) { auto ResAndSeqOrErr = appendCallAsyncWithSeq(C, Args...); if (ResAndSeqOrErr) return std::move(ResAndSeqOrErr->first); return ResAndSeqOrErr.getError(); } /// The same as appendCallAsync, except that it calls C.send to flush the /// channel after serializing the call. template Expected> callAsync(ChannelT &C, const ArgTs &... Args) { auto ResAndSeqOrErr = callAsyncWithSeq(C, Args...); if (ResAndSeqOrErr) return std::move(ResAndSeqOrErr->first); return ResAndSeqOrErr.getError(); } /// This can be used in single-threaded mode. template typename Func::ErrorReturn callSTHandling(ChannelT &C, HandleFtor &HandleOther, const ArgTs &... Args) { if (auto ResultAndSeqNoOrErr = callAsyncWithSeq(C, Args...)) { auto &ResultAndSeqNo = *ResultAndSeqNoOrErr; if (auto Err = waitForResult(C, ResultAndSeqNo.second, HandleOther)) return std::move(Err); return Func::optionalToErrorReturn(ResultAndSeqNo.first.get()); } else return ResultAndSeqNoOrErr.takeError(); } // This can be used in single-threaded mode. template typename Func::ErrorReturn callST(ChannelT &C, const ArgTs &... Args) { return callSTHandling(C, handleNone, Args...); } /// Start receiving a new function call. /// /// Calls startReceiveMessage on the channel, then deserializes a FunctionId /// into Id. Error startReceivingFunction(ChannelT &C, FunctionIdT &Id) { if (auto Err = startReceiveMessage(C)) return Err; return deserialize(C, Id); } /// Deserialize args for Func from C and call Handler. The signature of /// handler must conform to 'Error(Args...)' where Args... matches /// the arguments used in the Func typedef. template static Error handle(ChannelT &C, HandlerT Handler) { return HandlerHelper::handle(C, Handler); } /// Helper version of 'handle' for calling member functions. template static Error handle(ChannelT &C, ClassT &Instance, RetT (ClassT::*HandlerMethod)(ArgTs...)) { return handle( C, MemberFnWrapper(Instance, HandlerMethod)); } /// Deserialize a FunctionIdT from C and verify it matches the id for Func. /// If the id does match, deserialize the arguments and call the handler /// (similarly to handle). /// If the id does not match, return an unexpect RPC call error and do not /// deserialize any further bytes. template Error expect(ChannelT &C, HandlerT Handler) { FunctionIdT FuncId; if (auto Err = startReceivingFunction(C, FuncId)) return std::move(Err); if (FuncId != Func::Id) return orcError(OrcErrorCode::UnexpectedRPCCall); return handle(C, Handler); } /// Helper version of expect for calling member functions. template static Error expect(ChannelT &C, ClassT &Instance, Error (ClassT::*HandlerMethod)(ArgTs...)) { return expect( C, MemberFnWrapper(Instance, HandlerMethod)); } /// Helper for handling setter procedures - this method returns a functor that /// sets the variables referred to by Args... to values deserialized from the /// channel. /// E.g. /// /// typedef Function<0, bool, int> Func1; /// /// ... /// bool B; /// int I; /// if (auto Err = expect(Channel, readArgs(B, I))) /// /* Handle Args */ ; /// template static ReadArgs readArgs(ArgTs &... Args) { return ReadArgs(Args...); } /// Read a response from Channel. /// This should be called from the receive loop to retrieve results. Error handleResponse(ChannelT &C, SequenceNumberT *SeqNoRet = nullptr) { SequenceNumberT SeqNo; if (auto Err = deserialize(C, SeqNo)) { abandonOutstandingResults(); return Err; } if (SeqNoRet) *SeqNoRet = SeqNo; auto I = OutstandingResults.find(SeqNo); if (I == OutstandingResults.end()) { abandonOutstandingResults(); return orcError(OrcErrorCode::UnexpectedRPCResponse); } if (auto Err = I->second->readResult(C)) { abandonOutstandingResults(); // FIXME: Release sequence numbers? return Err; } OutstandingResults.erase(I); SequenceNumberMgr.releaseSequenceNumber(SeqNo); return Error::success(); } // Loop waiting for a result with the given sequence number. // This can be used as a receive loop if the user doesn't have a default. template Error waitForResult(ChannelT &C, SequenceNumberT TgtSeqNo, HandleOtherFtor &HandleOther = handleNone) { bool GotTgtResult = false; while (!GotTgtResult) { FunctionIdT Id = RPCFunctionIdTraits::InvalidId; if (auto Err = startReceivingFunction(C, Id)) return Err; if (Id == RPCFunctionIdTraits::ResponseId) { SequenceNumberT SeqNo; if (auto Err = handleResponse(C, &SeqNo)) return Err; GotTgtResult = (SeqNo == TgtSeqNo); } else if (auto Err = HandleOther(C, Id)) return Err; } return Error::success(); } // Default handler for 'other' (non-response) functions when waiting for a // result from the channel. static Error handleNone(ChannelT &, FunctionIdT) { return orcError(OrcErrorCode::UnexpectedRPCCall); }; private: // Manage sequence numbers. class SequenceNumberManager { public: SequenceNumberManager() = default; SequenceNumberManager(const SequenceNumberManager &) = delete; SequenceNumberManager &operator=(const SequenceNumberManager &) = delete; SequenceNumberManager(SequenceNumberManager &&Other) : NextSequenceNumber(std::move(Other.NextSequenceNumber)), FreeSequenceNumbers(std::move(Other.FreeSequenceNumbers)) {} SequenceNumberManager &operator=(SequenceNumberManager &&Other) { NextSequenceNumber = std::move(Other.NextSequenceNumber); FreeSequenceNumbers = std::move(Other.FreeSequenceNumbers); } void reset() { std::lock_guard Lock(SeqNoLock); NextSequenceNumber = 0; FreeSequenceNumbers.clear(); } SequenceNumberT getSequenceNumber() { std::lock_guard Lock(SeqNoLock); if (FreeSequenceNumbers.empty()) return NextSequenceNumber++; auto SequenceNumber = FreeSequenceNumbers.back(); FreeSequenceNumbers.pop_back(); return SequenceNumber; } void releaseSequenceNumber(SequenceNumberT SequenceNumber) { std::lock_guard Lock(SeqNoLock); FreeSequenceNumbers.push_back(SequenceNumber); } private: std::mutex SeqNoLock; SequenceNumberT NextSequenceNumber = 0; std::vector FreeSequenceNumbers; }; // Base class for results that haven't been returned from the other end of the // RPC connection yet. class OutstandingResult { public: virtual ~OutstandingResult() {} virtual Error readResult(ChannelT &C) = 0; virtual void abandon() = 0; }; // Outstanding results for a specific function. template class OutstandingResultImpl : public OutstandingResult { private: public: OutstandingResultImpl(std::promise &&P) : P(std::move(P)) {} Error readResult(ChannelT &C) override { return Func::readResult(C, P); } void abandon() override { Func::abandon(P); } private: std::promise P; }; // Create an outstanding result for the given function. template std::unique_ptr createOutstandingResult(std::promise &&P) { return llvm::make_unique>(std::move(P)); } // Abandon all outstanding results. void abandonOutstandingResults() { for (auto &KV : OutstandingResults) KV.second->abandon(); OutstandingResults.clear(); SequenceNumberMgr.reset(); } SequenceNumberManager SequenceNumberMgr; std::map> OutstandingResults; }; } // end namespace remote } // end namespace orc } // end namespace llvm #endif