1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.core.polling;
21
22 import java.io.IOException;
23 import java.net.PortUnreachableException;
24 import java.nio.channels.ClosedSelectorException;
25 import java.util.ArrayList;
26 import java.util.Iterator;
27 import java.util.List;
28 import java.util.Queue;
29 import java.util.concurrent.ConcurrentHashMap;
30 import java.util.concurrent.ConcurrentLinkedQueue;
31 import java.util.concurrent.Executor;
32 import java.util.concurrent.atomic.AtomicBoolean;
33 import java.util.concurrent.atomic.AtomicInteger;
34 import java.util.concurrent.atomic.AtomicReference;
35
36 import org.apache.mina.core.buffer.IoBuffer;
37 import org.apache.mina.core.file.FileRegion;
38 import org.apache.mina.core.filterchain.IoFilterChain;
39 import org.apache.mina.core.filterchain.IoFilterChainBuilder;
40 import org.apache.mina.core.future.DefaultIoFuture;
41 import org.apache.mina.core.service.AbstractIoService;
42 import org.apache.mina.core.service.IoProcessor;
43 import org.apache.mina.core.service.IoServiceListenerSupport;
44 import org.apache.mina.core.session.AbstractIoSession;
45 import org.apache.mina.core.session.IoSession;
46 import org.apache.mina.core.session.IoSessionConfig;
47 import org.apache.mina.core.session.SessionState;
48 import org.apache.mina.core.write.WriteRequest;
49 import org.apache.mina.core.write.WriteRequestQueue;
50 import org.apache.mina.core.write.WriteToClosedSessionException;
51 import org.apache.mina.transport.socket.AbstractDatagramSessionConfig;
52 import org.apache.mina.util.ExceptionMonitor;
53 import org.apache.mina.util.NamePreservingRunnable;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
56
57
58
59
60
61
62
63
64
65
66
67 public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> implements IoProcessor<S> {
68
69 private final static Logger LOG = LoggerFactory.getLogger(IoProcessor.class);
70
71
72
73
74
75
76
77 private static final int WRITE_SPIN_COUNT = 256;
78
79
80
81
82
83 private static final long SELECT_TIMEOUT = 1000L;
84
85
86 private static final ConcurrentHashMap<Class<?>, AtomicInteger> threadIds = new ConcurrentHashMap<Class<?>, AtomicInteger>();
87
88
89 private final String threadName;
90
91
92 private final Executor executor;
93
94
95 private final Queue<S> newSessions = new ConcurrentLinkedQueue<S>();
96
97
98 private final Queue<S> removingSessions = new ConcurrentLinkedQueue<S>();
99
100
101 private final Queue<S> flushingSessions = new ConcurrentLinkedQueue<S>();
102
103
104
105
106
107 private final Queue<S> trafficControllingSessions = new ConcurrentLinkedQueue<S>();
108
109
110 private final AtomicReference<Processor> processorRef = new AtomicReference<Processor>();
111
112 private long lastIdleCheckTime;
113
114 private final Object disposalLock = new Object();
115
116 private volatile boolean disposing;
117
118 private volatile boolean disposed;
119
120 private final DefaultIoFuture disposalFuture = new DefaultIoFuture(null);
121
122 protected AtomicBoolean wakeupCalled = new AtomicBoolean(false);
123
124
125
126
127
128
129
130
131 protected AbstractPollingIoProcessor(Executor executor) {
132 if (executor == null) {
133 throw new IllegalArgumentException("executor");
134 }
135
136 this.threadName = nextThreadName();
137 this.executor = executor;
138 }
139
140
141
142
143
144
145
146
147
148 private String nextThreadName() {
149 Class<?> cls = getClass();
150 int newThreadId;
151
152 AtomicInteger threadId = threadIds.putIfAbsent(cls, new AtomicInteger(1));
153
154 if (threadId == null) {
155 newThreadId = 1;
156 } else {
157
158 newThreadId = threadId.incrementAndGet();
159 }
160
161
162 return cls.getSimpleName() + '-' + newThreadId;
163 }
164
165
166
167
168 public final boolean isDisposing() {
169 return disposing;
170 }
171
172
173
174
175 public final boolean isDisposed() {
176 return disposed;
177 }
178
179
180
181
182 public final void dispose() {
183 if (disposed || disposing) {
184 return;
185 }
186
187 synchronized (disposalLock) {
188 disposing = true;
189 startupProcessor();
190 }
191
192 disposalFuture.awaitUninterruptibly();
193 disposed = true;
194 }
195
196
197
198
199
200
201
202 protected abstract void doDispose() throws Exception;
203
204
205
206
207
208
209
210
211
212
213 protected abstract int select(long timeout) throws Exception;
214
215
216
217
218
219
220
221
222 protected abstract int select() throws Exception;
223
224
225
226
227
228
229
230 protected abstract boolean isSelectorEmpty();
231
232
233
234
235 protected abstract void wakeup();
236
237
238
239
240
241
242
243 protected abstract Iterator<S> allSessions();
244
245
246
247
248
249
250 protected abstract Iterator<S> selectedSessions();
251
252
253
254
255
256
257
258
259 protected abstract SessionState getState(S session);
260
261
262
263
264
265
266
267
268 protected abstract boolean isWritable(S session);
269
270
271
272
273
274
275
276
277 protected abstract boolean isReadable(S session);
278
279
280
281
282
283
284
285
286
287 protected abstract void setInterestedInWrite(S session, boolean isInterested) throws Exception;
288
289
290
291
292
293
294
295
296
297 protected abstract void setInterestedInRead(S session, boolean isInterested) throws Exception;
298
299
300
301
302
303
304
305
306 protected abstract boolean isInterestedInRead(S session);
307
308
309
310
311
312
313
314
315 protected abstract boolean isInterestedInWrite(S session);
316
317
318
319
320
321
322
323 protected abstract void init(S session) throws Exception;
324
325
326
327
328
329
330
331
332
333 protected abstract void destroy(S session) throws Exception;
334
335
336
337
338
339
340
341
342
343
344
345
346
347 protected abstract int read(S session, IoBuffer buf) throws Exception;
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364 protected abstract int write(S session, IoBuffer buf, int length) throws Exception;
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382 protected abstract int transferFile(S session, FileRegion region, int length) throws Exception;
383
384
385
386
387 public final void add(S session) {
388 if (disposed || disposing) {
389 throw new IllegalStateException("Already disposed.");
390 }
391
392
393 newSessions.add(session);
394 startupProcessor();
395 }
396
397
398
399
400 public final void remove(S session) {
401 scheduleRemove(session);
402 startupProcessor();
403 }
404
405 private void scheduleRemove(S session) {
406 removingSessions.add(session);
407 }
408
409
410
411
412 public final void flush(S session) {
413
414
415 if (session.setScheduledForFlush(true)) {
416 flushingSessions.add(session);
417 wakeup();
418 }
419 }
420
421 private void scheduleFlush(S session) {
422
423
424 if (session.setScheduledForFlush(true)) {
425 flushingSessions.add(session);
426 }
427 }
428
429
430
431
432 public final void updateTrafficMask(S session) {
433 trafficControllingSessions.add(session);
434 wakeup();
435 }
436
437
438
439
440
441 private void startupProcessor() {
442 Processor processor = processorRef.get();
443
444 if (processor == null) {
445 processor = new Processor();
446
447 if (processorRef.compareAndSet(null, processor)) {
448 executor.execute(new NamePreservingRunnable(processor, threadName));
449 }
450 }
451
452
453
454 wakeup();
455 }
456
457
458
459
460
461
462
463
464
465 abstract protected void registerNewSelector() throws IOException;
466
467
468
469
470
471
472
473
474
475
476 abstract protected boolean isBrokenConnection() throws IOException;
477
478
479
480
481
482
483
484 private int handleNewSessions() {
485 int addedSessions = 0;
486
487 for (S session = newSessions.poll(); session != null; session = newSessions.poll()) {
488 if (addNow(session)) {
489
490 addedSessions++;
491 }
492 }
493
494 return addedSessions;
495 }
496
497
498
499
500
501
502
503
504
505
506 private boolean addNow(S session) {
507 boolean registered = false;
508
509 try {
510 init(session);
511 registered = true;
512
513
514 IoFilterChainBuilder chainBuilder = session.getService().getFilterChainBuilder();
515 chainBuilder.buildFilterChain(session.getFilterChain());
516
517
518
519
520 IoServiceListenerSupport listeners = ((AbstractIoService) session.getService()).getListeners();
521 listeners.fireSessionCreated(session);
522 } catch (Throwable e) {
523 ExceptionMonitor.getInstance().exceptionCaught(e);
524
525 try {
526 destroy(session);
527 } catch (Exception e1) {
528 ExceptionMonitor.getInstance().exceptionCaught(e1);
529 } finally {
530 registered = false;
531 }
532 }
533
534 return registered;
535 }
536
537 private int removeSessions() {
538 int removedSessions = 0;
539
540 for (S session = removingSessions.poll(); session != null; session = removingSessions.poll()) {
541 SessionState state = getState(session);
542
543
544 switch (state) {
545 case OPENED:
546
547 if (removeNow(session)) {
548 removedSessions++;
549 }
550
551 break;
552
553 case CLOSING:
554
555 break;
556
557 case OPENING:
558
559
560 newSessions.remove(session);
561
562 if (removeNow(session)) {
563 removedSessions++;
564 }
565
566 break;
567
568 default:
569 throw new IllegalStateException(String.valueOf(state));
570 }
571 }
572
573 return removedSessions;
574 }
575
576 private boolean removeNow(S session) {
577 clearWriteRequestQueue(session);
578
579 try {
580 destroy(session);
581 return true;
582 } catch (Exception e) {
583 IoFilterChain filterChain = session.getFilterChain();
584 filterChain.fireExceptionCaught(e);
585 } finally {
586 clearWriteRequestQueue(session);
587 ((AbstractIoService) session.getService()).getListeners().fireSessionDestroyed(session);
588 }
589 return false;
590 }
591
592 private void clearWriteRequestQueue(S session) {
593 WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
594 WriteRequest req;
595
596 List<WriteRequest> failedRequests = new ArrayList<WriteRequest>();
597
598 if ((req = writeRequestQueue.poll(session)) != null) {
599 Object message = req.getMessage();
600
601 if (message instanceof IoBuffer) {
602 IoBuffer buf = (IoBuffer) message;
603
604
605
606 if (buf.hasRemaining()) {
607 buf.reset();
608 failedRequests.add(req);
609 } else {
610 IoFilterChain filterChain = session.getFilterChain();
611 filterChain.fireMessageSent(req);
612 }
613 } else {
614 failedRequests.add(req);
615 }
616
617
618 while ((req = writeRequestQueue.poll(session)) != null) {
619 failedRequests.add(req);
620 }
621 }
622
623
624 if (!failedRequests.isEmpty()) {
625 WriteToClosedSessionException cause = new WriteToClosedSessionException(failedRequests);
626
627 for (WriteRequest r : failedRequests) {
628 session.decreaseScheduledBytesAndMessages(r);
629 r.getFuture().setException(cause);
630 }
631
632 IoFilterChain filterChain = session.getFilterChain();
633 filterChain.fireExceptionCaught(cause);
634 }
635 }
636
637 private void process() throws Exception {
638 for (Iterator<S> i = selectedSessions(); i.hasNext();) {
639 S session = i.next();
640 process(session);
641 i.remove();
642 }
643 }
644
645
646
647
648 private void process(S session) {
649
650 if (isReadable(session) && !session.isReadSuspended()) {
651 read(session);
652 }
653
654
655 if (isWritable(session) && !session.isWriteSuspended()) {
656
657 if (session.setScheduledForFlush(true)) {
658 flushingSessions.add(session);
659 }
660 }
661 }
662
663 private void read(S session) {
664 IoSessionConfig config = session.getConfig();
665 int bufferSize = config.getReadBufferSize();
666 IoBuffer buf = IoBuffer.allocate(bufferSize);
667
668 final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation();
669
670 try {
671 int readBytes = 0;
672 int ret;
673
674 try {
675 if (hasFragmentation) {
676
677 while ((ret = read(session, buf)) > 0) {
678 readBytes += ret;
679
680 if (!buf.hasRemaining()) {
681 break;
682 }
683 }
684 } else {
685 ret = read(session, buf);
686
687 if (ret > 0) {
688 readBytes = ret;
689 }
690 }
691 } finally {
692 buf.flip();
693 }
694
695 if (readBytes > 0) {
696 IoFilterChain filterChain = session.getFilterChain();
697 filterChain.fireMessageReceived(buf);
698 buf = null;
699
700 if (hasFragmentation) {
701 if (readBytes << 1 < config.getReadBufferSize()) {
702 session.decreaseReadBufferSize();
703 } else if (readBytes == config.getReadBufferSize()) {
704 session.increaseReadBufferSize();
705 }
706 }
707 }
708
709 if (ret < 0) {
710 scheduleRemove(session);
711 }
712 } catch (Throwable e) {
713 if (e instanceof IOException) {
714 if (!(e instanceof PortUnreachableException)
715 || !AbstractDatagramSessionConfig.class.isAssignableFrom(config.getClass())
716 || ((AbstractDatagramSessionConfig) config).isCloseOnPortUnreachable()) {
717 scheduleRemove(session);
718 }
719 }
720
721 IoFilterChain filterChain = session.getFilterChain();
722 filterChain.fireExceptionCaught(e);
723 }
724 }
725
726 private static String byteArrayToHex(byte[] barray) {
727 char[] c = new char[barray.length * 2];
728 int pos = 0;
729
730 for (byte b : barray) {
731 int bb = (b & 0x00FF) >> 4;
732 c[pos++] = (char) (bb > 9 ? bb + 0x37 : bb + 0x30);
733 bb = b & 0x0F;
734 c[pos++] = (char) (bb > 9 ? bb + 0x37 : bb + 0x30);
735 if (pos > 60) {
736 break;
737 }
738 }
739
740 return new String(c);
741 }
742
743 private void notifyIdleSessions(long currentTime) throws Exception {
744
745 if (currentTime - lastIdleCheckTime >= SELECT_TIMEOUT) {
746 lastIdleCheckTime = currentTime;
747 AbstractIoSession.notifyIdleness(allSessions(), currentTime);
748 }
749 }
750
751
752
753
754 private void flush(long currentTime) {
755 if (flushingSessions.isEmpty()) {
756 return;
757 }
758
759 do {
760 S session = flushingSessions.poll();
761
762 if (session == null) {
763
764 break;
765 }
766
767
768
769 session.unscheduledForFlush();
770
771 SessionState state = getState(session);
772
773 switch (state) {
774 case OPENED:
775 try {
776 boolean flushedAll = flushNow(session, currentTime);
777
778 if (flushedAll && !session.getWriteRequestQueue().isEmpty(session)
779 && !session.isScheduledForFlush()) {
780 scheduleFlush(session);
781 }
782 } catch (Exception e) {
783 scheduleRemove(session);
784 IoFilterChain filterChain = session.getFilterChain();
785 filterChain.fireExceptionCaught(e);
786 }
787
788 break;
789
790 case CLOSING:
791
792 break;
793
794 case OPENING:
795
796
797
798 scheduleFlush(session);
799 return;
800
801 default:
802 throw new IllegalStateException(String.valueOf(state));
803 }
804
805 } while (!flushingSessions.isEmpty());
806 }
807
808 private boolean flushNow(S session, long currentTime) {
809 if (!session.isConnected()) {
810 scheduleRemove(session);
811 return false;
812 }
813
814 final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation();
815
816 final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
817
818
819
820
821 final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize()
822 + (session.getConfig().getMaxReadBufferSize() >>> 1);
823 int writtenBytes = 0;
824 WriteRequest req = null;
825
826 try {
827
828 setInterestedInWrite(session, false);
829
830 do {
831
832 req = session.getCurrentWriteRequest();
833
834 if (req == null) {
835 req = writeRequestQueue.poll(session);
836
837 if (req == null) {
838 break;
839 }
840
841 session.setCurrentWriteRequest(req);
842 }
843
844 int localWrittenBytes = 0;
845 Object message = req.getMessage();
846
847 if (message instanceof IoBuffer) {
848 localWrittenBytes = writeBuffer(session, req, hasFragmentation, maxWrittenBytes - writtenBytes,
849 currentTime);
850
851 if ((localWrittenBytes > 0) && ((IoBuffer) message).hasRemaining()) {
852
853 writtenBytes += localWrittenBytes;
854 setInterestedInWrite(session, true);
855 return false;
856 }
857 } else if (message instanceof FileRegion) {
858 localWrittenBytes = writeFile(session, req, hasFragmentation, maxWrittenBytes - writtenBytes,
859 currentTime);
860
861
862
863
864
865
866 if ((localWrittenBytes > 0) && (((FileRegion) message).getRemainingBytes() > 0)) {
867 writtenBytes += localWrittenBytes;
868 setInterestedInWrite(session, true);
869 return false;
870 }
871 } else {
872 throw new IllegalStateException("Don't know how to handle message of type '"
873 + message.getClass().getName() + "'. Are you missing a protocol encoder?");
874 }
875
876 if (localWrittenBytes == 0) {
877
878 setInterestedInWrite(session, true);
879 return false;
880 }
881
882 writtenBytes += localWrittenBytes;
883
884 if (writtenBytes >= maxWrittenBytes) {
885
886 scheduleFlush(session);
887 return false;
888 }
889 } while (writtenBytes < maxWrittenBytes);
890 } catch (Exception e) {
891 if (req != null) {
892 req.getFuture().setException(e);
893 }
894
895 IoFilterChain filterChain = session.getFilterChain();
896 filterChain.fireExceptionCaught(e);
897 return false;
898 }
899
900 return true;
901 }
902
903 private int writeBuffer(S session, WriteRequest req, boolean hasFragmentation, int maxLength, long currentTime)
904 throws Exception {
905 IoBuffer buf = (IoBuffer) req.getMessage();
906 int localWrittenBytes = 0;
907
908 if (buf.hasRemaining()) {
909 int length;
910
911 if (hasFragmentation) {
912 length = Math.min(buf.remaining(), maxLength);
913 } else {
914 length = buf.remaining();
915 }
916
917 try {
918 localWrittenBytes = write(session, buf, length);
919 } catch (IOException ioe) {
920
921
922 session.close(true);
923 }
924
925 }
926
927 session.increaseWrittenBytes(localWrittenBytes, currentTime);
928
929 if (!buf.hasRemaining() || (!hasFragmentation && (localWrittenBytes != 0))) {
930
931 int pos = buf.position();
932 buf.reset();
933
934 fireMessageSent(session, req);
935
936
937 buf.position(pos);
938 }
939
940 return localWrittenBytes;
941 }
942
943 private int writeFile(S session, WriteRequest req, boolean hasFragmentation, int maxLength, long currentTime)
944 throws Exception {
945 int localWrittenBytes;
946 FileRegion region = (FileRegion) req.getMessage();
947
948 if (region.getRemainingBytes() > 0) {
949 int length;
950
951 if (hasFragmentation) {
952 length = (int) Math.min(region.getRemainingBytes(), maxLength);
953 } else {
954 length = (int) Math.min(Integer.MAX_VALUE, region.getRemainingBytes());
955 }
956
957 localWrittenBytes = transferFile(session, region, length);
958 region.update(localWrittenBytes);
959 } else {
960 localWrittenBytes = 0;
961 }
962
963 session.increaseWrittenBytes(localWrittenBytes, currentTime);
964
965 if ((region.getRemainingBytes() <= 0) || (!hasFragmentation && (localWrittenBytes != 0))) {
966 fireMessageSent(session, req);
967 }
968
969 return localWrittenBytes;
970 }
971
972 private void fireMessageSent(S session, WriteRequest req) {
973 session.setCurrentWriteRequest(null);
974 IoFilterChain filterChain = session.getFilterChain();
975 filterChain.fireMessageSent(req);
976 }
977
978
979
980
981 private void updateTrafficMask() {
982 int queueSize = trafficControllingSessions.size();
983
984 while (queueSize > 0) {
985 S session = trafficControllingSessions.poll();
986
987 if (session == null) {
988
989 return;
990 }
991
992 SessionState state = getState(session);
993
994 switch (state) {
995 case OPENED:
996 updateTrafficControl(session);
997
998 break;
999
1000 case CLOSING:
1001 break;
1002
1003 case OPENING:
1004
1005
1006
1007
1008 trafficControllingSessions.add(session);
1009 break;
1010
1011 default:
1012 throw new IllegalStateException(String.valueOf(state));
1013 }
1014
1015
1016
1017
1018
1019 queueSize--;
1020 }
1021 }
1022
1023
1024
1025
1026 public void updateTrafficControl(S session) {
1027
1028 try {
1029 setInterestedInRead(session, !session.isReadSuspended());
1030 } catch (Exception e) {
1031 IoFilterChain filterChain = session.getFilterChain();
1032 filterChain.fireExceptionCaught(e);
1033 }
1034
1035 try {
1036 setInterestedInWrite(session,
1037 !session.getWriteRequestQueue().isEmpty(session) && !session.isWriteSuspended());
1038 } catch (Exception e) {
1039 IoFilterChain filterChain = session.getFilterChain();
1040 filterChain.fireExceptionCaught(e);
1041 }
1042 }
1043
1044
1045
1046
1047
1048
1049
1050 private class Processor implements Runnable {
1051 public void run() {
1052 assert (processorRef.get() == this);
1053
1054 int nSessions = 0;
1055 lastIdleCheckTime = System.currentTimeMillis();
1056
1057 for (;;) {
1058 try {
1059
1060
1061
1062
1063 long t0 = System.currentTimeMillis();
1064 int selected = select(SELECT_TIMEOUT);
1065 long t1 = System.currentTimeMillis();
1066 long delta = (t1 - t0);
1067
1068 if ((selected == 0) && !wakeupCalled.get() && (delta < 100)) {
1069
1070
1071 if (isBrokenConnection()) {
1072 LOG.warn("Broken connection");
1073
1074
1075
1076 wakeupCalled.getAndSet(false);
1077
1078 continue;
1079 } else {
1080 LOG.warn("Create a new selector. Selected is 0, delta = " + (t1 - t0));
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092 registerNewSelector();
1093 }
1094
1095
1096 wakeupCalled.getAndSet(false);
1097
1098
1099 continue;
1100 }
1101
1102
1103 nSessions += handleNewSessions();
1104
1105 updateTrafficMask();
1106
1107
1108
1109 if (selected > 0) {
1110
1111 process();
1112 }
1113
1114
1115 long currentTime = System.currentTimeMillis();
1116 flush(currentTime);
1117
1118
1119 nSessions -= removeSessions();
1120
1121
1122 notifyIdleSessions(currentTime);
1123
1124
1125
1126 if (nSessions == 0) {
1127 processorRef.set(null);
1128
1129 if (newSessions.isEmpty() && isSelectorEmpty()) {
1130
1131 assert (processorRef.get() != this);
1132 break;
1133 }
1134
1135 assert (processorRef.get() != this);
1136
1137 if (!processorRef.compareAndSet(null, this)) {
1138
1139 assert (processorRef.get() != this);
1140 break;
1141 }
1142
1143 assert (processorRef.get() == this);
1144 }
1145
1146
1147
1148 if (isDisposing()) {
1149 for (Iterator<S> i = allSessions(); i.hasNext();) {
1150 scheduleRemove(i.next());
1151 }
1152
1153 wakeup();
1154 }
1155 } catch (ClosedSelectorException cse) {
1156
1157 break;
1158 } catch (Throwable t) {
1159 ExceptionMonitor.getInstance().exceptionCaught(t);
1160
1161 try {
1162 Thread.sleep(1000);
1163 } catch (InterruptedException e1) {
1164 ExceptionMonitor.getInstance().exceptionCaught(e1);
1165 }
1166 }
1167 }
1168
1169 try {
1170 synchronized (disposalLock) {
1171 if (disposing) {
1172 doDispose();
1173 }
1174 }
1175 } catch (Throwable t) {
1176 ExceptionMonitor.getInstance().exceptionCaught(t);
1177 } finally {
1178 disposalFuture.setValue(true);
1179 }
1180 }
1181 }
1182 }