Finish RPC service server side implementation

This commit is contained in:
Kelven Yang 2012-11-28 18:03:20 -08:00
parent 1d75063217
commit fc16e1ea1a
4 changed files with 136 additions and 14 deletions

View File

@ -32,6 +32,8 @@ public interface RpcClientCall {
Object getContextParam(String key);
<T> RpcClientCall addCallbackListener(RpcCallbackListener<T> listener);
RpcClientCall setCallbackDispatcherTarget(Object target);
RpcClientCall setOneway();
void apply();

View File

@ -33,7 +33,8 @@ public class RpcClientCallImpl implements RpcClientCall {
private boolean _oneway = false;
private List<RpcCallbackListener> _callbackListeners = new ArrayList<RpcCallbackListener>();
private Object _callbackDispatcherTarget;
private RpcProvider _rpcProvider;
private long _startTickInMs;
private long _callTag;
@ -95,6 +96,13 @@ public class RpcClientCallImpl implements RpcClientCall {
_callbackListeners.add(listener);
return this;
}
@Override
public RpcClientCall setCallbackDispatcherTarget(Object target) {
_callbackDispatcherTarget = target;
return this;
}
@Override
public RpcClientCall setOneway() {
@ -189,11 +197,16 @@ public class RpcClientCallImpl implements RpcClientCall {
_responseDone = true;
_responseLock.notifyAll();
}
assert(_rpcProvider.getMessageSerializer() != null);
Object resultObject = _rpcProvider.getMessageSerializer().serializeFrom(result);
for(RpcCallbackListener listener: _callbackListeners)
listener.onSuccess(resultObject);
if(_callbackListeners.size() > 0) {
assert(_rpcProvider.getMessageSerializer() != null);
Object resultObject = _rpcProvider.getMessageSerializer().serializeFrom(result);
for(RpcCallbackListener listener: _callbackListeners)
listener.onSuccess(resultObject);
} else {
if(_callbackDispatcherTarget != null)
RpcCallbackDispatcher.dispatch(_callbackDispatcherTarget, this);
}
}
public void complete(RpcException e) {
@ -205,7 +218,12 @@ public class RpcClientCallImpl implements RpcClientCall {
_responseLock.notifyAll();
}
for(RpcCallbackListener listener: _callbackListeners)
listener.onFailure(e);
if(_callbackListeners.size() > 0) {
for(RpcCallbackListener listener: _callbackListeners)
listener.onFailure(e);
} else {
if(_callbackDispatcherTarget != null)
RpcCallbackDispatcher.dispatch(_callbackDispatcherTarget, this);
}
}
}

View File

@ -50,9 +50,9 @@ public class RpcProviderImpl implements RpcProvider {
Object pdu = _messageSerializer.serializeFrom(message);
if(pdu instanceof RpcCallRequestPdu) {
handleCallRequestPdu((RpcCallRequestPdu)pdu);
handleCallRequestPdu(senderEndpointAddress, targetEndpointAddress, (RpcCallRequestPdu)pdu);
} else if(pdu instanceof RpcCallResponsePdu) {
handleCallResponsePdu((RpcCallResponsePdu)pdu);
handleCallResponsePdu(senderEndpointAddress, targetEndpointAddress, (RpcCallResponsePdu)pdu);
} else {
assert(false);
}
@ -81,7 +81,7 @@ public class RpcProviderImpl implements RpcProvider {
_serviceEndpoints.remove(rpcEndpoint);
}
}
@Override
public RpcClientCall newCall(String sourceAddress, String targetAddress) {
long callTag = getNextCallTag();
@ -124,11 +124,42 @@ public class RpcProviderImpl implements RpcProvider {
return tag;
}
private void handleCallRequestPdu(RpcCallRequestPdu pdu) {
// ???
private void handleCallRequestPdu(String sourceAddress, String targetAddress, RpcCallRequestPdu pdu) {
try {
RpcServerCall call = new RpcServerCallImpl(this, sourceAddress, targetAddress, pdu);
// TODO, we are trying to avoid locking when calling into callbacks
// this can be optimized later
List<RpcServiceEndpoint> endpoints = new ArrayList<RpcServiceEndpoint>();
synchronized(_serviceEndpoints) {
endpoints.addAll(_serviceEndpoints);
}
for(RpcServiceEndpoint endpoint : endpoints) {
if(RpcServiceDispatcher.dispatch(endpoint, call))
return;
}
RpcCallResponsePdu responsePdu = new RpcCallResponsePdu();
responsePdu.setCommand(pdu.getCommand());
responsePdu.setRequestStartTick(pdu.getRequestStartTick());
responsePdu.setRequestTag(pdu.getRequestTag());
responsePdu.setResult(RpcCallResponsePdu.RESULT_HANDLER_NOT_EXIST);
sendRpcPdu(targetAddress, sourceAddress, _messageSerializer.serializeTo(RpcCallResponsePdu.class, responsePdu));
} catch (Throwable e) {
RpcCallResponsePdu responsePdu = new RpcCallResponsePdu();
responsePdu.setCommand(pdu.getCommand());
responsePdu.setRequestStartTick(pdu.getRequestStartTick());
responsePdu.setRequestTag(pdu.getRequestTag());
responsePdu.setResult(RpcCallResponsePdu.RESULT_HANDLER_EXCEPTION);
sendRpcPdu(targetAddress, sourceAddress, _messageSerializer.serializeTo(RpcCallResponsePdu.class, responsePdu));
}
}
private void handleCallResponsePdu(RpcCallResponsePdu pdu) {
private void handleCallResponsePdu(String sourceAddress, String targetAddress, RpcCallResponsePdu pdu) {
RpcClientCallImpl call = null;
synchronized(this) {

View File

@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cloudstack.framework.messaging;
public class RpcServerCallImpl implements RpcServerCall {
private RpcProvider _rpcProvider;
private String _sourceAddress;
private String _targetAddress;
private RpcCallRequestPdu _requestPdu;
public RpcServerCallImpl(RpcProvider provider, String sourceAddress, String targetAddress,
RpcCallRequestPdu requestPdu) {
_rpcProvider = provider;
_sourceAddress = sourceAddress;
_targetAddress = targetAddress;
_requestPdu = requestPdu;
}
@Override
public String getCommand() {
assert(_requestPdu != null);
return _requestPdu.getCommand();
}
@Override
public Object getCommandArgument() {
if(_requestPdu.getSerializedCommandArg() == null)
return null;
assert(_rpcProvider.getMessageSerializer() != null);
return _rpcProvider.getMessageSerializer().serializeFrom(_requestPdu.getSerializedCommandArg());
}
@Override
public void completeCall(Object returnObject) {
assert(_sourceAddress != null);
assert(_targetAddress != null);
RpcCallResponsePdu pdu = new RpcCallResponsePdu();
pdu.setCommand(_requestPdu.getCommand());
pdu.setRequestTag(_requestPdu.getRequestTag());
pdu.setRequestStartTick(_requestPdu.getRequestStartTick());
pdu.setRequestStartTick(RpcCallResponsePdu.RESULT_SUCCESSFUL);
if(returnObject != null) {
assert(_rpcProvider.getMessageSerializer() != null);
pdu.setSerializedResult(_rpcProvider.getMessageSerializer().serializeTo(returnObject.getClass(), returnObject));
}
_rpcProvider.sendRpcPdu(_targetAddress, _sourceAddress,
_rpcProvider.getMessageSerializer().serializeTo(RpcCallResponsePdu.class, pdu));
}
}