modify RPC API flavor to be in Java Future<T> style

This commit is contained in:
Kelven Yang 2012-11-19 17:36:56 -08:00
parent bcff47d4b0
commit 17f2af409e
11 changed files with 57 additions and 81 deletions

View File

@ -0,0 +1,5 @@
package org.apache.cloudstack.framework.messaging;
public interface ComponentContainer {
ComponentEndpoint wireComponent(ComponentEndpoint endpoint, String predefinedAddress);
}

View File

@ -45,27 +45,12 @@ public class ComponentEndpoint implements RpcEndpoint, Subscriber {
rpcProvider.registerRpcEndpoint(this);
}
// it will throw RpcRuntimeException in case of transport
public String call(RpcCallContext callContext, String targetAddress, String command, Object cmdArg)
{
return rpcProvider.call(this, callContext, targetAddress, command, cmdArg);
}
public RpcClientCall asyncCall(RpcCallContext callContext, String targetAddress, String command, Object cmdArg) {
return rpcProvider.asyncCall(this, callContext, targetAddress, command, cmdArg);
}
@Override
public void onCallReceive(RpcServerCall call) {
// TODO Auto-generated method stub
// implement annotation based call dispatching
}
@Override
public void onCallReturn(RpcClientCall call, Object returnObject, RpcException e) {
// ???
}
@Override
public void onPublishEvent(String subject, String senderAddress, Object args) {
// TODO

View File

@ -19,6 +19,6 @@
package org.apache.cloudstack.framework.messaging;
public interface MessageSerializer {
String serializeTo(Object object);
Object serializeFrom(String message);
<T>String serializeTo(Class<?> clz, T object);
<T> T serializeFrom(String message);
}

View File

@ -1,56 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.cloudstack.framework.messaging;
import java.util.HashMap;
import java.util.Map;
public class RpcCallContext {
private final static int DEFAULT_RPC_TIMEOUT = 10000;
Map<String, Object> _contextMap = new HashMap<String, Object>();
int _timeoutMilliSeconds = DEFAULT_RPC_TIMEOUT;
String _pipeline;
public RpcCallContext() {
}
public int getTimeoutMilliSeconds() {
return _timeoutMilliSeconds;
}
public void setTimeoutMilliSeconds(int timeoutMilliseconds) {
_timeoutMilliSeconds = timeoutMilliseconds;
}
public void setPipeline(String pipeName) {
_pipeline = pipeName;
}
public String getPipeline() {
return _pipeline;
}
@SuppressWarnings("unchecked")
public <T> T getContextParameter(String key) {
return (T)_contextMap.get(key);
}
public void setContextParameter(String key, Object object) {
_contextMap.put(key, object);
}
}

View File

@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cloudstack.framework.messaging;
public interface RpcCallbackListener<T> {
void onSuccess(T result);
void onFailure(RpcException e);
}

View File

@ -16,10 +16,23 @@
// under the License.
package org.apache.cloudstack.framework.messaging;
import java.util.concurrent.TimeUnit;
public interface RpcClientCall {
String getCommand();
Object getCommandArgument();
RpcCallContext getCallContext();
RpcClientCall setCommand(String cmd);
RpcClientCall setPipeline(String pipeline);
RpcClientCall setTimeout(TimeUnit timeout);
RpcClientCall setCommandArg(Object arg);
Object getCommandArg();
RpcClientCall setContextParam(String key, Object param);
Object getContextParam(String key);
<T> RpcClientCall addCallbackListener(RpcCallbackListener<T> listener);
void apply();
void cancel();
<T> T get();
}

View File

@ -20,5 +20,4 @@ package org.apache.cloudstack.framework.messaging;
public interface RpcEndpoint {
void onCallReceive(RpcServerCall call);
void onCallReturn(RpcClientCall call, Object returnObject, RpcException e);
}

View File

@ -24,7 +24,6 @@ public interface RpcProvider extends TransportMultiplexier {
void registerRpcEndpoint(RpcEndpoint rpcEndpoint);
void unregisteRpcEndpoint(RpcEndpoint rpcEndpoint);
String call(RpcEndpoint endpoint, RpcCallContext callContext, String targetAddress, String command, Object cmdArg);
RpcClientCall asyncCall(RpcEndpoint endpoint, RpcCallContext callContext, String targetAddress, String command, Object cmdArg);
RpcClientCall target(String target);
}

View File

@ -21,7 +21,6 @@ package org.apache.cloudstack.framework.messaging;
public interface RpcServerCall {
String getCommand();
Object getCommandArgument();
String getRequestTag();
// for receiver to response call
void completeCall(Object returnObject);

View File

@ -19,6 +19,8 @@
package org.apache.cloudstack.framework.messaging;
public interface TransportEndpoint {
String getEndpointAddress();
void onAttachConfirm(boolean bSuccess, String endpointAddress);
void onDetachIndication(String endpointAddress);

View File

@ -23,6 +23,12 @@ import org.apache.cloudstack.framework.messaging.TransportMultiplexier;
public class ClientTransportEndpoint implements TransportEndpoint {
@Override
public String getEndpointAddress() {
// ???
return "";
}
@Override
public void onAttachConfirm(boolean bSuccess, String endpointAddress) {
// TODO Auto-generated method stub