View Javadoc

1   /**
2   * Copyright (C) 2007 EDIT
3   * European Distributed Institute of Taxonomy 
4   * http://www.e-taxonomy.eu
5   * 
6   * The contents of this file are subject to the Mozilla Public License Version 1.1
7   * See LICENSE.TXT at the top of this package for the full license terms.
8   */
9   
10  package eu.etaxonomy.cdm.io.berlinModel.in;
11  
12  import java.sql.ResultSet;
13  import java.sql.SQLException;
14  import java.util.ArrayList;
15  import java.util.HashMap;
16  import java.util.HashSet;
17  import java.util.List;
18  import java.util.Map;
19  import java.util.Set;
20  import java.util.SortedSet;
21  import java.util.TreeSet;
22  import java.util.UUID;
23  
24  import org.apache.commons.lang.StringUtils;
25  import org.apache.log4j.Logger;
26  import org.springframework.stereotype.Component;
27  
28  import eu.etaxonomy.cdm.common.CdmUtils;
29  import eu.etaxonomy.cdm.io.berlinModel.in.validation.BerlinModelCommonNamesImportValidator;
30  import eu.etaxonomy.cdm.io.common.IOValidator;
31  import eu.etaxonomy.cdm.io.common.ResultSetPartitioner;
32  import eu.etaxonomy.cdm.io.common.Source;
33  import eu.etaxonomy.cdm.model.common.Annotation;
34  import eu.etaxonomy.cdm.model.common.AnnotationType;
35  import eu.etaxonomy.cdm.model.common.CdmBase;
36  import eu.etaxonomy.cdm.model.common.DescriptionElementSource;
37  import eu.etaxonomy.cdm.model.common.Extension;
38  import eu.etaxonomy.cdm.model.common.ExtensionType;
39  import eu.etaxonomy.cdm.model.common.Language;
40  import eu.etaxonomy.cdm.model.common.Representation;
41  import eu.etaxonomy.cdm.model.description.CommonTaxonName;
42  import eu.etaxonomy.cdm.model.description.TaxonDescription;
43  import eu.etaxonomy.cdm.model.location.NamedArea;
44  import eu.etaxonomy.cdm.model.location.TdwgArea;
45  import eu.etaxonomy.cdm.model.name.TaxonNameBase;
46  import eu.etaxonomy.cdm.model.reference.ReferenceBase;
47  import eu.etaxonomy.cdm.model.taxon.Taxon;
48  import eu.etaxonomy.cdm.model.taxon.TaxonBase;
49  
50  /**
51   * FIXME TO BE IMPLEMENTED (Common names)
52   * 
53   * @author a.mueller
54   * @created 20.03.2008
55   * @version 1.0
56   */
57  @Component
58  public class BerlinModelCommonNamesImport  extends BerlinModelImportBase {
59  	private static final Logger logger = Logger.getLogger(BerlinModelCommonNamesImport.class);
60  
61  	public static final UUID REFERENCE_LANGUAGE_ISO639_2_UUID = UUID.fromString("40c4f8dd-3d9c-44a4-b77a-76e137a89a5f");
62  	public static final UUID REFERENCE_LANGUAGE_STRING_UUID = UUID.fromString("2a1b678f-c27d-48c1-b43e-98fd0d426305");
63  	public static final UUID STATUS_ANNOTATION_UUID = UUID.fromString("e3f7b80a-1286-458d-812c-5e818f731968");
64  	
65  	public static final String NAMESPACE = "common name";
66  	
67  	
68  	private static int modCount = 10000;
69  	private static final String pluralString = "common names";
70  	private static final String dbTableName = "emCommonName";
71  
72  
73  	//map that stores the regions (named areas) and makes them accessible via the regionFk
74  	private Map<String, NamedArea> regionMap = new HashMap<String, NamedArea>();
75  
76  	
77  
78  	public BerlinModelCommonNamesImport(){
79  		super();
80  	}
81  	
82  	
83  
84  	@Override
85  	protected String getIdQuery() {
86  		String result = " SELECT CommonNameId FROM emCommonName ";
87  		return result;
88  	}
89  
90  
91  
92  	/* (non-Javadoc)
93  	 * @see eu.etaxonomy.cdm.io.berlinModel.in.BerlinModelImportBase#getRecordQuery(eu.etaxonomy.cdm.io.berlinModel.in.BerlinModelImportConfigurator)
94  	 */
95  	@Override
96  	protected String getRecordQuery(BerlinModelImportConfigurator config) {
97  		String recordQuery = "";
98  		recordQuery = 
99  			" SELECT emCommonName.CommonNameId, emCommonName.CommonName, PTaxon.RIdentifier AS taxonId, emCommonName.PTNameFk, emCommonName.RefFk AS refId, emCommonName.Status, " + 
100 				" emCommonName.RegionFks, emCommonName.MisNameRefFk, emCommonName.NameInSourceFk , emCommonName.Created_When, emCommonName.Updated_When, emCommonName.Created_Who, emCommonName.Updated_Who, emCommonName.Note as Notes," + 
101         		" regionLanguage.Language AS regionLanguage, languageCommonName.Language, languageCommonName.LanguageOriginal, languageCommonName.ISO639_1, languageCommonName.ISO639_2, " + 
102         		" emLanguageRegion.Region, emLanguageReference.RefFk as languageRefRefFk, emLanguageReference.ReferenceShort, " + 
103         		" emLanguageReference.ReferenceLong, emLanguageReference.LanguageFk, languageReferenceLanguage.Language AS refLanguage, " +
104         		" languageReferenceLanguage.ISO639_2 AS refLanguageIso639_2, regionLanguage.ISO639_2 AS regionLanguageIso, " +
105         		" misappliedTaxon.RIdentifier AS misappliedTaxonId " + 
106         	" FROM emLanguage as regionLanguage RIGHT OUTER JOIN " + 
107         		" emLanguageRegion ON regionLanguage.LanguageId = emLanguageRegion.LanguageFk RIGHT OUTER JOIN " +
108         		" emLanguage AS languageReferenceLanguage RIGHT OUTER JOIN " +
109         		" emLanguageReference ON languageReferenceLanguage.LanguageId = emLanguageReference.LanguageFk RIGHT OUTER JOIN " +
110         		" emCommonName INNER JOIN " +
111         		" PTaxon ON emCommonName.PTNameFk = PTaxon.PTNameFk AND emCommonName.PTRefFk = PTaxon.PTRefFk ON " + 
112         		" emLanguageReference.ReferenceId = emCommonName.LanguageRefFk LEFT OUTER JOIN " +
113         		" emLanguage AS languageCommonName ON emCommonName.LanguageFk = languageCommonName.LanguageId ON " + 
114         		" emLanguageRegion.RegionId = emCommonName.RegionFks LEFT OUTER JOIN " +
115         		" PTaxon as misappliedTaxon ON emCommonName.PTNameFk = misappliedTaxon.PTNameFk AND emCommonName.MisNameRefFk = misappliedTaxon.PTRefFk " + 
116 			" WHERE emCommonName.CommonNameId IN (" + ID_LIST_TOKEN + ")";
117 		return recordQuery;
118 	}
119 	
120 	
121 
122 	@Override
123 	protected boolean doInvoke(BerlinModelImportState state) {
124 		boolean result = true;
125 		try {
126 			result &= makeRegions(state);
127 		} catch (Exception e) {
128 			logger.error("Error when creating common name regions:" + e.getMessage());
129 			result = false;
130 		}
131 		result &= super.doInvoke(state);
132 		return result;
133 	}
134 	
135 	/**
136 	 * @param state 
137 	 * 
138 	 */
139 	private boolean makeRegions(BerlinModelImportState state) {
140 		boolean result = true;
141 		try {
142 			SortedSet<Integer> regionFks = new TreeSet<Integer>();
143 			Source source = state.getConfig().getSource();
144 			
145 			result = getRegionFks(result, regionFks, source);
146 			//concat filter string
147 			String sqlWhere = getSqlWhere(regionFks);
148 			
149 			//get E+M - TDWG Mapping
150 			Map<String, String> emTdwgMap = getEmTdwgMap(source);
151 			//fill regionMap
152 			fillRegionMap(source, sqlWhere, emTdwgMap);
153 			
154 			return result;
155 		} catch (NumberFormatException e) {
156 			e.printStackTrace();
157 			return false;
158 		} catch (SQLException e) {
159 			e.printStackTrace();
160 			return false;
161 		}
162 	}
163 
164 
165 	/* (non-Javadoc)
166 	 * @see eu.etaxonomy.cdm.io.berlinModel.in.IPartitionedIO#doPartition(eu.etaxonomy.cdm.io.berlinModel.in.ResultSetPartitioner, eu.etaxonomy.cdm.io.berlinModel.in.BerlinModelImportState)
167 	 */
168 	public boolean doPartition(ResultSetPartitioner partitioner, BerlinModelImportState state)  {
169 		boolean success = true ;
170 		BerlinModelImportConfigurator config = state.getConfig();
171 		Set<TaxonBase> taxaToSave = new HashSet<TaxonBase>();
172 		Map<String, Taxon> taxonMap = (Map<String, Taxon>) partitioner.getObjectMap(BerlinModelTaxonImport.NAMESPACE);
173 		Map<String, TaxonNameBase> taxonNameMap = (Map<String, TaxonNameBase>) partitioner.getObjectMap(BerlinModelTaxonNameImport.NAMESPACE);
174 		
175 		Map<String, ReferenceBase> biblioRefMap = (Map<String, ReferenceBase>) partitioner.getObjectMap(BerlinModelReferenceImport.BIBLIO_REFERENCE_NAMESPACE);
176 		Map<String, ReferenceBase> nomRefMap = (Map<String, ReferenceBase>) partitioner.getObjectMap(BerlinModelReferenceImport.NOM_REFERENCE_NAMESPACE);
177 		
178 		Map<String, Language> iso6392Map = new HashMap<String, Language>();
179 		
180 	//	logger.warn("MisappliedNameRefFk  not yet implemented for Common Names");
181 		
182 		ResultSet rs = partitioner.getResultSet();
183 		try{
184 			while (rs.next()){
185 
186 				//create TaxonName element
187 				Object commonNameId = rs.getObject("CommonNameId");
188 				int taxonId = rs.getInt("taxonId");
189 				Object refId = rs.getObject("refId");
190 				Object ptNameFk = rs.getObject("PTNameFk");
191 				String commonNameString = rs.getString("CommonName");
192 				String iso639_2 = rs.getString("ISO639_2");
193 				String iso639_1 = rs.getString("ISO639_1");
194 				String languageString = rs.getString("Language");
195 				String originalLanguageString = rs.getString("LanguageOriginal");
196 				Object misNameRefFk = rs.getObject("MisNameRefFk");
197 				Object languageRefRefFk = rs.getObject("languageRefRefFk");
198 				String refLanguage = rs.getString("refLanguage");
199 				String refLanguageIso639_2 = rs.getString("refLanguageIso639_2");
200 				String status = rs.getString("Status");
201 				Object nameInSourceFk = rs.getObject("NameInSourceFk");
202 				Object misappliedTaxonId = rs.getObject("misappliedTaxonId");
203 				
204 				//regions
205 				String region = rs.getString("Region");
206 				String regionFks  = rs.getString("RegionFks");
207 				String[] regionFkSplit = regionFks.split(",");
208 				
209 				//commonNameString
210 				if (CdmUtils.isEmpty(commonNameString)){
211 					String message = "CommonName is empty or null. Do not import record for taxon " + taxonId;
212 					logger.warn(message);
213 					continue;
214 				}
215 				
216 				//taxon
217 				Taxon taxon = null;
218 				TaxonBase taxonBase = null;
219 				taxonBase  = taxonMap.get(String.valueOf(taxonId));
220 				if (taxonBase == null){
221 					logger.warn("Taxon (" + taxonId + ") could not be found. Common name " + commonNameString + " not imported");
222 					continue;
223 				}else if (! taxonBase.isInstanceOf(Taxon.class)){
224 					logger.warn("taxon (" + taxonId + ") is not accepted. Can't import common name " +  commonNameId);
225 					continue;
226 				}else{
227 					taxon = CdmBase.deproxy(taxonBase, Taxon.class);
228 				}
229 				
230 				//Language
231 				Language language = getAndHandleLanguage(iso6392Map, iso639_2, iso639_1, languageString, originalLanguageString);
232 				
233 				//CommonTaxonName
234 				List<CommonTaxonName> commonTaxonNames = new ArrayList<CommonTaxonName>();
235 				for (String regionFk : regionFkSplit){ 
236 					CommonTaxonName commonTaxonName;
237 					if (commonTaxonNames.size() == 0){
238 						commonTaxonName = CommonTaxonName.NewInstance(commonNameString, language);
239 					}else{
240 						commonTaxonName = (CommonTaxonName)commonTaxonNames.get(0).clone();
241 					}
242 					commonTaxonNames.add(commonTaxonName);
243 					regionFk = regionFk.trim();
244 					NamedArea area = regionMap.get(regionFk);
245 					if (area == null){
246 						logger.warn("Area for " + regionFk + " not defined.");
247 					}else{
248 						commonTaxonName.setArea(area);
249 						TaxonDescription description = getDescription(taxon);
250 						description.addElement(commonTaxonName);
251 					}
252 				}
253 					
254 				//Reference/Source
255 				String strRefId = String.valueOf(refId);
256 				String languageRefFk = String.valueOf(languageRefRefFk);
257 				if (! CdmUtils.nullSafeEqual(strRefId, languageRefFk)){
258 					logger.warn("CommonName.RefFk (" + CdmUtils.Nz(strRefId) + ") and LanguageReference.RefFk " + CdmUtils.Nz(languageRefFk) + " are not equal. I will import only languageRefFk");
259 				}
260 						
261 				ReferenceBase reference = getReferenceOnlyFromMaps(biblioRefMap, nomRefMap, String.valueOf(languageRefRefFk));
262 				String microCitation = null;
263 				String originalNameString = null;
264 				
265 				TaxonNameBase nameUsedInSource = taxonNameMap.get(String.valueOf(nameInSourceFk));
266 				if (nameInSourceFk != null && nameUsedInSource == null){
267 					logger.warn("Name used in source (" + nameInSourceFk + ") was not found");
268 				}
269 				DescriptionElementSource source = DescriptionElementSource.NewInstance(reference, microCitation, nameUsedInSource, originalNameString);
270 				for (CommonTaxonName commonTaxonName : commonTaxonNames){
271 					commonTaxonName.addSource(source);
272 				}
273 				
274 				//MisNameRef
275 				if (misNameRefFk != null){
276 					//Taxon misappliedName = getMisappliedName(biblioRefMap, nomRefMap, misNameRefFk, taxon);
277 					Taxon misappliedName = null;
278 					if (misappliedTaxonId != null){
279 						misappliedName = taxonMap.get(String.valueOf(misappliedTaxonId));
280 					}else{
281 						TaxonNameBase taxonName = taxonNameMap.get(String.valueOf(ptNameFk));
282 						ReferenceBase sec = getReferenceOnlyFromMaps(biblioRefMap, nomRefMap, String.valueOf(misNameRefFk));
283 						if (taxonName == null || sec == null){
284 							logger.info("Taxon name or misapplied name reference is null for common name " + commonNameId);
285 						}else{
286 							misappliedName = Taxon.NewInstance(taxonName, sec);
287 							taxaToSave.add(misappliedName);
288 						}
289 					}
290 					if (misappliedName != null){
291 						taxon.addMisappliedName(misappliedName, config.getSourceReference(), null);
292 						TaxonDescription misappliedNameDescription = getDescription(misappliedName);
293 						try {
294 							for (CommonTaxonName commonTaxonName : commonTaxonNames){
295 								CommonTaxonName commonNameClone = (CommonTaxonName)commonTaxonName.clone();
296 								misappliedNameDescription.addElement(commonNameClone);
297 							}
298 						} catch (CloneNotSupportedException e) {
299 							e.printStackTrace();
300 						}	
301 					}else{
302 						logger.warn("Misapplied name is null for common name " + commonNameId);
303 					}
304 					
305 				}
306 				
307 				
308 				//reference extensions
309 				if (reference != null){
310 					if (CdmUtils.isNotEmpty(refLanguage)){
311 						ExtensionType refLanguageExtensionType = getExtensionType( state, REFERENCE_LANGUAGE_STRING_UUID, "reference language","The language of the reference","ref. lang.");
312 						Extension.NewInstance(reference, refLanguage, refLanguageExtensionType);
313 					}
314 					
315 					if (CdmUtils.isNotEmpty(refLanguageIso639_2)){
316 						ExtensionType refLanguageIsoExtensionType = getExtensionType( state, REFERENCE_LANGUAGE_ISO639_2_UUID, "reference language iso 639-2","The iso 639-2 code of the references language","ref. lang. 639-2");
317 						Extension.NewInstance(reference, refLanguageIso639_2, refLanguageIsoExtensionType);
318 					}
319 				}else if (CdmUtils.isNotEmpty(refLanguage) || CdmUtils.isNotEmpty(refLanguageIso639_2)){
320 					logger.warn("Reference is null (" + languageRefRefFk + ") but refLanguage (" + CdmUtils.Nz(refLanguage) + ") or iso639_2 (" + CdmUtils.Nz(refLanguageIso639_2) + ") was not null");
321 				}
322 				
323 				//status
324 				if (CdmUtils.isNotEmpty(status)){
325 					AnnotationType statusAnnotationType = getAnnotationType( state, STATUS_ANNOTATION_UUID, "status","The status of this object","status");
326 					Annotation annotation = Annotation.NewInstance(status, statusAnnotationType, Language.DEFAULT());
327 					for (CommonTaxonName commonTaxonName : commonTaxonNames){
328 						commonTaxonName.addAnnotation(annotation);
329 					}
330 					
331 				}
332 				
333 				//Notes
334 				for (CommonTaxonName commonTaxonName : commonTaxonNames){
335 					doIdCreatedUpdatedNotes(state, commonTaxonName, rs, String.valueOf(commonNameId), NAMESPACE);
336 				}
337 				partitioner.startDoSave();
338 				taxaToSave.add(taxon);
339 
340 			}
341 		} catch (SQLException e) {
342 			logger.error("SQLException:" +  e);
343 			return false;
344 		} catch (ClassCastException e) {
345 			e.printStackTrace();
346 		} catch (CloneNotSupportedException e) {
347 			logger.error("Clone not supported by CommonTaxonName");
348 			e.printStackTrace();
349 			return false;
350 		}
351 	
352 			
353 		//	logger.info( i + " names handled");
354 		getTaxonService().save(taxaToSave);
355 		return success;
356 
357 	}
358 
359 
360 
361 	/**
362 	 * Not used anymore. Use MisappliedName RIdentifier instead
363 	 * @param biblioRefMap
364 	 * @param nomRefMap
365 	 * @param misNameRefFk
366 	 * @param taxon
367 	 */
368 	private boolean isFirstMisappliedName = true;
369 	private Taxon getMisappliedName(Map<String, ReferenceBase> biblioRefMap, Map<String, ReferenceBase> nomRefMap, 
370 			Object misNameRefFk, Taxon taxon) {
371 		Taxon misappliedTaxon = null;
372 		ReferenceBase misNameRef = getReferenceOnlyFromMaps(biblioRefMap, nomRefMap, String.valueOf(misNameRefFk));
373 		misappliedTaxon = Taxon.NewInstance(taxon.getName(), misNameRef);
374 		Set<String> includeProperty = new HashSet<String>();
375 		try {
376 //			//IMatchStrategy matchStrategy = DefaultMatchStrategy.NewInstance(TaxonBase.class);
377 //			//List<TaxonBase> misappliedList1 = getCommonService().findMatching(misappliedTaxon, matchStrategy);
378 			List<TaxonBase> misappliedList = getTaxonService().list(misappliedTaxon, includeProperty, null, null, null, null);
379 			if (misappliedList.size() > 0){
380 				misappliedTaxon = CdmBase.deproxy(misappliedList.get(0), Taxon.class);
381 			}
382 		} catch (ClassCastException e) {
383 			logger.error(e.getMessage());
384 			if (isFirstMisappliedName){
385 				e.printStackTrace();
386 				isFirstMisappliedName = false;
387 			}
388 		}
389 		return misappliedTaxon;
390 	}
391 
392 
393 
394 	/**
395 	 * @param iso6392Map
396 	 * @param iso639_2
397 	 * @param languageString
398 	 * @param originalLanguageString
399 	 * @return
400 	 */
401 	private Language getAndHandleLanguage(Map<String, Language> iso639Map,	String iso639_2, String iso639_1, String languageString, String originalLanguageString) {
402 		Language language;
403 		if (CdmUtils.isNotEmpty(iso639_2)|| CdmUtils.isNotEmpty(iso639_1)  ){
404 			//TODO test performance, implement in state
405 			language = getLanguageFromIsoMap(iso639Map, iso639_2, iso639_1);
406 			
407 			if (language == null){
408 				language = getTermService().getLanguageByIso(iso639_2);
409 				iso639Map.put(iso639_2, language);
410 				if (language == null){
411 					language = getTermService().getLanguageByIso(iso639_1);
412 					iso639Map.put(iso639_1, language);
413 				}
414 				if (language == null){
415 					logger.warn("Language for code ISO693-2 '" + iso639_2 + "' and ISO693-1 '" + iso639_1 + "' was not found");
416 				}
417 			}
418 		}else{
419 			logger.warn("language ISO 639_1 and ISO 639_2 were empty for " + languageString);
420 			language = null;
421 		}
422 		addOriginalLanguage(language, originalLanguageString);
423 		return language;
424 	}
425 
426 
427 	/**
428 	 * @param iso639Map
429 	 * @param iso639_2
430 	 * @param iso639_1
431 	 * @return
432 	 */
433 	private Language getLanguageFromIsoMap(Map<String, Language> iso639Map,
434 			String iso639_2, String iso639_1) {
435 		Language language;
436 		language = iso639Map.get(iso639_2);
437 		if (language == null){
438 			language = iso639Map.get(iso639_1);
439 		}
440 		return language;
441 	}
442 
443 	/**
444 	 * @param language
445 	 * @param originalLanguageString
446 	 */
447 	private void addOriginalLanguage(Language language,	String originalLanguageString) {
448 		if (CdmUtils.isEmpty(originalLanguageString)){
449 			return;
450 		}else if (language == null){
451 			logger.warn("Language could not be defined, but originalLanguageString exists: " + originalLanguageString);
452 		}else {
453 			Representation representation = language.getRepresentation(language);
454 			if (representation == null){
455 				language.addRepresentation(Representation.NewInstance(originalLanguageString, originalLanguageString, originalLanguageString, language));
456 				getTermService().saveOrUpdate(language);
457 			}
458 		}
459 		
460 	}
461 	
462 
463 
464 	/**
465 	 * @param result
466 	 * @param regionFks
467 	 * @param source
468 	 * @return
469 	 * @throws SQLException
470 	 */
471 	private boolean getRegionFks(boolean result, SortedSet<Integer> regionFks,
472 			Source source) throws SQLException {
473 		String sql = " SELECT DISTINCT RegionFks FROM emCommonName";
474 		ResultSet rs = source.getResultSet(sql);
475 		while (rs.next()){
476 			String strRegionFks = rs.getString("RegionFks"); 
477 			String[] regionFkArray = strRegionFks.split(",");
478 			for (String regionFk: regionFkArray){
479 				regionFk = regionFk.trim();
480 				if (! StringUtils.isNumeric(regionFk)){
481 					result = false;
482 					logger.warn("RegionFk is not numeric: " + regionFk);
483 				}else{
484 					regionFks.add(Integer.valueOf(regionFk));
485 				}
486 			}
487 		}
488 		return result;
489 	}
490 
491 
492 
493 	/**
494 	 * @param source
495 	 * @param sqlWhere
496 	 * @param emTdwgMap
497 	 * @throws SQLException
498 	 */
499 	private void fillRegionMap(Source source, String sqlWhere,
500 			Map<String, String> emTdwgMap) throws SQLException {
501 		String sql;
502 		ResultSet rs;
503 		sql = " SELECT RegionId, Region FROM emLanguageRegion WHERE RegionId IN ("+ sqlWhere+ ") ";
504 		rs = source.getResultSet(sql);
505 		while (rs.next()){
506 			Object regionId = rs.getObject("RegionId");
507 			String region = rs.getString("Region");
508 			String[] splitRegion = region.split("-");
509 			if (splitRegion.length <= 1){
510 				NamedArea newArea = NamedArea.NewInstance(region, region, null);
511 				getTermService().save(newArea);
512 				regionMap.put(String.valueOf(regionId), newArea);
513 				logger.warn("Found new area: " +  region);
514 			}else if (splitRegion.length == 2){
515 				String emCode = splitRegion[1].trim();
516 				String tdwgCode = emTdwgMap.get(emCode);
517 				if (StringUtils.isNotBlank(tdwgCode) ){
518 					NamedArea tdwgArea = getNamedArea(tdwgCode);
519 					regionMap.put(String.valueOf(regionId), tdwgArea);
520 				}else{
521 					logger.warn("emCode did not map to valid tdwgCode: " +  CdmUtils.Nz(emCode) + "->" + CdmUtils.Nz(tdwgCode));
522 				}
523 			}
524 		}
525 	}
526 
527 
528 	/**
529 	 * @param tdwgCode
530 	 */
531 	private NamedArea getNamedArea(String tdwgCode) {
532 		NamedArea area;
533 		if (tdwgCode.equalsIgnoreCase("Ab")){
534 			area = NamedArea.NewInstance("Azerbaijan (including Nakhichevan)", "Azerbaijan & Nakhichevan", "Ab");
535 			getTermService().save(area);
536 		}else if (tdwgCode.equalsIgnoreCase("Rf")){
537 			area = NamedArea.NewInstance("The Russian Federation", "The Russian Federation", "Rf");
538 			getTermService().save(area);
539 		}else if (tdwgCode.equalsIgnoreCase("Uk")){
540 			area = NamedArea.NewInstance("Ukraine (including Crimea)", "Ukraine & Crimea", "Uk");
541 			getTermService().save(area);
542 		}else{
543 			area = TdwgArea.getAreaByTdwgAbbreviation(tdwgCode);
544 		}
545 		return area;
546 	}
547 
548 
549 
550 	/**
551 	 * @param regionFks
552 	 * @return
553 	 */
554 	private String getSqlWhere(SortedSet<Integer> regionFks) {
555 		String sqlWhere = "";
556 		for (Integer regionFk : regionFks){
557 			sqlWhere += regionFk + ","; 
558 		}
559 		sqlWhere = sqlWhere.substring(0, sqlWhere.length()-1);
560 		return sqlWhere;
561 	}
562 
563 
564 
565 	/**
566 	 * @param source
567 	 * @throws SQLException
568 	 */
569 	private Map<String, String> getEmTdwgMap(Source source) throws SQLException {
570 		String sql;
571 		ResultSet rs;
572 		Map<String, String> emTdwgMap = new HashMap<String, String>();
573 		sql = " SELECT EmCode, TDWGCode FROM emArea ";
574 		rs = source.getResultSet(sql);
575 		while (rs.next()){
576 			String emCode = rs.getString("EMCode");
577 			String TDWGCode = rs.getString("TDWGCode");
578 			if (StringUtils.isNotBlank(emCode) ){
579 				emCode = emCode.trim();
580 				if (emCode.equalsIgnoreCase("Ab") || emCode.equalsIgnoreCase("Rf")|| emCode.equalsIgnoreCase("Uk") ){
581 					emTdwgMap.put(emCode, emCode);
582 				}else if (StringUtils.isNotBlank(TDWGCode)){
583 					emTdwgMap.put(emCode, TDWGCode.trim());
584 				}
585 			}
586 		}
587 		return emTdwgMap;
588 	}
589 
590 
591 
592 
593 	/**
594 	 * Returns the first non-image gallery description. Creates a new one if no description exists.
595 	 * @param taxon
596 	 * @return
597 	 */
598 	private TaxonDescription getDescription(Taxon taxon) {
599 		TaxonDescription result = null;
600 		for (TaxonDescription taxonDescription : taxon.getDescriptions()){
601 			if (! taxonDescription.isImageGallery()){
602 				result = taxonDescription;
603 			}
604 		}
605 		if (result == null){
606 			result = TaxonDescription.NewInstance(taxon);
607 		}
608 		return result;
609 	}
610 
611 	/* (non-Javadoc)
612 	 * @see eu.etaxonomy.cdm.io.berlinModel.in.IPartitionedIO#getRelatedObjectsForPartition(java.sql.ResultSet)
613 	 */
614 	public Map<Object, Map<String, ? extends CdmBase>> getRelatedObjectsForPartition(ResultSet rs) {
615 		String nameSpace;
616 		Class cdmClass;
617 		Set<String> idSet;
618 		Map<Object, Map<String, ? extends CdmBase>> result = new HashMap<Object, Map<String, ? extends CdmBase>>();
619 		
620 		try{
621 			Set<String> taxonIdSet = new HashSet<String>();
622 			Set<String> nameIdSet = new HashSet<String>();
623 			Set<String> referenceIdSet = new HashSet<String>();
624 			while (rs.next()){
625 				handleForeignKey(rs, taxonIdSet, "taxonId");
626 				handleForeignKey(rs, taxonIdSet, "misappliedTaxonId");
627 				handleForeignKey(rs, referenceIdSet, "refId");
628 				handleForeignKey(rs, referenceIdSet, "languageRefRefFk");
629 				handleForeignKey(rs, nameIdSet, "NameInSourceFk");
630 				handleForeignKey(rs, nameIdSet, "PTNameFk");
631 				handleForeignKey(rs, referenceIdSet, "MisNameRefFk");
632 			}
633 			
634 			//name map
635 			nameSpace = BerlinModelTaxonNameImport.NAMESPACE;
636 			cdmClass = TaxonNameBase.class;
637 			idSet = nameIdSet;
638 			Map<String, TaxonNameBase> nameMap = (Map<String, TaxonNameBase>)getCommonService().getSourcedObjectsByIdInSource(cdmClass, idSet, nameSpace);
639 			result.put(nameSpace, nameMap);
640 
641 			//name map
642 			nameSpace = BerlinModelTaxonImport.NAMESPACE;
643 			cdmClass = Taxon.class;
644 			idSet = taxonIdSet;
645 			Map<String, TaxonNameBase> taxonMap = (Map<String, TaxonNameBase>)getCommonService().getSourcedObjectsByIdInSource(cdmClass, idSet, nameSpace);
646 			result.put(nameSpace, taxonMap);
647 
648 			//nom reference map
649 			nameSpace = BerlinModelReferenceImport.NOM_REFERENCE_NAMESPACE;
650 			cdmClass = ReferenceBase.class;
651 			idSet = referenceIdSet;
652 			Map<String, ReferenceBase> nomReferenceMap = (Map<String, ReferenceBase>)getCommonService().getSourcedObjectsByIdInSource(cdmClass, idSet, nameSpace);
653 			result.put(nameSpace, nomReferenceMap);
654 
655 			//biblio reference map
656 			nameSpace = BerlinModelReferenceImport.BIBLIO_REFERENCE_NAMESPACE;
657 			cdmClass = ReferenceBase.class;
658 			idSet = referenceIdSet;
659 			Map<String, ReferenceBase> biblioReferenceMap = (Map<String, ReferenceBase>)getCommonService().getSourcedObjectsByIdInSource(cdmClass, idSet, nameSpace);
660 			result.put(nameSpace, biblioReferenceMap);
661 
662 		} catch (SQLException e) {
663 			throw new RuntimeException(e);
664 		} catch (NullPointerException nep){
665 			logger.error("NullPointerException in getRelatedObjectsForPartition()");
666 		}
667 		return result;
668 	}
669 		
670 
671 
672 	/* (non-Javadoc)
673 	 * @see eu.etaxonomy.cdm.io.common.CdmIoBase#doCheck(eu.etaxonomy.cdm.io.common.IoStateBase)
674 	 */
675 	@Override
676 	protected boolean doCheck(BerlinModelImportState state){
677 		IOValidator<BerlinModelImportState> validator = new BerlinModelCommonNamesImportValidator();
678 		return validator.validate(state);
679 	}
680 						
681 						
682 	/* (non-Javadoc)
683 	 * @see eu.etaxonomy.cdm.io.berlinModel.in.BerlinModelImportBase#getTableName()
684 	 */
685 	@Override
686 	protected String getTableName() {
687 		return dbTableName;
688 	}
689 							
690 	/* (non-Javadoc)
691 	 * @see eu.etaxonomy.cdm.io.berlinModel.in.BerlinModelImportBase#getPluralString()
692 	 */
693 	@Override
694 	public String getPluralString() {
695 		return pluralString;
696 	}
697 
698 	/* (non-Javadoc)
699 	 * @see eu.etaxonomy.cdm.io.common.CdmIoBase#isIgnore(eu.etaxonomy.cdm.io.common.IImportConfigurator)
700 	 */
701 	protected boolean isIgnore(BerlinModelImportState state){
702 		return ! state.getConfig().isDoCommonNames();
703 	}
704 
705 }