diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/ComponentContainer.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/ComponentContainer.java new file mode 100644 index 00000000000..1d0f2741451 --- /dev/null +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/ComponentContainer.java @@ -0,0 +1,5 @@ +package org.apache.cloudstack.framework.messaging; + +public interface ComponentContainer { + ComponentEndpoint wireComponent(ComponentEndpoint endpoint, String predefinedAddress); +} diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/ComponentEndpoint.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/ComponentEndpoint.java index 442f9869b8b..92443e5df9c 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/ComponentEndpoint.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/ComponentEndpoint.java @@ -45,27 +45,12 @@ public class ComponentEndpoint implements RpcEndpoint, Subscriber { rpcProvider.registerRpcEndpoint(this); } - // it will throw RpcRuntimeException in case of transport - public String call(RpcCallContext callContext, String targetAddress, String command, Object cmdArg) - { - return rpcProvider.call(this, callContext, targetAddress, command, cmdArg); - } - - public RpcClientCall asyncCall(RpcCallContext callContext, String targetAddress, String command, Object cmdArg) { - return rpcProvider.asyncCall(this, callContext, targetAddress, command, cmdArg); - } - @Override public void onCallReceive(RpcServerCall call) { // TODO Auto-generated method stub // implement annotation based call dispatching } - @Override - public void onCallReturn(RpcClientCall call, Object returnObject, RpcException e) { - // ??? - } - @Override public void onPublishEvent(String subject, String senderAddress, Object args) { // TODO diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/MessageSerializer.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/MessageSerializer.java index 3aafd85a679..d07a3add483 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/MessageSerializer.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/MessageSerializer.java @@ -19,6 +19,6 @@ package org.apache.cloudstack.framework.messaging; public interface MessageSerializer { - String serializeTo(Object object); - Object serializeFrom(String message); + String serializeTo(Class clz, T object); + T serializeFrom(String message); } diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcCallContext.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcCallContext.java deleted file mode 100644 index 2d379b3a37e..00000000000 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcCallContext.java +++ /dev/null @@ -1,56 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -package org.apache.cloudstack.framework.messaging; - -import java.util.HashMap; -import java.util.Map; - -public class RpcCallContext { - private final static int DEFAULT_RPC_TIMEOUT = 10000; - - Map _contextMap = new HashMap(); - int _timeoutMilliSeconds = DEFAULT_RPC_TIMEOUT; - String _pipeline; - - public RpcCallContext() { - } - - public int getTimeoutMilliSeconds() { - return _timeoutMilliSeconds; - } - - public void setTimeoutMilliSeconds(int timeoutMilliseconds) { - _timeoutMilliSeconds = timeoutMilliseconds; - } - - public void setPipeline(String pipeName) { - _pipeline = pipeName; - } - - public String getPipeline() { - return _pipeline; - } - - @SuppressWarnings("unchecked") - public T getContextParameter(String key) { - return (T)_contextMap.get(key); - } - - public void setContextParameter(String key, Object object) { - _contextMap.put(key, object); - } -} diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcCallbackListener.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcCallbackListener.java new file mode 100644 index 00000000000..729c41d7e93 --- /dev/null +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcCallbackListener.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cloudstack.framework.messaging; + +public interface RpcCallbackListener { + void onSuccess(T result); + void onFailure(RpcException e); +} diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCall.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCall.java index 7d9cd8f5886..5a1e9c47e12 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCall.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCall.java @@ -16,10 +16,23 @@ // under the License. package org.apache.cloudstack.framework.messaging; +import java.util.concurrent.TimeUnit; + public interface RpcClientCall { - String getCommand(); - Object getCommandArgument(); - RpcCallContext getCallContext(); + RpcClientCall setCommand(String cmd); + RpcClientCall setPipeline(String pipeline); + RpcClientCall setTimeout(TimeUnit timeout); + RpcClientCall setCommandArg(Object arg); + Object getCommandArg(); + + RpcClientCall setContextParam(String key, Object param); + Object getContextParam(String key); + + RpcClientCall addCallbackListener(RpcCallbackListener listener); + + void apply(); void cancel(); + + T get(); } diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcEndpoint.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcEndpoint.java index ff007286045..375c1d3520d 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcEndpoint.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcEndpoint.java @@ -20,5 +20,4 @@ package org.apache.cloudstack.framework.messaging; public interface RpcEndpoint { void onCallReceive(RpcServerCall call); - void onCallReturn(RpcClientCall call, Object returnObject, RpcException e); } diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProvider.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProvider.java index 95217cdec1f..547a81acc77 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProvider.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProvider.java @@ -24,7 +24,6 @@ public interface RpcProvider extends TransportMultiplexier { void registerRpcEndpoint(RpcEndpoint rpcEndpoint); void unregisteRpcEndpoint(RpcEndpoint rpcEndpoint); - - String call(RpcEndpoint endpoint, RpcCallContext callContext, String targetAddress, String command, Object cmdArg); - RpcClientCall asyncCall(RpcEndpoint endpoint, RpcCallContext callContext, String targetAddress, String command, Object cmdArg); + + RpcClientCall target(String target); } diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcServerCall.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcServerCall.java index 7380fb2091a..b6dd943cbc9 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcServerCall.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcServerCall.java @@ -21,7 +21,6 @@ package org.apache.cloudstack.framework.messaging; public interface RpcServerCall { String getCommand(); Object getCommandArgument(); - String getRequestTag(); // for receiver to response call void completeCall(Object returnObject); diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpoint.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpoint.java index 6bca5668567..91ec86f49ea 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpoint.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpoint.java @@ -19,6 +19,8 @@ package org.apache.cloudstack.framework.messaging; public interface TransportEndpoint { + String getEndpointAddress(); + void onAttachConfirm(boolean bSuccess, String endpointAddress); void onDetachIndication(String endpointAddress); diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportEndpoint.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportEndpoint.java index 3c8878ff997..e12ddcf48d1 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportEndpoint.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportEndpoint.java @@ -23,6 +23,12 @@ import org.apache.cloudstack.framework.messaging.TransportMultiplexier; public class ClientTransportEndpoint implements TransportEndpoint { + @Override + public String getEndpointAddress() { + // ??? + return ""; + } + @Override public void onAttachConfirm(boolean bSuccess, String endpointAddress) { // TODO Auto-generated method stub