Add more RPC implementation code

This commit is contained in:
Kelven Yang 2012-11-21 17:10:47 -08:00
parent 550f4fbcdc
commit d4cb74c77b
14 changed files with 607 additions and 83 deletions

View File

@ -22,10 +22,24 @@
<artifactId>cloud-framework-ipc</artifactId>
<version>4.1.0-SNAPSHOT</version>
<dependencies>
<!--
<dependency>
<groupId>org.hornetq</groupId>
<artifactId>hornetq-core-client</artifactId>
<version>snap-r9548</version>
</dependency>
-->
<dependency>
<groupId>org.apache.cloudstack</groupId>
<artifactId>cloud-core</artifactId>
<version>4.1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.cloudstack</groupId>
<artifactId>cloud-utils</artifactId>
<version>4.1.0-SNAPSHOT</version>
</dependency>
</dependencies>
</project>

View File

@ -18,7 +18,10 @@
*/
package org.apache.cloudstack.framework.messaging;
import org.apache.log4j.Logger;
public class ComponentEndpoint implements RpcServiceEndpoint, Subscriber {
private static final Logger s_logger = Logger.getLogger(ComponentEndpoint.class);
private TransportEndpoint transportEndpoint;
private RpcProvider rpcProvider;
@ -56,6 +59,7 @@ public class ComponentEndpoint implements RpcServiceEndpoint, Subscriber {
try {
EventDispatcher.dispatch(this, subject, senderAddress, args);
} catch(RuntimeException e) {
s_logger.error("Unhandled exception", e);
}
}
}

View File

@ -1,19 +1,21 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cloudstack.framework.messaging;

View File

@ -1,19 +1,21 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cloudstack.framework.messaging;

View File

@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cloudstack.framework.messaging;
import com.google.gson.Gson;
public class JsonMessageSerializer implements MessageSerializer {
// this will be injected from external to allow installation of
// type adapters need by upper layer applications
private Gson _gson;
private OnwireClassRegistry _clzRegistry;
public JsonMessageSerializer() {
}
public Gson getGson() {
return _gson;
}
public void setGson(Gson gson) {
_gson = gson;
}
public OnwireClassRegistry getOnwireClassRegistry() {
return _clzRegistry;
}
public void setOnwireClassRegistry(OnwireClassRegistry clzRegistry) {
_clzRegistry = clzRegistry;
}
@Override
public <T> String serializeTo(Class<?> clz, T object) {
assert(clz != null);
assert(object != null);
StringBuffer sbuf = new StringBuffer();
OnwireName onwire = clz.getAnnotation(OnwireName.class);
if(onwire == null)
throw new RuntimeException("Class " + clz.getCanonicalName() + " is not declared to be onwire");
sbuf.append(onwire.name()).append("|");
sbuf.append(_gson.toJson(object));
return sbuf.toString();
}
@Override
public <T> T serializeFrom(String message) {
assert(message != null);
int contentStartPos = message.indexOf('|');
if(contentStartPos < 0)
throw new RuntimeException("Invalid on-wire message format");
String onwireName = message.substring(0, contentStartPos);
Class<?> clz = _clzRegistry.getOnwireClass(onwireName);
if(clz == null)
throw new RuntimeException("Onwire class is not registered. name: " + onwireName);
return (T)_gson.fromJson(message.substring(contentStartPos + 1), clz);
}
}

View File

@ -0,0 +1,164 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cloudstack.framework.messaging;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.jar.JarEntry;
import java.util.jar.JarInputStream;
import org.springframework.stereotype.Component;
//
// Finding classes in a given package code is taken and modified from
// Credit: http://internna.blogspot.com/2007/11/java-5-retrieving-all-classes-from.html
//
@Component
public class OnwireClassRegistry {
private List<String> packages = new ArrayList<String>();
private Map<String, Class<?>> registry = new HashMap<String, Class<?>>();
public OnwireClassRegistry() {
}
public void addPackage(String packageName) {
packages.add(packageName);
}
public void scan() {
Set<Class<?>> classes = new HashSet<Class<?>>();
for(String pkg : packages) {
classes.addAll(getClasses(pkg));
}
for(Class<?> clz : classes) {
OnwireName onwire = clz.getAnnotation(OnwireName.class);
assert(onwire.name() != null);
registry.put(onwire.name(), clz);
}
}
public Class<?> getOnwireClass(String onwireName) {
return registry.get(onwireName);
}
static Set<Class<?>> getClasses(String packageName) {
ClassLoader loader = Thread.currentThread().getContextClassLoader();
return getClasses(loader, packageName);
}
//
// Following helper methods can be put in a separated helper class,
// will do that later
//
static Set<Class<?>> getClasses(ClassLoader loader, String packageName) {
Set<Class<?>> classes = new HashSet<Class<?>>();
String path = packageName.replace('.', '/');
try {
Enumeration<URL> resources = loader.getResources(path);
if (resources != null) {
while (resources.hasMoreElements()) {
String filePath = resources.nextElement().getFile();
// WINDOWS HACK
if(filePath.indexOf("%20") > 0)
filePath = filePath.replaceAll("%20", " ");
if (filePath != null) {
if ((filePath.indexOf("!") > 0) && (filePath.indexOf(".jar") > 0)) {
String jarPath = filePath.substring(0, filePath.indexOf("!"))
.substring(filePath.indexOf(":") + 1);
// WINDOWS HACK
if (jarPath.indexOf(":") >= 0) jarPath = jarPath.substring(1);
classes.addAll(getFromJARFile(jarPath, path));
} else {
classes.addAll(getFromDirectory(new File(filePath), packageName));
}
}
}
}
} catch(IOException e) {
} catch(ClassNotFoundException e) {
}
return classes;
}
static Set<Class<?>> getFromDirectory(File directory, String packageName) throws ClassNotFoundException {
Set<Class<?>> classes = new HashSet<Class<?>>();
if (directory.exists()) {
for (String file : directory.list()) {
if (file.endsWith(".class")) {
String name = packageName + '.' + stripFilenameExtension(file);
try {
Class<?> clazz = Class.forName(name);
classes.add(clazz);
} catch(ClassNotFoundException e) {
} catch(Exception e) {
}
} else {
File f = new File(directory.getPath() + "/" + file);
if(f.isDirectory()) {
classes.addAll(getFromDirectory(f, packageName + "." + file));
}
}
}
}
return classes;
}
static Set<Class<?>> getFromJARFile(String jar, String packageName) throws IOException, ClassNotFoundException {
Set<Class<?>> classes = new HashSet<Class<?>>();
JarInputStream jarFile = new JarInputStream(new FileInputStream(jar));
JarEntry jarEntry;
do {
jarEntry = jarFile.getNextJarEntry();
if (jarEntry != null) {
String className = jarEntry.getName();
if (className.endsWith(".class")) {
className = stripFilenameExtension(className);
if (className.startsWith(packageName)) {
try {
Class<?> clz = Class.forName(className.replace('/', '.'));
classes.add(clz);
} catch(ClassNotFoundException e) {
} catch(NoClassDefFoundError e) {
}
}
}
}
} while (jarEntry != null);
return classes;
}
static String stripFilenameExtension(String file) {
return file.substring(0, file.lastIndexOf('.'));
}
}

View File

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cloudstack.framework.messaging;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface OnwireName {
String name();
}

View File

@ -1,19 +1,21 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cloudstack.framework.messaging;

View File

@ -1,27 +1,27 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cloudstack.framework.messaging;
import java.util.concurrent.TimeUnit;
public interface RpcClientCall {
String getCommand();
RpcClientCall setCommand(String cmd);
RpcClientCall setTimeout(TimeUnit timeout);
RpcClientCall setTimeout(int timeoutMilliseconds);
RpcClientCall setCommandArg(Object arg);
Object getCommandArg();
@ -30,6 +30,7 @@ public interface RpcClientCall {
Object getContextParam(String key);
<T> RpcClientCall addCallbackListener(RpcCallbackListener<T> listener);
RpcClientCall setOneway();
void apply();
void cancel();

View File

@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cloudstack.framework.messaging;
import java.util.HashMap;
import java.util.Map;
public class RpcClientCallImpl implements RpcClientCall {
private String _command;
private Object _commandArg;
private int _timeoutMilliseconds;
private Map<String, Object> _contextParams = new HashMap<String, Object>();
public RpcClientCallImpl() {
}
@Override
public String getCommand() {
// TODO Auto-generated method stub
return null;
}
@Override
public RpcClientCall setCommand(String cmd) {
_command = cmd;
return this;
}
@Override
public RpcClientCall setTimeout(int timeoutMilliseconds) {
_timeoutMilliseconds = timeoutMilliseconds;
return this;
}
@Override
public RpcClientCall setCommandArg(Object arg) {
_commandArg = arg;
return this;
}
@Override
public Object getCommandArg() {
return _commandArg;
}
@Override
public RpcClientCall setContextParam(String key, Object param) {
assert(key != null);
_contextParams.put(key, param);
return this;
}
@Override
public Object getContextParam(String key) {
// TODO Auto-generated method stub
return null;
}
@Override
public <T> RpcClientCall addCallbackListener(RpcCallbackListener<T> listener) {
// TODO Auto-generated method stub
return null;
}
@Override
public RpcClientCall setOneway() {
// TODO Auto-generated method stub
return null;
}
@Override
public void apply() {
// TODO Auto-generated method stub
}
@Override
public void cancel() {
// TODO Auto-generated method stub
}
@Override
public <T> T get() {
// TODO Auto-generated method stub
return null;
}
}

View File

@ -1,19 +1,21 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cloudstack.framework.messaging;
public class RpcException extends RuntimeException {

View File

@ -1,3 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cloudstack.framework.messaging;
public class RpcIOException extends RpcException {

View File

@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cloudstack.framework.messaging;
import java.util.ArrayList;
import java.util.List;
public class RpcProviderImpl implements RpcProvider {
private MessageSerializer _messageSerializer;
private List<RpcServiceEndpoint> _serviceEndpoints = new ArrayList<RpcServiceEndpoint>();
private TransportProvider _transportProvider;
public RpcProviderImpl() {
}
public TransportProvider getTransportProvider() {
return _transportProvider;
}
public void setTransportProvider(TransportProvider transportProvider) {
_transportProvider = transportProvider;
}
@Override
public void onTransportMessage(String senderEndpointAddress,
String targetEndpointAddress, String multiplexer, String message) {
// TODO Auto-generated method stub
}
@Override
public void setMessageSerializer(MessageSerializer messageSerializer) {
_messageSerializer = messageSerializer;
}
@Override
public MessageSerializer getMessageSerializer() {
return _messageSerializer;
}
@Override
public void registerRpcServiceEndpoint(RpcServiceEndpoint rpcEndpoint) {
synchronized(_serviceEndpoints) {
_serviceEndpoints.add(rpcEndpoint);
}
}
@Override
public void unregisteRpcServiceEndpoint(RpcServiceEndpoint rpcEndpoint) {
synchronized(_serviceEndpoints) {
_serviceEndpoints.remove(rpcEndpoint);
}
}
@Override
public RpcClientCall target(String target) {
// TODO Auto-generated method stub
return null;
}
}

View File

@ -1,3 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cloudstack.framework.messaging;
public class RpcTimeoutException extends RpcException {